##### Data_utils_file:

In [None]:
%%writefile data_utils.py


import cv2
import os
import pickle
import matplotlib.pyplot as plt
from typing import List
import torch
from torch.utils.data import Dataset
import numpy as np
from PIL import Image
import random
from torchvision.transforms import (Compose, Resize, CenterCrop, ToTensor, Normalize, 
                                    RandomAffine, ColorJitter, RandomHorizontalFlip, 
                                    RandomVerticalFlip, RandomRotation, RandomPerspective, 
                                    GaussianBlur, RandomGrayscale, RandomErasing)
from torchvision import  transforms

from torchvision.transforms import Compose, RandomAffine, ColorJitter, RandomRotation, RandomPerspective, GaussianBlur, RandomAdjustSharpness, RandomPosterize



class BoxInfo:
    def __init__(self,line):
        words=line.split()
        self.category = words.pop()
        words = [int(string) for string in words]
        self.player_ID = words[0]
        del words[0]
        x1, y1, x2, y2, frame_ID, lost, grouping, generated = words
        self.box = x1, y1, x2, y2
        self.frame_ID = frame_ID
        self.lost = lost
        self.grouping = grouping
        self.generated = generated




def load_tracking_annot(path):
    with open(path, 'r') as file:
        player_boxes = {idx:[] for idx in range(12)}
        frame_boxes_dct = {}

        for idx, line in enumerate(file):
            box_info = BoxInfo(line)
            if box_info.player_ID > 11:
                continue
            player_boxes[box_info.player_ID].append(box_info)

        # let's create view from frame to boxes
        for player_ID, boxes_info in player_boxes.items():
            # let's keep the middle 9 frames only (enough for this task empirically)
            boxes_info = boxes_info[5:]
            boxes_info = boxes_info[:-6]
            boxes_info = boxes_info[5:6]
    

            
            for box_info in boxes_info:
                if box_info.frame_ID not in frame_boxes_dct:
                    frame_boxes_dct[box_info.frame_ID] = []
                    
                frame_boxes_dct[box_info.frame_ID].append(box_info)

        return frame_boxes_dct



def vis_clip(annot_path, video_dir):
    frame_boxes_dct = load_tracking_annot(annot_path)
    font = cv2.FONT_HERSHEY_SIMPLEX

    for frame_id, boxes_info in frame_boxes_dct.items():
        img_path = os.path.join(video_dir, f'{frame_id}.jpg')
        print(img_path)
        image = cv2.imread(img_path)

        if image is None:
            print(f"Image not found: {img_path}")
            continue

        for box_info in boxes_info:
            x1, y1, x2, y2 = box_info.box
            cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(image, box_info.category, (x1, y1 - 10), font, 0.5, (0, 255, 0), 2)

        # Display the frame inline
        plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        plt.axis('off')
        plt.show()


def load_video_annot(video_annot):
       with open(video_annot, 'r') as file:
        clip_category_dct = {}
        for line in file:
            items = line.strip().split(' ')[:2]
            clip_dir = items[0].replace('.jpg', '')
            clip_category_dct[clip_dir] = items[1]

        return clip_category_dct


def load_volleyball_dataset(videos_root, annot_root):
    videos_dirs = os.listdir(videos_root)
    videos_dirs.sort()
    videos_annot = {}

    for idx, video_dir in enumerate(videos_dirs):
        video_dir_path = os.path.join(videos_root, video_dir)
        if not os.path.isdir(video_dir_path):
            continue

        print(f'{idx+1}/{len(videos_dirs)} - Processing Dir {video_dir_path}')
        video_annot = os.path.join(video_dir_path, 'annotations.txt')
        clip_category_dct = load_video_annot(video_annot)

        clips_dir = os.listdir(video_dir_path)
        clips_dir.sort()
        clip_annot = {}

        for clip_dir in clips_dir:
            clip_dir_path = os.path.join(video_dir_path, clip_dir)
            if not os.path.isdir(clip_dir_path):
                continue

            
            assert clip_dir in clip_category_dct
            annot_file = os.path.join(annot_root, video_dir, clip_dir, f'{clip_dir}.txt')
            frame_boxes_dct = load_tracking_annot(annot_file)

            clip_annot[clip_dir] = {
                'category': clip_category_dct[clip_dir],
                'frame_boxes_dct': frame_boxes_dct
            }


        videos_annot[video_dir] = clip_annot

    return videos_annot





def create_class_mapping(dataset):

    # Collect all the unique classes
    classes = set(clip['category'] for video_index in dataset.keys() 
                                  for clip_id, clip in dataset[video_index].items())
    
    # Sort classes to ensure deterministic ordering
    classes = sorted(classes)
    
    # Create the class-to-index mapping
    class_to_index = {clip_class: idx for idx, clip_class in enumerate(classes)}
    
    return class_to_index



class Volleyball_training_dataset_baseline_1(Dataset):
    def __init__(self, dataset, videos_path, videos_indices):
        
        self.videos_path = videos_path
        
        # Filter the dataset to only include the specified video indices.
        self.dataset = {key: value for key, value in dataset.items() if key in videos_indices}
        
        # Generate or use the provided class mapping.
   
        self.class_to_index = create_class_mapping(self.dataset)
       
       
        
   
        self.pre_transform = Compose([
            Resize((256, 256)),
            CenterCrop((224, 224))
        ])
      

        
        self.post_transform = Compose([
            ToTensor(),
            Normalize(mean=[0.485, 0.456, 0.406],
                      std=[0.229, 0.224, 0.225])
        ])


        
     

        self.augmentation_transforms = [
            # Pipeline 1: Moderate affine transform with color jitter.
            Compose([
                RandomAffine(degrees=10, translate=(0.0, 0.1)),
                ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05)
            ]),
            # Pipeline 2: Rotation with perspective distortion and a mild blur.
            Compose([
                RandomRotation(degrees=10),
                RandomPerspective(distortion_scale=0.3, p=0.5),
                GaussianBlur(kernel_size=3)
            ]),
            # Pipeline 3: Increased rotation angle with color jitter and a sharpness adjustment.
            Compose([
                RandomRotation(degrees=20),
                ColorJitter(brightness=0.25, contrast=0.25, saturation=0.25, hue=0.1),
                RandomAdjustSharpness(sharpness_factor=2, p=0.5)
            ]),
            # Pipeline 4: Posterization with color jitter for a different style.
            Compose([
                RandomPosterize(bits=4),
                ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.05)
            ])
        
             ,             # Pipeline 5: Grayscale conversion
            Compose([
                RandomGrayscale(p=0.2),
            ])
            ]


      
        self.samples = [] 
        for video_index, clips in self.dataset.items():
            for clip_id, clip_data in clips.items():
                # Get the label as an integer.
                label = self.class_to_index[clip_data['category']]
                # For every frame in the clip...
                for frame_id in clip_data['frame_boxes_dct'].keys():
                    frame_path = os.path.join(self.videos_path, video_index, clip_id, f"{frame_id}.jpg")
                    try:
                        image = Image.open(frame_path).convert("RGB")
                    except Exception as e:
                        print(f"Error loading image {frame_path}: {e}")
                        continue
                    
                    # First, apply the pre_transform (e.g., Resize, CenterCrop) to obtain a consistent PIL image.
                    pre_image = self.pre_transform(image)
                    # Compute the original sample.
                    original_tensor = self.post_transform(pre_image)
                    self.samples.append((original_tensor, label))

                    
                    
                    for aug in self.augmentation_transforms:
                        for _ in range(2):
                            aug_image = aug(pre_image)          
                            aug_tensor = self.post_transform(aug_image)  
                            self.samples.append((aug_tensor, label))
                        
        print(f"Training Dataset contains {len(self.samples)} samples.")
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        return self.samples[idx]





def get_val_transform():
    preprocess = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.CenterCrop((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
    return preprocess


class Volleyball_testing_dataset_baseline_1(Dataset):
    def __init__(self, dataset, videos_path, videos_indices, transform=None, class_to_index=None):
        self.dataset = dataset
        self.videos_path = videos_path
        self.videos_indices = videos_indices
        self.transform = transform
        
        # Filter dataset to only include the specified video indices.
        self.dataset = {key: value for key, value in self.dataset.items() if key in self.videos_indices}
        
        # Generate or use the provided class mapping.
        if class_to_index is None:
            self.class_to_index = create_class_mapping(self.dataset)
        else:
            self.class_to_index = class_to_index

        self.indices = []
        for video_index in self.dataset.keys():
            for clip_id in self.dataset[video_index].keys():
                frame_ids = list(self.dataset[video_index][clip_id]['frame_boxes_dct'].keys())
                try:
                    # Try converting frame_ids to integers for proper numeric sorting.
                    frame_ids = sorted(frame_ids, key=lambda x: int(x))
                except ValueError:
                    # If conversion fails, sort as strings.
                    frame_ids = sorted(frame_ids)
                
                if frame_ids:
                    # Calculate the middle index. For an even number of frames, this chooses the lower middle.
                    middle_index = len(frame_ids) // 2
                    middle_frame = frame_ids[middle_index]
                    # Append the tuple (video_index, clip_id, middle_frame)
                    self.indices.append((video_index, clip_id, middle_frame))
     
        print(f"Class mapping for testing: {self.class_to_index}")
            
    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        video_index, clip_id, frame_id = self.indices[idx]
        
        # Get the class of the clip (label) and map it to an integer index.
        clip_class = self.dataset[video_index][clip_id]['category']
        clip_class_idx = self.class_to_index[clip_class]
        clip_class_idx = torch.tensor(clip_class_idx, dtype=torch.long)
        
        # Construct the path to the frame image.
        frame_path = os.path.join(self.videos_path, video_index, clip_id, f"{frame_id}.jpg")
        
        # Load the frame image and convert it to RGB for consistency.
        frame = Image.open(frame_path).convert("RGB")
        
        if self.transform:
            frame = self.transform(frame)
            
        return frame, clip_class_idx



# Model_utils_file :

In [None]:
%%writefile modeling_utils.py
import torch
import torch.nn as nn
from torchvision import models, transforms
import torch.optim as optim
import numpy
from sklearn.metrics import accuracy_score,confusion_matrix,classification_report
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns



def Get_device():
    return torch.device("cuda" if torch.cuda.is_available() else "cpu")




def Get_model(num_classes, device):
    model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)

    # Enable gradient updates for all layers
    for param in model.parameters():
        param.requires_grad = True

    for param in model.conv1.parameters():
        param.requires_grad = False
    for param in model.bn1.parameters():
        param.requires_grad = False

    # Modify the last fully connected layer for num_classes
    model.fc = nn.Sequential(
    nn.Dropout(p=0.5),  # Dropout with 50% probability
    nn.Linear(model.fc.in_features, num_classes)
    )

    # Move model to the specified device
    model = model.to(device)
    model.train()
    return model








def train(model, dataloader,val_loader,optimizer, criterion, device,class_to_index_dict,epochs=50):
    model.train()
    train_losses = []
    train_accuracies = []
    val_losses=[]
    val_accuracies=[]

    # Track the best validation accuracy and best model weights
    best_val_accuracy = 0.0
    

    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
       

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

                 
            
            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            # Calculate running loss
            running_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Record epoch statistics
        epoch_loss = running_loss / len(dataloader)
        epoch_accuracy = 100 * correct / total

        
        train_losses.append(epoch_loss)
        train_accuracies.append(epoch_accuracy)

        
    
        val_loss,val_accuracy=evaluate_model(model, val_loader,device,criterion,"Validation",list(class_to_index_dict.keys()))
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), "best_model.pt")
            print(f"Best model updated at epoch {epoch+1} with validation accuracy: {best_val_accuracy:.2f}%")
        

        
        # Print statistics for the current epoch
        print(f"Epoch {epoch+1}/{epochs}, Training_Loss: {epoch_loss:.4f}, Training_Accuracy:{epoch_accuracy:.2f}%")
        print(f"Epoch {epoch+1}/{epochs}, Validation_loss:{val_loss:.4f}, Validation_Accuracy:{val_accuracy:.2f}%")
    return  train_losses ,train_accuracies,val_losses,val_accuracies



    
def plot_losses_and_accuracies(train_losses, val_losses, train_accuracies, val_accuracies):
    epochs = range(1, len(train_losses) + 1)
    
    plt.figure(figsize=(12, 6))
    
   
    train_color = 'red'
    val_color = 'blue'
    

    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, label='Training Loss', color=train_color, marker='o')
    plt.plot(epochs, val_losses, label='Validation Loss', color=val_color, linestyle='dashed', marker='s')
    plt.xticks(epochs)  
    plt.title('Loss over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    
    
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accuracies, label='Training Accuracy', color=train_color, marker='o')
    plt.plot(epochs, val_accuracies, label='Validation Accuracy', color=val_color, linestyle='dashed', marker='s')
    plt.xticks(epochs)  
    plt.title('Accuracy over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    plt.grid(True)
    
    plt.tight_layout()
    plt.show()




def evaluate_model(model, data_loader, device, criterion,mode="Validation",class_names=None):
    all_preds = []
    all_labels = []
    total_loss = 0.0
    batch_count = 0
    
    model.eval()  # Set model to evaluation mode
    with torch.no_grad():  # Disable gradient computation
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)  # Forward pass
            
            # Compute loss for the batch
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            batch_count += 1
            
            # For multi-class classification, get the class with the highest probability
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            
            # Collect predictions and true labels
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())
    
    # Calculate average loss over all batches
    avg_loss = total_loss / batch_count if batch_count > 0 else 0.0

    # Convert lists to numpy arrays for metric computations
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)
    
    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    conf_matrix = confusion_matrix(all_labels, all_preds)
    
    # Print statistics
    if mode=="Testing":
         print(f"Loss: {avg_loss:.4f}")
         print(f"Accuracy: {accuracy * 100:.2f}%")
        
         if class_names:
            print("\nClassification Report:")
            print(classification_report(all_labels, all_preds, target_names=class_names))
            plt.figure(figsize=(10, 8))
            sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', 
                        xticklabels=class_names if class_names else np.unique(all_labels), 
                        yticklabels=class_names if class_names else np.unique(all_labels))
            plt.xlabel("Predicted Labels")
            plt.ylabel("True Labels")
            plt.title("Confusion Matrix")
            plt.show()

    model.train()  # Set the model back to training mode before returning
    return avg_loss, accuracy*100




# Main File
  - ### step 1: imort libraries
  - ### step 2: Loading Data
  - ### step 3: Training
  - ### step 4: Testing

# step 1: imorting libraries

In [None]:
import importlib
from data_utils import Volleyball_testing_dataset_baseline_1
from torch.utils.data import Dataset,DataLoader

%run data_utils.py
%run modeling_utils.py

import data_utils
import modeling_utils


importlib.reload(data_utils)
importlib.reload(modeling_utils)


# step 2: Loading Data

In [None]:
videos_path=f'/kaggle/input/volleyball/volleyball_/videos'

annots_path=f'/kaggle/input/volleyball/volleyball_tracking_annotation/volleyball_tracking_annotation'

dataset=load_volleyball_dataset(videos_path,annots_path)


In [None]:


train_ids= ["1", "3", "6", "7", "10", "13", "15", "16", "18", "22", "23", "31", "32", "36", "38", "39", "40", "41", "42", "48", "50", "52", "53", "54"]

Training_dataset=Volleyball_training_dataset_baseline_1(dataset,videos_path,train_ids)

train_loader = DataLoader(Training_dataset, batch_size=64, shuffle=True)





# Evaluate the model on validation_set
val_transform=get_val_transform()

val_ids = ["0", "2", "8", "12", "17", "19", "24", "26", "27", "28", "30", "33", "46", "49", "51"]

val_dataset=Volleyball_testing_dataset_baseline_1(dataset,videos_path,val_ids,val_transform,Training_dataset.class_to_index)

val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# step 3: Training


In [None]:
# Define the loss function (CrossEntropyLoss for classification)
device=Get_device()

model = Get_model(
    num_classes=8,
    device=device)


criterion = nn.CrossEntropyLoss(label_smoothing=0.1)


# Set up the AdamW optimizer
optimizer = optim.AdamW([
    {'params': model.layer1.parameters(), 'lr': 1e-4},
    {'params': model.layer2.parameters(), 'lr': 1e-4},
    {'params': model.layer3.parameters(), 'lr': 1e-4},
    {'params': model.layer4.parameters(), 'lr': 1e-4},
    {'params': model.fc.parameters(),    'lr': 1e-4}
], 
  
    weight_decay=1e-4  # Proper decoupled weight decay
)


train_loss,train_accuracy,val_loss,val_accuracy=train(model, train_loader, val_loader,optimizer, criterion, device,Training_dataset.class_to_index, epochs=35)

# Training vs Validation Curves 

In [None]:
plot_losses_and_accuracies(train_loss,val_loss,train_accuracy,val_accuracy)

# step 4:Testing 

In [None]:
model.load_state_dict(torch.load("best_model.pt"))

# Move the model to the correct device (if not already done in Get_model)
model.to(device)


test_ids = ["4","5","9","11","14","20","21", "25", "29", "34", "35", "37", "43", "44" ,"45" ,"47"]

test_dataset=Volleyball_testing_dataset_baseline_1(dataset,videos_path,test_ids,val_transform,Training_dataset.class_to_index)

test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print("Evaluating the model on the Testing set...")

evaluate_model(model, test_loader,device,criterion,"Testing",list(Training_dataset.class_to_index.keys()))