In [1]:
!pip install pytube
!pip install tensorflow

Collecting pytube
  Downloading pytube-15.0.0-py3-none-any.whl (57 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/57.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytube
Successfully installed pytube-15.0.0


In [2]:
import json
import numpy as np
from pytube import YouTube, exceptions
import os
import cv2
import gc
import tensorflow as tf
from keras.models import Sequential
from keras.layers import TimeDistributed, Conv2D, MaxPooling2D, Flatten, LSTM, Dense
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

In [3]:
def load_json(file_name):
    with open(file_name, 'r') as file:
        return json.load(file)

def load_data(path):
  with open(path, 'r') as file:
        data = json.load(file)
  filtered_data = [item for item in data if item['clean_text'] in classes.keys()]
  # print(len(filtered_data))
  for item in filtered_data:
      item['url'] = 'w' + item['url'].lstrip('https://www.youtube.com')
  return filtered_data

def download_video(url, output_path, filename):
    """Downloads a video from YouTube."""

    file_path = os.path.join(output_path, filename)
    # Skip download if file already exist
    if os.path.exists(file_path):
        return file_path

    try:
        yt = YouTube(url)
        stream = yt.streams.get_highest_resolution()
        stream.download(output_path=output_path, filename=filename)
        return file_path
    except exceptions.VideoPrivate:
        return None
    except exceptions.VideoUnavailable:
        return None
    except Exception as e:
        return None

def extract_and_preprocess_frames(url, start_time, end_time, fps, box, width, height):
  local_video_path = download_video('https://www.youtube.com/' + url, './videos', url.split("=")[1] + '.mp4')
  if local_video_path is not None:
    video = cv2.VideoCapture(local_video_path)
  else:
    return None
  frames = []
  frame_count = int((end_time - start_time) * fps)
  video.set(cv2.CAP_PROP_POS_MSEC, start_time * 1000)

  while len(frames) < frame_count:
      ret, frame = video.read()
      if not ret:
          break
      x_min = int(box[0] * width)
      y_min = int(box[1] * height)
      x_max = int(box[2] * width)
      y_max = int(box[3] * height)
      cropped_frame = frame[y_min:y_max, x_min:x_max]
      # Resize frame
      resized_frame = cv2.resize(cropped_frame, (128, 128))
      # Normalize frame
      normalized_frame = resized_frame.astype(np.float32) / 255.0
      frames.append(normalized_frame)
  return frames

def preprocess_data(data):
  processed_data = []
  # count = 1
  for video in data:
    # print(f"processing {count} videos")
    frames = extract_and_preprocess_frames(video['url'], video['start_time'],
                                           video['end_time'], video['fps'],
                                           video['box'], video['width'],
                                           video['height'])
    if frames is None:
      continue
    label = video['label']
    processed_data.append({'label': label, 'frames': frames})

    del frames  # Delete frames to free up memory
    gc.collect()

    # count += 1

  return processed_data

In [None]:
classes_data = load_json('./drive/MyDrive/csc413_project_data/MSASL_classes.json')
classes = {word: idx for idx, word in enumerate(classes_data)}

train_data = load_data('./drive/MyDrive/csc413_project_data/MSASL_train.json')
test_data = load_data('./drive/MyDrive/csc413_project_data/MSASL_test.json')
val_data = load_data('./drive/MyDrive/csc413_project_data/MSASL_val.json')

train_data = preprocess_data(train_data)
test_data = preprocess_data(test_data)
val_data = preprocess_data(val_data)

In [None]:
def create_model():
    # Define a CNN model to extract features from each frame
    cnn_base = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=(128, 128, 3)),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu'),
        MaxPooling2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu'),
        MaxPooling2D((2, 2)),
        Flatten(),
    ])

    # Define the full model that includes LSTM layers
    model = Sequential([
        TimeDistributed(cnn_base, input_shape=(None, 128, 128, 3)),
        LSTM(64, return_sequences=True),
        LSTM(64),
        Dense(64, activation='relu'),
        Dense(len(classes), activation='softmax')
    ])

    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

model = create_model()

In [None]:
def evaluate_and_plot(model, test_data):
    test_loss, test_accuracy = model.evaluate(test_data['frames'], test_data['labels'], verbose=0)
    print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")

    # Predictions for confusion matrix
    predictions = model.predict(test_data['frames'])
    cm = confusion_matrix(test_data['labels'], np.argmax(predictions, axis=1))
    sns.heatmap(cm, annot=True, fmt='d')
    plt.title('Confusion Matrix')
    plt.ylabel('Actual labels')
    plt.xlabel('Predicted labels')
    plt.show()

In [None]:
def train_model(model, data, val_data, test_data, epochs=10, batch_size=1):
    for epoch in range(epochs):
        np.random.shuffle(data)  # Shuffle the data each epoch
        for video in data:
            frames = np.array([video['frames']])  # Shape: (1, num_frames, 224, 224, 3)
            label = np.array([video['label']])
            model.train_on_batch(frames, label)
        evaluate_and_plot(model, val_data)
    evaluate_and_plot(model, test_data)

train_model(model, preprocess_data(train_data), preprocess_data(val_data), preprocess_data(test_data), epochs=1, batch_size=10)