In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))
# /kaggle/input/car-crash-dataset-ccd/CrashBest
# /kaggle/input/car-crash-dataset-ccd/CrashBest/C_000001_01.jpg
# /kaggle/input/crash-1500/Crash-1500/000001.mp4
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [7]:
import os

# Define the file you want to delete
file_to_delete = 'best_model.pth'

# Delete the file
if os.path.exists(file_to_delete):
    os.remove(file_to_delete)
    print(f"File '{file_to_delete}' deleted successfully.")
else:
    print(f"File '{file_to_delete}' does not exist.")

File 'best_model.pth' deleted successfully.


In [None]:
!sudo apt-get update
!sudo apt-get install ffmpeg

In [8]:
import pandas as pd

csv_file_path = '/kaggle/input/preprocessed-df/preprocessed_df (1).csv'
annotations = pd.read_csv(csv_file_path)

class_counts = annotations['Severity of the Crash'].value_counts()
print(class_counts)

Severity of the Crash
Moderate    622
Minor       348
Severe      244
moderate     70
severe       57
minor        31
fatal        25
Fatal        18
Name: count, dtype: int64


In [9]:
annotations['Severity of the Crash'] = annotations['Severity of the Crash'].str.lower()
class_counts = annotations['Severity of the Crash'].value_counts()
print(class_counts)

Severity of the Crash
moderate    692
minor       379
severe      301
fatal        43
Name: count, dtype: int64


In [None]:
import os
import pandas as pd

video_dir = '/kaggle/input/crash-1500/Crash-1500'
frames_dir = '/kaggle/working/frames' 

for idx, row in annotations.iterrows():
    video_number = row['Video Number']
    video_path = os.path.join(video_dir, f'{video_number:06}.mp4')
    output_dir = os.path.join(frames_dir, f'{video_number:06}')
    os.makedirs(output_dir, exist_ok=True)

    ffmpeg_command = f'ffmpeg -ss 00:00:01 -i "{video_path}" -vf fps=5 "{output_dir}/frame%04d.png"'
#     print(f"Running FFmpeg command: {ffmpeg_command}")
    os.system(ffmpeg_command)
    
    extracted_frames = os.listdir(output_dir)
    print(f"Extracted {len(extracted_frames)} frames for video {video_number}")

In [10]:
from PIL import ImageFilter,ImageEnhance
import cv2
import numpy as np

def denoise_frame(frame):
    frame = np.array(frame)
    denoised_frame = cv2.medianBlur(frame, 5)
    return Image.fromarray(denoised_frame)

def deblur_frame(frame):
    return frame.filter(ImageFilter.UnsharpMask(radius=2, percent=150, threshold=3))

def enhance_contrast(frame):
    frame = np.array(frame)
    lab = cv2.cvtColor(frame, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    cl = clahe.apply(l)
    limg = cv2.merge((cl, a, b))
    enhanced_frame = cv2.cvtColor(limg, cv2.COLOR_LAB2RGB)
    return Image.fromarray(enhanced_frame)

def sharpen_frame(frame, factor=2.0):
    enhancer = ImageEnhance.Sharpness(frame)
    return enhancer.enhance(factor)

In [11]:
import os
frames_dir = '/kaggle/working/frames'
video_dirs = next(os.walk(frames_dir))[1]
num_videos = len(video_dirs)

print(f"Number of videos with extracted frames: {num_videos}")

Number of videos with extracted frames: 1415


In [12]:
import os
import glob
import pandas as pd
import torch
from torch.utils.data import Dataset
from torchvision.io import read_image
from torchvision import transforms
from PIL import Image

class VideoDataset(Dataset):
    def __init__(self, annotations_df, root_dir, transform=None, num_frames=20):
        self.annotations = annotations_df
        self.annotations['Severity of the Crash'] = self.annotations['Severity of the Crash'].str.lower()
        self.root_dir = root_dir
        self.transform = transform
        self.label_map = {'minor': 0, 'moderate': 1, 'severe': 2,'fatal': 3}
        self.num_frames = num_frames

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        video_number = self.annotations.iloc[idx]['Video Number']
        label = self.label_map[self.annotations.iloc[idx]['Severity of the Crash']]
        frame_dir = os.path.join(self.root_dir, f'{video_number:06}')
        frames = []
        # print(f"Looking for frames in: {frame_dir}")
        for frame_path in sorted(glob.glob(os.path.join(frame_dir, '*.png'))):
            frame = read_image(frame_path)
            frame = transforms.ToPILImage()(frame)
            frame = denoise_frame(frame)
            frame = sharpen_frame(frame)
            frame = enhance_contrast(frame)
            frame = deblur_frame(frame)
            if self.transform:
                frame = self.transform(frame)
            frames.append(frame)
        # print(f"Found {len(frames)} frames.")

        if not frames: 
            raise ValueError(f"No frames found for video {video_number} in directory {frame_dir}")
        frames = torch.stack(frames)
        return frames, label

transform = transforms.Compose([
    transforms.Resize((224, 224)),  
    transforms.RandomHorizontalFlip(),  
    transforms.RandomRotation(10), 
    transforms.ToTensor(),  
])

dataset = VideoDataset(annotations_df=annotations, root_dir='/kaggle/working/frames', transform=transform)

In [13]:
import torch
from transformers import TimesformerModel, TimesformerConfig
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [14]:
class VideoClassifier(nn.Module):
    def __init__(self, num_classes, num_frames, hidden_size=512, num_layers=1, bidirectional=True, num_heads=4):
        super().__init__()
        self.config = TimesformerConfig(
            num_frames=num_frames,
            num_classes=num_classes  
        )
        self.timesformer = TimesformerModel(self.config)
        self.lstm = nn.LSTM(
            input_size=self.config.hidden_size, 
            hidden_size=hidden_size, 
            num_layers=num_layers, 
            bidirectional=bidirectional, 
            batch_first=True,
        )
        lstm_output_size = hidden_size * 2 if bidirectional else hidden_size

        # Multihead attention mechanism
        self.multihead_attention = nn.MultiheadAttention(
            embed_dim=lstm_output_size, 
            num_heads=num_heads, 
            batch_first=True
        )
        self.classifier = nn.Linear(lstm_output_size, num_classes)

    def forward(self, x):
        transformer_output = self.timesformer(x).last_hidden_state  
        lstm_output, _ = self.lstm(transformer_output)          
        
        # Applying multihead attention
        attn_output, _ = self.multihead_attention(lstm_output, lstm_output, lstm_output)         
        context_vector = torch.sum(attn_output, dim=1)       
        logits = self.classifier(context_vector) 
        return logits

model = VideoClassifier(num_classes=4, num_frames=20)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

VideoClassifier(
  (timesformer): TimesformerModel(
    (embeddings): TimesformerEmbeddings(
      (patch_embeddings): TimesformerPatchEmbeddings(
        (projection): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
      )
      (pos_drop): Dropout(p=0.0, inplace=False)
      (time_drop): Dropout(p=0.0, inplace=False)
    )
    (encoder): TimesformerEncoder(
      (layer): ModuleList(
        (0-11): 12 x TimesformerLayer(
          (drop_path): Identity()
          (attention): TimeSformerAttention(
            (attention): TimesformerSelfAttention(
              (qkv): Linear(in_features=768, out_features=2304, bias=True)
              (attn_drop): Dropout(p=0.0, inplace=False)
            )
            (output): TimesformerSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
          )
          (intermediate): TimesformerIntermediate(
            (dense): Linear(in_featu

In [None]:
class_counts = annotations['label'].value_counts()
print(class_counts)

In [15]:
import numpy as np
import torch
from torch.utils.data import DataLoader, WeightedRandomSampler, SubsetRandomSampler
from collections import Counter

np.random.seed(21)
dataset_size = len(dataset)
indices = list(range(dataset_size))
np.random.shuffle(indices)

train_split = int(np.floor(0.7 * dataset_size))
val_split = int(np.floor(0.2 * dataset_size))
test_split = dataset_size - train_split - val_split

train_indices = indices[:train_split]
val_indices = indices[train_split:train_split + val_split]
test_indices = indices[train_split + val_split:]

print(f"Total dataset size: {dataset_size}")
print(f"Training set size: {len(train_indices)}")
print(f"Validation set size: {len(val_indices)}")
print(f"Test set size: {len(test_indices)}")

# train_labels = [annotations['Severity of the Crash'][idx] for idx in train_indices]
# class_counts = Counter(train_labels)
# total_samples = len(train_labels)
# class_weights = {cls: total_samples / count for cls, count in class_counts.items()}
# print(class_weights)
# sample_weights = [class_weights[annotations['Severity of the Crash'][idx]] for idx in train_indices]
# sample_weights = torch.tensor(sample_weights, dtype=torch.float)

# train_sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
test_sampler = SubsetRandomSampler(test_indices)

train_loader = DataLoader(dataset, batch_size=2, sampler=train_sampler)
val_loader = DataLoader(dataset, batch_size=2, sampler=val_sampler)
test_loader = DataLoader(dataset, batch_size=2, sampler=test_sampler)

print("Total effective samples in training set:", len(train_loader) * train_loader.batch_size)

Total dataset size: 1415
Training set size: 990
Validation set size: 283
Test set size: 142
Total effective samples in training set: 990


In [16]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
num_epochs = 3
best_val_acc = 0.0  
save_path = '/kaggle/working/best_model.pth'  

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for data in train_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # Here the model is evaluated on the validation set
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in val_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    val_acc = 100 * correct / total

    print('[%d] loss: %.3f, val_acc: %.3f' % (epoch + 1, running_loss / len(train_loader), val_acc))
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), save_path)
        print(f"New best model saved with val_acc: {best_val_acc:.3f}")

[1] loss: 341.700, val_acc: 23.322
New best model saved with val_acc: 23.322
[2] loss: 8.172, val_acc: 27.208
New best model saved with val_acc: 27.208
[3] loss: 4.421, val_acc: 46.290
New best model saved with val_acc: 46.290


TEST ACCURACY, RECALL PART

In [17]:
import torch
from sklearn.metrics import precision_score, recall_score, f1_score

def calculate_metrics(model, data_loader, device):
    model.eval()
    correct_predictions = 0
    custom_correct = 0
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for data in data_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            # The probabilities from the outputs
            probabilities = torch.softmax(outputs, dim=1)
            top_probs, top_classes = torch.topk(probabilities, 2, dim=1)

            p1 = top_classes[:, 0]  
            p2 = top_classes[:, 1]  

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(p1.cpu().numpy())

            for i in range(len(labels)):
                true_label = labels[i].item()
                predicted1 = p1[i].item()
                predicted2 = p2[i].item()
                
                if abs(predicted1-predicted2) == 1:
                    if(predicted1==true_label or predicted2==true_label):  
                        custom_correct += 1 
                else:
                    if predicted1 == true_label:
                        custom_correct += 1

    total_samples = len(all_labels)
    custom_accuracy = custom_correct / total_samples
    precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

    print(f"Custom Accuracy: {custom_accuracy * 100:.2f}%")
    print(f"Precision: {precision:.2f}")
    print(f"Recall: {recall:.2f}")
    print(f"F1 Score: {f1:.2f}")

In [18]:
model.load_state_dict(torch.load(save_path))
calculate_metrics(model, test_loader, device)

Custom Accuracy: 73.94%
Precision: 0.33
Recall: 0.57
F1 Score: 0.41


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

model.load_state_dict(torch.load('/kaggle/working/best_model.pth'))
model.eval()

correct = 0
total = 0
custom_correct = 0
all_true_labels = []
all_predicted_labels = []

with torch.no_grad():
    for data in test_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)

        all_true_labels.extend(labels.cpu().numpy())
        all_predicted_labels.extend(predicted.cpu().numpy())

        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Custom accuracy calculation
        for i in range(len(labels)):
            p1 = predicted[i].item()
            true_label = labels[i].item()

            # Find the two closest severity classes
            p2 = p1 + 1 if p1 < 3 else p1 - 1

            if abs(p1 - p2) == 1:
                if p1 == true_label or p2 == true_label:
                    custom_correct += 1
            else:
                if p1 == true_label:
                    custom_correct += 1

# Calculate test accuracy, custom accuracy, precision, recall, and F1 score
test_acc = 100 * correct / total
custom_accuracy = 100 * custom_correct / total
precision = precision_score(all_true_labels, all_predicted_labels, average='weighted')
recall = recall_score(all_true_labels, all_predicted_labels, average='weighted')
f1 = f1_score(all_true_labels, all_predicted_labels, average='weighted')

print(f"Test Accuracy: {test_acc:.2f}%")
print(f"Custom Accuracy: {custom_accuracy:.2f}%")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")

In [None]:
import torch
from sklearn.metrics import precision_score, recall_score, f1_score

def calculate_metrics(model, data_loader, device):
    model.eval()
    correct_predictions = 0
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for data in data_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
            
            # Custom accuracy calculation
            for i in range(len(labels)):
                true_severity = labels[i].item()
                p1 = predicted[i].item()

                # Check if the prediction is correct or close to the correct value
                if abs(p1 - true_severity) <= 1:
                    correct_predictions += 1

    # Calculate custom accuracy
    total_samples = len(all_labels)
    custom_accuracy = correct_predictions / total_samples
    print(f"Custom accuracy: {custom_accuracy * 100:.2f}%")

    # Calculate precision, recall, and F1 score
    precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

    print(f"Precision: {precision * 100:.2f}%")
    print(f"Recall: {recall * 100:.2f}%")
    print(f"F1 Score: {f1 * 100:.2f}%")

# Load the best model
model.load_state_dict(torch.load(save_path))

# Calculate metrics on the test set
calculate_metrics(model, test_loader, device)
