In [4]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from PIL import Image
from torch.utils.data import Dataset
from torchvision import transforms

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using 1  device: cuda


## Section 1: Data Preprocessing

In [16]:
# Define the first 10 classes
classes = [
    'BaseballPitch',
    'Basketball',
    'BenchPress',
    'Biking',
    'Billiards',
    'BreastStroke',
    'CleanAndJerk',
    'Diving',
    'Drumming',
    'Fencing'
]

# Define the root directory where extracted frames are stored
frames_root_dir = None

### Section 1-1: Split Data paths:

In [4]:
def manual_split_dataset(frames_root_dir, classes, num_frames=20, train_ratio=0.8, max_videos_per_class=5):
    """
    Manually splits the dataset into training and testing sets.

    Args:
        frames_root_dir (str): Path to the root directory containing class subdirectories with frame folders.
        classes (list): List of class names to process.
        num_frames (int, optional): Number of frames per video. Defaults to 20.
        train_ratio (float, optional): Proportion of data to be used for training. Defaults to 0.8.
        max_videos_per_class (int, optional): Maximum number of videos per class. Defaults to 5.

    Returns:
        train_frame_paths (list of list of str): Frame paths for training videos.
        train_labels (list of int): Labels for training videos.
        test_frame_paths (list of list of str): Frame paths for testing videos.
        test_labels (list of int): Labels for testing videos.
    """
    train_frame_paths = []
    train_labels = []
    test_frame_paths = []
    test_labels = []

    for class_idx, class_name in enumerate(classes):
        class_dir = os.path.join(frames_root_dir, class_name)
        if not os.path.isdir(class_dir):
            print(f"Class directory does not exist: {class_dir}")
            continue

        video_folders = sorted([
            d for d in os.listdir(class_dir)
            if os.path.isdir(os.path.join(class_dir, d))
        ])

        # Limit to the first `max_videos_per_class` videos
        selected_video_folders = video_folders[:max_videos_per_class]

        num_train = int(train_ratio * len(selected_video_folders))
        for i, video_folder in enumerate(selected_video_folders):
            video_dir = os.path.join(class_dir, video_folder)
            frame_files = sorted([
                f for f in os.listdir(video_dir)
                if f.endswith('.jpg') or f.endswith('.png')  # Extend as needed
            ])

            # Ensure exactly `num_frames` frames are present
            if len(frame_files) < num_frames:
                print(f"Not enough frames in {video_dir}. Expected {num_frames}, got {len(frame_files)}.")
                continue

            # Get full paths for the first `num_frames` frames
            selected_frame_files = frame_files[:num_frames]
            selected_frame_paths = [
                os.path.join(video_dir, f) for f in selected_frame_files
            ]

            if i < num_train:
                #### Your Code Here!
                # train_frame_paths
                # train_labels
            else:
                #### Your Code Here!
                # test_frame_paths
                # test_labels

    return train_frame_paths, train_labels, test_frame_paths, test_labels

# Perform manual data splitting
train_frame_paths, train_labels, test_frame_paths, test_labels = manual_split_dataset(
    frames_root_dir=frames_root_dir,
    classes=classes,
    num_frames=20,
    train_ratio=0.8,
    max_videos_per_class=5
)

### Section 1-2: Data Transformation

In [5]:
transform = None

### Section 1-3:Create Dataset Class

In [None]:
class CustomFrameDataset(Dataset):
    def __init__(self, frame_paths, labels, transform=None):
        """
        Args:
            frame_paths (list of list of str): List where each element is a list of frame paths for a video.
            labels (list of int): List of labels corresponding to each video.
            transform (callable, optional): Optional transform to be applied on a sample. Defaults to None.
        """
        self.frame_paths = frame_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.frame_paths)

    def __getitem__(self, idx):
        frame_paths = self.frame_paths[idx]
        label = self.labels[idx]

        frames = []
        for frame_path in frame_paths:
            try:
                with Image.open(frame_path) as img:
                    img = img.convert('RGB')  # Ensure 3-channel RGB
                    ###
                    # Your Code Here!
                    ###

                    frames.append(img)
            except Exception as e:
                print(f"Error loading frame {frame_path}: {e}")

        # Stack frames into a tensor of shape (num_frames, C, H, W)
        frames_tensor = None

        return frames_tensor, label

### Section 1-4: Create Dataset Instances

In [7]:
# Create Dataset objects
train_dataset = None

test_dataset = None

# Check if datasets are not empty
if len(train_dataset) == 0:
    raise ValueError("The training dataset is empty. Please ensure frames are correctly extracted and organized.")
if len(test_dataset) == 0:
    raise ValueError("The testing dataset is empty. Please ensure frames are correctly extracted and organized.")

### Section 1-5: Create Dataloaders

In [None]:
# Create DataLoaders for training and testing
trainloader = None

testloader = None

## Section 2: Network Desing and Training

### Section 2-1: Designing the Video Classification Model

In [9]:
class VideoClassificationModel(nn.Module):
    def __init__(self, num_classes=10):
        super(VideoClassificationModel, self).__init__()

        # Define a simple CNN model to process each frame
        self.frame_cnn = nn.Sequential(
            # Add a Conv2d layer with 3 input channels, 16 output channels,
            #       kernel size of 3, stride of 1, and padding of 1

            # Add a ReLU activation function

            # Add a MaxPool2d layer with kernel size of 2 and stride of 2

            # Add a Conv2d layer with 16 input channels, 32 output channels,
            #       kernel size of 3, stride of 1, and padding of 1

            # Add another ReLU activation function

            # Add another MaxPool2d layer with kernel size of 2 and stride of 2

            # Add a Conv2d layer with 32 input channels, 64 output channels,
            #       kernel size of 3, stride of 1, and padding of 1

            # Add a ReLU activation function

            # Add a final MaxPool2d layer with kernel size of 2 and stride of 2
        )

        # MLP for aggregation after passing through CNN
        # Adjust the input size based on the CNN output and number of frames
        self.mlp = nn.Sequential(
            # Add a Linear layer that flattens all frame features
            #       (Input size should be 64 * 14 * 14 * 20) and outputs 512 features

            # Add a ReLU activation function

            # Add a Dropout layer with a probability of 0.5

            # Add a final Linear layer that maps 512 features to the number of classes

        )

    def forward(self, x):
        batch_size, num_frames, c, h, w = x.size()

        # Initialize a list to hold features
        frame_features = []

        #### Your Code Here!
        pass
        ###

        # Concatenate all frame features along the channel dimension
        # Resulting shape: (batch_size, 64 * 20, 14, 14)
        None

        # Flatten the features
        None

        # Pass through MLP
        None
        return output

### Section 2-3: Training Setup

In [10]:
None
None
None

# Optional: Learning rate scheduler
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

### Section 2-4: Train the network


In [None]:
# Training Loop

num_epochs = 10
train_loss_list = []
train_accuracy_list = []
test_accuracy_list = []

for None:
    #### Your Code Here!
        pass
    ###
    running_loss = 0.0
    correct = 0
    total = 0

    for None:

        #### Your Code Here!
        pass
        ###

        # Forward pass

        #### Your Code Here!
        pass
        ###

        # Backward pass

        #### Your Code Here!
        pass
        ###

        running_loss += loss.item()

        # Calculate accuracy

        #### Your Code Here!
        pass
        ###
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(trainloader)
    train_loss_list.append(epoch_loss)
    epoch_accuracy = 100 * correct / total
    train_accuracy_list.append(epoch_accuracy)

    # Evaluate on test set

    #### Your Code Here!
        pass
    ###
    correct_test = 0
    total_test = 0

    with torch.no_grad():
        for None:
            #### Your Code Here!
            pass
            ###

            _, predicted = torch.max(outputs, 1)
            total_test += labels.size(0)
            correct_test += (predicted == labels).sum().item()

    test_accuracy = 100 * correct_test / total_test
    test_accuracy_list.append(test_accuracy)

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, "
          f"Train Accuracy: {epoch_accuracy:.2f}%, Test Accuracy: {test_accuracy:.2f}%")

    # Step the scheduler
    scheduler.step()

### Section 2-5: Plot Loss and Accuracy Curves


In [None]:
def plot_metrics(train_loss, train_acc, test_acc):
    """
    Plots training loss, training accuracy, and test accuracy over epochs.
    """
    epochs = range(1, len(train_loss) + 1)

    plt.figure(figsize=(15, 5))

    # Plot Loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_loss, 'b-', marker='o', label='Training Loss')
    plt.title('Training Loss over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.grid(True)
    plt.legend()

    # Plot Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_acc, 'g-', marker='o', label='Training Accuracy')
    plt.plot(epochs, test_acc, 'r-', marker='x', label='Test Accuracy')
    plt.title('Accuracy over Epochs')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.grid(True)
    plt.legend()

    plt.tight_layout()
    plt.show()

# Plot the training loss and accuracy curves
plot_metrics(train_loss_list, train_accuracy_list, test_accuracy_list)