In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
import cv2
import os
import numpy as np
from tensorflow.keras.utils import Sequence
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, BatchNormalization, ReLU, Dropout, Flatten, Dense, Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError

In [7]:
def load_images_and_speeds(images_path, speeds_file):
    images = sorted([os.path.join(images_path, img) for img in os.listdir(images_path) if img.endswith('.png')])
    speeds = np.loadtxt(speeds_file)
    return images, speeds

In [8]:
images_path = "/content/drive/MyDrive/assignment3 - computer vision - files/Q3/frames"
speeds_file = "/content/drive/MyDrive/assignment3 - computer vision - files/Q3/train.txt"
images, speeds = load_images_and_speeds(images_path, speeds_file)

In [9]:
class OpticalFlowDataset(tf.keras.utils.Sequence):
    def __init__(self, img_paths, speeds, batch_size=8, shuffle=True):
        self.img_paths = img_paths
        self.speeds = speeds[1:]
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return (len(self.img_paths) - 1) // self.batch_size

    def __getitem__(self, idx):
        batch_indices = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_img_paths = [self.img_paths[i] for i in batch_indices]
        batch_speeds = [self.speeds[i] for i in batch_indices]

        batch_flow_hsv = []
        batch_speed = []

        for i in range(len(batch_img_paths) - 1):
            img1_path = batch_img_paths[i]
            img2_path = batch_img_paths[i + 1]

            img1 = cv2.imread(img1_path)
            img2 = cv2.imread(img2_path)

            img1 = cv2.resize(img1, (128, 128))
            img2 = cv2.resize(img2, (128, 128))

            flow_hsv = self.compute_optical_flow_hsv(img1, img2)
            flow_hsv = flow_hsv.astype(np.float32)

            batch_flow_hsv.append(flow_hsv)
            batch_speed.append(batch_speeds[i])

        return np.array(batch_flow_hsv), np.array(batch_speed)

    def on_epoch_end(self):
        self.indices = np.arange(len(self.img_paths) - 1)
        if self.shuffle:
            np.random.shuffle(self.indices)

    def compute_optical_flow_hsv(self, img1, img2):
        prev_gray = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
        gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
        flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

        hsv = np.zeros_like(img1)
        mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])
        hsv[..., 0] = ang * 180 / np.pi / 2
        hsv[..., 1] = 255
        hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)

        hsv = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

        return hsv

In [10]:
inputs = Input(shape=(128, 128, 3))

#first convolutional layer
x = Conv2D(16, kernel_size=5, strides=2, padding='same')(inputs)
x = BatchNormalization()(x)
x = ReLU()(x)

#second convolutional layer
x = Conv2D(32, kernel_size=3, strides=2, padding='same')(x)
x = BatchNormalization()(x)
x = ReLU()(x)

#third convolutional layer
x = Conv2D(64, kernel_size=3, strides=2, padding='same')(x)
x = BatchNormalization()(x)
x = ReLU()(x)

#fourth convolutional layer
x = Conv2D(128, kernel_size=3, strides=2, padding='same')(x)
x = BatchNormalization()(x)
x = ReLU()(x)

x = Flatten()(x)

print(f"Feature map size before flattening: {x.shape}")

#fully connected layer
x = Dense(128, activation='relu')(x)

#dropout for regularization
x = Dropout(0.5)(x)

outputs = Dense(1)(x)

model = Model(inputs, outputs)

model.summary()

Feature map size before flattening: (None, 8192)
Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 128, 128, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 64, 64, 16)        1216      
                                                                 
 batch_normalization (Batch  (None, 64, 64, 16)        64        
 Normalization)                                                  
                                                                 
 re_lu (ReLU)                (None, 64, 64, 16)        0         
                                                                 
 conv2d_1 (Conv2D)           (None, 32, 32, 32)        4640      
                                                                 
 batch_normalization_1 (Bat  (None, 32, 32, 32)        128       
 chNormaliza

In [12]:
dataset = OpticalFlowDataset(images, speeds, batch_size=8, shuffle=True)

model.compile(optimizer=Adam(learning_rate=0.001), loss=MeanSquaredError())

num_epochs = 2

#Keras data generator
data_gen = tf.data.Dataset.from_generator(
    lambda: dataset,
    output_signature=(
        tf.TensorSpec(shape=(None, 128, 128, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None,), dtype=tf.float32)
    )
)

#prefetch data
data_gen = data_gen.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

model.fit(data_gen, epochs=num_epochs)

Epoch 1/2
Epoch 2/2


<keras.src.callbacks.History at 0x7899519cd210>