In [18]:
import torch
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

tensor([1.], device='mps:0')


## Data preparation

In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from PIL import Image
from sklearn.model_selection import train_test_split

In [None]:
import os
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

class CrashDatasetLoader(Dataset):
    def __init__(self, root_dir, annotation_file):
        self.root_dir = root_dir
        self.annotation_file = annotation_file
        self.video_paths = []
        self.labels = []
        self.metadata = []

        # Load the annotation file and process the data
        with open(annotation_file, 'r') as f:
            lines = f.readlines()
            for line in lines:
                # Parse the annotation line
                line = line.strip() #.split(',')
                parts1 = line[:157]
                parts2 = line[:158].strip().split(',')

                vidname = parts1[:6]
                binlabels = eval(parts1[7:])  # Convert the string list to an actual list
                startframe = int(parts2[2])
                youtubeID = parts2[3]
                timing = parts2[4]
                weather = parts2[5]
                egoinvolve = parts2[6]

                # Determine the video path
                video_path = os.path.join(root_dir, 'crash', f'{vidname}.mp4')
                
                # Check if the video exists in the crash folder
                if os.path.exists(video_path):
                    self.video_paths.append(video_path)
                    self.labels.append(1)  # Label 1 for crash
                    self.metadata.append(binlabels)  # Store the binlabels as metadata

        # Add the no_crash videos
        no_crash_folder = os.path.join(root_dir, 'no_crash')
        for video_file in os.listdir(no_crash_folder):
            if video_file.endswith('.mp4'):
                video_path = os.path.join(no_crash_folder, video_file)
                self.video_paths.append(video_path)
                self.labels.append(0)  # Label 0 for no_crash
                self.metadata.append(None)  # No metadata for no_crash videos

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path, label, metadata = self.video_paths[idx], self.labels[idx], self.metadata[idx]

        # Load the video and convert to grayscale
        cap = cv2.VideoCapture(video_path)
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            frames.append(gray_frame)
        cap.release()
        frames = np.stack(frames)  # shape: (frame_count, H, W)

        # Convert to tensor 
        frames = torch.from_numpy(frames).float()  # shape: [1, frame_count, H, W]

        # If metadata is None, return an empty dict instead
        if metadata is None:
            metadata = {}

        return {'video': frames, 'label': torch.tensor(label, dtype=torch.float32), 'metadata': metadata}


# Initialize the dataset and dataloader
root_dir = 'CarCrashDatasetV/'
annotation_file = 'annotations.txt'
crash_dataset = CrashDatasetLoader(root_dir, annotation_file)

data_loader = DataLoader(crash_dataset, batch_size=1, shuffle=True)

# Iterate over the dataset
i = 0
for batch in data_loader:
    video = batch['video']  # The video frames, shape: [1, frame_count, H, W]
    label = batch['label']  # The label: 1 for crash, 0 for no_crash
    metadata = batch['metadata']  # The metadata for crash videos
    
    print('vid', video.shape)
    print('label', label)
    print('metadata', metadata)

    i+=1
    if i == 9:
        break


In [None]:
import os
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
from torchvision import transforms

class Resize:
    def __init__(self, size):
        self.size = size

    def __call__(self, image):
        return cv2.resize(image, (self.size[1], self.size[0]))

class Normalize:
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def __call__(self, image):
        image = image.astype(np.float32) / 255.0  # Scale to [0, 1]
        for c in range(3):
            image[:, :, c] = (image[:, :, c] - self.mean[c]) / self.std[c]
        return image

class CrashDatasetLoader(Dataset):
    def __init__(self, root_dir, annotation_file):
        self.root_dir = root_dir
        self.annotation_file = annotation_file
        self.video_paths = []
        self.labels = []
        self.metadata = []
        
        # Define transformations
        self.resize = Resize((480, 640))
        self.normalize = Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

        # Load the annotation file and process the data
        with open(annotation_file, 'r') as f:
            lines = f.readlines()
            for line in lines:
                # Parse the annotation line
                line = line.strip() #.split(',')
                parts1 = line[:157]
                parts2 = line[:158].strip().split(',')

                vidname = parts1[:6]
                binlabels = eval(parts1[7:])  # Convert the string list to an actual list
                startframe = int(parts2[2])
                youtubeID = parts2[3]
                timing = parts2[4]
                weather = parts2[5]
                egoinvolve = parts2[6]

                # Determine the video path
                video_path = os.path.join(root_dir, 'crash', f'{vidname}.mp4')
                
                # Check if the video exists in the crash folder
                if os.path.exists(video_path):
                    self.video_paths.append(video_path)
                    self.labels.append(1)  # Label 1 for crash
                    self.metadata.append(binlabels)  # Store the binlabels as metadata

        # Add the no_crash videos
        no_crash_folder = os.path.join(root_dir, 'no_crash')
        for video_file in os.listdir(no_crash_folder):
            if video_file.endswith('.mp4'):
                video_path = os.path.join(no_crash_folder, video_file)
                self.video_paths.append(video_path)
                self.labels.append(0)  # Label 0 for no_crash
                self.metadata.append(None)  # No metadata for no_crash videos

    def __len__(self):
        return len(self.video_paths)

    def __getitem__(self, idx):
        video_path, label, metadata = self.video_paths[idx], self.labels[idx], self.metadata[idx]

        # Load the video
        cap = cv2.VideoCapture(video_path)
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # Apply transformations
            frame = self.resize(frame)
            frame = self.normalize(frame)

            # Convert to grayscale
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            frames.append(gray_frame)
        cap.release()
        frames = np.stack(frames)  # shape: (frame_count, H, W)

        # Convert to tensor and add batch dimension
        frames = torch.from_numpy(frames).float()  # shape: [frame_count, H, W]

        # If metadata is None, return an empty dict instead
        if metadata is None:
            metadata = {}

        return {'video': frames, 'label': torch.tensor(label, dtype=torch.float32), 'metadata': metadata}

# Initialize the dataset and dataloader
root_dir = 'CarCrashDatasetV/'
annotation_file = 'annotations.txt'
crash_dataset = CrashDatasetLoader(root_dir, annotation_file)

data_loader = DataLoader(crash_dataset, batch_size=1, shuffle=True)

# Iterate over the dataset
i = 0
for batch in data_loader:
    video = batch['video']  # The video frames, shape: [1, frame_count, H, W]
    label = batch['label']  # The label: 1 for crash, 0 for no_crash
    metadata = batch['metadata']  # The metadata for crash videos
    
    print('video shape:', video.shape)
    print('label:', label)
    print('metadata:', metadata)

    i += 1
    if i == 9:
        break


In [12]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models, transforms
import math

import os 
import torch
from sklearn.utils import shuffle
from src.saliency.mlnet import MLNet, ModMSELoss
from src.DADALoader import DADALoader
import time, argparse
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.io import write_video
from src.data_transform import ProcessImages, padding_inv
import numpy as np
# from tqdm import tqdm
# from tensorboardX import SummaryWriter

In [13]:
input_shape = (480, 640)

In [14]:
model = MLNet(input_shape)



In [15]:
# Model class must be defined somewhere
PATH = 'models/mlnet_25.pth'
state_dict = torch.load(PATH, map_location=torch.device('cpu'))
state_dict

  state_dict = torch.load(PATH, map_location=torch.device('cpu'))


{'epoch': 25,
 'model': OrderedDict([('prior',
               tensor([[[[0.9999, 0.9999, 1.0000, 0.9999, 0.9999, 1.0000, 0.9999, 1.0000],
                         [0.9999, 1.0000, 0.9999, 0.9998, 0.9999, 0.9999, 1.0000, 0.9999],
                         [0.9999, 0.9999, 1.0000, 1.0000, 1.0005, 1.0001, 1.0000, 0.9999],
                         [0.9998, 1.0001, 0.9999, 0.9992, 0.9999, 0.9998, 0.9997, 0.9999],
                         [0.9999, 0.9998, 0.9999, 1.0002, 0.9999, 0.9999, 1.0000, 1.0000],
                         [0.9999, 1.0000, 0.9999, 0.9999, 0.9999, 1.0000, 1.0000, 0.9999]]]])),
              ('features.0.weight',
               tensor([[[[-5.5373e-01,  1.4270e-01,  5.2896e-01],
                         [-5.8312e-01,  3.5655e-01,  7.6566e-01],
                         [-6.9022e-01, -4.8019e-02,  4.8409e-01]],
               
                        [[ 1.7548e-01,  9.8630e-03, -8.1413e-02],
                         [ 4.4089e-02, -7.0323e-02, -2.6035e-01],
                   

In [16]:
len(state_dict), state_dict.keys()

(3, dict_keys(['epoch', 'model', 'optimizer']))

In [19]:
# Load the state_dict into the model
model.load_state_dict(state_dict['model'], strict=False)

# Move the model to MPS
model = model.to(mps_device)
model

MLNet(
  (features): ModuleList(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation

In [None]:
import torch
import numpy as np
from sklearn.metrics import average_precision_score, roc_auc_score
from tqdm import tqdm

def get_predictions(model, dataloader, device):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in tqdm(dataloader): 
            images = batch['video']
            labels = batch['label']
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            probs = torch.sigmoid(outputs).cpu().numpy()
            all_preds.extend(probs)
            all_labels.extend(labels.cpu().numpy())
    
    return np.array(all_preds), np.array(all_labels)

def calculate_metrics(predictions, labels):
    # Calculate Average Precision (AP)
    ap = average_precision_score(labels, predictions)
    
    # Calculate AUC
    auc = roc_auc_score(labels, predictions)
    
    return ap, auc

def mean_tta(dataloader, model, device, num_augmentations):
    all_tta_times = []

    for _ in range(num_augmentations):
        model.eval()
        all_preds = []
        all_labels = []
        tta_times = []

        with torch.no_grad():
            for images, labels in tqdm(dataloader):
                start_time = time.time()
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                end_time = time.time()
                tta_times.append(end_time - start_time)
                
                probs = torch.sigmoid(outputs).cpu().numpy()
                all_preds.extend(probs)
                all_labels.extend(labels.cpu().numpy())
        
        all_preds = np.array(all_preds)
        all_labels = np.array(all_labels)
        tta_times = np.mean(tta_times)
        all_tta_times.append(tta_times)

    mean_tta = np.mean(all_tta_times)
    
    return mean_tta




In [None]:
predictions, labels = get_predictions(model, data_loader, mps_device)
ap, auc = calculate_metrics(predictions, labels)
mean_tta = mean_tta(dataloader, model, device, num_augmentations=10)

print(f'Average Precision (AP): {ap:.4f}')
print(f'AUC: {auc:.4f}')
print(f'Mean TTA: {mean_tta:.4f} seconds')

In [None]:
predictions[0][0][0].shape

In [None]:
# import os
# import pandas as pd
# import numpy as np
# import cv2
# from PIL import Image
# import torch
# from torch.utils.data import Dataset, DataLoader
# from torchvision import transforms

# # Define image transformations
# image_transforms = transforms.Compose([
#     transforms.Resize((480, 640)),  # Adjust size as needed
#     transforms.ToTensor(),
#     transforms.Normalize(
#         mean=[0.485, 0.456, 0.406],  # ImageNet mean
#         std=[0.229, 0.224, 0.225]    # ImageNet std
#     )
# ])

# class CCDDataset(Dataset):
#     def __init__(self, annotations_file, root_dir, transform=None, frame_selection='middle'):
#         self.annotations = pd.read_csv(annotations_file)
#         self.root_dir = root_dir
#         self.transform = transform
#         self.frame_selection = frame_selection
        
#         self.video_info = []
#         for idx, row in self.annotations.iterrows():
#             vidname = row['vidname']
#             binlabels = list(map(int, row['binlabels'].split(',')))
#             category = 'crash' if 1 in binlabels else 'no_crash'
#             video_path = os.path.join(self.root_dir, category, vidname)
#             if os.path.exists(video_path):
#                 self.video_info.append({
#                     'vidname': vidname,
#                     'video_path': video_path,
#                     'binlabels': binlabels,
#                     'startframe': row['startframe'],
#                     'youtubeID': row['youtubeID'],
#                     'timing': row['timing'],
#                     'weather': row['weather'],
#                     'egoinvolve': row['egoinvolve'],
#                     'label': 1 if category == 'crash' else 0
#                 })
#             else:
#                 print(f"Warning: Video path {video_path} does not exist.")

#     def __len__(self):
#         return len(self.video_info)

#     def __getitem__(self, idx):
#         video_data = self.video_info[idx]
#         video_path = video_data['video_path']
#         label = video_data['label']
        
#         # Load selected frame
#         frame = self.load_frame(video_path, video_data['binlabels'])
        
#         if self.transform:
#             frame = self.transform(frame)
        
#         sample = {
#             'image': frame,
#             'label': torch.tensor(label, dtype=torch.long),
#             'metadata': {
#                 'vidname': video_data['vidname'],
#                 'startframe': video_data['startframe'],
#                 'youtubeID': video_data['youtubeID'],
#                 'timing': video_data['timing'],
#                 'weather': video_data['weather'],
#                 'egoinvolve': video_data['egoinvolve']
#             }
#         }
        
#         return sample
    
#     def load_frame(self, video_path, binlabels):
#         frame_files = sorted([os.path.join(video_path, img) for img in os.listdir(video_path) if img.endswith(('.png', '.jpg', '.jpeg'))])
        
#         if not frame_files:
#             raise FileNotFoundError(f"No frames found in {video_path}")
        
#         if self.frame_selection == 'accident' and 1 in binlabels:
#             frame_idx = binlabels.index(1)
#         elif self.frame_selection == 'middle':
#             frame_idx = len(frame_files) // 2
#         elif self.frame_selection == 'random':
#             frame_idx = np.random.randint(0, len(frame_files))
#         else:
#             frame_idx = 0  # Default to first frame
        
#         frame_path = frame_files[frame_idx]
#         frame = Image.open(frame_path).convert('RGB')
        
#         return frame

# def get_dataloaders(annotations_file, root_dir, batch_size=16, frame_selection='middle', num_workers=4):
#     dataset = CCDDataset(
#         annotations_file=annotations_file,
#         root_dir=root_dir,
#         transform=image_transforms,
#         frame_selection=frame_selection
#     )
    
#     dataloader = DataLoader(
#         dataset,
#         batch_size=batch_size,
#         shuffle=True,
#         num_workers=num_workers,
#         pin_memory=True
#     )
    
#     return dataloader

# # Usage example
# if __name__ == '__main__':
#     annotations_file = './annotations.csv'
#     root_dir = './data'
#     batch_size = 32
#     frame_selection = 'accident'  # Options: 'accident', 'middle', 'random'
#     num_workers = 8
    
#     dataloader = get_dataloaders(
#         annotations_file=annotations_file,
#         root_dir=root_dir,
#         batch_size=batch_size,
#         frame_selection=frame_selection,
#         num_workers=num_workers
#     )
    
#     # Initialize your pretrained MLNet model
#     # model = MLNet()
#     # model.load_state_dict(torch.load('mlnet_pretrained.pth'))
#     # model.eval()
#     # model.to(device)
    
#     for batch in dataloader:
#         images = batch['image']          # [batch_size, 3, 480, 640]
#         labels = batch['label']          # [batch_size]
#         metadata = batch['metadata']     # dict
        
#         # Perform inference
#         # outputs = model(images.to(device))
        
#         # Calculate metrics, loss, etc.
#         # ...
        
#         # For demonstration, let's just print batch shapes
#         print(f'Images batch shape: {images.shape}')
#         print(f'Labels batch shape: {labels.shape}')
#         print(f'Metadata example: {metadata[0]}')
#         break  # Remove this break to iterate over entire dataset


In [None]:
import os
import cv2
import torch
from torch.utils.data import Dataset
import numpy as np

class CrashDatasetLoader(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.crash_videos = self._load_videos(os.path.join(root_dir, 'crash'))
        self.no_crash_videos = self._load_videos(os.path.join(root_dir, 'no_crash'))
        self.annotation_file = os.path.join(root_dir, 'crash', 'annotations.txt')
        self.annotations = self._load_annotations()

    def _load_videos(self, folder):
        # Load all video file paths from the folder
        videos = []
        for video_name in os.listdir(folder):
            if video_name.endswith('.mp4'):  # Assuming videos are in .mp4 format
                videos.append(os.path.join(folder, video_name))
        return videos

    def _load_annotations(self):
        # Load the annotations from the text file
        annotations = {}
        with open(self.annotation_file, 'r') as file:
            for line in file:
                parts = line.strip().split(',')
                vidname = parts[0]
                binlabels = list(map(int, parts[1].strip('[]').split()))
                startframe = parts[2]
                youtubeID = parts[3]
                timing = parts[4]
                weather = parts[5]
                egoinvolve = parts[6]
                annotations[vidname] = {
                    'binlabels': binlabels,
                    'startframe': startframe,
                    'youtubeID': youtubeID,
                    'timing': timing,
                    'weather': weather,
                    'egoinvolve': egoinvolve
                }
        return annotations

    def __len__(self):
        return len(self.crash_videos) + len(self.no_crash_videos)

    def __getitem__(self, idx):
        if idx < len(self.crash_videos):
            video_path = self.crash_videos[idx]
            label = 1
            vidname = os.path.basename(video_path).split('.')[0]
            metadata = self.annotations.get(vidname, {})
        else:
            video_path = self.no_crash_videos[idx - len(self.crash_videos)]
            label = 0
            metadata = None

        frames = self._load_video_frames(video_path)
        if self.transform:
            frames = self.transform(frames)
        else:
            frames = torch.from_numpy(frames).float()

        return {
            'video': frames,
            'label': label,
            'metadata': metadata
        }

    def _load_video_frames(self, video_path):
        # Load video frames and convert to grayscale
        cap = cv2.VideoCapture(video_path)
        frames = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            frames.append(gray_frame)
        cap.release()
        frames = np.stack(frames, axis=0)  # Shape: (frame_count, H, W)
        frames = np.expand_dims(frames, axis=0)  # Add batch dimension: (1, frame_count, H, W)
        return frames


# # Usage
# root_dir = 'CarCrashDatasetV'
# dataset = CrashDataset(root_dir)
# data_loader = DataLoader(dataset, batch_size=1, shuffle=True)

# for batch in data_loader:
#     frames, label, metadata = batch
#     print(frames.shape)  # [1, frame_count, H, W]
#     print(label)  # 1 (crash) or 0 (no_crash)
#     if label == 1:
#         print(metadata)  # metadata for crash videos
#         break

# Step 1: Define the root directory where your dataset is located
root_dir = 'CarCrashDatasetV/'

# Step 2: Initialize the dataset
crash_dataset = CrashDatasetLoader(root_dir)

# Step 3: Create a DataLoader to iterate over the dataset
data_loader = DataLoader(crash_dataset, batch_size=1, shuffle=True)

# Step 4: Iterate over the dataset and access the data
for batch in data_loader:
    video = batch['video']  # The video frames, shape: [1, frame_count, H, W]
    label = batch['label']  # The label: 1 for crash, 0 for no_crash
    metadata = batch['metadata']  # Metadata if available, otherwise None
    
    # Process the data (e.g., pass it through a model)
    print("Video shape:", video.shape)
    print("Label:", label)
    if metadata:
        print("Metadata:", metadata)
    else:
        print("No metadata for this video")

    # Example: If you want to break after processing the first batch
    break


## New method 1

In [8]:
import os
import json

def parse_annotations(annotations_path):
    annotations = {}
    with open(annotations_path, 'r') as file:
        for line in file:
            # Parse the annotation line
            line = line.strip() #.split(',')
            parts1 = line[:157]
            parts2 = line[158:].strip().split(',')

            vidname = parts1[:6]
            binlabels = eval(parts1[7:])  # Convert the string list to an actual list
            startframe = int(parts2[0])
            youtubeID = parts2[1]
            timing = parts2[2]
            weather = parts2[3]
            egoinvolve = parts2[4]
            
            annotations[vidname] = {
                'binlabels': binlabels,
                'startframe': startframe,
                'youtubeID': youtubeID,
                'timing': timing,
                'weather': weather,
                'egoinvolve': egoinvolve
            }
    return annotations


In [21]:
import cv2

def extract_frames_from_video(video_path, output_folder):
    os.makedirs(output_folder, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    success, image = cap.read()
    
    while success:
        frame_filename = f"frame_{frame_count:04d}.jpg"
        frame_path = os.path.join(output_folder, frame_filename)
        cv2.imwrite(frame_path, image)
        success, image = cap.read()
        frame_count += 1
    
    cap.release()
    # print(f"Extracted {frame_count} frames from {video_path} to {output_folder}")

def convert_videos_to_frames(root_dir):
    for label_dir in ['crash', 'no_crash']:
        label_path = os.path.join(root_dir, label_dir)
        video_files = [f for f in os.listdir(label_path) if f.endswith(('.mp4', '.avi', '.mov'))]
        
        for video_file in video_files:
            video_path = os.path.join(label_path, video_file)
            video_name = os.path.splitext(video_file)[0]
            output_folder = os.path.join(label_path, video_name)
            extract_frames_from_video(video_path, output_folder)



import torch
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image

class CarCrashDataset(Dataset):
    def __init__(self, root_dir, annotations, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.annotations = annotations
        self.video_folders = []
        self.labels = []
        
        # Collect video folder paths and labels
        for label_dir in ['crash', 'no_crash']:
            label_path = os.path.join(root_dir, label_dir)
            video_folders = [os.path.join(label_path, folder) for folder in os.listdir(label_path)]
            self.video_folders.extend(video_folders)
            self.labels.extend([1 if label_dir == 'crash' else 0] * len(video_folders))
    
    def __len__(self):
        return len(self.video_folders)
    
    def __getitem__(self, idx):
        video_folder = self.video_folders[idx]
        video_name = os.path.basename(video_folder)
        annotation = self.annotations.get(video_name, {})
        binlabels = annotation.get('binlabels', [])
        image_files = sorted([os.path.join(video_folder, img) for img in os.listdir(video_folder)])
        
        # Load images
        images = [Image.open(img_file) for img_file in image_files]
        
        if self.transform:
            images = [self.transform(img) for img in images]
        
        images = torch.stack(images, dim=0)
        
        return images, torch.tensor(binlabels, dtype=torch.float32)



In [22]:
import numpy as np
from sklearn.metrics import average_precision_score, roc_auc_score

def calculate_metrics(predictions, ground_truth, frame_rate=30):
    """
    Calculate average precision, AUC, and TTA.
    
    Args:
        predictions (list of lists): Model output accident scores for each frame (0-1) for each video.
        ground_truth (list of lists): Ground truth binary labels (0 or 1) for each frame for each video.
        frame_rate (int): Frames per second (FPS) of the video.
        
    Returns:
        avg_precision (float): Average precision score.
        auc (float): Area under the ROC curve.
        avg_tta (float): Average Time-to-Accident (TTA) in seconds.
    """
    
    all_predictions = np.concatenate(predictions)
    all_ground_truth = np.concatenate(ground_truth)
    
    # Calculate Average Precision
    avg_precision = average_precision_score(all_ground_truth, all_predictions)
    
    # Calculate AUC
    auc = roc_auc_score(all_ground_truth, all_predictions)
    
    # Calculate TTA
    tta_list = []
    for video_preds, video_gt in zip(predictions, ground_truth):
        accident_start_frame = np.argmax(video_gt == 1)
        predicted_accident_frame = next((i for i, score in enumerate(video_preds) if score >= 0.5), None)
        
        if predicted_accident_frame is not None and predicted_accident_frame < accident_start_frame:
            tta = (accident_start_frame - predicted_accident_frame) / frame_rate
            tta_list.append(tta)
    
    avg_tta = np.mean(tta_list) if tta_list else 0
    
    return avg_precision, auc, avg_tta


In [None]:
# Step 1: Parse annotations
annotations_path = 'annotations.txt'
annotations = parse_annotations(annotations_path)


# Step 2: Convert videos to frames (run this once)
root_dir = 'CarCrashDatasetV'
convert_videos_to_frames(root_dir)


In [25]:
# Step 3: Initialize the dataset
transform = transforms.Compose([
    transforms.Resize((480, 640)),
    transforms.ToTensor(),
])

car_crash_dataset = CarCrashDataset(root_dir=root_dir, annotations=annotations, transform=transform)

# Step 4: Create a DataLoader
dataloader = DataLoader(car_crash_dataset, batch_size=1, shuffle=False)


In [26]:
# Step 5: Evaluate the model
all_predictions = []
all_ground_truth = []

for images, binlabels in dataloader:
    # Assuming 'model' is the trained DRIVE model
    # You need to modify this according to how you obtain predictions from the model
    with torch.no_grad():
        predictions = model(images)  # Example: [B, T, 1]
        predictions = predictions.squeeze().cpu().numpy().tolist()
    
    all_predictions.append(predictions)
    all_ground_truth.append(binlabels.squeeze().cpu().numpy().tolist())

# Step 6: Calculate metrics
avg_precision, auc, avg_tta = calculate_metrics(all_predictions, all_ground_truth)
print(f'Average Precision: {avg_precision:.4f}')
print(f'AUC: {auc:.4f}')
print(f'Average TTA: {avg_tta:.4f} seconds')

RuntimeError: Expected 3D (unbatched) or 4D (batched) input to conv2d, but got input of size: [1, 50, 3, 480, 640]

In [27]:
all_predictions = []
all_ground_truth = []

for images, binlabels in dataloader:
    # Assuming 'model' is the trained DRIVE model
    # Loop over each frame in the video
    batch_predictions = []
    with torch.no_grad():
        for frame in images.squeeze(0):  # [50, 3, 480, 640]
            frame = frame.unsqueeze(0)  # Add batch dimension, [1, 3, 480, 640]
            prediction = model(frame)  # Example: [1, 1]
            batch_predictions.append(prediction.squeeze().cpu().item())
    
    all_predictions.append(batch_predictions)
    all_ground_truth.append(binlabels.squeeze().cpu().numpy().tolist())

# Calculate metrics
avg_precision, auc, avg_tta = calculate_metrics(all_predictions, all_ground_truth)
print(f'Average Precision: {avg_precision:.4f}')
print(f'AUC: {auc:.4f}')
print(f'Average TTA: {avg_tta:.4f} seconds')

RuntimeError: slow_conv2d_forward_mps: input(device='cpu') and weight(device=mps:0')  must be on the same device

In [39]:
import torch

# Determine if MPS is available (for Apple M1/M2 devices)
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")

# Move the model to the appropriate device
model = model.to(device)

all_predictions = []
all_ground_truth = []

for images, binlabels in dataloader:
    # Move input tensors to the same device as the model
    images = images.to(device)
    
    # Loop over each frame in the video
    batch_predictions = []
    with torch.no_grad():
        for frame in images.squeeze(0):  # [50, 3, 480, 640]
            frame = frame.unsqueeze(0)  # Add batch dimension, [1, 3, 480, 640]
            frame = frame.to(device)  # Ensure the frame is on the correct device
            prediction = model(frame)  # Example: [1, 1]
            print(prediction.shape)
            batch_predictions.append(np.squeeze(prediction))
    
    all_predictions.append(batch_predictions)
    all_ground_truth.append(binlabels.squeeze().cpu().numpy().tolist())
    break

# # Calculate metrics
# avg_precision, auc, avg_tta = calculate_metrics(all_predictions, all_ground_truth)
# print(f'Average Precision: {avg_precision:.4f}')
# print(f'AUC: {auc:.4f}')
# print(f'Average TTA: {avg_tta:.4f} seconds')


torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
torch.Size([1, 1, 60, 80])
t

In [41]:
len(all_ground_truth[0])

50

In [44]:
len(all_predictions[0])

50

In [46]:
all_predictions[0][0].shape

torch.Size([60, 80])