In [1]:
import os
import shutil
from sklearn.model_selection import train_test_split

# Base directory containing the video folders
base_dir = '/kaggle/input/cric-shot-2-0/cricshot-cls-mp4'
base_dir = '/kaggle/input/cric-shot-yolo-pose-annotated/cricshot-yolo-ano'

output_dir = '/kaggle/working/cricshot-split'

# Create output directories for train, val, and test sets
splits = ['train', 'val', 'test']
for split in splits:
    os.makedirs(os.path.join(output_dir, split), exist_ok=True)

# Define the split ratios
train_ratio = 0.7
val_ratio = 0.2
test_ratio = 0.1

# Function to split files into train, val, and test
def split_data(class_dir, files, train_ratio, val_ratio):
    train_files, temp_files = train_test_split(files, test_size=(1 - train_ratio))
    val_files, test_files = train_test_split(temp_files, test_size=(test_ratio / (val_ratio + test_ratio)))
    return train_files, val_files, test_files

# Traverse through each class folder
for class_name in os.listdir(base_dir):
    class_path = os.path.join(base_dir, class_name)
    if os.path.isdir(class_path):
        files = [f for f in os.listdir(class_path) if f.endswith('.mp4')]
        
        # Split the files
        train_files, val_files, test_files = split_data(class_path, files, train_ratio, val_ratio)

        # Copy files to corresponding folders
        for split, split_files in zip(splits, [train_files, val_files, test_files]):
            split_class_dir = os.path.join(output_dir, split, class_name)
            os.makedirs(split_class_dir, exist_ok=True)
            for file in split_files:
                src = os.path.join(class_path, file)
                dst = os.path.join(split_class_dir, file)
                shutil.copyfile(src, dst)

print("Dataset successfully split into train, val, and test sets.")


Dataset successfully split into train, val, and test sets.


In [2]:
import os

# Update this path to point to your local cricshot-split directory
split_dir = output_dir

# Function to count files in each folder
def count_files_in_folders(base_dir):
    folder_counts = {}
    for split in os.listdir(base_dir):
        split_path = os.path.join(base_dir, split)
        if os.path.isdir(split_path):
            folder_counts[split] = {}
            for class_name in os.listdir(split_path):
                class_path = os.path.join(split_path, class_name)
                if os.path.isdir(class_path):
                    num_files = len([f for f in os.listdir(class_path) if os.path.isfile(os.path.join(class_path, f))])
                    folder_counts[split][class_name] = num_files
    return folder_counts

# Count the number of files in each folder
file_counts = count_files_in_folders(split_dir)

# Print the results
for split, classes in file_counts.items():
    print(f"{split}:")
    for class_name, num_files in classes.items():
        print(f"  {class_name}: {num_files} files")


train:
  square_cut: 139 files
  lofted: 138 files
  straight: 135 files
  pull: 125 files
  cover: 131 files
  late_cut: 127 files
  sweep: 135 files
  flick: 126 files
  defense: 134 files
  hook: 126 files
val:
  square_cut: 40 files
  lofted: 40 files
  straight: 38 files
  pull: 36 files
  cover: 38 files
  late_cut: 36 files
  sweep: 39 files
  flick: 36 files
  defense: 38 files
  hook: 36 files
test:
  square_cut: 21 files
  lofted: 20 files
  straight: 20 files
  pull: 18 files
  cover: 19 files
  late_cut: 19 files
  sweep: 20 files
  flick: 19 files
  defense: 20 files
  hook: 19 files


In [3]:
import warnings

# Suppress all warnings
warnings.filterwarnings("ignore")


In [4]:
!pip install -q ultralytics

In [15]:
import cv2
import torch
import torch.nn as nn
from torch import optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import os

# Custom Dataset Class
class VideoDataset(Dataset):
    def __init__(self, video_dir, transform=None, max_frames=16):
        self.video_dir = video_dir
        self.transform = transform
        self.classes = sorted(os.listdir(video_dir))
        self.video_files = []
        self.max_frames = max_frames
        for class_name in self.classes:
            class_dir = os.path.join(video_dir, class_name)
            for video in os.listdir(class_dir):
                if video.endswith('.mp4'):
                    self.video_files.append((os.path.join(class_dir, video), class_name))

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_path, label = self.video_files[idx]
        frames = self.load_video(video_path)
        label_idx = self.classes.index(label)
        
        if self.transform:
            frames = [self.transform(frame.float()) for frame in frames]  # Convert to float

        # Stack frames to create a tensor of shape (C, T, H, W)
        video_tensor = torch.stack(frames, dim=1)
        
        return video_tensor, label_idx
    
    def load_video(self, video_path):
        cap = cv2.VideoCapture(video_path)
        
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        interval = max(1, frame_count // self.max_frames)  # Ensure interval is at least 1
        frames = []
        
        for i in range(self.max_frames):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i * interval)
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = torch.from_numpy(frame).permute(2, 0, 1)  # Convert to (C, H, W)
            frames.append(frame)
            
        cap.release()
        return frames

# Transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.Normalize((0.5,), (0.5,))
])

# video_path = os.path.join(category_path, video_file)
#             cap = cv2.VideoCapture(video_path)
#             frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
#             interval = max(1, frame_count // num_frames)  # Ensure interval is at least 1

#             for i in range(num_frames):
#                 cap.set(cv2.CAP_PROP_POS_FRAMES, i * interval)
#                 ret, frame = cap.read()
#                 if not ret:
#                     break
#                 resized_frame = cv2.resize(frame, img_size)
#                 frame_filename = os.path.join(output_category_path, f"{os.path.splitext(video_file)[0]}_frame_{i}.jpg")
#                 cv2.imwrite(frame_filename, resized_frame)
#             cap.release()

In [16]:
# Paths to train, val, test directories
train_dir = output_dir+'/train'
val_dir = output_dir+'/val'
test_dir = output_dir+'/test'

# Datasets and DataLoaders
train_dataset = VideoDataset(train_dir, transform=transform)
val_dataset = VideoDataset(val_dir, transform=transform)
test_dataset = VideoDataset(test_dir, transform=transform)

batch_size = 2

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)


In [17]:
len(train_dataset)

1316

In [18]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torch.nn.functional as F
import torchvision.models as models
import os
import cv2
import numpy as np
from tqdm import tqdm

# Block for r3d_34
class BasicBlock3D(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super(BasicBlock3D, self).__init__()
        self.conv1 = nn.Conv3d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm3d(planes)
        self.conv2 = nn.Conv3d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm3d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class Bottleneck3D(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super(Bottleneck3D, self).__init__()
        self.conv1 = nn.Conv3d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm3d(planes)
        self.conv2 = nn.Conv3d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm3d(planes)
        self.conv3 = nn.Conv3d(planes, planes * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm3d(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

class ResNet3D(nn.Module):
    def __init__(self, block, layers, num_classes=400):
        super(ResNet3D, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv3d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm3d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool3d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_planes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv3d(self.in_planes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm3d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.in_planes, planes, stride, downsample))
        self.in_planes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_planes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

def r3d_34(num_classes=400):
    return ResNet3D(BasicBlock3D, [3, 4, 6, 3], num_classes)

def r3d_50(num_classes=400):
    return ResNet3D(Bottleneck3D, [3, 4, 6, 3], num_classes)

# Load the custom ResNet-3D models
num_classes = 10  # Set the number of classes according to your dataset
model_34 = r3d_34(num_classes=num_classes)
model_50 = r3d_50(num_classes=num_classes)


In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models.video import r3d_18
import copy

# Load the pre-trained ResNet-3D model
model = r3d_18(pretrained=False)

# Modify the final layer for your number of classes
num_classes = len(train_dataset.classes)  # Number of classes in your dataset
model.fc = nn.Linear(model.fc.in_features, num_classes)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()  # For multi-class classification
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Training, validation, and testing
num_epochs = 30
best_val_loss = float('inf')
best_accuracy = 0.0
best_model_wts = 
prnt_aftr = 100

no_imp = 0
epoch_limit = 7

for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}/{num_epochs}')
    print('-' * 50)

    # Training phase
    model.train()
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.float().to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        if i % prnt_aftr == 0:
            print(f'[Training] Iteration {i}/{len(train_loader)} Loss: {loss.item():.4f}')

    epoch_train_loss = running_loss / len(train_loader)
    print(f'Training Loss: {epoch_train_loss:.4f}')

    # Validation phase
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for i, (inputs, labels) in enumerate(val_loader):
            inputs, labels = inputs.float().to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            if i % prnt_aftr == 0:
                print(f'[Validation] Iteration {i}/{len(val_loader)} Loss: {loss.item():.4f}')

    epoch_val_loss = val_loss / len(val_loader)
    print(f'Validation Loss: {epoch_val_loss:.4f}')
    val_accuracy = 100 * correct / total
    print(f'Validation Accuracy: {val_accuracy}%')

#     # Save the best model
#     if epoch_val_loss < best_val_loss:
#         best_val_loss = epoch_val_loss
#         best_model_wts = copy.deepcopy(model.state_dict())
#         torch.save(model.state_dict(), f'best_model_epoch_{epoch+1}.pth')
#         print(f'Saved best model for epoch {epoch+1}')
    if no_imp > epoch_limit:
        print(f"No improvement for {no_imp} epoch, so terminating")
        break
    no_imp+=1
    # Save the best model
    if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        best_model_wts = copy.deepcopy(model.state_dict())
        no_imp = 0
        torch.save(model.state_dict(), f'best_model_epoch_{epoch+1}.pth')
        print(f'Saved best model for epoch {epoch+1}')
        print(f"New best model saved with accuracy: {best_accuracy}%")


Epoch 1/30
--------------------------------------------------


RuntimeError: CUDA error: CUBLAS_STATUS_ALLOC_FAILED when calling `cublasCreate(handle)`

In [None]:
import torch
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

# Load best model weights
model.load_state_dict(best_model_wts)

# Testing phase
model.eval()
test_loss = 0.0
correct = 0
total = 0
all_labels = []
all_preds = []
incorrect_count = 0  # To count incorrect predictions

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.float().to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())
        
        # Count incorrect predictions
        incorrect_count += (predicted != labels).sum().item()

test_loss /= len(test_loader)
accuracy = 100 * correct / total

print(f"Test Accuracy: {accuracy:.2f}%")
print(f"Incorrect Predictions: {incorrect_count} out of {total}")
print(f"Test Loss: {test_loss:.4f}")

# Convert to numpy arrays
all_labels = np.array(all_labels)
all_preds = np.array(all_preds)

# Save accuracy and loss to a text file
with open("test_results.txt", "w") as f:
    f.write(f'Test Loss: {test_loss:.4f}\n')
    f.write(f'Test Accuracy: {accuracy:.2f}%\n')
    f.write(f'Incorrect Predictions: {incorrect_count} out of {total}\n')

# Classification report
report = classification_report(all_labels, all_preds, target_names=train_dataset.classes)
print("Classification Report:\n", report)  # Display before saving
with open("classification_report.txt", "w") as f:
    f.write("Classification Report:\n")
    f.write(report)

print(report)

# Confusion matrix
conf_matrix = confusion_matrix(all_labels, all_preds, labels=list(range(num_classes)))

# Save confusion matrix as a CSV file
conf_matrix_df = pd.DataFrame(conf_matrix, index=train_dataset.classes, columns=train_dataset.classes)
conf_matrix_df.to_csv("confusion_matrix.csv")

# Plot and save the confusion matrix as an image
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=train_dataset.classes, yticklabels=train_dataset.classes)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.savefig("confusion_matrix.png")
plt.show()


In [None]:
len(all_labels),len(all_preds)