## Install required libraries

In [None]:
!pip install numpy
!pip install opencv-python
!pip install tensorflow
!pip install matplotlib

In [35]:
# Load API's needed for program

import tensorflow as tf
from tensorflow.keras import datasets, layers, models
import matplotlib.pyplot as plt
import numpy as np
import cv2
import glob, os, time, random
import shutil
from google.colab import drive

# Initialize globally used variables

average_frame_size = (144, 256, 3)
categories_int = {}
int_categories = {}
color_model = None
content_model = None

In [21]:
# Plot function to display images for testing
def plotImage(image):
  plt.figure(figsize = (15,2))
  plt.imshow(image)

# Load training data from directory

In [62]:
# Load actual data from file path

# Folder structure should be:
# Level 1 : names of categories as folder name -> timelapse animation frogs ... etc
# Level 2 : name of video as folder name -> frog_in_sun dogs_jumping ... etc
# Level 3 : frames as png with sequential numbering as name -> 1 2 3 4 ... 10

def loadData(file_path_to_folders):
  '''Video data returns a tuple, the first element is the loaded frames
    The second element is a list of the categories as strings

    The loaded frames data is formatted as a dictionary, values are as follows:
    {category_type : [[video1_frames], [video2_frames], ...], category2_type : [...], ...}
    video_data[0]["timelapse"][2][3] gives the third timelapse video's 4th frame
  '''

  categories = next(os.walk(file_path_to_folders))[1]
  video_categories = {}

  for category in categories:
    video_categories[category] = []
    video_titles = next(os.walk(file_path_to_folders + "/" + category))[1]

    for video_title in video_titles:
      final_path = file_path_to_folders + "/" + category + "/" + video_title + "/"
      video_frames = []

      for image in sorted(glob.glob(final_path + "*.png")):
        frame = cv2.imread(image)
        if frame.shape != average_frame_size:
          frame = cv2.resize(frame, (256, 144))
          print(frame.shape , "")
        video_frames.append(frame)

      video_categories[category].append(video_frames)


  for index, category in enumerate(categories):
    categories_int[category] = index
    int_categories[index] = category
  return (video_categories, categories)

# Functions used for average color network data formatting

In [101]:
# Average all frames selected in video into a single image
def averageFrames(frames):
  frameAmount = len(frames)
  averageFrame = np.zeros(average_frame_size)
  for frame in frames:
    #print(averageFrame.shape , " : " , frame.shape)
    averageFrame = averageFrame + frame
  averageFrame = averageFrame / frameAmount
  return averageFrame

# Normalize image for input
def normalizeFrame(frame):
  return (frame / 255)

def get_frames_averaged(frame_data, categories):
  average_frames = {}
  for category in categories:
    average_frames[category] = []
    for video in frame_data[category]:
      average_frames[category].append(normalizeFrame(averageFrames(video)))
  return average_frames

# Functions used for content network data formatting

In [38]:
def differentiateFrames(frames):
  frameAmount = len(frames)
  newFrames = []
  if frameAmount == 1:
    return frames
  for index, frame in enumerate(frames):
    if(index + 1 >= frameAmount):
      return newFrames
    frameCurrent = frame
    frameNext = frames[index + 1]
    frameDifference = frameCurrent - frameNext
    newFrames.append(normalizeFrame(frameDifference))

def get_frames_difference(frame_data, categories):
  differentiated_frames = {}
  for category in categories:
    differentiated_frames[category] = []
    for video in frame_data[category]:
      differentiated_frames[category].append(differentiateFrames(video))
  return differentiated_frames

# Create models for both networks

In [39]:
# Load models as convolutional network with 2 filter layers and 2 pooling layers
# The final network is a dense network of 64 neurons with a final output layer of neurons equal to the category list amount
def create_network_models():
  global color_model
  global content_model
  
  color_model = models.Sequential([
    layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu', input_shape=average_frame_size),
    layers.MaxPooling2D((2, 2)),
    
    layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    
    layers.Flatten(),
    layers.Dense(64, activation='relu'),
    layers.Dense(len(categories), activation='softmax')
  ])

  content_model = models.Sequential([
    layers.Conv2D(filters=32, kernel_size=(3, 3), activation='relu', input_shape=average_frame_size),
    layers.MaxPooling2D((2, 2)),
    
    layers.Conv2D(filters=64, kernel_size=(3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    
    layers.Flatten(),
    layers.Dense(64, activation='relu'),
    layers.Dense(len(categories), activation='softmax')
  ])

  color_model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
  
  content_model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Format data information into model usable variables

In [40]:
# format data for color model in x and y dimensions
def format_color_data(average_frames):
  x_color_train = []
  y_color_train = []
  x_color_test = []
  y_color_test = []

  for category in categories_int.keys():
    for frame in average_frames[category]:
      x_color_train.append(frame)
      y_color_train.append(categories_int[category])

  return (np.asarray(x_color_train), np.asarray(y_color_train), np.asarray(x_color_test), np.asarray(y_color_test))

In [41]:
# format data for content model in x and y dimensions
def format_content_data(differentiated_frames):
  x_content_train = []
  y_content_train = []
  x_content_test = []
  y_content_test = []

  for category in categories_int.keys():
    for video in differentiated_frames[category]:
      for frame in video:
        x_content_train.append(frame)
        y_content_train.append(categories_int[category])

  return (np.asarray(x_content_train), np.asarray(y_content_train), np.asarray(x_content_test), np.asarray(y_content_test))

# Train models and get accuracy

In [28]:
def train_color_model(x_color_train, y_color_train, epoch_count):
  color_model.fit(x_color_train, y_color_train, epochs=epoch_count)

In [29]:
def train_content_model(x_content_train, y_content_train, epoch_count):
  content_model.fit(x_content_train, y_content_train, epochs=epoch_count)

# Load testing video

In [102]:
video = []

def get_video_from_path(path_to_video):
  vidmp4 = cv2.VideoCapture(path_to_video)
  return vidmp4

def get_split_video_frames(video, amount, category):
  video_frames = {}
  for category_type in categories:
    video_frames[category_type] = []
  video_frames[category].append([])

  data_path = "." + "/testingimage"
  if os.path.exists(data_path):
    shutil.rmtree(data_path)
  os.mkdir(data_path)

  iteration = int((video.get(cv2.CAP_PROP_FRAME_COUNT) - 150) / amount)
  success,image = video.read()
  count = 0
  frame_count = 0
  while success:
    if (count % iteration) == 0:
      frame = image
      if frame.shape != average_frame_size:
          frame = cv2.resize(frame, (144, 256))
      cv2.imwrite(data_path + "/" + str(frame_count) + ".png", frame)
      read_frame = cv2.imread(data_path + "/" + str(frame_count) + ".png")
      video_frames[category][0].append(read_frame)
      frame_count = frame_count + 1
    success,image = video.read()
    count += 1
    return video_frames

# Predict a video category from video data

In [105]:
def predict_video_type(color_data, content_data):
  color_pred = color_model.predict(color_data[0])

  predicted_color_category = [np.argmax(element) for element in color_pred][0]
  print("Color prediction : " + str(predicted_color_category))

  content_pred = content_model.predict(content_data[0])

  predicted_content_category = [np.argmax(element) for element in content_pred]
  avg_content_pred = round((np.sum(predicted_content_category)) / len(predicted_content_category))
  print("Content prediction : " + str(avg_content_pred))

  avg_prediction = round((predicted_color_category + avg_content_pred) / 2)
  print("Average prediction : " + str(avg_prediction))

  return (predicted_color_category, avg_content_pred, avg_prediction)

# **Run all methods to train network and test it**

In [100]:
# Mount google drive for reading in data

drive.mount('/content/drive')

# Print categories for referencing

print("\nCategories : " , categories_int)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).

Categories :  {'frogs': 0, 'dogs': 1, 'cats': 2}


In [125]:
# Set path variables according to what is shown in mounted google drive

path_to_train_data = "/content/drive/MyDrive/final_data/data"
path_to_test_data = "/content/drive/MyDrive/final_data/testdata"
test_video_category = "cats"
path_to_test_video = "/content/drive/MyDrive/final_data/test_video/test_video.mp4"

In [None]:
# Use functions above to train the models and load data

# Load data from training directory
path_to_data = path_to_train_data
video_data = loadData(path_to_data)

categories = video_data[1]
frame_data = video_data[0]

# Modify frames into correct data style
average_frames = get_frames_averaged(frame_data, categories)
differentiated_frames = get_frames_difference(frame_data, categories)

# Format the modified frames into dataset usuable by models
frame_color_data = format_color_data(average_frames)
frame_content_data = format_content_data(differentiated_frames)

# Generate the network's models
create_network_models()

# Train the models with formatted data (this may take a while depending on the data loaded)
train_color_model(frame_color_data[0], frame_color_data[1], 20)
train_content_model(frame_content_data[0], frame_content_data[1], 20)

In [None]:
# Use functions above to test the models

# Load data from test directory
test_path_to_data = path_to_test_data
test_video_data = loadData(test_path_to_data)

test_frame_data = test_video_data[0]

# Modify frames into correct data style
test_average_frames = get_frames_averaged(test_frame_data, categories)
test_differentiated_frames = get_frames_difference(test_frame_data, categories)

# Format the modified frames into dataset usuable by models
test_frame_color_data = format_color_data(test_average_frames)
test_frame_content_data = format_content_data(test_differentiated_frames)

# Evaluate models
color_model.evaluate(test_frame_color_data[0], test_frame_color_data[1])
content_model.evaluate(test_frame_content_data[0], test_frame_content_data[1])

In [135]:
# Get data for testing a video

test_video_frames = get_split_video_frames((get_video_from_path(path_to_test_video)), 10, test_video_category)

average_frames_test = get_frames_averaged(test_video_frames, categories)
differentiated_frames_test = get_frames_difference(test_video_frames, categories)

frame_color_data_test = format_color_data(average_frames_test)
frame_content_data_test = format_content_data(differentiated_frames_test)

In [138]:
# Print results of video prediction

results = predict_video_type(frame_color_data_test, frame_content_data_test)

print("Original Category : " + test_video_category)
print("Predicted Category : " + int_categories[results[2]])

Color prediction : 0
Content prediction : 1
Average prediction : 0
Original Category : cats
Predicted Category : frogs
