https://thebinarynotes.com/video-classification-keras-convlstm/

Version History:
- v1.0.0: Original tutorial code, with I/O from boiling videos
- v1.1.0: Running code with more frames and epochs over the weekend.
- v1.2.0: Changing frame reading to skip frames.
- v1.3.0: Using new videos to test pre-trained model. Test A videos.
- v1.3.1: Using new videos to test pre-trained model. Test B videos.

In [1]:
import keras
from keras import applications
from keras.preprocessing.image import ImageDataGenerator
from keras import optimizers
from keras.models import Sequential, Model, load_model
from keras.layers import *
from keras.callbacks import ModelCheckpoint, LearningRateScheduler, TensorBoard, EarlyStopping
 
import os
import cv2
import numpy as np
import matplotlib.pyplot
import seaborn as sns
import math

from sklearn.model_selection import train_test_split
import keras_metrics as km 
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

Using TensorFlow backend.


In [2]:
data_dir = "videos/"
img_height , img_width = 128, 128
seq_len = 10  # Number of frames to extract

# 0: 2W, 1: Others, 2: CHF
classes = ["Class 0", "Class 1", "Class 2"]
 
#  Creating frames from videos
 
def frames_extraction(video_path):    
    frames_list = []    
    vidObj = cv2.VideoCapture(video_path)
    
    # Extract total number of frames, then set spacing using desired number of frames (seq_len)
    total_frames = vidObj.get(7)
    skip = math.floor(total_frames / seq_len)
    
    # Used as counter variable 
    count = 1
    
    while count <= total_frames: 
        vidObj.set(1, count) 
        success, image = vidObj.read()
        #print(vidObj.get(1))
        if success:
            image = cv2.resize(image, (img_height, img_width))
            frames_list.append(image)
            count += skip
        else:
            #print("Defected frame: ", vidObj.get(1))
            print("Defected frame")
            break
 
    vidObj.release()        
    
    return frames_list, count
 
def create_data(input_dir):
    X = []
    Y = []
     
    classes_list = os.listdir(input_dir)
     
    for c in classes_list:
        if c != "Class 1, small":
            print(c)
            files_list = os.listdir(os.path.join(input_dir, c))
            for f in files_list:
                print (f, end="\r")
                frames, count = frames_extraction(os.path.join(os.path.join(input_dir, c), f))
                if len(frames) == (seq_len):
                    #print("yes")
                    X.append(frames)

                    y = [0]*len(classes)
                    y[classes.index(c)] = 1
                    Y.append(y)
        else:
            pass
     
    X = np.asarray(X)
    Y = np.asarray(Y)
    return X, Y, frames

In [3]:
X, Y, frames = create_data(data_dir)

Class 0
Defected framei
2W_split009.avi

KeyboardInterrupt: 

In [None]:
# Made directories with just 1 video each for testing

#classes = ["1", "2"]
#A, B, test_frames = create_data("test_videos/")

In [None]:
# Shows a desired frame in a separate window. Press '0' to close.

#cv2.imshow('Frame', test_frames[5])
#cv2.waitKey(0)
#cv2.destroyAllWindows()

In [7]:
# Testing printing filenames on one line:
import time
array = np.arange(1000)
for x in range(len(array)):  
    print (array[x], end="\r")
    time.sleep(1)

5

KeyboardInterrupt: 

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.20, shuffle=True, random_state=0)

In [None]:
loaded_model = load_model("convlstm_v1.1.0.h5")

In [None]:
y_pred = loaded_model.predict(X)

In [None]:
y_pred_report = np.argmax(y_pred, axis = 1)
y_test_report = np.argmax(y_test, axis = 1)

In [None]:
print(classification_report(y_test_report, y_pred_report))

In [None]:
matrix = confusion_matrix(y_test_report, y_pred_report)

In [None]:
print(matrix)

In [None]:
figure = matplotlib.pyplot.figure(figsize=(8, 8))
sns.heatmap(matrix, annot=True,cmap=matplotlib.pyplot.cm.Blues)
#matplotlib.pyplot.
matplotlib.pyplot.tight_layout()
matplotlib.pyplot.ylabel('True label')
matplotlib.pyplot.xlabel('Predicted label')
#matplotlib.pyplot.savefig('conmatrix_convlstm_v1.2.0.png',bbox_inches='tight')
matplotlib.pyplot.show()

In [None]:
print(type(X_test))
print(type(X))