In [None]:
from collections import defaultdict, deque
import cv2
from IPython.display import Video
import matplotlib.pyplot as plt
import mediapipe as mp
from moviepy.editor import *
import numpy as np
import os
import pandas as pd
from pytube import YouTube
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve, auc
from sklearn.model_selection import train_test_split
import ssl
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models.video import mc3_18, MC3_18_Weights
from tqdm import tqdm
from ultralytics import YOLO
from ultralytics.utils.ops import non_max_suppression
from urllib.request import urlretrieve
from yt_dlp import YoutubeDL
import yt_dlp as youtube_dl


In [None]:
#Dataset download

# Define the URL of the UCF50 dataset.
DATA_URL = 'https://www.crcv.ucf.edu/data/UCF50.rar'

# Define the local directory path where the dataset will be downloaded.
DATA_PATH = 'workspace'

# Create the complete path to the directory where the UCF50 dataset will be stored after extraction.
UCF50_DATA_PATH = os.path.join(DATA_PATH, 'UCF50')

# Check if the directory specified by DATA_PATH already exists.
if os.path.exists(DATA_PATH):

    # If the directory exists, print a message indicating that the data is already available.
    print('[INFO] Data already exists.')

else:

    # If the directory specified by DATA_PATH does not exist, execute the following block.

    # Print a message indicating that the data is being downloaded.
    print('[INFO] Downloading data in the data directory.')

    # Create the DATA_PATH directory on the local file system.
    os.mkdir(DATA_PATH)

    # Create a default SSL context with an unverified SSL certificate to allow downloading data.
    ssl._create_default_https_context = ssl._create_unverified_context

    # Download the UCF50 dataset from the specified DATA_URL and save it as 'UCF50.rar' in the DATA_PATH directory.
    urlretrieve(url=DATA_URL, filename=os.path.join(DATA_PATH, 'UCF50.rar'))


In [None]:
#Dataset Extraction 

# Check if the directory specified by UCF50_DATA_PATH already exists.
if os.path.exists(UCF50_DATA_PATH):

    # If the directory exists, print a message indicating that the data is already available, and the extraction process is skipped.
    print('[INFO] UCF50 Data already exists, skipping extraction process.')

else:

    # If the directory specified by UCF50_DATA_PATH does not exist, execute the following block.

    # Print a message indicating that the data is being extracted to the UCF50_DATA_PATH directory.
    print(f'[INFO] Extracting data: "{UCF50_DATA_PATH}"')

    # Create a RarFile object 'r' to open and read the 'UCF50.rar' archive file.
    r = rarfile.RarFile('/workspace/Workspace/UCF50.rar')

    # Extract all files and directories from the 'UCF50.rar' archive to the DATA_PATH directory.
    r.extractall(DATA_PATH)

    # Close the RarFile object to release resources.
    r.close()

In [None]:
# Set random seed for reproducibility
torch.manual_seed(42)

# Define constants
SEQUENCE_LENGTH = 15  # Number of frames to use for each video
IMAGE_SIZE = 224  # I3D expects 224x224 input
SELECTED_CLASSES = ['Kayaking', 'Basketball', 'JumpRope']
#SELECTED_CLASSES = ['Kayaking', 'Basketball', 'JumpRope', 'Diving', 'HorseRace', 'PullUps','MilitaryParade']
DATASET_DIR = 'workspace/UCF50'
BATCH_SIZE = 8  # Adjust based on your GPU memory
NUM_EPOCHS = 5
PPE_CLASSES = ['Hardhat', 'Mask', 'NO-Hardhat', 'NO-Mask', 'NO-Safety Vest', 'Person', 'Safety Cone', 'Safety Vest', 'machinery', 'vehicle']

# Define image transformations
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.43216, 0.394666, 0.37645], std=[0.22803, 0.22145, 0.216989])  # Kinetics dataset mean and std
])


In [None]:
# Function to extract frames from a video
def extract_frames(video_path):
    frames = []
    video_reader = cv2.VideoCapture(video_path)
    frame_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    skip_frames_window = max(int(frame_count / SEQUENCE_LENGTH), 1)

    for _ in range(SEQUENCE_LENGTH):
        video_reader.set(cv2.CAP_PROP_POS_FRAMES, _ * skip_frames_window)
        success, frame = video_reader.read()
        if not success:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = transform(frame)
        frames.append(frame)

    video_reader.release()
    
    # If we don't have enough frames, we'll pad with zeros
    while len(frames) < SEQUENCE_LENGTH:
        frames.append(torch.zeros_like(frames[0]))
    
    return torch.stack(frames).permute(1, 0, 2, 3)  # [C, T, H, W]

# Custom Dataset class
class VideoDataset(Dataset):
    def __init__(self, video_paths, labels):
        self.video_paths = video_paths
        self.labels = labels

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        frames = extract_frames(self.video_paths[idx])
        label = self.labels[idx]
        return frames, label



In [None]:
# Function to load all video paths and labels
def load_dataset():
    video_paths = []
    labels = []
    for class_idx, class_name in enumerate(SELECTED_CLASSES):
        class_dir = os.path.join(DATASET_DIR, class_name)
        for video_name in os.listdir(class_dir):
            video_path = os.path.join(class_dir, video_name)
            video_paths.append(video_path)
            labels.append(class_idx)
    return video_paths, labels

In [None]:
# Function to draw graphs
def draw_graphs(train_losses, val_losses, train_accuracies, val_accuracies):
    epochs = range(1, len(train_losses) + 1)

    plt.figure(figsize=(12, 5))
    
    # Loss graph
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, 'b-', label='Training Loss')
    plt.plot(epochs, val_losses, 'r-', label='Validation Loss')
    plt.title('Total Loss vs Total Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    # Accuracy graph
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accuracies, 'b-', label='Training Accuracy')
    plt.plot(epochs, val_accuracies, 'r-', label='Validation Accuracy')
    plt.title('Model Total Accuracy vs Model Total Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig('training_progress.png')
    plt.close()


In [None]:
# Function to save preprocessed dataset
def save_preprocessed_dataset(data, labels, file_path):
    # Convert paths to strings if they're tensors
    if isinstance(data[0], torch.Tensor):
        data = [path.tolist() if isinstance(path, torch.Tensor) else path for path in data]
    with open(file_path, 'wb') as f:
        pickle.dump((data, labels), f)

# Function to load preprocessed dataset
def load_preprocessed_dataset(file_path):
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
    # Convert tensors to lists if necessary
    if isinstance(data[0][0], torch.Tensor):
        return ([path.tolist() if isinstance(path, torch.Tensor) else path for path in data[0]], data[1])
    return data
        
# Function to preprocess the entire dataset
def preprocess_dataset(video_paths, labels):
    preprocessed_data = []
    preprocessed_labels = []
    for video_path, label in tqdm(zip(video_paths, labels), desc="Preprocessing dataset", total=len(video_paths)):
        frames = extract_frames(video_path)
        preprocessed_data.append(frames)
        preprocessed_labels.append(label)
    return preprocessed_data, preprocessed_labels


In [None]:
print("Loading and preprocessing dataset...")

#classes changed based on the dataset selected above.
preprocessed_file = 'preprocessed_dataset_pytorch-3classes-mc3.pkl'
#preprocessed_file = 'preprocessed_dataset_pytorch-7classes-mc3.pkl'

if os.path.exists(preprocessed_file):
    video_paths, labels = load_preprocessed_dataset(preprocessed_file)
else:
    video_paths, labels = load_dataset()
    video_paths, labels = preprocess_dataset(video_paths, labels)
    save_preprocessed_dataset(video_paths, labels, preprocessed_file)

In [None]:
# Split dataset into train and test sets
train_paths, test_paths, train_labels, test_labels = train_test_split(
    video_paths, labels, test_size=0.2, random_state=42, stratify=labels
)

# Create Dataset and DataLoader objects
train_dataset = VideoDataset(train_paths, train_labels)
test_dataset = VideoDataset(test_paths, test_labels)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

In [None]:
# Calculate class weights
class_counts = np.bincount(labels)
class_weights = 1. / class_counts
weights = torch.tensor([class_weights[label] for label in labels], dtype=torch.float)
sampler = torch.utils.data.WeightedRandomSampler(weights, len(weights))

In [None]:
# Load pre-trained I3D model
model = mc3_18(weights=MC3_18_Weights.KINETICS400_V1)

# Modify the final classification layer for your number of classes
num_classes = len(SELECTED_CLASSES)
model.fc = nn.Linear(model.fc.in_features, num_classes)

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu" )
model = model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [None]:
# Initialize lists to store metrics
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

# Training loop
for epoch in range(NUM_EPOCHS):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for frames, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS}"):
        frames, labels = frames.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(frames)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    
    avg_loss = total_loss / len(train_loader)
    accuracy = 100. * correct / total
    train_losses.append(avg_loss)
    train_accuracies.append(accuracy)
    print(f"Epoch {epoch+1}/{NUM_EPOCHS}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%")

    # Validation
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for frames, labels in tqdm(test_loader, desc="Validation"):
            frames, labels = frames.to(device), labels.to(device)
            outputs = model(frames)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    avg_val_loss = val_loss / len(test_loader)
    val_accuracy = 100. * correct / total
    val_losses.append(avg_val_loss)
    val_accuracies.append(val_accuracy)
    print(f"Validation Loss: {avg_val_loss:.4f}, Accuracy: {val_accuracy:.2f}%")

# Draw graphs after each epoch
draw_graphs(train_losses, val_losses, train_accuracies, val_accuracies)  

In [None]:
# After training, evaluate the model on the test set
model.eval()
all_predictions = []
all_labels = []
all_probabilities = []  # For ROC and AUC


with torch.no_grad():
    for frames, labels in tqdm(test_loader, desc="Testing"):
        frames, labels = frames.to(device), labels.to(device)
        outputs = model(frames)
        _, predicted = outputs.max(1)
        all_predictions.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        all_probabilities.extend(outputs.softmax(dim=1).cpu().numpy())  # For ROC and AUC


# Calculate and print overall accuracy
accuracy = 100 * sum(np.array(all_predictions) == np.array(all_labels)) / len(all_labels)
print(f"Overall Test Accuracy: {accuracy:.2f}%")



In [None]:
# Function to plot and save confusion matrix
def plot_confusion_matrix(y_true, y_pred, classes):
    cm = confusion_matrix(y_true, y_pred)
    cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm_percentage, annot=True, fmt='.2f', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.title('Confusion Matrix (Percentage)')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.savefig('confusion_matrix.png')
    plt.close()
# Plot confusion matrix
plot_confusion_matrix(all_labels, all_predictions, SELECTED_CLASSES)

In [None]:
# Classification Report
# Function to save classification report as a table with styling
def save_classification_report(report_df, filename='classification_report.png'):
    fig, ax = plt.subplots(figsize=(14, 12))  # Increased size for better visibility
    ax.axis('off')  # No axis for the table
    
    # Render the DataFrame as a table
    table = ax.table(cellText=report_df.values,
                     colLabels=report_df.columns,
                     rowLabels=report_df.index,
                     cellLoc='center',
                     loc='center',
                     bbox=[0, 0, 1, 1])  # Adjust bbox to fill the figure area
    
    # Customize table appearance
    table.auto_set_font_size(False)  # Allow manual font size setting
    table.set_fontsize(10)  # Set a readable font size
    table.scale(1.2, 1.2)  # Scale table size

    # Adjust column widths based on content
    for i, width in enumerate([max(len(str(cell)) for cell in col) for col in report_df.T.values]):
        table.auto_set_column_width([i])
        table._cells[(0, i)].set_text_props(weight='bold', color='white')
        table._cells[(0, i)].set_facecolor('#4c72b0')  # Header color

    for (i, j), cell in table._cells.items():
        if i == 0:  # Header row
            continue
        cell.set_edgecolor('black')  # Cell border color
        cell.set_facecolor('#f5f5f5')  # Alternate row color
        cell.set_fontsize(10)
        cell.set_text_props(color='black')

    # Save as an image
    plt.savefig(filename, bbox_inches='tight', pad_inches=0.3)  # Increased padding for better fit
    plt.close()

# Classification Report generation
report_dict = classification_report(all_labels, all_predictions, target_names=SELECTED_CLASSES, digits=4, output_dict=True)
report_df = pd.DataFrame(report_dict).transpose()
report_df['support'] = report_df['support'].astype(int)
save_classification_report(report_df, 'classification_report.png')


In [None]:
# Calculate and plot ROC and AUC for each class (if it's a multi-class problem)
n_classes = len(SELECTED_CLASSES)
fpr = dict()
tpr = dict()
roc_auc = dict()

plt.figure(figsize=(10, 8))
for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(np.array(all_labels) == i, np.array(all_probabilities)[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])
    plt.plot(fpr[i], tpr[i], lw=2, label=f'Class {SELECTED_CLASSES[i]} (AUC = {roc_auc[i]:0.2f})')

plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves')
plt.legend(loc="lower right")
plt.grid(True)
plt.show()
plt.savefig('roc_curves.png')

In [None]:
# Save the trained model
torch.save(model.state_dict(), 'mc3_activity_recognition.pth')

In [None]:
#loading the ppe model. 
#source: https://www.kaggle.com/code/plasticglass/yolov8-safety-helmet-detection
ppe_model = YOLO('ppe.pt')

In [None]:
# Function to predict on a single video with PPE detection.
def predict_on_video(video_path):
    model.eval()
    frames = extract_frames(video_path).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = model(frames)
        probabilities = F.softmax(outputs, dim=1)
        confidence, predicted = torch.max(probabilities, 1)
    
    # Perform PPE detection on the last frame
    last_frame = frames[0, :, -1].cpu().numpy()  # Shape: [C, H, W]
    last_frame = np.transpose(last_frame, (1, 2, 0))  # Shape: [H, W, C]
    last_frame = (last_frame * 255).astype(np.uint8)  # Convert to 0-255 range
    last_frame_bgr = cv2.cvtColor(last_frame, cv2.COLOR_RGB2BGR)
    
    ppe_results = ppe_model(last_frame_bgr)[0]
    ppe_detections = [PPE_CLASSES[int(det.cls)] for det in ppe_results.boxes if int(det.cls) < len(PPE_CLASSES)]
    
    return SELECTED_CLASSES[predicted.item()], confidence.item(), ppe_detections
# Modify the process_video function to include PPE detection
def process_video(input_video_path, output_video_path):
    video_reader = cv2.VideoCapture(input_video_path)
    fps = int(video_reader.get(cv2.CAP_PROP_FPS))
    width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
    
    frame_buffer = []
    while True:
        ret, frame = video_reader.read()
        if not ret:
            break
        
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_tensor = transform(frame_rgb)
        frame_buffer.append(frame_tensor)
        
        if len(frame_buffer) == SEQUENCE_LENGTH:
            input_frames = torch.stack(frame_buffer).permute(1, 0, 2, 3).unsqueeze(0).to(device)
            with torch.no_grad():
                outputs = model(input_frames)
                probabilities = F.softmax(outputs, dim=1)
                confidence, predicted = torch.max(probabilities, 1)
            predicted_class = SELECTED_CLASSES[predicted.item()]
            confidence_value = confidence.item()
            
            # Perform PPE detection
            ppe_results = ppe_model(frame)[0]
            ppe_detections = [PPE_CLASSES[int(det.cls)] for det in ppe_results.boxes if int(det.cls) < len(PPE_CLASSES)]
            
            # Display prediction, confidence, and PPE detections on frame
            activity_text = f"Activity: {predicted_class} ({confidence_value:.2f})"
            ppe_text = f"PPE: {', '.join(ppe_detections) if ppe_detections else 'None'}"
            cv2.putText(frame, activity_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            cv2.putText(frame, ppe_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
            
            # Draw bounding boxes for PPE detections
            for box in ppe_results.boxes:
                class_id = int(box.cls)
                if class_id < len(PPE_CLASSES):
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)
                    cv2.putText(frame, PPE_CLASSES[class_id], (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
            
            frame_buffer.pop(0)
        
        video_writer.write(frame)
    
    video_reader.release()
    video_writer.release()

In [None]:
# predection function usage.
if __name__ == "__main__":
    # Define the directory where the video is stored
    test_videos_directory = 'test_videos'

    # Define the video title
    video_title = 'mask'  # Replace with the actual video title

    # Construct the input video file path
    input_video_file_path = f'{test_videos_directory}/{video_title}.mp4'

    # Predict on a single video
    prediction, confidence, ppe_detections = predict_on_video(input_video_file_path)
    print(f"Predicted activity: {prediction}, Confidence: {confidence:.2f}")
    print(f"Detected PPE: {', '.join(ppe_detections) if ppe_detections else 'None'}")

    # Construct the output video path
    output_video_file_path = f'{test_videos_directory}/{video_title}-Output-SeqLen{SEQUENCE_LENGTH}.mp4'

    # Process the video and save predictions
    process_video(input_video_file_path, output_video_file_path)

    # Display the output video
    clip = VideoFileClip(output_video_file_path, audio=False, target_resolution=(300, None))
    clip.ipython_display(fps=clip.fps if clip.fps else 24)  # Use the clip's fps or default to 24




In [None]:
# Function to perform frame-by-frame analysis.
def frame_by_frame_analysis(video_path, model, transform, device, SELECTED_CLASSES, SEQUENCE_LENGTH):
    model.eval()
    video_reader = cv2.VideoCapture(video_path)
    frame_count = int(video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(video_reader.get(cv2.CAP_PROP_FPS))
    
    frame_predictions = []
    class_probabilities = defaultdict(list)
    frame_buffer = []
    
    for _ in tqdm(range(frame_count), desc="Processing frames"):
        ret, frame = video_reader.read()
        if not ret:
            break
        
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame_tensor = transform(frame_rgb)
        frame_buffer.append(frame_tensor)
        
        if len(frame_buffer) == SEQUENCE_LENGTH:
            input_frames = torch.stack(frame_buffer).permute(1, 0, 2, 3).unsqueeze(0).to(device)
            with torch.no_grad():
                outputs = model(input_frames)
                probabilities = F.softmax(outputs, dim=1).squeeze().cpu().numpy()
            
            predicted_class = SELECTED_CLASSES[probabilities.argmax()]
            frame_predictions.append(predicted_class)
            
            for i, class_name in enumerate(SELECTED_CLASSES):
                class_probabilities[class_name].append(probabilities[i])
            
            frame_buffer.pop(0)
        else:
            frame_predictions.append(None)
            for class_name in SELECTED_CLASSES:
                class_probabilities[class_name].append(0)
    
    video_reader.release()
    
    # Pad the beginning of predictions and probabilities
    pad_length = SEQUENCE_LENGTH - 1
    frame_predictions = [None] * pad_length + frame_predictions[pad_length:]
    for class_name in SELECTED_CLASSES:
        class_probabilities[class_name] = [0] * pad_length + class_probabilities[class_name][pad_length:]
    
    return frame_predictions, class_probabilities, fps

def plot_frame_by_frame_results(frame_predictions, class_probabilities, fps, output_path):
    frame_count = len(frame_predictions)
    time_axis = np.arange(frame_count) / fps
    
    plt.figure(figsize=(15, 10))
    
    # Plot class probabilities
    plt.subplot(2, 1, 1)
    for class_name, probs in class_probabilities.items():
        plt.plot(time_axis, probs, label=class_name)
    plt.title("Class Probabilities Over Time")
    plt.xlabel("Time (seconds)")
    plt.ylabel("Probability")
    plt.legend()
    plt.grid(True)
    
    # Plot predicted classes
    plt.subplot(2, 1, 2)
    unique_classes = list(set(frame_predictions) - {None})
    class_to_num = {cls: i for i, cls in enumerate(unique_classes)}
    numeric_predictions = [class_to_num[cls] if cls is not None else -1 for cls in frame_predictions]
    plt.scatter(time_axis, numeric_predictions, marker='.')
    plt.yticks(range(len(unique_classes)), unique_classes)
    plt.title("Predicted Class Over Time")
    plt.xlabel("Time (seconds)")
    plt.ylabel("Predicted Class")
    plt.grid(True)
    
    plt.tight_layout()
    plt.savefig(output_path)
    plt.close()



In [None]:
# frame-by-frame analysis function usage.
if __name__ == "__main__":
    # Define the directory where the video is stored
    test_videos_directory = 'test_videos'

    # Define the video title
    video_title = 'mask'  # Replace with the actual video title

    # Construct the input video file path
    input_video_file_path = f'{test_videos_directory}/{video_title}.mp4'

    # Perform frame-by-frame analysis
    frame_predictions, class_probabilities, fps = frame_by_frame_analysis(
        input_video_file_path, model, transform, device, SELECTED_CLASSES, SEQUENCE_LENGTH
    )

    # Plot and save the results
    output_graph_path = f'{test_videos_directory}/{video_title}_frame_analysis.png'
    plot_frame_by_frame_results(frame_predictions, class_probabilities, fps, output_graph_path)

    print(f"Frame-by-frame analysis graph saved to: {output_graph_path}")


In [None]:
#Body straight detection for Pull Ups
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

def predict_on_video(video_path):
    model.eval()
    frames = extract_frames(video_path).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = model(frames)
        probabilities = F.softmax(outputs, dim=1)
        confidence, predicted = torch.max(probabilities, 1)
    
    return SELECTED_CLASSES[predicted.item()], confidence.item()
    
# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

def check_body_straight(landmarks):
    left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value]
    right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
    left_hip = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value]
    right_hip = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value]
    left_ankle = landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value]
    right_ankle = landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value]

    # Check if body is straight (shoulders, hips, and ankles aligned)
    body_straight = (
        abs(left_shoulder.x - left_hip.x) < 0.1 and
        abs(right_shoulder.x - right_hip.x) < 0.1 and
        abs(left_hip.x - left_ankle.x) < 0.1 and
        abs(right_hip.x - right_ankle.x) < 0.1
    )

    return body_straight

def process_video(input_video_path, output_video_path):
    video_reader = cv2.VideoCapture(input_video_path)
    fps = int(video_reader.get(cv2.CAP_PROP_FPS))
    width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    frame_buffer = []

    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        while True:
            ret, frame = video_reader.read()
            if not ret:
                break
            
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_tensor = transform(frame_rgb)
            frame_buffer.append(frame_tensor)
            
            # Perform pose estimation
            pose_results = pose.process(frame_rgb)
            
            # Draw pose landmarks on the frame
            if pose_results.pose_landmarks:
                mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
            
            if len(frame_buffer) == SEQUENCE_LENGTH:
                input_frames = torch.stack(frame_buffer).permute(1, 0, 2, 3).unsqueeze(0).to(device)
                with torch.no_grad():
                    outputs = model(input_frames)
                    probabilities = F.softmax(outputs, dim=1)
                    confidence, predicted = torch.max(probabilities, 1)
                predicted_class = SELECTED_CLASSES[predicted.item()]
                confidence_value = confidence.item()
                
                # Display activity and confidence
                activity_text = f"Activity: {predicted_class} ({confidence_value:.2f})"
                cv2.putText(frame, activity_text, (10, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                
                # Check body straight for pull-ups
                if pose_results.pose_landmarks:
                    body_straight = check_body_straight(pose_results.pose_landmarks.landmark)
                    posture_text = f"Body straight: {'Yes' if body_straight else 'No'}"
                    cv2.putText(frame, posture_text, (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
                else:
                    cv2.putText(frame, "No pose detected", (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
                
                frame_buffer.pop(0)
            
            video_writer.write(frame)
    
    video_reader.release()
    video_writer.release()

In [None]:
    #Body straight detection function usage.
if __name__ == "__main__":
    # Define the directory where the video is stored
    test_videos_directory = 'test_videos'

    # Define the video title
    video_title = 'fall2'  # Replace with the actual video title

    # Construct the input video file path
    input_video_file_path = f'{test_videos_directory}/{video_title}.mp4'

    # Predict on a single video
    prediction, confidence = predict_on_video(input_video_file_path)
    print(f"Predicted activity: {prediction}, Confidence: {confidence:.2f}")

    # Construct the output video path
    output_video_file_path = f'{test_videos_directory}/{video_title}-Output-SeqLen{SEQUENCE_LENGTH}.mp4'

    # Process the video and save predictions
    process_video(input_video_file_path, output_video_file_path)

    print(f"Processed video saved as: {output_video_file_path}")

    # Display the output video
    
    # Get the duration of the video
    cap = cv2.VideoCapture(output_video_file_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = frame_count / fps
    cap.release()


In [None]:
#Fall detection function.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

def predict_on_video(video_path):
    model.eval()
    frames = extract_frames(video_path).unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = model(frames)
        probabilities = F.softmax(outputs, dim=1)
        confidence, predicted = torch.max(probabilities, 1)
    
    return SELECTED_CLASSES[predicted.item()], confidence.item()

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

def detect_fall(landmarks):
    # Get relevant landmark positions
    nose = landmarks[mp_pose.PoseLandmark.NOSE.value]
    left_hip = landmarks[mp_pose.PoseLandmark.LEFT_HIP.value]
    right_hip = landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value]
    left_ankle = landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value]
    right_ankle = landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value]

    # Calculate the average y-position of hips and ankles
    hip_y = (left_hip.y + right_hip.y) / 2
    ankle_y = (left_ankle.y + right_ankle.y) / 2

    # Check if the nose is below the hips or if the body is horizontal
    fall_detected = nose.y > hip_y or abs(hip_y - ankle_y) < 0.1

    return fall_detected

def process_video(input_video_path, output_video_path):
    video_reader = cv2.VideoCapture(input_video_path)
    fps = int(video_reader.get(cv2.CAP_PROP_FPS))
    width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    frame_buffer = []

    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        while True:
            ret, frame = video_reader.read()
            if not ret:
                break
            
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_tensor = transform(frame_rgb)
            frame_buffer.append(frame_tensor)
            
            # Perform pose estimation
            pose_results = pose.process(frame_rgb)
            
            # Draw pose landmarks on the frame
            if pose_results.pose_landmarks:
                mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
            
            if len(frame_buffer) == SEQUENCE_LENGTH:
                input_frames = torch.stack(frame_buffer).permute(1, 0, 2, 3).unsqueeze(0).to(device)
                with torch.no_grad():
                    outputs = model(input_frames)
                    probabilities = F.softmax(outputs, dim=1)
                    confidence, predicted = torch.max(probabilities, 1)
                predicted_class = SELECTED_CLASSES[predicted.item()]
                confidence_value = confidence.item()
                
                # Display activity and confidence
                activity_text = f"Activity: {predicted_class} ({confidence_value:.2f})"
                cv2.putText(frame, activity_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                
                # Check for fall
                if pose_results.pose_landmarks:
                    fall_detected = detect_fall(pose_results.pose_landmarks.landmark)
                    fall_text = f"Fall detected: {'Yes' if fall_detected else 'No'}"
                    cv2.putText(frame, fall_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
                else:
                    cv2.putText(frame, "No pose detected", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
                
                frame_buffer.pop(0)
            
            video_writer.write(frame)
    
    video_reader.release()
    video_writer.release()

In [None]:
if __name__ == "__main__":
    # Define the directory where the video is stored
    test_videos_directory = 'test_videos'

    # Define the video title
    video_title = 'fall2'  # Replace with the actual video title

    # Construct the input video file path
    input_video_file_path = f'{test_videos_directory}/{video_title}.mp4'

    # Predict on a single video
    prediction, confidence = predict_on_video(input_video_file_path)
    print(f"Predicted activity: {prediction}, Confidence: {confidence:.2f}")

    # Construct the output video path
    output_video_file_path = f'{test_videos_directory}/{video_title}-Output-SeqLen{SEQUENCE_LENGTH}.mp4'

    # Process the video and save predictions
    process_video(input_video_file_path, output_video_file_path)

    print(f"Processed video saved as: {output_video_file_path}")

    # Display the output video
    
    # Get the duration of the video
    cap = cv2.VideoCapture(output_video_file_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = frame_count / fps
    cap.release()


In [None]:
#EXTRA:
# Function to download videos from YouTube.
def download_yt_videos(yt_url_list, save_dir):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    ydl_opts = {
        'outtmpl': os.path.join(save_dir, '%(title)s.%(ext)s'),
        'format': 'bestvideo+bestaudio/best',
        'merge_output_format': 'mp4'
    }
    
    for url in yt_url_list:
        try:
            with YoutubeDL(ydl_opts) as ydl:
                ydl.download([url])
                print(f"Downloaded: {url}")
        except Exception as e:
            print(f"Failed to download {url}: {e}")

yt_url_list = [
    'https://www.youtube.com/shorts/PbA0JXVph8E'
    
]
save_dir = 'test_data'
download_yt_videos(yt_url_list, save_dir)