In [1]:
#!pip install git+https://github.com/tensorflow/docs
#!pip install imutils
#!pip install imagio

In [1]:
from tensorflow_docs.vis import embed
from tensorflow import keras
from imutils import paths
import keras
import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import imageio
import cv2 
import os

In [2]:
train_df=pd.read_csv('train.csv')
test_df=pd.read_csv('test.csv')
test_df

Unnamed: 0.1,Unnamed: 0,video_name,tag
0,0,dataset/test/dancing/dancing (21).mp4,dancing
1,1,dataset/test/dancing/dancing (22).mp4,dancing
2,2,dataset/test/dancing/dancing (23.mp4,dancing
3,3,dataset/test/dancing/dancing (24).mp4,dancing
4,4,dataset/test/dancing/dancing (25).mp4,dancing
5,5,dataset/test/exercise/exercis (1).mp4,exercise
6,6,dataset/test/exercise/exercis (2).mp4,exercise
7,7,dataset/test/exercise/exercis (3).mp4,exercise
8,8,dataset/test/exercise/exercise (25).mp4,exercise
9,9,dataset/test/exercise/exercise (26).mp4,exercise


In [3]:
IMG_SIZE=224

def crop_image_square(frame):
    x,y=frame.shape[0:2]
    min_dim=min(x,y)
    start_x=(x//2)-(min_dim//2)
    start_y=(y//2)-(min_dim//2)
    return frame[start_y:star_y+min_dim,start_x:start_x+min_dim]

def load_video(path, max_frames=0,resize=(IMG_SIZE,IMG_SIZE)):
    cap=cv2.VideoCapture(path)
    frames=[]
    try:
        while True:
            ret,frame=cap.read()
            if not ret:
                break
            frame=crop_image_square(frame)
            frame=cv2.resize(frame, resize)
            frame=frame[:,:,[2,1,0]]
            frames.append(frame)
            if len(frames)==max_frames:
                break
    finally:
        cap.release()
    return np.array(frames)

In [4]:
def build_features_extractor():
    features_extractor=keras.applications.InceptionV3(
        weights='imagenet',
        include_top=False,
        pooling='avg',
        input_shape=(IMG_SIZE,IMG_SIZE,3),
    )
    preprocess_input=keras.applications.inception_v3.preprocess_input
    inputs=kreas.Input((IMG_SIZE,IMG_SIZE,3))
    preprocessed=preprocess_input(inputs)
    
    outputs=feature_extractor(preprocessed)
    return keras.Model(inputs,outputs,name="feature_extractor")    

In [5]:
label_processor=keras.layers.StringLookup(num_oov_indices=0,vocabulary=pd.unique(train_df["tag"]))
print(label_processor.get_vocabulary())

labels=train_df['tag'].values
labels=label_processor(labels[...,None]).numpy()
labels

['dancing', 'exercise', 'yoga']


array([[0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [0],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
       [1],
    

In [6]:
BATCH_SIZE=64
EPOCHS=100
MAX_SEQ_LENGTH=20
NUM_FEATURES=2048

In [7]:
def prepare_all_videos(df, root_dir):
    num_samples=len(df)
    video_paths=df['video_name'].values.tolist()
    labels=df['tag'].values
    labels=label_processor(labels[...,None]).numpy()
    
    frame_masks=np.zeros(shape=(num_samples,MAX_SEQ_LENGTH),dtype="bool")
    frame_features=np.zeros(shape=(num_samples,MAX_SEQ_LENGTH,NUM_FEATURES),dtype="float32")
    
    for idx,path in enumerate(video_paths):
        frames=load_video(os.path.join(root_dir,path))
        frames=frames[None,...]
        temp_frame_mask=np.zeros(shape=(1,MAX_SEQ_LENGTH),dtype='bool')
        temp_frame_features=np.zeros(shape=(1,MAX_SEQ_LENGTH,NUM_FEATURES),dtype="float32")
        
        for i,batch in enumerate(frames):
            video_length=batch.shape[0]
            length=min(MAX_SEQ_LENGTH,video_length)
            for j in range(length):
                temp_frame_feature[i,j,:]=feature_extractor.predict(
                batch[None,j,:])
                temp_frame_mask[i,:length]=1
            frame_masks[idx,]=temp_frame_mask.squeeze()
            frame_features[idx,]=temp_frame_features.squeeze()
            
    return (frame_features,frame_masks),labels

train_data,train_labels=prepare_all_videos(train_df,"train")
test_data,test_labels=prepare_all_videos(test_df,"test")

train_data
print(f'Frame features in data set: {train_data[0].shape}')
print(f'Frame masks in data set: {train_data[1].shape}')

print(f'train_labels in train set: {train_labels.shape}')
print(f'test_labels in train set: {test_labels.shape}')


Frame features in data set: (145, 20, 2048)
Frame masks in data set: (145, 20)
train_labels in train set: (145, 1)
test_labels in train set: (22, 1)


In [12]:
def get_sequence_model():
    class_vocab=label_processor.get_vocabulary()
    frame_features_input=keras.Input((MAX_SEQ_LENGTH,NUM_FEATURES))
    mask_input=keras.Input((MAX_SEQ_LENGTH),dtype='bool')
    
    x=keras.layers.GRU(16,return_sequences=True)(frame_features_input,mask=mask_input)
    x=keras.layers.GRU(8)(x)
    x=keras.layers.Dropout(0.4)(x)
    x=keras.layers.Dense(8,activation='relu')(x)
    output=keras.layers.Dense(len(class_vocab),activation='softmax')(x)
    
    rnn_model=keras.Model([frame_features_input,mask_input],output)
    rnn_model.compile(
        loss="sparse_categorical_crossentropy",optimizer="adam",metrics=['accuracy']
    )
    return rnn_model
EPOCHS=30
def run_experiment():
    file_path='./tmp/video_classifier'
    checkpoint=keras.callbacks.ModelCheckpoint(
        file_path,save_weights_only=True, save_best_only=True, verbose=1
    )
    seq_model=get_sequence_model()
    history=seq_model.fit(
        [train_data[0],train_data[1]],
        train_labels,
        validation_split=0.3,
        epochs=EPOCHS,
        callbacks=[checkpoint]
    )
    seq_model.load_weights(file_path)
    _,accuracy=seq_model.evaluate([test_data[0],test_data[1]],test_labels)
    print(f'Test accuracy: {round(accuracy*100,2)}%')
    return history, seq_model

_,sequence_model=run_experiment()

Epoch 1/30
Epoch 1: val_loss improved from inf to 1.10351, saving model to ./tmp\video_classifier
Epoch 2/30
Epoch 2: val_loss did not improve from 1.10351
Epoch 3/30
Epoch 3: val_loss did not improve from 1.10351
Epoch 4/30
Epoch 4: val_loss did not improve from 1.10351
Epoch 5/30
Epoch 5: val_loss did not improve from 1.10351
Epoch 6/30
Epoch 6: val_loss did not improve from 1.10351
Epoch 7/30
Epoch 7: val_loss did not improve from 1.10351
Epoch 8/30
Epoch 8: val_loss did not improve from 1.10351
Epoch 9/30
Epoch 9: val_loss did not improve from 1.10351
Epoch 10/30
Epoch 10: val_loss did not improve from 1.10351
Epoch 11/30
Epoch 11: val_loss did not improve from 1.10351
Epoch 12/30
Epoch 12: val_loss did not improve from 1.10351
Epoch 13/30
Epoch 13: val_loss did not improve from 1.10351
Epoch 14/30
Epoch 14: val_loss did not improve from 1.10351
Epoch 15/30
Epoch 15: val_loss did not improve from 1.10351
Epoch 16/30
Epoch 16: val_loss did not improve from 1.10351
Epoch 17/30
Epoch 

Test accuracy: 22.73%


In [14]:
def prepare_single_video(frames):
    frames=frames[None,...]
    frame_mask=np.zeros(shape=(1,MAX_SEQ_LENGTH,),dtype='bool')
    frame_features=np.zeros(shape=(1,MAX_SEQ_LENGTH,NUM_FEATURES),dtype='float32')
    
    for i,batch in enumerate(frames):
        video_length=batch.shape[0]
        length=min(MAX_SEQ_LENGTH,video_length)
        for j in range(length):
            frame_features[i,j,:]=feature_extractor.predict(batch[None,j,:])
        frame_mask[i,:length]=1
    return frame_features,frame_mask

def sequence_prediction(path):
    class_vocab=label_processor.get_vocabulary()
    frames=load_video(os.path.join('test',path))
    frame_features,frame_mask=prepare_single_video(frames)
    probabilities=sequence_model.predict([frame_features,frame_mask])[0]
    
    for i in np.argsort(probabilities)[::-1]:
        print(f"{class_vocab[i]}:{probabilities[i]*100:5.2f}%")
    return frames
test_video=np.random.choice(test_df["video_name"].values.tolist())
print(f"Test video path: {test_video}")
test_frames=sequence_prediction(test_video)

Test video path: dataset/test/exercise/exercise (27).mp4
dancing:33.42%
exercise:33.41%
yoga:33.17%
