In [214]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# !unzip /content/drive/MyDrive/Senior/Spring/EC523/FinalProject/ec523/data.zip
!unzip /content/drive/MyDrive/School/Graduate/Classes/2024\ Spring/EC523\ Deep\ Learning/data.zip


# Performing lip detection and extraction from a video using a Haar cascade classifier

In [235]:
import os

import cv2
import tensorflow as tf


def process_video_file(path: str) -> tf.Tensor:
    """
    Images are now fully pre-processed here before being stored or saved.
    """

    # Load video
    cap = cv2.VideoCapture(path)  # e.g. /content/data/videos/bbaf2n.mpg

    frame_count = 0
    # processed_frames = []
    tensors_list = []

    while True:

        # Read frame from vid
        ret, frame = cap.read()
        if not ret:
            break

        # Convert to grey scale
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Lip detection
        for x in reversed(range(5, 8)):  # lower conditions as needed to find best match
            lips = lip_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=x, minSize=(60, 30), maxSize=(120, 60))
            if len(lips) > 0:
                break
        if len(lips) == 0:  # if still no match, use static vals
            lips = [[50, 150, 200, 100]]
        (x, y, w, h) = lips[0]
        # print(f'{frame_count:02d}', ': ', lips[0])

        # Store normalized, grayscaled, and uniformly cropped
        gray_normal_crop = cv2.normalize(gray, gray, 0, 100, cv2.NORM_MINMAX)[y:y+h, x:x+w]
        final_img = cv2.resize(gray_normal_crop, (80, 40))
        tensors_list.append(final_img)

        # Stored in tensor, rather than saving to file
        # cv2.imwrite(os.path.join(output_dir, f'frame_{frame_count}.jpg'), final_img)
        # cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 200, 0), 2)  # draws rectangle
        # cv2.imwrite(os.path.join(output_dir, f'frame_{frame_count}.jpg'), frame)
        # processed_frames.append(cv2.normalize(gray, gray, 0, 100, cv2.NORM_MINMAX)[y:y+h, x:x+w])

        frame_count += 1

    # TODO: tf.normalize across all images
    # mean = tf.math.reduce_mean(processed_frames)
    # std = tf.math.reduce_std(tf.cast(processed_frames, tf.float32))
    # processed_frames = (tf.cast((processed_frames - mean), tf.float32) / std).numpy()
    # for i, v in enumerate(processed_frames):
    #     output_path = os.path.join(output_dir, f"frame_{i}.jpg")
    #     cv2.imwrite(output_path, v)
    # print(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)))  # number of frames in video

    cap.release()

    video_tensor = tf.convert_to_tensor(np.array(tensors_list))

    # print(f"Finished processing video; {frame_count} frames saved in:", output_dir)
    # print(f'\nvideo_tensor ({type(video_tensor)}, {video_tensor.shape}): \n', video_tensor)

    return video_tensor


# Load annotations
def load_annotations(annotation_file: str) -> list[str]:

    with open(annotation_file, 'r') as file:
        lines = file.readlines()
        annotations = [line.strip().split() for line in lines]

    return annotations


def process_annotations_file(path: str):
    """
    Should experiment with syllable labels vs. video labels...
    """

    # annotation_file = "/content/data/annotations/s1/bbaf2n.align"
    annotations = load_annotations(path)

    # Map frames to annotations based on filename; each annotation line contains:
    # [<start of syllable to thousandth of a frame>,
    #    <end of syllable to thousandth of a frame>,
    #    <syllable/annotation>]
    # frame_labels = []
    video_label = []
    for line in annotations:
        # rng = np.array([int(line[0]), int(line[1])]) // 1000
        # frame_labels[rng[0]:rng[1] + 1] = [line[2]] * ((rng[1] + 1) - rng[0])
        for letter in line[2]:
            video_label.append(ord(letter) - 97)
    # print(f'\nannotated_frames ({(cnt := len(frame_labels))}): \n', frame_labels)
    # print(f'\nvideo_label ({len(video_label)}): \n', video_label)

    # Tests (for bbaf2n)
    # assert cnt == frame_count
    # assert frame_labels[33] == 'blue' and frame_labels[34]      == 'at'
    # assert frame_labels[0]  == 'sil'  and frame_labels[cnt - 1] == 'sil'
    # assert video_label[0]   == 18     and video_label[3]        == 1

    # TODO: Remove 'sil' (i.e. silence) frames?

    return video_label


In [242]:
import os

import cv2
import tensorflow as tf

# Directory to store processed frames
output_dir = "/content/processed_frames"
os.makedirs(output_dir, exist_ok=True)
videos_dir = '/content/data/videos'
annotations_dir = '/content/data/annotations/s1'
video_limit = 100  # On Google Colab Pro, 100 takes about ~6 mins to load

# Use Haar cascade for lip detection
lip_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_smile.xml')

# Loop over files, loading the
# video_subset = ['bbaf2n', 'bbaf3s', 'bbaf4p']
inputs = []
labels = []

for i, video_name in enumerate(os.listdir(videos_dir)):
    if video_name.endswith('.mpg'):
        inputs.append(process_video_file(f'{videos_dir}/{video_name}'))
        print(i, ': ', video_name)
    if i > video_limit:
        break

for i, annotations_name in enumerate(os.listdir(annotations_dir)):
    if annotations_name.endswith('.align'):
        labels.append(process_annotations_file(f'{annotations_dir}/{annotations_name}'))
        print(i, ': ', annotations_name)
    if i > video_limit:
        break

inputs = np.array([inputs])
labels = np.array([labels])

print(f'\ninputs ({inputs.shape}): \n')#, inputs)
print(f'\nlabels ({labels.shape}): \n')#, labels)


ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (204,) + inhomogeneous part.

In [197]:
"""
Now being handled above
"""

import os

import cv2
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from keras import layers, models
from keras.utils import to_categorical

# Define width, height, and number of frames
width      = 80  # originally 224
height     = 40  # originally 224
num_frames = 75  # Number of frames per video (currently using all frames)
channels   = 3


# extract label from filename
def extract_label(filename):
    label = filename.split("_")[1].split(".")[0]  # labels are now defined above
    return label


# Function to load and preprocess frames
# (Now being handled above, before the images are saved, to reduce space)
def load_and_preprocess_frames(directory):
    frames = []
    labels = []

    for filename in os.listdir(directory):
        if filename.endswith(".jpg"):
            # load the frame
            img = cv2.imread(os.path.join(directory, filename))
            # preprocess frame (resize, normalize, etc.)
            img = cv2.resize(img, (width, height))  # resize frame
            img = img / 255.0  # normalize pixel values to [0, 1]
            # append the frame and label
            frames.append(img)
            labels.append(extract_label(filename))  # extract label

    return np.array(labels)#, np.array(frames)


# Load and preprocess frames
# labels = load_and_preprocess_frames("/content/processed_frames")
# print(labels)


In [None]:
from sklearn.model_selection import train_test_split

# split the data into training and validation sets
# X_train, X_val, y_train, y_val = train_test_split(frames, labels, test_size=0.2, random_state=42)
# print(f'\nX_train ({X_train.shape}): \n', X_train)
# print(f'\nX_val   ({X_val.shape}):   \n', X_val)
# print(f'\ny_train ({y_train.shape}): \n', y_train)
# print(f'\ny_val   ({y_val.shape}):   \n', y_val)


# Model


In [74]:
import tensorflow as tf
from tensorflow import keras
from keras import layers, models
from keras.utils import to_categorical

# Assuming 'height', 'width', 'frames', and 'channels' are defined
# Calculate the number of unique classes or labels
num_classes = len(set(labels))

# Convert labels to one-hot encoding
y_train_encoded = to_categorical(y_train, num_classes)
y_val_encoded   = to_categorical(y_val,   num_classes)

# Define the lip reading model architecture
model = models.Sequential([
    # 3D Convolutional layers
    layers.Conv3D(32,   (3, 3, 3), activation='relu', padding='same', input_shape=(num_frames, height, width, channels)),
    layers.MaxPooling3D((2, 2, 2)),
    layers.Conv3D(64,   (3, 3, 3), activation='relu', padding='same'),
    layers.MaxPooling3D((2, 2, 2)),
    layers.Conv3D(128,  (3, 3, 3), activation='relu', padding='same'),
    layers.MaxPooling3D((1, 2, 2)),  # Adjusted pooling size

    # Flatten layer
    layers.Flatten(),

    # Fully connected layers
    layers.Dense(128, activation='relu'),
    layers.Dense(num_classes, activation='softmax')
])

# Compile the model
model.compile(optimizer='adam',
              loss='categorical_crossentropy',
              metrics=['accuracy'])


ValueError: The `kernel_size` argument must be a tuple of 2 integers. Received: (3, 3, 3)

# Model Accuracy


In [68]:
import matplotlib.pyplot as plt

# train model using your processed frames as input data
history = model.fit(X_train, y_train_encoded, epochs=10, batch_size=32, validation_data=(X_val, y_val_encoded))

# model performance
loss, accuracy = model.evaluate(X_val, y_val_encoded)
print("Validation Loss:", loss)
print("Validation Accuracy:", accuracy)

# plot training history
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()


Epoch 1/10


ValueError: in user code:

    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1401, in train_function  *
        return step_function(self, iterator)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1384, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1373, in run_step  **
        outputs = model.train_step(data)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1150, in train_step
        y_pred = self(x, training=True)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/input_spec.py", line 298, in assert_input_compatibility
        raise ValueError(

    ValueError: Input 0 of layer "sequential_12" is incompatible with the layer: expected shape=(None, 10, 224, 224, 3), found shape=(None, 224, 224, 3)
