In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# !pip install einops --quiet

import os
import shutil
import cv2

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

#------- torch imports -----------#
import torch
from torch import nn
import torchvision.utils as vutils
from torch.utils.data import Dataset, DataLoader
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tqdm import tqdm

#------------- Video Testing Imports -----------#
import time
from PIL import Image
from IPython.display import display, clear_output


## Configs

In [None]:
class Config:
    DOWNLOAD_DATASET = False  # Change to False after first download
    DATASET_NAME = "matthewjansen/ucf101-action-recognition"
    LOCAL_FOLDER_NAME = "ucf101_data"

    SELECTED_CLASSES = 10 # select only first 10 classes.
    SEQUENCE_LENGTH = 30
    IMAGE_SIZE = 32
    CHANNELS = 3
    BATCH_SIZE = 4

    MODEL = "SingleFrameCNN"
    NUM_EPOCHS = 100
    SAVE_EVERY = 1
    LEARNING_RATE = 1e-4

    SAVE_PATH = f"/path_to_save_model"
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

    @property
    def DATASET_PATH(self):
        return os.path.join(self.SAVE_PATH, self.LOCAL_FOLDER_NAME)

config = Config()

## Download Dataset

In [None]:
# Set KaggleHub path early
try:
    import kagglehub
except ImportError:
    !pip install kagglehub
    import kagglehub

In [None]:
if config.DOWNLOAD_DATASET:
    print("Downloading dataset from KaggleHub...")
    dataset_path = kagglehub.dataset_download(config.DATASET_NAME)
    print(f"Dataset downloaded to: {dataset_path}")

    dest_path = config.DATASET_PATH
    shutil.copytree(dataset_path, dest_path, dirs_exist_ok=True)
    print(f"Dataset copied to: {dest_path}")
else:
    print(f"Using existing dataset at: {config.DATASET_PATH}")

## Process Video / Load Data

In [None]:
# Custom PyTorch Dataset class for video data
class VideoDataset(Dataset):
    def __init__(self, data, labels):
        # Initialize dataset with video data and corresponding labels
        self.data = data        # data is a NumPy array of shape (N, T, H, W, C)
        self.labels = labels    # labels are one-hot encoded or categorical labels

    def __len__(self):
        # Return the total number of samples in the dataset
        return len(self.data)

    def __getitem__(self, idx):
        # Fetch one sample (video + label) at the given index

        # Convert the video frames from NumPy array to a PyTorch tensor
        # and permute dimensions to match PyTorch's expected input format:
        # from (T, H, W, C) to (T, C, H, W)
        video = torch.tensor(self.data[idx]).permute(0, 3, 1, 2)

        # Convert the corresponding label to a tensor
        label = torch.tensor(self.labels[idx])

        return video, label

In [None]:
def process_video(file_path):
    # Open the video file
    cap = cv2.VideoCapture(file_path)
    frames = []

    # Read frames until we reach the desired sequence length
    while len(frames) < config.SEQUENCE_LENGTH:
        ret, frame = cap.read()  # Read a frame
        if not ret:  # If no frame is returned, end of video reached or error
            break
        # Resize frame to fixed size and normalize pixel values to [0,1]
        frame = cv2.resize(frame, (config.IMAGE_SIZE, config.IMAGE_SIZE)) / 255.0
        frames.append(frame)  # Append processed frame to list

    cap.release()  # Release the video capture resource

    # Return frames as a NumPy array only if we have the exact required number of frames
    return np.array(frames) if len(frames) == config.SEQUENCE_LENGTH else None

In [None]:
def load_data(split_folder="train", num_classes=None):
    # Construct the path to the split folder inside the dataset directory
    base_path = os.path.join(config.DATASET_PATH, split_folder)

    # List all class directories inside this split folder and sort them alphabetically
    action_classes = sorted([d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))])

    # Use the provided number of classes or fallback to config.SELECTED_CLASSES
    selected_classes = num_classes if num_classes is not None else config.SELECTED_CLASSES
    selected_action_classes = action_classes[:selected_classes]

    print(f"\n[{split_folder.upper()}] Found {len(selected_action_classes)} Classes:")
    print("\n".join(f" - {cls}" for cls in selected_action_classes))

    X, y = [], []  # Initialize lists for videos (X) and labels (y)

    # Loop through each selected action class folder
    for i, action in enumerate(selected_action_classes):
        action_path = os.path.join(base_path, action)
        if not os.path.isdir(action_path):
            continue

        for file in os.listdir(action_path):
            video_path = os.path.join(action_path, file)

            if not video_path.lower().endswith(('.avi', '.mp4', '.mov')):
                continue

            video_data = process_video(video_path)

            if video_data is not None:
                X.append(video_data)
                y.append(i)

    X = np.array(X, dtype=np.float32)
    y = to_categorical(y, num_classes=len(selected_action_classes))

    return X, y, len(selected_action_classes)


#### Loading Data

In [None]:
!ls "/content/drive/MyDrive/ucf101_data/"

test  test.csv	train  train.csv  val  val.csv


In [None]:
# Load data
X_train, y_train, num_classes = load_data("train", num_classes = 10)
X_val, y_val, _ = load_data("val")
X_test, y_test, _ = load_data("test")

# Create DataLoaders
train_loader = DataLoader(VideoDataset(X_train, y_train), batch_size=config.BATCH_SIZE, shuffle=True)
val_loader = DataLoader(VideoDataset(X_val, y_val), batch_size=config.BATCH_SIZE, shuffle=False)
test_loader = DataLoader(VideoDataset(X_test, y_test), batch_size=config.BATCH_SIZE, shuffle=False)


[TRAIN] Found 10 Classes:
 - ApplyEyeMakeup
 - ApplyLipstick
 - Archery
 - BabyCrawling
 - BalanceBeam
 - BandMarching
 - BaseballPitch
 - Basketball
 - BasketballDunk
 - BenchPress

[VAL] Found 10 Classes:
 - ApplyEyeMakeup
 - ApplyLipstick
 - Archery
 - BabyCrawling
 - BalanceBeam
 - BandMarching
 - BaseballPitch
 - Basketball
 - BasketballDunk
 - BenchPress

[TEST] Found 10 Classes:
 - ApplyEyeMakeup
 - ApplyLipstick
 - Archery
 - BabyCrawling
 - BalanceBeam
 - BandMarching
 - BaseballPitch
 - Basketball
 - BasketballDunk
 - BenchPress


In [None]:
print(f"\nTrain samples: {len(X_train)}")
print(f"Val samples: {len(X_val)}")
print(f"Test samples: {len(X_test)}")


Train samples: 1125
Val samples: 186
Test samples: 194


## Models

### SimpleFrameCNN

In [None]:
# --- SingleFrameCNN ---
class SingleFrameCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # Convolutional layers to extract features from a single frame (3 channels RGB)
        self.conv = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),  # Conv layer with 32 filters, kernel size 3x3, padding=1 keeps spatial size
            nn.ReLU(),                       # Activation
            nn.MaxPool2d(2),                 # Downsample by factor of 2 (height & width)
            nn.Conv2d(32, 64, 3, padding=1),# Second conv layer increasing depth to 64
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        # Fully connected layers for classification
        self.fc = nn.Sequential(
            nn.Linear(64 * 8 * 8, 128),     # Flattened feature size after conv layers (assuming input 32x32)
            nn.ReLU(),
            nn.Linear(128, num_classes)     # Output layer for classification
        )

    def forward(self, x):
        x = x[:, 0]  # Take only the first frame of the video sequence: shape (batch, C, H, W)
        x = self.conv(x)  # Extract spatial features from that frame
        x = x.reshape(x.size(0), -1)  # Flatten for fully connected layers
        return self.fc(x)  # Return class logits


### C3DLite

In [None]:
# --- C3DLite ---
class C3DLite(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # 3D conv layers extract spatiotemporal features from video clips
        self.conv3d = nn.Sequential(
            nn.Conv3d(3, 64, kernel_size=3, padding=1),  # 3D Conv with 64 filters, kernel 3x3x3
            nn.ReLU(),
            nn.MaxPool3d(2),                             # Downsample temporal + spatial dims by 2
            nn.Conv3d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(2),
            nn.Conv3d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(2),
            nn.Conv3d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(2),
        )
        self.fc = None  # Will initialize after knowing input size dynamically

    def forward(self, x):
        # Permute from (B, T, C, H, W) to (B, C, T, H, W) for Conv3d
        x = x.permute(0, 2, 1, 3, 4)
        x = self.conv3d(x)  # Apply 3D conv layers
        x = x.reshape(x.size(0), -1)  # Flatten

        if self.fc is None:
            # Dynamically create fully connected layers based on flattened size
            self.fc = nn.Sequential(
                nn.Linear(x.shape[1], 512),
                nn.ReLU(),
                nn.Linear(512, num_classes)
            )
            self.fc.to(x.device)  # Move to same device as input tensor
        return self.fc(x)  # Output classification logits

### CNN LSTM

In [None]:
# --- CNNLSTM ---
class CNNLSTM(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # CNN to extract spatial features frame-wise
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(128, 256, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(256, 256, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        # Calculate CNN output feature size dynamically for input of size 32x32
        with torch.no_grad():
            dummy_input = torch.zeros(1, 3, 32, 32)
            cnn_out = self.cnn(dummy_input)
            cnn_out_size = cnn_out.numel()  # Total features after CNN

        # LSTM to model temporal dependencies across frames
        self.lstm = nn.LSTM(input_size=cnn_out_size, hidden_size=128, batch_first=True, num_layers=2)

        # Fully connected layers for classification
        self.fc = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        B, T, C, H, W = x.size()
        # Combine batch and time dims for CNN processing of individual frames
        x = x.view(B * T, C, H, W)
        x = self.cnn(x)  # Extract spatial features
        x = x.reshape(B, T, -1)  # Reshape back to (batch, time, features) for LSTM

        x, _ = self.lstm(x)  # Apply LSTM over time dimension
        return self.fc(x[:, -1])  # Use last LSTM output for classification

### Slow Fast Net

In [None]:
# --- SlowFast ---
class SlowFast(nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        # Define conv blocks for both Fast and Slow pathways
        def conv_stack():
          return nn.Sequential(
              nn.Conv3d(3, 64, 3, padding=1),
              nn.ReLU(),
              nn.MaxPool3d((1, 2, 2)),
              nn.Conv3d(64, 128, 3, padding=1),
              nn.ReLU(),
              nn.MaxPool3d((1, 2, 2)),
              nn.Conv3d(128, 256, 3, padding=1),
              nn.ReLU(),
              nn.MaxPool3d((1, 2, 2)),
              nn.Conv3d(256, 256, 3, padding=1),
              nn.ReLU(),
              nn.AdaptiveAvgPool3d((1, 1, 1))  # Fixed output size: (C, 1, 1, 1)
          )

        self.fast_conv = conv_stack()  # Fast pathway conv layers
        self.slow_conv = conv_stack()  # Slow pathway conv layers

        # Dynamically compute output feature size for concatenation
        with torch.no_grad():
            dummy_fast = torch.zeros(1, 3, 16, 32, 32)  # Fast input has more frames
            dummy_slow = torch.zeros(1, 3, 8, 32, 32)   # Slow input has fewer frames
            f = self.fast_conv(dummy_fast)
            s = self.slow_conv(dummy_slow)
            fast_dim = f.numel()
            slow_dim = s.numel()

        # Fully connected layers for combined features
        self.fc = nn.Sequential(
            nn.Linear(fast_dim + slow_dim, 512),
            nn.ReLU(),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        # x shape: (batch, time, channels, height, width)

        # Sample frames for fast pathway (every 2nd frame)
        fast = x[:, ::2].permute(0, 2, 1, 3, 4)  # Reorder to (B, C, T, H, W)
        # Sample frames for slow pathway (every 4th frame)
        slow = x[:, ::4].permute(0, 2, 1, 3, 4)

        f = self.fast_conv(fast)  # Fast path features
        s = self.slow_conv(slow)  # Slow path features

        f = f.reshape(f.size(0), -1)  # Flatten
        s = s.reshape(s.size(0), -1)  # Flatten

        x = torch.cat((f, s), dim=1)  # Concatenate features from both paths
        return self.fc(x)  # Final classification output

## Training Models

In [None]:
def get_model_instance(model_name, num_classes):
    if model_name == "SingleFrameCNN":
        return SingleFrameCNN(num_classes=num_classes)
    elif model_name == "C3DLite":
        return C3DLite(num_classes=num_classes)
    elif model_name == "CNNLSTM":
        return CNNLSTM(num_classes=num_classes)
    elif model_name == "SlowFast":
        return SlowFast(num_classes=num_classes)
    else:
        raise ValueError(f"Model '{model_name}' is not supported.")

#### Train and Validate Function

In [None]:
def train_and_validate(model, train_loader, val_loader, optimizer, criterion, model_name=None, patience=5):
    # Move model to the specified device (GPU or CPU)
    model.to(config.DEVICE)

    # Create directory to save models and logs
    save_path = os.path.join(config.SAVE_PATH, model_name)
    os.makedirs(save_path, exist_ok=True)

    # File to store training logs for later analysis
    log_file = os.path.join(save_path, "training_log.txt")

    # Lists to track loss and accuracy for each epoch
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    best_val_loss = float('inf')  # Initialize best validation loss to a large number
    patience_counter = 0          # For early stopping

    for epoch in range(config.NUM_EPOCHS):
        model.train()  # Set model to training mode
        total_loss = 0
        correct_train, total_train = 0, 0

        # Iterate over batches in the training data
        for videos, labels in train_loader:
            # Move data to the device
            videos, labels = videos.to(config.DEVICE), labels.to(config.DEVICE).float()

            optimizer.zero_grad()          # Reset gradients
            outputs = model(videos)        # Forward pass
            loss = criterion(outputs, labels)  # Compute loss
            loss.backward()                # Backpropagation
            optimizer.step()               # Update weights

            total_loss += loss.item()      # Accumulate loss for averaging

            # Calculate number of correct predictions
            preds = torch.argmax(outputs, dim=1)
            targets = torch.argmax(labels, dim=1)
            correct_train += (preds == targets).sum().item()
            total_train += labels.size(0)

        # Average loss and accuracy for training
        avg_train_loss = total_loss / len(train_loader)
        train_accuracy = 100 * correct_train / total_train

        # Validation phase - no gradient calculation
        model.eval()
        val_loss, correct_val, total_val = 0, 0, 0

        with torch.no_grad():
            for videos, labels in val_loader:
                videos, labels = videos.to(config.DEVICE), labels.to(config.DEVICE).float()
                outputs = model(videos)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = torch.argmax(outputs, dim=1)
                targets = torch.argmax(labels, dim=1)
                correct_val += (preds == targets).sum().item()
                total_val += labels.size(0)

        # Average loss and accuracy for validation
        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = 100 * correct_val / total_val

        # Store results for plotting later
        train_losses.append(avg_train_loss)
        val_losses.append(avg_val_loss)
        train_accuracies.append(train_accuracy)
        val_accuracies.append(val_accuracy)

        # Log the progress
        log_msg = (f"Epoch [{epoch+1}/{config.NUM_EPOCHS}] "
                   f"- Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}% "
                   f"- Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")
        print(log_msg)

        # Append logs to a file
        with open(log_file, "a") as f:
            f.write(log_msg + "\n")

        # Save best model based on validation loss
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            patience_counter = 0  # Reset counter since we improved
            torch.save(model.state_dict(), os.path.join(save_path, f"{model_name}_best.pth"))
        else:
            patience_counter += 1
            # Early stopping if no improvement for 'patience' epochs
            if patience_counter >= patience:
                print(f"Early stopping at epoch {epoch+1} due to no improvement in validation loss.")
                break

        # Save model checkpoint periodically
        if (epoch + 1) % config.SAVE_EVERY == 0:
            torch.save(model.state_dict(), os.path.join(save_path, f"{model_name}_epoch{epoch+1}.pth"))

    # After training, plot losses and accuracies
    epochs = range(1, len(train_losses) + 1)
    plt.figure(figsize=(12,5))

    plt.subplot(1,2,1)
    plt.plot(epochs, train_losses, 'b-', label='Train Loss')
    plt.plot(epochs, val_losses, 'r-', label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Train & Validation Loss')
    plt.legend()

    plt.subplot(1,2,2)
    plt.plot(epochs, train_accuracies, 'b-', label='Train Accuracy')
    plt.plot(epochs, val_accuracies, 'r-', label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Train & Validation Accuracy')
    plt.legend()

    plt.tight_layout()
    # Save the plot image in the same directory
    plt.savefig(os.path.join(save_path, "training_plots.png"))
    plt.close()


#### Test Function

In [None]:
def test_model(model, test_loader, class_names, save_path):
    # Move model to the device (GPU/CPU)
    model.to(config.DEVICE)
    model.eval()  # Set model to evaluation mode (disables dropout, batchnorm, etc.)

    predictions = []  # To store predicted class indices
    real_labels = []  # To store true class indices
    correct = 0       # Count of correct predictions
    total = 0         # Total samples processed

    # Directory to save classified images (one frame per video)
    image_save_dir = os.path.join(save_path, "classified_images")
    os.makedirs(image_save_dir, exist_ok=True)

    with torch.no_grad():  # No gradient computation needed during testing
        for idx, (videos, labels) in enumerate(tqdm(test_loader)):
            videos = videos.to(config.DEVICE)
            labels = labels.to(config.DEVICE)

            outputs = model(videos)               # Forward pass
            preds = torch.argmax(outputs, dim=1) # Predicted classes
            true_labels = torch.argmax(labels, dim=1)  # True classes

            # Store predictions and true labels for all batches
            predictions.extend(preds.cpu().numpy())
            real_labels.extend(true_labels.cpu().numpy())

            # Count correct predictions for accuracy
            correct += (preds == true_labels).sum().item()
            total += labels.size(0)

            # Save one image frame per video with predicted and real label in filename
            for i in range(videos.size(0)):
                pred_label = class_names[preds[i]]
                real_label = class_names[true_labels[i]]
                img_name = f"{pred_label}_{real_label}_{idx}_{i}.jpg"
                img_path = os.path.join(image_save_dir, img_name)

                # Extract first frame (channel 0) of the video tensor for saving
                frame = videos[i][0].unsqueeze(0)  # Add channel dim for saving
                vutils.save_image(frame, img_path)

    # Save predictions and true labels to CSV for analysis
    result_path = os.path.join(save_path, "test_results.csv")
    df = pd.DataFrame({
        "Video_Index": list(range(len(predictions))),
        "Predicted_Class": [class_names[p] for p in predictions],
        "Real_Class": [class_names[r] for r in real_labels]
    })
    df.to_csv(result_path, index=False)

    # Calculate overall accuracy and save it to a text file
    accuracy = 100 * correct / total
    acc_file = os.path.join(save_path, "test_accuracy.txt")
    with open(acc_file, "w") as f:
        f.write(f"Test Accuracy: {accuracy:.2f}%\n")

    # Print summary to console
    print(f"\nTest accuracy: {accuracy:.2f}%")
    print(f"Test results saved to: {result_path}")
    print(f"Images saved in: {image_save_dir}")
    print(f"Accuracy saved in: {acc_file}")


#### Run Training and Testing Function

In [None]:
def run_training_and_testing(models, train_loader, val_loader, test_loader, num_classes, testing=True):
    # Ensure models is a list
    if isinstance(models, str):
        models = [models]

    for model_name in models:
        print(f"\n=== Training and Testing Model: {model_name} ===")

        # Instantiate model, optimizer, and loss function
        model = get_model_instance(model_name, num_classes)
        optimizer = torch.optim.Adam(model.parameters(), lr=config.LEARNING_RATE)
        criterion = nn.CrossEntropyLoss()

        # Train and validate the model
        train_and_validate(model, train_loader, val_loader, optimizer, criterion, model_name=model_name)

        if testing:
            # Prepare save directory for test results
            save_path = os.path.join(config.SAVE_PATH, model_name)
            os.makedirs(save_path, exist_ok=True)

            # Get class names (sorted) from training data folder limited to selected classes
            class_names_dir = os.path.join(config.DATASET_PATH, "train")
            class_names = sorted([d for d in os.listdir(class_names_dir) if os.path.isdir(os.path.join(class_names_dir, d))])[:config.SELECTED_CLASSES]

            # Test the trained model
            test_model(model, test_loader, class_names, save_path)


#### Run Model

In [None]:
config.MODEL = ["SingleFrameCNN"] # "SingleFrameCNN", "C3DLite", "CNNLSTM", "SlowFast"
run_training_and_testing(config.MODEL, train_loader, val_loader, test_loader, num_classes)


=== Training and Testing Model: SingleFrameCNN ===
Epoch [1/100] - Train Loss: 2.1669, Train Acc: 20.62% - Val Loss: 2.0146, Val Acc: 25.81%
Epoch [2/100] - Train Loss: 1.7738, Train Acc: 36.18% - Val Loss: 1.6115, Val Acc: 48.39%
Epoch [3/100] - Train Loss: 1.4977, Train Acc: 47.64% - Val Loss: 1.4404, Val Acc: 48.39%
Epoch [4/100] - Train Loss: 1.3337, Train Acc: 53.24% - Val Loss: 1.2473, Val Acc: 50.54%
Epoch [5/100] - Train Loss: 1.1814, Train Acc: 57.42% - Val Loss: 1.1200, Val Acc: 52.69%
Epoch [6/100] - Train Loss: 1.0631, Train Acc: 61.42% - Val Loss: 0.9609, Val Acc: 68.28%
Epoch [7/100] - Train Loss: 0.9591, Train Acc: 65.78% - Val Loss: 0.9664, Val Acc: 61.83%
Epoch [8/100] - Train Loss: 0.8848, Train Acc: 66.93% - Val Loss: 0.8723, Val Acc: 67.74%
Epoch [9/100] - Train Loss: 0.8181, Train Acc: 70.22% - Val Loss: 0.7776, Val Acc: 67.74%
Epoch [10/100] - Train Loss: 0.7518, Train Acc: 71.73% - Val Loss: 0.6928, Val Acc: 73.12%
Epoch [11/100] - Train Loss: 0.6948, Train Acc:

100%|██████████| 49/49 [00:02<00:00, 20.85it/s]


Test accuracy: 80.41%
Test results saved to: /content/drive/MyDrive/MSAI/DeepLearning/assignment_07/SingleFrameCNN/test_results.csv
Images saved in: /content/drive/MyDrive/MSAI/DeepLearning/assignment_07/SingleFrameCNN/classified_images
Accuracy saved in: /content/drive/MyDrive/MSAI/DeepLearning/assignment_07/SingleFrameCNN/test_accuracy.txt





## Detect Live Video

In [None]:
import os
import time
import cv2
import torch
import numpy as np
from PIL import Image
from IPython.display import display, clear_output

# ---------------- CONFIG ---------------- #
class Config:
    IMAGE_SIZE = 32
    FRAME_RATE = 30
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    CLASS_NAMES = ['ApplyEyeMakeup',
                    'ApplyLipstick',
                    'Archery',
                    'BabyCrawling',
                    'BalanceBeam',
                    'BandMarching',
                    'BaseballPitch',
                    'Basketball',
                    'BasketballDunk',
                    'BenchPress']  # Replace with real class names

# ---------------- MODEL LOADER ---------------- #
def get_model_instance(model_name, num_classes):
    if model_name == "SingleFrameCNN":
        return SingleFrameCNN(num_classes=num_classes)
    elif model_name == "C3DLite":
        return C3DLite(num_classes=num_classes)
    elif model_name == "CNNLSTM":
        return CNNLSTM(num_classes=num_classes)
    elif model_name == "SlowFast":
        return SlowFast(num_classes=num_classes)
    else:
        raise ValueError(f"Model '{model_name}' is not supported.")

In [None]:
def load_model(model_path, model_name, num_classes):
    model = get_model_instance(model_name, num_classes)
    model.load_state_dict(torch.load(model_path, map_location=Config.DEVICE))
    model.to(Config.DEVICE)
    model.eval()
    return model

# ---------------- FRAME PROCESSING ---------------- #
def process_frame(frame, model):
    resized = cv2.resize(frame, (Config.IMAGE_SIZE, Config.IMAGE_SIZE))
    normalized = resized.astype(np.float32) / 255.0
    chw = np.transpose(normalized, (2, 0, 1))
    input_tensor = torch.tensor(chw, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(Config.DEVICE)

    with torch.no_grad():
        output = model(input_tensor)
        pred_idx = torch.argmax(output, dim=1).item()

    return pred_idx, input_tensor

In [None]:
import random

# ---------------- VIDEO PROCESSING ---------------- #
def process_video(video_path, model, output_path=None, show=False, predict=True):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise IOError(f"Cannot open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_skip = max(int(fps / Config.FRAME_RATE), 1)

    # Prepare video writer if saving
    out_writer = None
    if output_path:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        out_writer = cv2.VideoWriter(output_path, fourcc, Config.FRAME_RATE, (width, height))

    print(f"Processing: {os.path.basename(video_path)}")
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        current_frame = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
        if current_frame % frame_skip != 0:
            continue

        display_frame = frame.copy()

        if predict:
            if len(frame.shape) == 2 or frame.shape[2] == 1:
                frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)

            pred_idx, _ = process_frame(frame, model)
            print("Index is :",pred_idx)
            label = Config.CLASS_NAMES[pred_idx]

            cv2.putText(display_frame, f"Predicted: {label}", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)

        if show:
            clear_output(wait=True)
            disp_img = cv2.resize(display_frame, (640, 480))
            disp_img = cv2.cvtColor(disp_img, cv2.COLOR_BGR2RGB)
            display(Image.fromarray(disp_img))
            time.sleep(1 / Config.FRAME_RATE)

        if out_writer:
            out_writer.write(display_frame)

    cap.release()
    if out_writer:
        out_writer.release()

    print(f"Finished processing: {os.path.basename(video_path)}")

# ---------------- FOLDER SUPPORT ---------------- #
def process_path(input_path, model, output_dir=None, show=False, predict=True):
    if os.path.isfile(input_path):
        out_path = os.path.join(output_dir, os.path.basename(input_path)) if output_dir else None
        process_video(input_path, model, output_path=out_path, show=show, predict=predict)
    elif os.path.isdir(input_path):
        video_files = [f for f in os.listdir(input_path) if f.endswith(('.mp4', '.avi'))]
        video_files = random.sample(video_files, min(1, len(video_files)))
        for vid in video_files:
            full_path = os.path.join(input_path, vid)
            out_path = os.path.join(output_dir, vid) if output_dir else None
            process_video(full_path, model, output_path=out_path, show=show, predict=predict)
    else:
        raise ValueError("Invalid input path. Must be a video file or directory.")

In [None]:
# Customize here
model_name = "SingleFrameCNN"
model_path = '/SingleFrameCNN/SingleFrameCNN_best.pth'
config = Config()
for i in config.CLASS_NAMES:
  input_path = f'/ucf101_data/test/{i}/'
  output_dir = '/SingleFrameCNN/output_videos'  # or None
  show_video = False
  predict_video = True

  print("I path :", input_path)

  os.makedirs(output_dir, exist_ok=True) if output_dir else None
  model = load_model(model_path, model_name, num_classes=10)

  process_path(input_path, model, output_dir=output_dir, show=show_video, predict=predict_video)

In [None]:
import cv2
import os
from glob import glob
from IPython.display import display, clear_output
from PIL import Image as PILImage
import time

def combine_and_play_videos(input_folder, download=False, save_path=None, show_video=True):
    # Get all video files
    video_extensions = ('*.avi', '*.mp4', '*.mov', '*.mkv')
    video_files = []
    for ext in video_extensions:
        video_files.extend(glob(os.path.join(input_folder, ext)))

    if not video_files:
        print("No video files found in the specified folder.")
        return

    print(f"Found {len(video_files)} videos. Combining and playing...")

    # Read properties from first video
    cap0 = cv2.VideoCapture(video_files[0])
    width  = int(cap0.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap0.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap0.get(cv2.CAP_PROP_FPS)
    cap0.release()

    if download:
        if save_path is None:
            raise ValueError("Save path must be specified when download is True.")
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        out_writer = cv2.VideoWriter(save_path, fourcc, fps, (width, height))

    for video_path in video_files:
        cap = cv2.VideoCapture(video_path)
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            if download:
                out_writer.write(frame)

            if show_video:
                resized = cv2.resize(frame, (640, 480))
                rgb_img = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
                pil_img = PILImage.fromarray(rgb_img)

                clear_output(wait=True)  # Clear previous frame
                display(pil_img)         # Show current frame
                time.sleep(1 / fps)

        cap.release()

    if download:
        out_writer.release()
        print(f"Video saved to {save_path}")

    print("All videos played and processed.")

In [None]:
save_pth = "SingleFrameCNN/output_30.mp4"
combine_and_play_videos(output_dir, download=True, save_path=save_pth, show_video=True)