## 1. Import packages

In [1]:
!pip install immutabledict sacrebleu sentencepiece seqeval tensorflow-model-optimization>=0.4.1 tensorflow-text~=2.13.0
!pip install tf-models-official --no-deps --force-reinstall pyyaml>=6.0.0

In [2]:
import tqdm
import random
import pathlib
import itertools
import collections

import cv2
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import keras
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from sklearn.metrics import classification_report,accuracy_score

# Import the MoViNet model from TensorFlow Models (tf-models-official) for the MoViNet model
from official.projects.movinet.modeling import movinet
from official.projects.movinet.modeling import movinet_model

In [3]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import tensorflow as tf
import random
from pathlib import Path
import os
import cv2
import re
import collections
from sklearn.model_selection import train_test_split

kaggle_username = "rahaf8"
kaggle_key = "f59b8cb26f2973bc6fb4c52b1516ac19"
os.environ["KAGGLE_USERNAME"] = kaggle_username
os.environ["KAGGLE_KEY"] = kaggle_key
import kaggle

import warnings
warnings.filterwarnings('ignore')

## 2. Define Functions

#### 2.1 Get data from Kaggle

In [4]:
def get_kaggle_dataset(dataset_name):
    """Get dataset from Kaggle.

    Args:
        dataset_name: the dataset name.
    """

    # Download the dataset using the Kaggle API
    kaggle.api.dataset_download_files(dataset_name, path=".", unzip=True)

#### 2.2 Video Dataframe Generator

In [5]:
def video_dataframe(data_dir):
  """Get video dataframe.

    Args:
      files_path: A path from which the files can be stored.

    Returns:
      Video dataframe containing the labels , videos name , and videos path.
  """
  vidDf = pd.DataFrame(columns=['Label','VidName','VidPath'])

  for dirname, _, filenames in os.walk(data_dir):
      for name in filenames:
            vidDf =  vidDf.append({'Label': re.match(r'^[^\d_]+', name).group(),
                                   'VidName': name,
                                   'VidPath': os.path.join(dirname, name)},
                                    ignore_index=True)
  return vidDf

#### 2.3 Split Dataset

In [6]:
def SplitData(testsize, df, classes):
    min_samples_per_class = min(df.groupby("Label").size())
    print(f"{min_samples_per_class} Samples per Class")

    df_TrainingSet = pd.DataFrame(columns=df.columns)
    df_TestSet = pd.DataFrame(columns=df.columns)

    for class_label in classes:
        df_class = df[df['Label'] == class_label].sample(min_samples_per_class, random_state=42)

        training_set, test_set = train_test_split(df_class, test_size=testsize, random_state=42)

        df_TrainingSet = df_TrainingSet.append(training_set)
        df_TestSet = df_TestSet.append(test_set)

    df_TrainingSet = df_TrainingSet.sample(frac=1, random_state=42)
    df_TestSet = df_TestSet.sample(frac=1, random_state=42)

    return df_TrainingSet, df_TestSet

#### 2.3 Move video into train and test folder

In [7]:
import os
import shutil

def delete_empty_folders(directory):
    for root, dirs, files in os.walk(directory, topdown=False):
        for dir_name in dirs:
            folder_path = os.path.join(root, dir_name)
            delete_empty_folders(folder_path)  # Recursively check subdirectories
        if not os.listdir(root) and not files:
            os.rmdir(root)

def move_videos_to_folders(df, destination_dir):
    for _, row in df.iterrows():
        label = row['Label']
        vid_name = row['VidName']
        vid_path = row['VidPath']

        folder_path = os.path.join(destination_dir, label)
        os.makedirs(folder_path, exist_ok=True)

        new_vid_path = os.path.join(folder_path, vid_name)
        shutil.move(vid_path, new_vid_path)

        df.loc[df['VidPath'] == vid_path, 'VidPath'] = new_vid_path

#### 2.4 Find minimal frame count

In [8]:
def get_minimal_frame_count(file_name):
    with open(file_name, 'r') as file:
        annotations = file.readlines()

    minimal_value = float('inf')
    for annotation in annotations:
        video_info = annotation.split()
        start_frame = int(video_info[2])
        end_frame = int(video_info[3])
        frame_count = end_frame - start_frame

        if frame_count > 0:
            minimal_value = min(minimal_value, frame_count)

    if minimal_value == float('inf'):
        minimal_value = 0

    return minimal_value

#### 2.4 This code has been copied from tensorflow website without any changes - task for tomorrow

In [9]:
def format_frames(frame, output_size):
  """
    Pad and resize an image from a video.

    Args:
      frame: Image that needs to resized and padded.
      output_size: Pixel size of the output frame image.

    Return:
      Formatted frame with padding of specified output size.
  """
  frame = tf.image.convert_image_dtype(frame, tf.float32)
  frame = tf.image.resize_with_pad(frame, *output_size)
  return frame

In [10]:
import os

def get_start_frame_from_annotation(annotation_file, video_path):
    """
    Retrieves the starting frame from the annotation file.

    Args:
        annotation_file: File path to the annotation file.
        video_path: File path to the video.

    Return:
        The starting frame for the video.
    """
    #print(re.search(r'([^/]+)\.mp4$', video_path).group(1))
    video_name = re.search(r'[^/]+$', video_path).group(0)

    starting_frame = 0
    with open(annotation_file, 'r') as file:
        for line in file:
            video_info = line.strip().split()
            if len(video_info) >= 3 and video_info[0] == video_name:
                starting_frame = int(video_info[2])
                if(starting_frame == -1):
                  starting_frame = 0
    return starting_frame

In [11]:
def frames_from_video_file(video_path, n_frames, output_size=(320, 320)):
    """
    Creates frames from each video file present for each category.

    Args:
        video_path: File path to the video.
        n_frames: Number of frames to be created per video file.
        output_size: Pixel size of the output frame image.

    Return:
        A NumPy array of frames in the shape of (n_frames, height, width, channels).
    """
    annotation_file = '/content/dataset/Filtered_Anomaly_Annotation.txt'

    result = []
    src = cv2.VideoCapture(str(video_path))

    video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

    need_length = n_frames

    start = get_start_frame_from_annotation(annotation_file, str(video_path))

    src.set(cv2.CAP_PROP_POS_FRAMES, start)

    ret, frame = src.read()
    result.append(format_frames(frame, output_size))

    for _ in range(n_frames - 1):
        ret, frame = src.read()
        if ret:
            frame = format_frames(frame, output_size)
            result.append(frame)
        else:
            result.append(np.zeros_like(result[0]))

    src.release()
    result = np.array(result)[..., [2, 1, 0]]

    return result

In [12]:
def generate_frames(path, df, n_frames, training=False):
    pairs = list(zip(df['VidPath'], df['Label']))
    class_names = df['Label'].unique().tolist()
    class_ids_for_name = {name: idx for idx, name in enumerate(class_names)}
    print(class_ids_for_name)

    if training:
        random.shuffle(pairs)

    for path, name in pairs:
        video_frames = frames_from_video_file(path, n_frames)
        label = class_ids_for_name[name]
        yield video_frames, label

## 3. Data Preprocessing

In [13]:
! pip install opendatasets --quiet


import opendatasets as od

#{"username":"rahaf8","key":"f59b8cb26f2973bc6fb4c52b1516ac19"}

dataset_url = 'https://www.kaggle.com/datasets/saharyatimi/dataset'

# Specify the folder or file you want to download

od.download(dataset_url)

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: rahaf8
Your Kaggle Key: ··········
Downloading dataset.zip to ./dataset


100%|██████████| 552M/552M [00:11<00:00, 52.5MB/s]





In [14]:
dataset_name = "saharyatimi/datasett"
dataset_dir = "/content/dataset/dataset"

#get_kaggle_dataset(dataset_name)

In [15]:
video_df = video_dataframe(dataset_dir)
classes = video_df['Label'].unique().tolist()
print('Number of classes', len(classes))
print('Num videos for each class: : ')
print(video_df['Label'].value_counts())

Number of classes 8
Num videos for each class: : 
Stealing       5
Vandalism      5
Shooting       5
Normal         5
Robbery        5
Shoplifting    5
Burglary       5
Fighting       5
Name: Label, dtype: int64


In [16]:
train_df, test_df = SplitData(0.2, video_df,classes)

5 Samples per Class


In [17]:
ddata = {"Training":train_df.groupby("Label").size(),"Test":test_df.groupby("Label").size()}

ddataframe = pd.DataFrame(data=ddata)
ddataframe.plot.bar(stacked= True, rot= 15, title='Training vs Test data',figsize=(15,5))
plt.show(block= True)

In [18]:
train_destination_dir = '/content/dataset/train'
test_destination_dir = '/content/dataset/test'

# Move videos to train folders
move_videos_to_folders(train_df, train_destination_dir)

# Move videos to test folders
move_videos_to_folders(test_df, test_destination_dir)

#delete empty folders
delete_empty_folders('/content/dataset/dataset')

In [19]:
file_name = '/content/dataset/Filtered_Anomaly_Annotation.txt'
batch_size = 8
num_frames =  get_minimal_frame_count(file_name)

output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))

train_ds = tf.data.Dataset.from_generator(
    lambda: generate_frames(Path(train_destination_dir), train_df, num_frames, training=True),
    output_signature=output_signature
)
train_ds = train_ds.batch(batch_size)

test_ds = tf.data.Dataset.from_generator(
    lambda: generate_frames(Path(test_destination_dir), test_df, num_frames),
    output_signature=output_signature
)
test_ds = test_ds.batch(batch_size)

In [20]:
for frames, labels in test_ds.take(5):
  print(labels)

{'Vandalism': 0, 'Shoplifting': 1, 'Stealing': 2, 'Fighting': 3, 'Shooting': 4, 'Robbery': 5, 'Normal': 6, 'Burglary': 7}
tf.Tensor([0 1 2 3 4 5 6 7], shape=(8,), dtype=int16)


In [21]:
print(f"Shape: {frames.shape}")
print(f"Label: {labels.shape}")

Shape: (8, 60, 320, 320, 3)
Label: (8,)


In [22]:
import PIL
def tensor_to_image(tensor):
    tensor = tensor*255
    tensor = np.array(tensor, dtype=np.uint8)
    if np.ndim(tensor)>3:
        assert tensor.shape[0] == 1
        tensor = tensor[0]
    return tensor

In [23]:
import numpy as np
from PIL import Image

# Retrieve one batch of frames from the train_ds dataset
batch = next(iter(test_ds))

# Unpack the batch into frames and labels
frames, labels = batch

# Select the first frame from the frames tensor
frame = frames[0, 0]  # Assuming the batch dimension is the first dimension


# Create a list to hold the frames
frames2 = []

# Convert each NumPy array to a PIL Image and append to the frames list
for i in range(0,60):
    frames2.append(Image.fromarray(tensor_to_image(frames[0, i])))


# Create a GIF file with the frames
output_file = 'output.gif'
frames2[0].save(output_file, save_all=True, append_images=frames2[1:], loop=0, duration=200)

print(f"Animated GIF saved as '{output_file}'")

{'Vandalism': 0, 'Shoplifting': 1, 'Stealing': 2, 'Fighting': 3, 'Shooting': 4, 'Robbery': 5, 'Normal': 6, 'Burglary': 7}
Animated GIF saved as 'output.gif'


## the code below is copied from tensorflow website without any changes - task for tomorrow

In [None]:
model_id = 'a5'
resolution = 320

tf.keras.backend.clear_session()

backbone = movinet.Movinet(model_id=model_id)
backbone.trainable = False

# Set num_classes=600 to load the pre-trained weights from the original model
model = movinet_model.MovinetClassifier(backbone=backbone, num_classes=600)
model.build([None, None, None, None, 3])

# Load pre-trained weights
!wget https://storage.googleapis.com/tf_model_garden/vision/movinet/movinet_a0_base.tar.gz -O movinet_a0_base.tar.gz -q
!tar -xvf movinet_a0_base.tar.gz

checkpoint_dir = f'movinet_{model_id}_base'
checkpoint_path = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint = tf.train.Checkpoint(model=model)
status = checkpoint.restore(checkpoint_path)
status.assert_existing_objects_matched()

In [None]:
def build_classifier(batch_size, num_frames, resolution, backbone, num_classes):
  """Builds a classifier on top of a backbone model."""
  model = movinet_model.MovinetClassifier(
      backbone=backbone,
      num_classes=num_classes)
  model.build([batch_size, num_frames, resolution, resolution, 3])

  return model

In [None]:
model = build_classifier(batch_size, num_frames, resolution, backbone, 8)

In [None]:
num_epochs = 15

loss_obj = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

optimizer = tf.keras.optimizers.Adam(learning_rate = 0.01)

model.compile(loss=loss_obj, optimizer=optimizer, metrics=['accuracy'])

In [None]:
results = model.fit(train_ds,
                    validation_data=test_ds,
                    epochs=num_epochs,
                    validation_freq=1,
                    verbose=1)

In [None]:
model.evaluate(test_ds, return_dict=True)

In [None]:
def get_actual_predicted_labels(dataset):
  """
    Create a list of actual ground truth values and the predictions from the model.

    Args:
      dataset: An iterable data structure, such as a TensorFlow Dataset, with features and labels.

    Return:
      Ground truth and predicted values for a particular dataset.
  """
  actual = [labels for _, labels in dataset.unbatch()]
  predicted = model.predict(dataset)

  actual = tf.stack(actual, axis=0)
  predicted = tf.concat(predicted, axis=0)
  predicted = tf.argmax(predicted, axis=1)

  return actual, predicted

In [None]:
def plot_confusion_matrix(actual, predicted, labels, ds_type):
  cm = tf.math.confusion_matrix(actual, predicted)
  ax = sns.heatmap(cm, annot=True, fmt='g')
  sns.set(rc={'figure.figsize':(12, 12)})
  sns.set(font_scale=1.4)
  ax.set_title('Confusion matrix of action recognition for ' + ds_type)
  ax.set_xlabel('Predicted Action')
  ax.set_ylabel('Actual Action')
  plt.xticks(rotation=90)
  plt.yticks(rotation=0)
  ax.xaxis.set_ticklabels(labels)
  ax.yaxis.set_ticklabels(labels)

In [None]:
fg = FrameGenerator(Path(train_destination_dir), num_frames, training = True)
label_names = list(fg.class_ids_for_name.keys())

In [None]:
actual, predicted = get_actual_predicted_labels(test_ds)

In [None]:
print(f'MobileNet Model accuracy on the test set is : {accuracy_score(actual, predicted )*100:.2f}%')