In [3]:
import numpy as np
import pandas as pd
import cv2
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, LSTM, Masking, Dropout
from tensorflow.keras.models import Sequential



In [None]:
print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))


Num GPUs Available: 1


In [None]:
def padding(frames, max_frames):
    new_frames = frames
    while len(new_frames) < max_frames:
        zero_array = np.zeros((224,224, 3))
        new_frames.append(zero_array)
    
    return new_frames

In [None]:
def extract_and_resize_frames(video_path, target_size=(224,224), max_frames=120):
    cap = cv2.VideoCapture(video_path)
    frames = []
    
    while len(frames) < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
    
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_resized = cv2.resize(frame, target_size)
        frame_resized = frame_resized / 255.0
        frames.append(frame_resized)
    
    #while len(frames) < max_frames:
    #    frames.append(np.zeros((640, 360)))

    frames = padding(frames, 120)
    
    cap.release()
    return np.array(frames)

In [None]:
def create_padding_mask(frames, max_frames=120):
    mask = np.ones((max_frames,))  
    if len(frames) < max_frames:
        mask[len(frames):] = 0  
    return mask

In [None]:
def repeat_rows(df, num_repeats=120):
    repeated_df = pd.DataFrame(np.tile(df.values, (num_repeats, 1)), columns=df.columns)
    
    return repeated_df

In [None]:
def generate_df(X_folder, y_filepath):
    X_filepaths = []
    X_arrays = []
    
    for i in range(2):
        X_filename = f"{X_folder}{i+1}.mp4"
        X_filepaths.append(X_filename)
    
    for filepath in X_filepaths:
        X_array = extract_and_resize_frames(filepath)
        X_arrays.append(X_array)

    y_df = pd.read_csv(y_filepath)
    y_df.drop(columns='filename', axis=1, inplace=True)
    y_df = y_df.head(2)
    


    X_arrays = np.array(X_arrays)   
    y_array = y_df.to_numpy()
    
    return X_arrays, y_array

In [None]:
X, y = generate_df("../model/data/videos/", "../model/data/coordinates/coordinate_data.csv")
y.shape


(2, 2)

In [None]:
def create_feature_extraction_cnn(input_shape):
    base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=input_shape)
    
    for layer in base_model.layers:
        layer.trainable = False
        
    model = Sequential([
        base_model,
        GlobalAveragePooling2D(),
        Dense(256, activation='relu')
    ])
    
    return model

In [None]:
feature_extracter = create_feature_extraction_cnn(input_shape=(224,224,3))
feature_extracter.compile(
    optimizer = 'adam',
    loss='categorical_crossentropy',
    metrics=['accuracy']
)
feature_extracter.fit(
    X, y, epochs=25
)

Epoch 1/25


ValueError: Exception encountered when calling Sequential.call().

[1mInvalid input shape for input Tensor("data:0", shape=(None, 120, 224, 224, 3), dtype=float32). Expected shape (None, 224, 224, 3), but input has incompatible shape (None, 120, 224, 224, 3)[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=(None, 120, 224, 224, 3), dtype=float32)
  • training=True
  • mask=None

In [None]:
def build_model(input_shape, X, y):
    model = Sequential([
        Masking(mask_value=0.0, input_shape = input_shape),
        LSTM(64, return_sequences=True),
        Dropout(0.2),
        LSTM(64),
        Dropout(0.2),

        
        Dense(2)
    ])
    
    model.compile(
        optimizer='adam', 
        loss='mse',
        metrics = ['mae']
    )
    
    model.fit(X, y, epochs=5)
    
    return model

In [None]:
model = build_model(input_shape=(120,244,244), X=X, y=y)

  super().__init__(**kwargs)


ValueError: Input 0 of layer "lstm_13" is incompatible with the layer: expected ndim=3, found ndim=4. Full shape received: (None, 120, 244, 244)