In [24]:
import cv2
import os
import shutil

def extract_frames(video_path, output_dir, fps=5):
    """
    Extract frames from a video and save them as images.
    Args:
    - video_path (str): Path to the video file.
    - output_dir (str): Directory to save the frames.
    - fps (int): Number of frames per second to extract.
    """
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    os.makedirs(output_dir) 
    
    cap = cv2.VideoCapture(video_path)
    video_fps = cap.get(cv2.CAP_PROP_FPS)
    interval = int(video_fps / fps)
    
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if frame_count % interval == 0:
            frame_filename = os.path.join(output_dir, f"frame_{frame_count}.jpg")
            cv2.imwrite(frame_filename, frame)
        frame_count += 1
    cap.release()


In [25]:
def find_videos_and_run_extract_frames(base_path, label_map):
    for class_label, class_name in label_map.items():
        class_folder = f'{base_path}/{class_name}'
        
        if not os.path.exists(class_folder):
            print(f"Folder does not exist: {class_folder}\n")  # Print if the folder doesn't exist
            continue
            
        # Loop through each subfolder
        subfolders = os.listdir(class_folder)
        
        if '.DS_Store' in subfolders:
            subfolders.remove('.DS_Store')

        for subfolder in subfolders:
            # print(f'{class_folder}/{subfolder}')
            for image_file in os.listdir(f'{class_folder}/{subfolder}'):
                if '.avi' in f'{class_folder}/{subfolder}/{image_file}':
                    extract_frames(f'{class_folder}/{subfolder}/{image_file}', f'{class_folder}/{subfolder}/output_frames/', fps=9)

# Define your label map based on your class names
label_map = {
    0: "Diving-Side", 1: "Golf-Swing-Back", 2: "Golf-Swing-Front", 
    3: "Golf-Swing-Side", 4: "Kicking-Front", 5: "Kicking-Side",
    6: "Lifting", 7: "Riding-Horse", 8: "Run-Side", 
    9: "SkateBoarding-Front", 10: "Swing-Bench", 11: "Walk-Front"
}

# Load image data
print("extracting frames ---> ./ucf_sports_actions/ucf action")
find_videos_and_run_extract_frames("./ucf_sports_actions/ucf action", label_map)
print("extracted frames ---> ./ucf_sports_actions/ucf action\n")
print("extracting frames ---> ./ucf_sports_actions_test/ucf action")
find_videos_and_run_extract_frames("./ucf_sports_actions_test/ucf action", label_map)
print("extracted frames ---> ./ucf_sports_actions_test/ucf action\n")

extracting frames ---> ./ucf_sports_actions/ucf action
extracted frames ---> ./ucf_sports_actions/ucf action

extracting frames ---> ./ucf_sports_actions_test/ucf action
extracted frames ---> ./ucf_sports_actions_test/ucf action



In [27]:
import torch
import torch.nn as nn
import torchvision.models as models

class ActionRecognitionModel(nn.Module):
    def __init__(self, num_classes):
        super(ActionRecognitionModel, self).__init__()
        
        # Pretrained 2D CNN (ResNet) for feature extraction
        resnet = models.resnet50(pretrained=True)
        self.feature_extractor = nn.Sequential(*list(resnet.children())[:-2])
        
        # 3D Convolution Layer
        self.conv3d = nn.Conv3d(1, 64, kernel_size=(3, 3, 3), stride=1, padding=1)
        
        # LSTM for Temporal Dynamics
        self.lstm = nn.LSTM(2048, 512, batch_first=True)
        
        # Fully Connected Layer for classification
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        batch_size, c, h, w = x.size()  # Expecting 4 dimensions from DataLoader
        x = x.unsqueeze(1)  # Add time step dimension, making it (batch_size, time_steps, c, h, w)
        batch_size, time_steps, c, h, w = x.size()
        
        cnn_out = []
        
        # Apply CNN to each frame
        for t in range(time_steps):
            frame_features = self.feature_extractor(x[:, t, :, :, :])  # Output shape: (batch_size, 2048, H', W')
            
            # Apply adaptive average pooling to reduce (2048, H', W') to (2048, 1, 1)
            frame_features = torch.nn.functional.adaptive_avg_pool2d(frame_features, (1, 1))
            
            # Flatten to get (batch_size, 2048)
            frame_features = frame_features.view(batch_size, 2048)
            cnn_out.append(frame_features)
        
        cnn_out = torch.stack(cnn_out, dim=1)  # Shape: (batch_size, time_steps, 2048)
        
        # LSTM for sequence processing
        lstm_out, _ = self.lstm(cnn_out)
        
        # Classification layer
        out = self.fc(lstm_out[:, -1, :])  # Take the output from the last time step
        return out

# Instantiate the model
model = ActionRecognitionModel(num_classes=13)


In [28]:
from torch.optim import Adam
from torch.utils.data import random_split, DataLoader
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
import torch.nn.functional as F
from PIL import Image
# Check if GPU is available
device = torch.device("mps" if torch.backends.mps.is_available() else "cuda" if torch.cuda.is_available() else "cpu")
print(f'device ===========> {device}')
# Move the model to the appropriate device

# Custom function to pad images to a target size
def pad_tensor(image, target_size):
    # Convert image to tensor first if not already done
    tensor = transforms.ToTensor()(image)
    
    # Padding: pad (width, height) to match the target size
    padded_tensor = F.pad(tensor, 
                            (0, target_size[2] - tensor.size(2),  # pad width
                            0, target_size[1] - tensor.size(1)))  # pad height
    return padded_tensor

# Custom transform to resize or pad images to the target size
class ResizeOrPadTransform:
    def __init__(self, target_size):
        self.target_size = target_size
    
    def __call__(self, image):
        # Pad image to target size
        return pad_tensor(image, self.target_size)

# Set your desired target size (C, H, W) - example target size (3, 404, 720)
target_size = (3, 404, 720)

# Use the custom transformation pipeline
transform = transforms.Compose([
    ResizeOrPadTransform(target_size)  # Apply padding to match target size
])
# Prepare data
train_dataset = ImageFolder('./ucf_sports_actions/ucf action/', transform=transform)
# Now you can create DataLoaders for both datasets
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
# Print dataset sizes
print(f"Training Dataset Size: {len(train_dataset)}")

Training Dataset Size: 8185


In [18]:
print(f'device ===========> {device}')
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.0001)

# Training loop
num_epochs = 10
trained_model = model.to(device)
trained_model.train()

for epoch in range(num_epochs):
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Forward pass
        outputs = trained_model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")


Epoch [1/10], Loss: 0.0045
Epoch [2/10], Loss: 0.0023
Epoch [3/10], Loss: 0.0018
Epoch [4/10], Loss: 0.0021
Epoch [5/10], Loss: 0.0008
Epoch [6/10], Loss: 0.0004
Epoch [7/10], Loss: 0.0005
Epoch [8/10], Loss: 0.0023
Epoch [9/10], Loss: 0.0005
Epoch [10/10], Loss: 0.0012


In [29]:
test_dataset = ImageFolder('./ucf_sports_actions_test/ucf action/', transform=transform)
print(f"Test Dataset Size: {len(test_dataset)}")
test_loader = DataLoader(test_dataset, batch_size=16)

Test Dataset Size: 1456


In [31]:
def calculate_accuracy(model, data_loader, device):
    """
    Calculate the accuracy of a model on a given dataset (data_loader).
    
    Args:
    - model: The PyTorch model to evaluate.
    - data_loader: The DataLoader containing the dataset.
    - device: The device to perform calculations on (CPU or GPU).
    
    Returns:
    - accuracy (float): The accuracy of the model on the given dataset.
    """
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    
    with torch.no_grad():  # Disable gradient calculations
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)  # Get the index of the max log-probability
            
            total += labels.size(0)  # Increment total count
            correct += (predicted == labels).sum().item()  # Increment correct count
    
    accuracy = 100 * correct / total  # Calculate accuracy as percentage
    return accuracy

test_accuracy = calculate_accuracy(trained_model, test_loader, device)
print(f"Test Accuracy: {test_accuracy:.2f}%")

Test Accuracy: 81.11%


In [32]:
from sklearn.metrics import precision_score

def calculate_precision(model, data_loader, device):
    """
    Calculate the precision of a model on a given dataset (data_loader).
    
    Args:
    - model: The PyTorch model to evaluate.
    - data_loader: The DataLoader containing the dataset.
    - device: The device to perform calculations on (CPU or GPU).
    
    Returns:
    - precision (float): The precision of the model on the given dataset.
    """
    model.eval()  # Set the model to evaluation mode
    all_labels = []
    all_preds = []
    
    with torch.no_grad():  # Disable gradient calculations
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)  # Get the index of the max log-probability
            
            # Store labels and predictions
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
    
    # Calculate precision using sklearn's precision_score function
    precision = precision_score(all_labels, all_preds, average='weighted')  # 'weighted' handles class imbalance
    return precision
# Example usage after training or during evaluation
test_precision = calculate_precision(trained_model, test_loader, device)
print(f"Test Precision: {test_precision:.4f}")


Test Precision: 0.7547


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [33]:
from sklearn.metrics import recall_score

def calculate_recall(model, data_loader, device):
    """
    Calculate the recall of a model on a given dataset (data_loader).
    
    Args:
    - model: The PyTorch model to evaluate.
    - data_loader: The DataLoader containing the dataset.
    - device: The device to perform calculations on (CPU or GPU).
    
    Returns:
    - recall (float): The recall of the model on the given dataset.
    """
    model.eval()  # Set the model to evaluation mode
    all_labels = []
    all_preds = []
    
    with torch.no_grad():  # Disable gradient calculations
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)  # Get the index of the max log-probability
            
            # Store labels and predictions
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
    
    # Calculate recall using sklearn's recall_score function
    recall = recall_score(all_labels, all_preds, average='weighted')  # 'weighted' handles class imbalance
    return recall
# Example usage after training or during evaluation
test_recall = calculate_recall(trained_model, test_loader, device)
print(f"Test Recall: {test_recall:.4f}")

Test Recall: 0.8111


In [34]:
from sklearn.metrics import f1_score

def calculate_f1(model, data_loader, device):
    """
    Calculate the F1 score of a model on a given dataset (data_loader).
    
    Args:
    - model: The PyTorch model to evaluate.
    - data_loader: The DataLoader containing the dataset.
    - device: The device to perform calculations on (CPU or GPU).
    
    Returns:
    - f1 (float): The F1 score of the model on the given dataset.
    """
    model.eval()  # Set the model to evaluation mode
    all_labels = []
    all_preds = []
    
    with torch.no_grad():  # Disable gradient calculations
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)  # Get the index of the max log-probability
            
            # Store labels and predictions
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
    
    # Calculate F1 score using sklearn's f1_score function
    f1 = f1_score(all_labels, all_preds, average='weighted')  # 'weighted' handles class imbalance
    return f1
# Example usage after training or during evaluation
test_f1 = calculate_f1(trained_model, test_loader, device)
print(f"Test F1 Score: {test_f1:.4f}")

Test F1 Score: 0.7779
