# MoViNet Tutorial

This notebook provides basic example code to build, run, and fine-tune [MoViNets (Mobile Video Networks)](https://arxiv.org/pdf/2103.11511.pdf).

Pretrained models are provided by [TensorFlow Hub](https://tfhub.dev/google/collections/movinet/) and the [TensorFlow Model Garden](https://github.com/tensorflow/models/tree/master/official/projects/movinet), trained on [Kinetics 600](https://deepmind.com/research/open-source/kinetics) for video action classification. All Models use TensorFlow 2 with Keras for inference and training.

The following steps will be performed:

1. [Running base model inference with TensorFlow Hub](#scrollTo=6g0tuFvf71S9&line=8&uniqifier=1)
2. [Running streaming model inference with TensorFlow Hub and plotting predictions](#scrollTo=ADrHPmwGcBZ5&line=4&uniqifier=1)
3. [Exporting a streaming model to TensorFlow Lite for mobile](#scrollTo=W3CLHvubvdSI&line=3&uniqifier=1)
4. [Fine-Tuning a base Model with the TensorFlow Model Garden](#scrollTo=_s-7bEoa3f8g&line=11&uniqifier=1)

![jumping jacks plot](https://storage.googleapis.com/tf_model_garden/vision/movinet/artifacts/jumpingjacks_plot.gif)

To generate video plots like the one above, see [section 2](#scrollTo=ADrHPmwGcBZ5&line=4&uniqifier=1).

## Setup

For inference on smaller models (A0-A2), CPU is sufficient for this Colab. For fine-tuning, it is recommended to run the models using GPUs.

To select a GPU in Colab, select `Runtime > Change runtime type > Hardware accelerator > GPU` dropdown in the top menu.

In [16]:
!pip install numpy==2.0.2
!pip install pandas==2.2.2
!pip install tensorflow==2.18.0



In [17]:
# Run imports
import os
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import PIL
import pandas as pd
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_hub as hub
import tqdm
import absl.logging

tf.get_logger().setLevel('ERROR')
absl.logging.set_verbosity(absl.logging.ERROR)
mpl.rcParams.update({
    'font.size': 10,
})

In [18]:
!pip install opencv-python



Run the cell below to define helper functions and create variables.

In [None]:
#@title Run this cell to set up some helper code.

# Download Kinetics 600 label map
!wget https://raw.githubusercontent.com/tensorflow/models/f8af2291cced43fc9f1d9b41ddbf772ae7b0d7d2/official/projects/movinet/files/kinetics_600_labels.txt -O labels.txt -q

with tf.io.gfile.GFile('labels.txt') as f:
  lines = f.readlines()
  KINETICS_600_LABELS_LIST = [line.strip() for line in lines]
  KINETICS_600_LABELS = tf.constant(KINETICS_600_LABELS_LIST)

def get_top_k(probs, k=5, label_map=KINETICS_600_LABELS):
  """Outputs the top k model labels and probabilities on the given video."""
  top_predictions = tf.argsort(probs, axis=-1, direction='DESCENDING')[:k]
  top_labels = tf.gather(label_map, top_predictions, axis=-1)
  top_labels = [label.decode('utf8') for label in top_labels.numpy()]
  top_probs = tf.gather(probs, top_predictions, axis=-1).numpy()
  return tuple(zip(top_labels, top_probs))

def predict_top_k(model, video, k=5, label_map=KINETICS_600_LABELS):
  """Outputs the top k model labels and probabilities on the given video."""
  outputs = model.predict(video[tf.newaxis])[0]
  probs = tf.nn.softmax(outputs)
  return get_top_k(probs, k=k, label_map=label_map)

def load_movinet_from_hub(model_id, model_mode, hub_version=3):
  """Loads a MoViNet model from TF Hub."""
  hub_url = f'https://tfhub.dev/tensorflow/movinet/{model_id}/{model_mode}/kinetics-600/classification/{hub_version}'

  encoder = hub.KerasLayer(hub_url, trainable=True)

  inputs = tf.keras.layers.Input(
      shape=[None, None, None, 3],
      dtype=tf.float32)

  if model_mode == 'base':
    inputs = dict(image=inputs)
  else:
    # Define the state inputs, which is a dict that maps state names to tensors.
    init_states_fn = encoder.resolved_object.signatures['init_states']
    state_shapes = {
        name: ([s if s > 0 else None for s in state.shape], state.dtype)
        for name, state in init_states_fn(tf.constant([0, 0, 0, 0, 3])).items()
    }
    states_input = {
        name: tf.keras.Input(shape[1:], dtype=dtype, name=name)
        for name, (shape, dtype) in state_shapes.items()
    }

    # The inputs to the model are the states and the video
    inputs = {**states_input, 'image': inputs}

  # Output shape: [batch_size, 600]
  outputs = encoder(inputs)

  model = tf.keras.Model(inputs, outputs)
  model.build([1, 1, 1, 1, 3])

  return model

# Download example gif
!wget https://github.com/tensorflow/models/raw/f8af2291cced43fc9f1d9b41ddbf772ae7b0d7d2/official/projects/movinet/files/jumpingjack.gif -O jumpingjack.gif -q

def load_gif(file_path, image_size=(224, 224)):
  """Loads a gif file into a TF tensor."""
  with tf.io.gfile.GFile(file_path, 'rb') as f:
    video = tf.io.decode_gif(f.read())
  video = tf.image.resize(video, image_size)
  video = tf.cast(video, tf.float32) / 255.
  return video

def get_top_k_streaming_labels(probs, k=5, label_map=KINETICS_600_LABELS_LIST):
  """Returns the top-k labels over an entire video sequence.

  Args:
    probs: probability tensor of shape (num_frames, num_classes) that represents
      the probability of each class on each frame.
    k: the number of top predictions to select.
    label_map: a list of labels to map logit indices to label strings.

  Returns:
    a tuple of the top-k probabilities, labels, and logit indices
  """
  top_categories_last = tf.argsort(probs, -1, 'DESCENDING')[-1, :1]
  categories = tf.argsort(probs, -1, 'DESCENDING')[:, :k]
  categories = tf.reshape(categories, [-1])

  counts = sorted([
      (i.numpy(), tf.reduce_sum(tf.cast(categories == i, tf.int32)).numpy())
      for i in tf.unique(categories)[0]
  ], key=lambda x: x[1], reverse=True)

  top_probs_idx = tf.constant([i for i, _ in counts[:k]])
  top_probs_idx = tf.concat([top_categories_last, top_probs_idx], 0)
  top_probs_idx = tf.unique(top_probs_idx)[0][:k+1]

  top_probs = tf.gather(probs, top_probs_idx, axis=-1)
  top_probs = tf.transpose(top_probs, perm=(1, 0))
  top_labels = tf.gather(label_map, top_probs_idx, axis=0)
  top_labels = [label.decode('utf8') for label in top_labels.numpy()]

  return top_probs, top_labels, top_probs_idx

def plot_streaming_top_preds_at_step(
    top_probs,
    top_labels,
    step=None,
    image=None,
    legend_loc='lower left',
    duration_seconds=10,
    figure_height=500,
    playhead_scale=0.8,
    grid_alpha=0.3):
  """Generates a plot of the top video model predictions at a given time step.

  Args:
    top_probs: a tensor of shape (k, num_frames) representing the top-k
      probabilities over all frames.
    top_labels: a list of length k that represents the top-k label strings.
    step: the current time step in the range [0, num_frames].
    image: the image frame to display at the current time step.
    legend_loc: the placement location of the legend.
    duration_seconds: the total duration of the video.
    figure_height: the output figure height.
    playhead_scale: scale value for the playhead.
    grid_alpha: alpha value for the gridlines.

  Returns:
    A tuple of the output numpy image, figure, and axes.
  """
  num_labels, num_frames = top_probs.shape
  if step is None:
    step = num_frames

  fig = plt.figure(figsize=(6.5, 7), dpi=300)
  gs = mpl.gridspec.GridSpec(8, 1)
  ax2 = plt.subplot(gs[:-3, :])
  ax = plt.subplot(gs[-3:, :])

  if image is not None:
    ax2.imshow(image, interpolation='nearest')
    ax2.axis('off')

  preview_line_x = tf.linspace(0., duration_seconds, num_frames)
  preview_line_y = top_probs

  line_x = preview_line_x[:step+1]
  line_y = preview_line_y[:, :step+1]

  for i in range(num_labels):
    ax.plot(preview_line_x, preview_line_y[i], label=None, linewidth='1.5',
            linestyle=':', color='gray')
    ax.plot(line_x, line_y[i], label=top_labels[i], linewidth='2.0')


  ax.grid(which='major', linestyle=':', linewidth='1.0', alpha=grid_alpha)
  ax.grid(which='minor', linestyle=':', linewidth='0.5', alpha=grid_alpha)

  min_height = tf.reduce_min(top_probs) * playhead_scale
  max_height = tf.reduce_max(top_probs)
  ax.vlines(preview_line_x[step], min_height, max_height, colors='red')
  ax.scatter(preview_line_x[step], max_height, color='red')

  ax.legend(loc=legend_loc)

  plt.xlim(0, duration_seconds)
  plt.ylabel('Probability')
  plt.xlabel('Time (s)')
  plt.yscale('log')

  fig.tight_layout()
  fig.canvas.draw()

  data = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
  data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,))
  plt.close()

  figure_width = int(figure_height * data.shape[1] / data.shape[0])
  image = PIL.Image.fromarray(data).resize([figure_width, figure_height])
  image = np.array(image)

  return image, (fig, ax, ax2)

def plot_streaming_top_preds(
    probs,
    video,
    top_k=5,
    video_fps=25.,
    figure_height=500,
    use_progbar=True):
  """Generates a video plot of the top video model predictions.

  Args:
    probs: probability tensor of shape (num_frames, num_classes) that represents
      the probability of each class on each frame.
    video: the video to display in the plot.
    top_k: the number of top predictions to select.
    video_fps: the input video fps.
    figure_fps: the output video fps.
    figure_height: the height of the output video.
    use_progbar: display a progress bar.

  Returns:
    A numpy array representing the output video.
  """
  video_fps = 8.
  figure_height = 500
  steps = video.shape[0]
  duration = steps / video_fps

  top_probs, top_labels, _ = get_top_k_streaming_labels(probs, k=top_k)

  images = []
  step_generator = tqdm.trange(steps) if use_progbar else range(steps)
  for i in step_generator:
    image, _ = plot_streaming_top_preds_at_step(
        top_probs=top_probs,
        top_labels=top_labels,
        step=i,
        image=video[i],
        duration_seconds=duration,
        figure_height=figure_height,
    )
    images.append(image)

  return np.array(images)

## Fine-Tune a Base Model with the TensorFlow Model Garden

We will Fine-tune MoViNet-A0-Base on [UCF-101](https://www.crcv.ucf.edu/research/data-sets/ucf101/).

The following code will:

- Load the UCF-101 dataset with [TensorFlow Datasets](https://www.tensorflow.org/datasets/catalog/ucf101).
- Create a simple [`tf.data.Dataset`](https://www.tensorflow.org/api_docs/python/tf/data/Dataset) pipeline for training and evaluation.
- Display some example videos from the dataset.
- Build a MoViNet model and load pretrained weights.
- Fine-tune the final classifier layers on UCF-101 and evaluate accuracy on the validation set.

### Load the UCF-101 Dataset with TensorFlow Datasets

Calling `download_and_prepare()` will automatically download the dataset. This step may take up to 1 hour depending on the download and extraction speed. After downloading, the next cell will output information about the dataset.

In [19]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [20]:
DATASET_PATH = '/content/drive/MyDrive/dataset'
TRAIN_DIR = os.path.join(DATASET_PATH, 'training')
TEST_DIR = os.path.join(DATASET_PATH, 'testing')

In [81]:
def get_video_paths_and_labels(base_path):
    paths = []
    labels = []
    label_map = {'passive': 0, 'active': 1}

    for label_name, label_value in label_map.items():
        folder = os.path.join(base_path, label_name)
        for fname in os.listdir(folder):
            if fname.endswith('.mp4'):
                paths.append(os.path.join(folder, fname))
                labels.append(label_value)

    return paths, labels

train_paths, train_labels = get_video_paths_and_labels('/content/drive/MyDrive/dataset/training')
test_paths, test_labels = get_video_paths_and_labels('/content/drive/MyDrive/dataset/testing')


In [78]:
import tensorflow as tf
import cv2
import numpy as np

# Paramètres
frame_stride = 2
num_frames = 16
resolution = 224
num_classes = 2
batch_size = 4

# Fonction utilisée via tf.py_function
def read_video_opencv(path):
    path = path.decode() if isinstance(path, bytes) else path
    cap = cv2.VideoCapture(path)
    frames = []
    success = True

    while success:
        success, frame = cap.read()
        if success:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)
    cap.release()

    frames = frames[::frame_stride]
    frames = [cv2.resize(f, (resolution, resolution)) for f in frames]

    if len(frames) >= num_frames:
        frames = frames[:num_frames]
    else:
        pad = [np.zeros((resolution, resolution, 3), dtype=np.uint8) for _ in range(num_frames - len(frames))]
        frames += pad

    video = np.stack(frames, axis=0).astype(np.float32) / 255.0
    return video

def format_features_opencv(path, label):
    # Explicitly run tf.py_function on the CPU, ensure data types match.
    video = tf.numpy_function(read_video_opencv, [path], tf.float32)
    # Set the shape explicitly
    video.set_shape([num_frames, resolution, resolution, 3])
    label = tf.one_hot(label, num_classes)
    return video, label

def make_dataset(paths, labels, training=True):
    dataset = tf.data.Dataset.from_tensor_slices((paths, labels))
    if training:
        dataset = dataset.shuffle(100)
    # Apply dataset.map() outside the strategy.scope
    dataset = dataset.map(
        format_features_opencv,
        num_parallel_calls=tf.data.AUTOTUNE,
    )
    # Add a tf.function decorator to format_features_opencv for performance
    dataset = dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return dataset


In [80]:
train_dataset = make_dataset(train_paths, train_labels, training=True)
test_dataset = make_dataset(test_paths, test_labels, training=False)

In [25]:
!pip install -U -q "tf-models-official"

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/43.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.8/51.8 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.9/2.9 MB[0m [31m39.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m61.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m644.9/644.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m242.5/242.5 kB[0m [31m16.0 MB/s[0m eta [36m0:00

In [26]:
# Run imports
import tensorflow_datasets as tfds

from official.vision.configs import video_classification
from official.projects.movinet.configs import movinet as movinet_configs
from official.projects.movinet.modeling import movinet
from official.projects.movinet.modeling import movinet_layers
from official.projects.movinet.modeling import movinet_model

In [29]:
model_id = 'a5'

tf.keras.backend.clear_session()

backbone = movinet.Movinet(model_id=model_id)
model = movinet_model.MovinetClassifier(backbone=backbone, num_classes=600)
model.build([1, 1, 1, 1, 3])

# Load pretrained weights
!wget https://storage.googleapis.com/tf_model_garden/vision/movinet/movinet_a5_base.tar.gz -O movinet_a5_base.tar.gz -q
!tar -xvf movinet_a5_base.tar.gz

checkpoint_dir = 'movinet_a0_base'
checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint = tf.train.Checkpoint(model=model)

movinet_a5_base/
movinet_a5_base/checkpoint
movinet_a5_base/ckpt-1.data-00000-of-00001
movinet_a5_base/ckpt-1.index


In [37]:
import tensorflow as tf
from keras import layers

In [65]:
def build_classifier(backbone, num_classes, freeze_backbone=False):
  """Builds a classifier on top of a backbone model."""
  model = movinet_model.MovinetClassifier(
      backbone=backbone,
      num_classes=num_classes)
  model.build([batch_size, num_frames, resolution, resolution, 3])

  if freeze_backbone:
    for layer in model.layers[:-1]:
      layer.trainable = False
    model.layers[-1].trainable = True

  return model



#def build_classifier(backbone, num_classes, freeze_backbone=False):
#    """
#    Fonction qui crée un modèle avec un backbone et une couche de classification binaire (2 classes).
#    """
#    model = tf.keras.Sequential([
#        backbone,  # Backbone pré-entraîné (MoViNet-A5)
#        layers.GlobalAveragePooling3D(),  # Pooling global pour réduire la dimension
#        layers.Dense(1024, activation='relu'),  # Couche dense pour apprendre la représentation
#        layers.Dropout(0.5),  # Dropout pour éviter le surapprentissage
#        layers.Dense(num_classes, activation='softmax')  # Dernière couche : 2 classes pour la classification binaire
#    ])

#    if freeze_backbone:
        # Geler toutes les couches du backbone sauf la dernière
#        backbone.trainable = False

#    return model

In [56]:
import tensorflow as tf

In [93]:
num_epochs = 3

#train_steps = num_examples['train'] // batch_size
#total_train_steps = train_steps * num_epochs
#test_steps = num_examples['test'] // batch_size

loss_obj = tf.keras.losses.CategoricalCrossentropy(
    from_logits=True)

metrics = [
    tf.keras.metrics.CategoricalAccuracy(name='accuracy'),
    tf.keras.metrics.AUC(name='auc'),
    tf.keras.metrics.Precision(name='precision'),
    tf.keras.metrics.Recall(name='recall'),
]

total_train_steps = 1000
initial_learning_rate = 0.01
learning_rate = tf.keras.optimizers.schedules.CosineDecay(
    initial_learning_rate, decay_steps=total_train_steps,
)
optimizer = tf.keras.optimizers.Adam(
    learning_rate, epsilon=1.0, clipnorm=1.0)

callbacks = [
    tf.keras.callbacks.TensorBoard(),
]

In [92]:
from keras import optimizers

In [94]:
backbone = movinet.Movinet(model_id=model_id)
model = build_classifier(backbone, num_classes=2, freeze_backbone=True)
# Instead of passing the instantiated optimizer, pass the class or string identifier
model.compile(optimizer='adam', loss=loss_obj, metrics=metrics) # or 'adam'

In [95]:
results = model.fit(
    train_dataset,
    epochs=num_epochs,  # Ajuste en fonction du nombre d'époques
    validation_data=test_dataset,
    verbose=2
)


InvalidArgumentError: Graph execution error:

Node 'training/RMSprop/gradients/gradients/classifier_head_8/classifier/conv3d/StatefulPartitionedCall_grad/PartitionedCall': Connecting to invalid output 1 of source node classifier_head_8/classifier/conv3d/StatefulPartitionedCall which has 1 outputs.