In [1]:
# Install packages

# tf-models-official is the stable Model Garden package
# tf-models-nightly includes latest changes
!pip install -U -q "tf-models-official"


# Install the mediapy package for visualizing images/videos.
# See https://github.com/google/mediapy
!command -v ffmpeg >/dev/null || (apt update && apt install -y ffmpeg)
!pip install -q mediapy

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.8/51.8 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.9/2.9 MB[0m [31m50.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m84.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m615.3/615.3 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m242.5/242.5 kB[0m [31m22.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.2/5.2 MB[0m [31m53.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m82.5 MB/s[0m eta [36m0:00

In [2]:
# Run imports
import os
import matplotlib as mpl
import matplotlib.pyplot as plt
import mediapy as media
import numpy as np
import PIL
import pandas as pd
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_hub as hub
import tqdm
import absl.logging

tf.get_logger().setLevel('ERROR')
absl.logging.set_verbosity(absl.logging.ERROR)
mpl.rcParams.update({
    'font.size': 10,
})

In [3]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [4]:
import shutil

# Source and destination paths
source = '/content/drive/MyDrive/CarCrash'  # Replace with your actual source path
destination = '/content/MyData'

# Copy the directory
shutil.copytree(source, destination,dirs_exist_ok=True)


'/content/MyData'

In [5]:
import zipfile
import os

# Path to the zip file
zip_path = '/content/MyData/Crash-1500.zip'

# Destination directory to extract
extract_path = '/content/MyData2/Crash'

# Check if the file exists
if os.path.exists(zip_path):
    # Unzipping the file
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print(f"Unzipped '{zip_path}' to '{extract_path}' successfully.")
else:
    print(f"Zip file '{zip_path}' does not exist.")


Unzipped '/content/MyData/Crash-1500.zip' to '/content/MyData2/Crash' successfully.


In [7]:
import zipfile
import os

# Path to the zip file
zip_path = '/content/MyData/Normal.zip'

# Destination directory to extract
extract_path = '/content/MyData2/Normal'

# Check if the file exists
if os.path.exists(zip_path):
    # Unzipping the file
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)
    print(f"Unzipped '{zip_path}' to '{extract_path}' successfully.")
else:
    print(f"Zip file '{zip_path}' does not exist.")


Unzipped '/content/MyData/Normal.zip' to '/content/MyData2/Normal' successfully.


In [8]:
import os
import shutil
from tqdm import tqdm
from sklearn.model_selection import train_test_split

# Source folders
source_crash = '/content/MyData2/Crash'
source_normal = '/content/MyData2/Normal'

# Destination folders
train_crash = '/content/DATA/train/crash'
train_normal = '/content/DATA/train/normal'
val_crash = '/content/DATA/val/crash'
val_normal = '/content/DATA/val/normal'
test_crash = '/content/DATA/test/crash'
test_normal = '/content/DATA/test/normal'

# Create destination directories
os.makedirs(train_crash, exist_ok=True)
os.makedirs(train_normal, exist_ok=True)
os.makedirs(val_crash, exist_ok=True)
os.makedirs(val_normal, exist_ok=True)
os.makedirs(test_crash, exist_ok=True)
os.makedirs(test_normal, exist_ok=True)

# Get list of all files in crash and normal folders
crash_files = [os.path.join(source_crash, f) for f in os.listdir(source_crash) if f.endswith('.mp4')]
normal_files = [os.path.join(source_normal, f) for f in os.listdir(source_normal) if f.endswith('.mp4')]

# Split the data (70% train, 20% val, 10% test)
train_crash_files, temp_crash_files = train_test_split(crash_files, test_size=0.3, random_state=42)
val_crash_files, test_crash_files = train_test_split(temp_crash_files, test_size=1/3, random_state=42)

train_normal_files, temp_normal_files = train_test_split(normal_files, test_size=0.3, random_state=42)
val_normal_files, test_normal_files = train_test_split(temp_normal_files, test_size=1/3, random_state=42)

# Function to copy files with tqdm for progress tracking
def copy_files(file_list, destination_folder, desc):
    for file in tqdm(file_list, desc=desc):
        shutil.copy(file, destination_folder)

# Copy crash files
copy_files(train_crash_files, train_crash, "Copying crash train files")
copy_files(val_crash_files, val_crash, "Copying crash val files")
copy_files(test_crash_files, test_crash, "Copying crash test files")

# Copy normal files
copy_files(train_normal_files, train_normal, "Copying normal train files")
copy_files(val_normal_files, val_normal, "Copying normal val files")
copy_files(test_normal_files, test_normal, "Copying normal test files")

print("Data split and copy completed!")


Copying crash train files: 100%|██████████| 1050/1050 [00:00<00:00, 1745.23it/s]
Copying crash val files: 100%|██████████| 300/300 [00:00<00:00, 715.02it/s]
Copying crash test files: 100%|██████████| 150/150 [00:00<00:00, 965.93it/s]
Copying normal train files: 100%|██████████| 2100/2100 [00:12<00:00, 163.99it/s]
Copying normal val files: 100%|██████████| 600/600 [00:08<00:00, 72.21it/s]
Copying normal test files: 100%|██████████| 300/300 [00:01<00:00, 261.19it/s]

Data split and copy completed!





In [14]:
from pathlib import PosixPath

data_paths = {
    'train': PosixPath('/content/DATA/train'),
    'test': PosixPath('/content/DATA/test'),
    'val': PosixPath('/content/DATA/val')
}

print(data_paths)
print(data_paths['train'])
print(data_paths['test'])
subset_paths = data_paths

{'train': PosixPath('/content/DATA/train'), 'test': PosixPath('/content/DATA/test'), 'val': PosixPath('/content/DATA/val')}
/content/DATA/train
/content/DATA/test


In [10]:
print(subset_paths)

{'train': PosixPath('/content/DATA/train'), 'test': PosixPath('/content/DATA/test')}


In [33]:
def format_frames(frame, output_size):
  """
    Pad and resize an image from a video.

    Args:
      frame: Image that needs to resized and padded.
      output_size: Pixel size of the output frame image.

    Return:
      Formatted frame with padding of specified output size.
  """
  frame = tf.image.convert_image_dtype(frame, tf.float32)
  frame = tf.image.resize_with_pad(frame, *output_size)
  return frame

def frames_from_video_file(video_path, n_frames, output_size = (172,172), frame_step = 15):
  """
    Creates frames from each video file present for each category.

    Args:
      video_path: File path to the video.
      n_frames: Number of frames to be created per video file.
      output_size: Pixel size of the output frame image.

    Return:
      An NumPy array of frames in the shape of (n_frames, height, width, channels).
  """
  # Read each video frame by frame
  result = []
  src = cv2.VideoCapture(str(video_path))

  video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

  need_length = 1 + (n_frames - 1) * frame_step

  if need_length > video_length:
    start = 0
  else:
    max_start = video_length - need_length
    start = random.randint(0, max_start + 1)

  src.set(cv2.CAP_PROP_POS_FRAMES, start)
  # ret is a boolean indicating whether read was successful, frame is the image itself
  ret, frame = src.read()
  result.append(format_frames(frame, output_size))

  for _ in range(n_frames - 1):
    for _ in range(frame_step):
      ret, frame = src.read()
    if ret:
      frame = format_frames(frame, output_size)
      result.append(frame)
    else:
      result.append(np.zeros_like(result[0]))
  src.release()
  result = np.array(result)[..., [2, 1, 0]]

  return result


def to_gif(images):
  converted_images = np.clip(images * 255, 0, 255).astype(np.uint8)
  imageio.mimsave('./animation.gif', converted_images, fps=10)
  return embed.embed_file('./animation.gif')

class FrameGenerator:
  def __init__(self, path, n_frames, training = False):
    """ Returns a set of frames with their associated label.

      Args:
        path: Video file paths.
        n_frames: Number of frames.
        training: Boolean to determine if training dataset is being created.
    """
    self.path = path
    self.n_frames = n_frames
    self.training = training
    self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
    self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

  def get_files_and_class_names(self):
    video_paths = list(self.path.glob('*/*.mp4'))
    classes = [p.parent.name for p in video_paths]
    return video_paths, classes

  def __call__(self):
    video_paths, classes = self.get_files_and_class_names()

    pairs = list(zip(video_paths, classes))

    if self.training:
      random.shuffle(pairs)

    for path, name in pairs:
      video_frames = frames_from_video_file(path, self.n_frames)
      label = self.class_ids_for_name[name] # Encode labels
      yield video_frames, label

In [12]:
import cv2
import os

# Path to the directory containing the videos
video_dir = '/content/DATA/test/crash'

# Initialize variables to calculate the average frames
total_frames = 0
video_count = 0

# Loop through all video files in the directory
for video_file in os.listdir(video_dir):
    video_path = os.path.join(video_dir, video_file)

    # Ensure the file is a video (e.g., .mp4)
    if video_file.endswith('.mp4'):  # Adjust the extension if needed
        video_count += 1
        cap = cv2.VideoCapture(video_path)

        # Get the number of frames in the video
        frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        total_frames += frames

        cap.release()

# Calculate the average number of frames
if video_count > 0:
    average_frames = total_frames / video_count
    print(f"Average frames per video: {average_frames:.2f}")
    print(f"Total videos processed: {video_count}")
else:
    print("No videos found in the directory.")


Average frames per video: 50.00
Total videos processed: 150


In [34]:
batch_size = 10
num_frames = 8

CLASSES = sorted(os.listdir('/content/DATA/train'))
print(CLASSES)
output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))

train_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['train'], num_frames, training = True),
                                          output_signature = output_signature)
train_ds = train_ds.batch(batch_size)

val_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['val'], num_frames),
                                          output_signature = output_signature)
val_ds = val_ds.batch(batch_size)

test_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['test'], num_frames),
                                         output_signature = output_signature)
test_ds = test_ds.batch(batch_size)

['crash', 'normal']


In [17]:
from official.projects.movinet.modeling import movinet
from official.projects.movinet.modeling import movinet_model
from official.projects.movinet.tools import export_saved_model

In [18]:
model_id = 'a5'
use_positional_encoding = model_id in {'a3', 'a4', 'a5'}
resolution = 172

backbone = movinet.Movinet(
    model_id=model_id,
    causal=True,
    conv_type='2plus1d',
    se_type='2plus3d',
    activation='hard_swish',
    gating_activation='hard_sigmoid',
    use_positional_encoding=use_positional_encoding,
    use_external_states=False,
)


In [19]:
model = movinet_model.MovinetClassifier(
    backbone,
    num_classes=600,
    output_states=True)

!wget https://storage.googleapis.com/tf_model_garden/vision/movinet/movinet_a5_stream.tar.gz -O movinet_a5_stream.tar.gz -q
!tar -xvf movinet_a5_stream.tar.gz

checkpoint_dir = 'movinet_a5_stream'
checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint = tf.train.Checkpoint(model=model)
status = checkpoint.restore(checkpoint_path)
status.assert_existing_objects_matched()


movinet_a5_stream/
movinet_a5_stream/ckpt-1.data-00000-of-00001
movinet_a5_stream/ckpt-1.index
movinet_a5_stream/checkpoint


<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x7f785d538eb0>

In [20]:
# Detect hardware
try:
  tpu_resolver = tf.distribute.cluster_resolver.TPUClusterResolver() # TPU detection
except ValueError:
  tpu_resolver = None
  gpus = tf.config.experimental.list_logical_devices("GPU")

# Select appropriate distribution strategy
if tpu_resolver:
  tf.config.experimental_connect_to_cluster(tpu_resolver)
  tf.tpu.experimental.initialize_tpu_system(tpu_resolver)
  distribution_strategy = tf.distribute.experimental.TPUStrategy(tpu_resolver)
  print('Running on TPU ', tpu_resolver.cluster_spec().as_dict()['worker'])
elif len(gpus) > 1:
  distribution_strategy = tf.distribute.MirroredStrategy([gpu.name for gpu in gpus])
  print('Running on multiple GPUs ', [gpu.name for gpu in gpus])
elif len(gpus) == 1:
  distribution_strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU
  print('Running on single GPU ', gpus[0].name)
else:
  distribution_strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU
  print('Running on CPU')

print("Number of accelerators: ", distribution_strategy.num_replicas_in_sync)

Running on single GPU  /device:GPU:0
Number of accelerators:  1


In [22]:
def build_classifier(batch_size, num_frames, resolution, backbone, num_classes):
  """Builds a classifier on top of a backbone model."""
  model = movinet_model.MovinetClassifier(
      backbone=backbone,
      num_classes=num_classes)
  # model.build([batch_size, num_frames, resolution, resolution, 3])

  return model

# Construct loss, optimizer and compile the model
with distribution_strategy.scope():
  model = build_classifier(batch_size, num_frames, resolution, backbone, 2)
  loss_obj = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
  optimizer = tf.keras.optimizers.Adam(learning_rate = 0.001)
  model.compile(loss=loss_obj, optimizer='adam', metrics=['accuracy'])

In [27]:
checkpoint_path = "/content/trained_model/cp.weights.h5"
checkpoint_dir = os.path.dirname(checkpoint_path)

cp_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,  # Saves only the model's weights
    save_best_only=False,    # If True, only saves the best weights
    monitor='val_loss',      # Metric to monitor for `save_best_only`
    mode='auto',             # Can be 'min', 'max', or 'auto'
    verbose=1
)

In [35]:
import random

results = model.fit(train_ds,
                    validation_data=val_ds,
                    epochs=2,
                    validation_freq=1,
                    verbose=1,
                    )
tf.saved_model.save(model, 'movinet_a0_saved_model4')

Epoch 1/2
Epoch 2/2


In [36]:
tf.saved_model.save(model, 'movinet_a0_saved_model4')

In [37]:
import shutil

# Define the folder to be zipped and the output zip file name
folder_to_zip = '/content/movinet_a0_saved_model4'  # Replace with the path to your folder
output_zip_file = '/content/model'  # Replace with desired zip file name

# Zip the folder
shutil.make_archive(output_zip_file.replace('.zip', ''), 'zip', folder_to_zip)

print(f"Folder zipped successfully as {output_zip_file}")


Folder zipped successfully as /content/model


In [None]:
model.evaluate(test_ds, return_dict=True)

In [None]:
def get_actual_predicted_labels(dataset):
  """
    Create a list of actual ground truth values and the predictions from the model.

    Args:
      dataset: An iterable data structure, such as a TensorFlow Dataset, with features and labels.

    Return:
      Ground truth and predicted values for a particular dataset.
  """
  actual = [labels for _, labels in dataset.unbatch()]
  predicted = model.predict(dataset)

  actual = tf.stack(actual, axis=0)
  predicted = tf.concat(predicted, axis=0)
  predicted = tf.argmax(predicted, axis=1)

  return actual, predicted

In [None]:
def plot_confusion_matrix(actual, predicted, labels, ds_type):
  cm = tf.math.confusion_matrix(actual, predicted)
  ax = sns.heatmap(cm, annot=True, fmt='g')
  sns.set(rc={'figure.figsize':(6, 16)})
  sns.set(font_scale=1.4)
  ax.set_title('Confusion matrix of action recognition for ' + ds_type)
  ax.set_xlabel('Predicted Action')
  ax.set_ylabel('Actual Action')
  plt.xticks(rotation=90)
  plt.yticks(rotation=0)
  ax.xaxis.set_ticklabels(labels)
  ax.yaxis.set_ticklabels(labels)
  plt.show()

In [None]:
fg = FrameGenerator(subset_paths['train'], num_frames, training = True)
label_names = list(fg.class_ids_for_name.keys())


In [None]:
actual, predicted = get_actual_predicted_labels(test_ds)
plot_confusion_matrix(actual, predicted, label_names, 'test')

In [47]:
# Save the weights to a checkpoint
checkpoint_path = '/content/trained_model/movinet_weights.ckpt'
checkpoint = tf.train.Checkpoint(model=model)
checkpoint.save(checkpoint_path)

print(f"Weights saved to {checkpoint_path}")

Weights saved to /content/trained_model/movinet_weights.ckpt


In [50]:
import tensorflow as tf
from official.projects.movinet.modeling import movinet, movinet_model

# Model configuration
model_id = 'a5'
use_positional_encoding = model_id in {'a3', 'a4', 'a5'}
resolution = 172

# Create backbone and model
backbone = movinet.Movinet(
    model_id=model_id,
    causal=True,
    conv_type='2plus1d',
    se_type='2plus3d',
    activation='hard_swish',
    gating_activation='hard_sigmoid',
    use_positional_encoding=use_positional_encoding,
    use_external_states=True,
)

model = movinet_model.MovinetClassifier(
    backbone,
    num_classes=2,
    output_states=True
)

# Load weights using a checkpoint
checkpoint_dir = '/content/trained_model'
checkpoint = tf.train.Checkpoint(model=model)
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)

if latest_checkpoint:
    checkpoint.restore(latest_checkpoint).expect_partial()
    print(f"Weights restored from {latest_checkpoint}")
else:
    print("No checkpoint found in the directory.")


Weights restored from /content/trained_model/movinet_weights.ckpt-1


In [42]:
def get_top_k(probs, k=5, label_map=CLASSES):
  """Outputs the top k model labels and probabilities on the given video."""
  top_predictions = tf.argsort(probs, axis=-1, direction='DESCENDING')[:k]
  top_labels = tf.gather(label_map, top_predictions, axis=-1)
  top_labels = [label.decode('utf8') for label in top_labels.numpy()]
  top_probs = tf.gather(probs, top_predictions, axis=-1).numpy()
  return tuple(zip(top_labels, top_probs))


In [43]:
# Create initial states for the stream model
init_states_fn = model.init_states
init_states = init_states_fn(tf.shape(tf.ones(shape=[1, 1, 172, 172, 3])))

all_logits = []

# To run on a video, pass in one frame at a time
states = init_states
for frames, label in test_ds.take(1):
  for clip in frames[0]:
    # Input shape: [1, 1, 172, 172, 3]
    clip = tf.expand_dims(tf.expand_dims(clip, axis=0), axis=0)
    logits, states = model.predict({**states, 'image': clip}, verbose=0)
    all_logits.append(logits)

logits = tf.concat(all_logits, 0)
probs = tf.nn.softmax(logits)

final_probs = probs[-1]
top_k = get_top_k(final_probs)
print()
for label, prob in top_k:
  print(label, prob)

frames, label = list(test_ds.take(1))[0]
to_gif(frames[0].numpy())

InvalidArgumentError: {{function_node __wrapped__GatherV2_device_/job:localhost/replica:0/task:0/device:CPU:0}} indices[0] = 5 is not in [0, 2) [Op:GatherV2]

In [45]:
import tensorflow as tf

# Path to the saved model
model_path = '/content/movinet_a0_saved_model4'

# Load the model
model = tf.saved_model.load(model_path)

# Verify the model is loaded
print("Model loaded successfully.")


Model loaded successfully.


AttributeError: '_UserObject' object has no attribute 'summary'

In [46]:
print(model.signatures)


_SignatureMap({'serving_default': <ConcreteFunction (*, image: TensorSpec(shape=(None, None, None, None, 3), dtype=tf.float32, name='image')) -> Dict[['classifier_head_2', TensorSpec(shape=(None, 2), dtype=tf.float32, name='classifier_head_2')]] at 0x7F714A65C220>})


In [51]:
saved_model_dir = '/content/model'
tflite_filename = 'model.tflite'
input_shape = [1, 1, 172, 172, 3]

# Convert to saved model
export_saved_model.export_saved_model(
    model=model,
    input_shape=input_shape,
    export_path=saved_model_dir,
    causal=True,
    bundle_input_init_states_fn=False
    )

In [52]:
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir)
tflite_model = converter.convert()

with open(tflite_filename, 'wb') as f:
  f.write(tflite_model)

# Create the interpreter and signature runner
interpreter = tf.lite.Interpreter(model_path=tflite_filename)
runner = interpreter.get_signature_runner()

init_states = {
    name: tf.zeros(x['shape'], dtype=x['dtype'])
    for name, x in runner.get_input_details().items()
}
del init_states['image']

In [None]:
# To run on a video, pass in one frame at a time
states = init_states
for frames, label in test_ds.take(1):
  for clip in frames[0]:
    # Input shape: [1, 1, 172, 172, 3]
    outputs = runner(**states, image=clip)
    logits = outputs.pop('logits')[0]
    states = outputs

probs = tf.nn.softmax(logits)
top_k = get_top_k(probs)
print()
for label, prob in top_k:
  print(label, prob)

frames, label = list(test_ds.take(1))[0]
to_gif(frames[0].numpy())