In [None]:
import sys

import numpy as np
import matplotlib.pyplot as plt
from tabulate import tabulate
from sklearn.preprocessing import MinMaxScaler
import idx2numpy
import os
from PIL import Image
import pickle
import shutil
import cv2
from scipy.special import expit


In [None]:
class VideoClassifier:
    def __init__(self, databaseName):
        self.databaseName = databaseName
        self.X = None
        self.y = None
        #(64 * 64) / 4 = 1024
        self.w1 = np.random.randn(64, 1024) * 0.01
        self.w2 = np.random.randn(64, 1024) * 0.01
        self.w3 = np.random.randn(64, 1024) * 0.01
        self.w4 = np.random.randn(64, 1024) * 0.01
        self.w5 = np.random.randn(6, 64) * 0.01
        self.w6 = np.random.randn(6, 64) * 0.01
        self.b1 = np.zeros((64))
        self.b2 = np.zeros((64))
        self.b3 = np.zeros((6))

    @staticmethod
    def createDatabase(folder, databaseName):
        X_data = []
        y_data = []
        os.makedirs("trained_data", exist_ok=True)
        for subfolder in os.listdir(folder):
         print("accessing subfolder: " + subfolder)
         for file in os.listdir(folder+"/"+subfolder):
            print("reading file: " + file)
            if file.endswith('.avi'):
                class_label = file.split("_")[1]
                path_start = folder + "/" + subfolder
                video_path = os.path.join(path_start, file)
                vidcap = cv2.VideoCapture(video_path)
                frame_index = 0
                while(vidcap.isOpened()):
                    ret, frame = vidcap.read()
                    if ret == False:
                        break
                    #cv2.imwrite('trained_data/'+ file + str(frame_index)+'.jpg', frame)
                    #img = Image.open('trained_data/'+ file + str(frame_index)+'.jpg')
                    img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    img = Image.fromarray(img)
                    img = img.convert('L')
                    img = img.resize((64, 64))
                    X_data.append(np.array(img))
                    y_data.append(class_label)
                    print(class_label)
                    frame_index+=1
        with open(databaseName + "_X" + ".pkl", "wb") as db:
            pickle.dump(X_data,db)
        with open(databaseName + "_y" + ".pkl", "wb") as db:
            pickle.dump(y_data, db)

    def openDatabase(self):
        with open(self.databaseName + "_X" + ".pkl", "rb") as X:
            self.X = pickle.load(X)
        with open(self.databaseName + "_y" + ".pkl", "rb") as y:
            self.y = pickle.load(y)

    @staticmethod
    def normalizeBinary(video_path):
        frames = []
        vidcap = cv2.VideoCapture(video_path)
        while(vidcap.isOpened()):
            ret, frame = vidcap.read()
            if ret == False:
                break
            #convert from opencv bgr to rgb
            img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            img = Image.fromarray(img)
            img = img.convert('L')
            img = img.resize((64, 64))
            img = np.array(img)
            img = img / 255.0
            frames.append(img)
        return frames
    #O(n*m), not ideal, will work on it later
    def classifyVideo(self, video_path, normFunction):
        normalized_video = normFunction(video_path)
        distances = []
        for f, label in zip(self.X, self.y):
            distance = 0
            for frame in normalized_video:
                distance += np.linalg.norm(frame - f)
            distances.append((distance, label))

        distances.sort(key=lambda x: x[0])
        return distances[0][1]


    def sigmoid(self, x):
        return expit(x)

    def forwardPass(self, x):
        data = np.array_split(x, 4)
        x1 = data[0]
        x2 = data[1]
        x3 = data[2]
        x4 = data[3]
        a1 = np.dot(self.w1,x1) + np.dot(self.w2,x2) + self.b1
        h1 = self.sigmoid(a1)
        a2 = np.dot(self.w3,x3) + np.dot(self.w4,x4) + self.b2
        h2 = self.sigmoid(a2)
        a3 = np.dot(self.w5,h1) + np.dot(self.w6, h2) + self.b3
        cache = {"x1": x1, "x2": x2, "x3": x3, "x4": x4, "a1": a1, "a2":a2, "a3":a3, "h1":h1, "h2":h2, "w5": self.w5, "w6": self.w6}
        return self.sigmoid(a3), cache

    def backwardPass(self, ytarget, ypred, cache):
        x1, x2, x3, x4, h1, h2, w5, w6 = cache["x1"],cache["x2"],cache["x3"],cache["x4"],cache["h1"],cache["h2"],cache["w5"],cache["w6"],

        grad_a3 = ypred - ytarget
        grad_w5 = np.outer(grad_a3, h1)
        grad_w6 = np.outer(grad_a3, h2)
        grad_b3 = grad_a3

        grad_h1 = np.dot(w5.T, grad_a3)
        grad_h2 = np.dot(w6.T, grad_a3)
        #multiply by the derivative of the sigmoid function
        grad_a1 = grad_h1 * (h1 * (1-h1))
        grad_a2 = grad_h2 * (h2 * (1-h2))

        grad_w1 = np.outer(grad_a1, x1)
        grad_w2 = np.outer(grad_a1, x2)
        grad_b1 = grad_a1
        grad_w3 = np.outer(grad_a2, x3)
        grad_w4 = np.outer(grad_a2, x4)
        grad_b2 = grad_a2
        gradiants = {"dw1":grad_w1, "dw2": grad_w2, "dw3": grad_w3, "dw4": grad_w4, "dw5": grad_w5, "dw6" :grad_w6, "db1": grad_b1, "db2": grad_b2, "db3":grad_b3}
        return gradiants


    def train(self, learning_rate,repetitions):
        #train the weights and biases
        for z in range(repetitions):
            loss = 0
            label_to_number = {label: i for i, label in enumerate(set(self.y))}
            for x, y in zip(self.X, self.y):
                x_flat = np.concatenate([img.flatten() for img in x])
                y_target = np.zeros(len(label_to_number))
                y_target[label_to_number[y]] = 1
                y_pred, cache = self.forwardPass(x_flat)
                loss += np.mean((y_pred - y_target)**2)
                gradients = self.backwardPass(y_target, y_pred, cache)
                self.w1 -= gradients['dw1'] * learning_rate
                self.w2 -= gradients['dw2'] * learning_rate
                self.w3 -= gradients['dw3'] * learning_rate
                self.w4 -= gradients['dw4'] * learning_rate
                self.w5 -= gradients['dw5'] * learning_rate
                self.w6 -= gradients['dw6'] * learning_rate
                self.b1 -= gradients['db1'] * learning_rate
                self.b2 -= gradients['db2'] * learning_rate
                self.b3 -= gradients['db3'] * learning_rate
            print(str(z+1) + " out of " + str(repetitions) + " repetitions complete")


    def classifyVideoNN(self, video_path, normFunction):
        best_label = None
        best_distance = sys.maxsize
        normalized_video = normFunction(video_path)
        normalized_video_flat = [img.flatten() for img in normalized_video]
        output_video_input = [self.forwardPass(f)[0] for f in normalized_video_flat]
        for label in set(self.y):
            x = [X for X, y in zip(self.X, self.y) if y == label]
            print(label)
            x_flat = [img.flatten() for img in x]
            output = [self.forwardPass(f)[0] for f in x_flat]
            distance = abs(np.mean(output) - np.mean(output_video_input))
            print("current label: " + label + " and its distance: " + str(distance))
            if(distance < best_distance):
                best_distance = distance
                best_label = label
                print("best label: " + best_label + " and its distance: " + str(best_distance))
        return best_label


In [None]:
#Example
#I created a testfolder which contained 6 sports/activities: Archery, BasketballDunk, HorseRace, PlayingPiano, PlayingViolin and Surfing
folder = "UCF-101/TestFolder"
databaseName = "exampleset"
VideoClassifier.createDatabase(folder, databaseName)
Classifier = VideoClassifier(databaseName)
Classifier.openDatabase()
print(Classifier)
#Classifier.classifyVideo("test_set/v_Archery_g01_c01.avi", Classifier.normalizeBinary)
#neural network training
Classifier.train(0.0001,10)

In [None]:
#neural network training example
result = Classifier.classifyVideoNN("test_set/v_Archery_g01_c01.avi", Classifier.normalizeBinary)
print(result)