In [88]:
from pathlib import Path
from typing import Tuple

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms

import utils

In [14]:
# Global File Paths
ZIP_FILES_PATH = "data"
DATASET_NAME = "CricShot10"
DATASET_ROOT_PATH = "CricShot10/"
TO_DIR = "dataset"

In [15]:
TRAIN_SET_RATIO = 0.8
TEST_SET_RATIO = 1 - TRAIN_SET_RATIO

In [89]:
N_FRAMES = 10
FRAME_SHAPE = (224, 224)

In [90]:
BATCH_SIZE = 16

In [5]:
# Extracting all files
utils.unzip_files(ZIP_FILES_PATH, DATASET_NAME)

Creating directory: CricShot10
Unzipping all the files: 
Unzipping: cover-20250328T152434Z-001.zip
Unzipping: defense-20250328T152432Z-001.zip
Unzipping: flick-20250328T152430Z-001.zip
Unzipping: hook-20250328T152430Z-001.zip
Unzipping: late_cut-20250328T152307Z-001.zip
Unzipping: lofted-20250328T152550Z-001.zip
Unzipping: pull-20250328T152305Z-001.zip
Unzipping: square_cut-20250328T152247Z-001.zip
Unzipping: straight-20250328T152224Z-001.zip
Unzipping: sweep-20250328T152202Z-001.zip
All files unzipped to: `C:\Users\Vaibhav Rastogi\Documents\projects\CricShot10`


In [91]:
samples_per_class = min(
    (
        len(list(path.glob("*.avi")))
        for path in Path(DATASET_ROOT_PATH).iterdir()
        if path.is_dir()
    )
)
print("Minimum Number of samples per class taken:", samples_per_class)

Minimum Number of samples per class taken: 179


In [None]:
# Setup the directory structure with defined train and test directories
utils.setup_dataset_structure(
    from_dir=DATASET_ROOT_PATH,
    to_dir=TO_DIR,
    train_ratio=TRAIN_SET_RATIO,
    samples_per_class=samples_per_class,
)

In [92]:
# Define train and test directories
root_dir = Path(TO_DIR)
train_dir = root_dir / "train"
test_dir = root_dir / "test"

In [93]:
train_paths = list(train_dir.glob("*/*.avi"))
test_paths = list(test_dir.glob("*/*.avi"))
print("No. of video files in training set:", len(train_paths))
print("No. of video files in testing set:", len(test_paths))

No. of video files in training set: 1430
No. of video files in testing set: 360


In [94]:
# Get all the classes in the dataset
class_names, classes_to_idx = utils.get_classes(train_dir)
idx_to_classes = {idx: class_name for class_name, idx in classes_to_idx.items()}
print("All Classes in dataset:", class_names)

All Classes in dataset: ['cover', 'defense', 'flick', 'hook', 'late_cut', 'lofted', 'pull', 'square_cut', 'straight', 'sweep']


In [95]:
class CricShot10(torch.utils.data.Dataset):
    def __init__(
        self,
        target_dir: str,
        transform=None,
        n_frames: int = 16,
        target_size: Tuple[int, int] = (224, 224),
    ):
        self.target_dir = target_dir
        self.paths = list(target_dir.glob("*/*.avi"))

        self.transform = transform
        self.n_frames = n_frames
        self.target_size = target_size
        self.class_names, self.class_to_idx = utils.get_classes(target_dir)

    def __len__(self):
        return len(self.paths)

    def load_video(self, idx: int) -> torch.Tensor:
        video_path = str(self.paths[idx])

        cap = cv2.VideoCapture(video_path)

        frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)

        indices = np.linspace(0, frame_count - 1, self.n_frames, dtype="int")

        frames = []
        for idx in indices:
            cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
            ret, frame = cap.read()

            if not ret:
                last_frame = (
                    frames[-1]
                    if frames
                    else np.zeros(
                        (self.target_size[0], self.target_size[1], 3), dtype=np.uint8
                    )
                )
                frames.append(last_frame)
                continue

            frame = cv2.resize(frame, self.target_size)
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(frame)

        cap.release()

        if len(frames) < self.n_frames:
            # Pad with copies of the last frame
            last_frame = (
                frames[-1]
                if frames
                else np.zeros(
                    (self.target_size[0], self.target_size[1], 3), dtype=np.uint8
                )
            )
            frames.extend([last_frame] * (self.n_frames - len(frames)))

        # Pytorch requires (C, D, H, W)
        frames = np.transpose(np.array(frames), (3, 0, 2, 1))
        return frames

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]:
        video = self.load_video(idx)
        class_name = "_".join(self.paths[idx].name.split("_")[:-1])
        class_idx = self.class_to_idx[class_name]

        if self.transform:
            return self.transform(video), class_idx
        return video, class_idx

In [96]:
# train_transform = transforms.Compose(
#     [
#         transforms.ToTensor(),
#         transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
#     ]
# )

# test_transform = transforms.Compose(
#     [
#         transforms.ToTensor(),
#         transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
#     ]
# )

In [97]:
train_dataset = CricShot10(
    train_dir,
    n_frames=N_FRAMES,
    target_size=FRAME_SHAPE,
    # transform=test_transform
)

test_dataset = CricShot10(
    test_dir,
    n_frames=N_FRAMES,
    target_size=FRAME_SHAPE,
)

In [98]:
# Create Dataloaders
# Shape: [10, 3, 16, 224, 224]
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=BATCH_SIZE)
test_dataloader = DataLoader(test_dataset, shuffle=False, batch_size=BATCH_SIZE)

In [101]:
class Conv2Plus1D(nn.Module):
    def __init__(
        self,
        in_channels: int = 3,
        out_channels: int = 16,
        kernel_size: Tuple[int, int, int] = (3, 7, 7),
        # padding
    ):
        super.__init__()
        self.block = nn.Sequential(
            # Spatial decomposition
            nn.Conv3d(
                in_channels=in_channels,
                out_channels=out_channels,
                kernel_size=(1, kernel_size[1], kernel_size[2]),
                padding="same",
            ),
            # Temporal decomposition
            nn.Conv3d(
                in_channels=out_channels,
                out_channels=out_channels,
                kernel_size=(kernel_size[0], 1, 1),
                padding="same",
            ),
        )

    def forward(self, x):
        return self.block(x)


class ResidualMain(nn.Module):
    def __init__(
        self,
        in_channels: int = 3,
        out_channels: int = 16,
        kernel_size: Tuple[int, int, int] = (3, 7, 7),
    ):
        super.__init__()
        self.block = nn.Sequential(
            Conv2Plus1D(
                in_channels=in_channels,
                out_channels=out_channels,
                kernel_size=kernel_size,
                padding="same",
            ),
            nn.LayerNorm(),
            nn.ReLU(),
            Conv2Plus1D(
                in_channels=in_channels,
                out_channels=out_channels,
                kernel_size=kernel_size,
                padding="same",
            ),
            nn.LayerNorm(),
        )

    def forward(self, x):
        return self.block(x)


class Project(nn.Module):
    def __init__(self, in_features: int, out_features: int):
        super.__init__()
        self.block = nn.Sequential(
            nn.Linear(in_features=in_features, out_features=out_features),
            nn.LayerNorm(),
        )

    def forward(self, x):
        return self.block(x)

In [100]:
class Model(nn.Module):
    def __init__(
        self, in_channels: int = 3, kernel_size: Tuple[int, int, int] = (3, 7, 7)
    ):
        super.__init__()