<a href="https://colab.research.google.com/github/03axdov/Notebooks/blob/main/ActionClassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install remotezip tqdm opencv-python
!pip install -q git+https://github.com/tensorflow/docs

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import tqdm
import random
import pathlib

import itertools
import collections

import os
import cv2
import numpy as np
import remotezip as rz

import tensorflow as tf

# Some modules to display an animation using imageio.
import imageio
from IPython import display
from urllib import request
from tensorflow_docs.vis import embed

In [None]:
URL = 'https://storage.googleapis.com/thumos14_files/UCF101_videos.zip'

In [None]:
def list_files_from_zip_url(zip_url):
  files= []
  with rz.RemoteZip(URL) as zip:
    for zip_info in zip.infolist():
      files.append(zip_info.filename)

  return files

In [None]:
files = list_files_from_zip_url(URL)
files = [f for f in files if f.endswith('.avi')]
files[:10]

['UCF101/v_ApplyEyeMakeup_g01_c01.avi',
 'UCF101/v_ApplyEyeMakeup_g01_c02.avi',
 'UCF101/v_ApplyEyeMakeup_g01_c03.avi',
 'UCF101/v_ApplyEyeMakeup_g01_c04.avi',
 'UCF101/v_ApplyEyeMakeup_g01_c05.avi',
 'UCF101/v_ApplyEyeMakeup_g01_c06.avi',
 'UCF101/v_ApplyEyeMakeup_g02_c01.avi',
 'UCF101/v_ApplyEyeMakeup_g02_c02.avi',
 'UCF101/v_ApplyEyeMakeup_g02_c03.avi',
 'UCF101/v_ApplyEyeMakeup_g02_c04.avi']

In [None]:
def get_class(fname):
  return fname.split("_")[1]

In [None]:
def get_files_per_class(files):
  files_for_class = collections.defaultdict(list)

  for fname in files:
    class_name = get_class(fname)
    files_for_class[class_name].append(fname)

  return files_for_class

In [None]:
NUM_CLASSES = 10
FILES_PER_CLASS = 50

In [None]:
files_for_class = get_files_per_class(files)
classes = list(files_for_class.keys())

In [None]:
print(f"NUM CLASSES: {len(classes)}")
print(f"NUM VIDEOS FOR CLASS 1: {len(files_for_class[classes[0]])}")

NUM CLASSES: 101
NUM VIDEOS FOR CLASS 1: 145


In [None]:
def select_subset_of_classes(files_for_class, classes, files_per_class):
  files_subset = {}

  for class_name in classes:
    files_subset[class_name] = files_for_class[class_name][:files_per_class]

  return files_subset

In [None]:
files_subset = select_subset_of_classes(files_for_class, classes[:NUM_CLASSES], FILES_PER_CLASS)
list(files_subset.keys())

['ApplyEyeMakeup',
 'ApplyLipstick',
 'Archery',
 'BabyCrawling',
 'BalanceBeam',
 'BandMarching',
 'BaseballPitch',
 'BasketballDunk',
 'Basketball',
 'BenchPress']

In [None]:
def download_from_zip(zip_url, to_dir, file_names):
  with rz.RemoteZip(zip_url) as zip:
    for fn in tqdm.tqdm(file_names):
      class_name = get_class(fn)
      zip.extract(fn, str(to_dir / class_name))
      unzipped_file = to_dir / class_name / fn

      fn = pathlib.Path(fn).parts[-1]
      output_file = to_dir / class_name / fn
      unzipped_file.rename(output_file)

In [None]:
def split_class_lists(files_for_class, count):
  split_files = []
  remainder = {}
  for cls in files_for_class:
    split_files.extend(files_for_class[cls][:count])
    remainder[cls] = files_for_class[cls][count:]

  return split_files, remainder

In [None]:
def download_ucf_101_subset(zip_url, num_classes, splits, download_dir):
  files = list_files_from_zip_url(zip_url)

  for f in files:
    path = os.path.normpath(f)  # ex. A/./B become A/B
    tokens = path.split(os.sep)
    if len(tokens) <= 2:  # No filename
      files.remove(f)

  files_for_class = get_files_per_class(files)

  classes = list(files_for_class.keys())[:num_classes]

  for cls in classes:
    random.shuffle(files_for_class[cls])

  files_for_class = {x: files_for_class[x] for x in classes}

  dirs = {}
  for split_name, split_count in splits.items():
    print(split_name, ":")
    split_dir = download_dir / split_name
    split_files, files_for_class = split_class_lists(files_for_class, split_count)

    download_from_zip(zip_url, split_dir, split_files)
    dirs[split_name] = split_dir

  return dirs

In [None]:
download_dir = pathlib.Path('./UCF101_subset/')
subset_paths = download_ucf_101_subset(URL,
                                       num_classes = NUM_CLASSES,
                                       splits = {"train": 30, "val": 10, "test": 10},
                                       download_dir = download_dir)

train :


100%|██████████| 300/300 [00:38<00:00,  7.80it/s]


val :


100%|██████████| 100/100 [00:11<00:00,  8.64it/s]


test :


100%|██████████| 100/100 [00:11<00:00,  8.93it/s]


In [None]:
video_count_train = len(list(download_dir.glob("train/*/*.avi")))
video_count_val = len(list(download_dir.glob("val/*/*.avi")))
test_count_val = len(list(download_dir.glob("test/*/*.avi")))

video_total = video_count_train + video_count_val + test_count_val
print(video_total)

900


In [None]:
!find ./UCF101_subset

In [None]:
def frames_from_video_file(video_path, n_frames, output_size=(224, 224)):
  count = 0
  result = []
  src = cv2.VideoCapture(str(video_path))

  video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

  if n_frames > video_length:
    start = 0
  else:
    max_start = video_length - n_frames
    start = random.randint(0, max_start)

  src.set(cv2.CAP_PROP_POS_FRAMES, start)

  for _ in range(n_frames):
    ret, frame = src.read()
    if ret:
      frame = tf.image.convert_image_dtype(frame, dtype=tf.float32)
      frame = tf.image.resize_with_pad(frame, *output_size)
      result.append(frame)

    else:
      result.append(np.zeros_like(result[0]))
  src.release()
  result = np.array(result)[..., [2, 1, 0]]

  return result

In [None]:
!curl -O https://upload.wikimedia.org/wikipedia/commons/8/86/End_of_a_jam.ogv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 55.0M  100 55.0M    0     0  20.1M      0  0:00:02  0:00:02 --:--:-- 20.1M


In [None]:
video_path = "End_of_a_jam.ogv"

In [None]:
sample_video = frames_from_video_file(video_path, n_frames=10)
sample_video.shape

(10, 224, 224, 3)

In [None]:
def to_gif(images):
  converted_images = np.clip(images * 255, 0, 255).astype(np.uint8)
  imageio.mimsave('./animation.gif', converted_images, fps=10)
  return embed.embed_file('./animation.gif')

In [None]:
to_gif(sample_video)

In [None]:
ucf_sample_video = frames_from_video_file(next(subset_paths['train'].glob('*/*.avi')), 50)
to_gif(ucf_sample_video)

In [None]:
class FrameGenerator:
  def __init__(self, path, n_frames):
    self.path = path
    self.n_frames = n_frames
    self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
    self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

  def get_files_and_class_names(self):
    video_paths = list(self.path.glob('*/*.avi'))
    classes = [p.parent.name for p in video_paths] 
    return video_paths, classes

  def __call__(self):
    video_paths, classes = self.get_files_and_class_names()

    pairs = list(zip(video_paths, classes))

    random.shuffle(pairs)

    shuffled_video_paths, shuffled_classes = zip(*pairs)

    for path, name in pairs:
      video_frames = frames_from_video_file(path, self.n_frames) 
      label = self.class_ids_for_name[name] # Encode labels
      yield video_frames, label

In [None]:
fg = FrameGenerator(subset_paths['train'], 10)

frames, label = next(fg())

print(f"Shape: {frames.shape}")
print(f"Label: {label}")

Shape: (10, 224, 224, 3)
Label: 6


In [None]:
output_signature = (tf.TensorSpec(shape=(None, None, None, 3), dtype=tf.float32),
                    tf.TensorSpec(shape= (), dtype=tf.int16))

train_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['train'], 10),
                                          output_signature=output_signature)

In [None]:
val_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['val'], 10),
                                        output_signature=output_signature)

In [None]:
print(train_ds)

<FlatMapDataset element_spec=(TensorSpec(shape=(None, None, None, 3), dtype=tf.float32, name=None), TensorSpec(shape=(), dtype=tf.int16, name=None))>


In [None]:
train_frames, train_labels = next(iter(train_ds))
print(f'Shape of training set of frames: {train_frames.shape}')
print(f'Shape of training labels: {train_labels.shape}')

val_frames, val_labels = next(iter(val_ds))
print(f'Shape of validation set of frames: {val_frames.shape}')
print(f'Shape of validation labels: {val_labels.shape}')

Shape of training set of frames: (10, 224, 224, 3)
Shape of training labels: ()
Shape of validation set of frames: (10, 224, 224, 3)
Shape of validation labels: ()


In [None]:
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().shuffle(1000).prefetch(buffer_size=AUTOTUNE)

In [None]:
train_ds = train_ds.batch(2)
val_ds = val_ds.batch(2)

train_frames, train_labels = next(iter(train_ds))
print(f'Shape of training set of frames: {train_frames.shape}')
print(f'Shape of training labels: {train_labels.shape}')

val_frames, val_labels = next(iter(val_ds))
print(f'Shape of validation set of frames: {val_frames.shape}')
print(f'Shape of validation labels: {val_labels.shape}')

Shape of training set of frames: (2, 2, 2, 10, 224, 224, 3)
Shape of training labels: (2, 2, 2)
Shape of validation set of frames: (2, 2, 2, 10, 224, 224, 3)
Shape of validation labels: (2, 2, 2)


In [94]:
net = tf.keras.applications.EfficientNetB0(include_top=False)
net.trainable = False

model = tf.keras.Sequential([
    tf.keras.layers.TimeDistributed(net),
    tf.keras.layers.Dense(10),
    tf.keras.layers.GlobalAveragePooling3D()
])

model.compile(
    optimizer = tf.keras.optimizers.Adam(),
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=["accuracy"]
)

In [None]:
model.fit(
    train_ds,
    epochs=10,
    validation_data = val_ds,
    callbacks = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=2)
)