In [7]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical, Sequence
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import CSVLogger, Callback

# Parameters
DATASET_DIR = r'D:\M.Ali\Semester 8\FYP-2\Final DataSet\DataSet 1.4'
IMG_SIZE = 224
N_FRAMES = 60
BATCH_SIZE = 8
EPOCHS = 100
LEARNING_RATE = 1e-4
CHECKPOINT_DIR = r'D:\M.Ali\Semester 8\FYP-2\Final DataSet\New Trained I3D'

# Ensure checkpoint directory exists
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

# Data augmentation
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Function to augment video frames
def augment_video(frames):
    aug_frames = []
    for frame in frames:
        frame = datagen.random_transform(frame)
        aug_frames.append(frame)
    return np.array(aug_frames)

# Function to load and preprocess video frames
def load_video(video_path, max_frames=N_FRAMES):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while len(frames) < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
        frames.append(frame)
    cap.release()

    if len(frames) < max_frames:
        frames.extend([frames[-1]] * (max_frames - len(frames)))  # Pad with last frame

    frames = np.array(frames)
    frames = frames / 255.0  # Normalize
    frames = augment_video(frames)  # Apply augmentation
    return frames

# Function to load dataset paths
def load_dataset_paths(dataset_dir):
    video_paths = []
    labels = []
    label_map = {}
    print(f'Loading dataset paths from {dataset_dir}')
    for i, label in enumerate(os.listdir(dataset_dir)):
        label_map[i] = label
        label_dir = os.path.join(dataset_dir, label)
        for video_name in os.listdir(label_dir):
            video_path = os.path.join(label_dir, video_name)
            video_paths.append(video_path)
            labels.append(i)
    print('Dataset paths loaded successfully.')
    return video_paths, labels, label_map

# Load dataset paths
video_paths, labels, label_map = load_dataset_paths(DATASET_DIR)

# Encode labels
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(labels)
labels = to_categorical(labels)

# Split dataset
train_paths, val_paths, train_labels, val_labels = train_test_split(video_paths, labels, test_size=0.2, random_state=42)

# Data generator
class VideoDataGenerator(Sequence):
    def __init__(self, video_paths, labels, batch_size=BATCH_SIZE, n_frames=N_FRAMES, img_size=IMG_SIZE, shuffle=True):
        self.video_paths = video_paths
        self.labels = labels
        self.batch_size = batch_size
        self.n_frames = n_frames
        self.img_size = img_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        return int(np.floor(len(self.video_paths) / self.batch_size))

    def __getitem__(self, index):
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
        video_paths_temp = [self.video_paths[k] for k in indexes]
        labels_temp = [self.labels[k] for k in indexes]

        X, y = self.__data_generation(video_paths_temp, labels_temp)
        return X, y

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.video_paths))
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def __data_generation(self, video_paths_temp, labels_temp):
        X = np.empty((self.batch_size, self.n_frames, self.img_size, self.img_size, 3), dtype=np.float32)
        y = np.empty((self.batch_size, len(self.labels[0])), dtype=np.float32)

        for i, video_path in enumerate(video_paths_temp):
            frames = load_video(video_path, self.n_frames)
            X[i,] = frames
            y[i,] = labels_temp[i]

        return X, y

# Create data generators
train_generator = VideoDataGenerator(train_paths, train_labels)
val_generator = VideoDataGenerator(val_paths, val_labels, shuffle=False)

# Define I3D model
def build_i3d_model(num_classes):
    base_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet', input_shape=(IMG_SIZE, IMG_SIZE, 3))
    base_model.trainable = False  # Freeze base model

    inputs = layers.Input(shape=(N_FRAMES, IMG_SIZE, IMG_SIZE, 3))
    x = layers.TimeDistributed(base_model)(inputs)
    x = layers.TimeDistributed(layers.GlobalAveragePooling2D())(x)
    x = layers.LSTM(256)(x)
    x = layers.Dense(1024, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)
    
    model = models.Model(inputs, outputs)
    return model

# Build and compile model
num_classes = len(label_map)
model = build_i3d_model(num_classes)
model.compile(optimizer=optimizers.Adam(learning_rate=LEARNING_RATE), loss='categorical_crossentropy', metrics=['accuracy'])

# Custom callback to save the model after each epoch with the epoch number in the filename
class SaveModelPerEpoch(Callback):
    def on_epoch_end(self, epoch, logs=None):
        model_path = os.path.join(CHECKPOINT_DIR, f'model_checkpoint_{epoch+1}.keras')
        self.model.save(model_path)
        print(f'Model saved to {model_path}')

# Checkpoint and logging
csv_logger = CSVLogger(os.path.join(CHECKPOINT_DIR, 'training_log.csv'), append=True)
save_model_per_epoch = SaveModelPerEpoch()

# Train model with checkpointing and logging
print('Starting training...')
model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=EPOCHS,
    callbacks=[save_model_per_epoch, csv_logger],
    verbose=1
)

# Save final model
final_model_path = os.path.join(CHECKPOINT_DIR, 'i3d_model_final.keras')
model.save(final_model_path)
print(f'Final model saved to {final_model_path}')

# Evaluate model
loss, accuracy = model.evaluate(val_generator)
print(f'Validation accuracy: {accuracy:.4f}')
# // //

Loading dataset paths from F:\DataSet Ali
Dataset paths loaded successfully.
Starting training...
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
 14/123 [==>...........................] - ETA: 28:04 - loss: 0.2448 - accuracy: 0.9554

KeyboardInterrupt: 