In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))
# /kaggle/input/car-crash-dataset-ccd/CrashBest
# /kaggle/input/car-crash-dataset-ccd/CrashBest/C_000001_01.jpg
# /kaggle/input/crash-1500/Crash-1500/000001.mp4
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [1]:
import os

# Define the file you want to delete
file_to_delete = 'best_model.pth'

# Delete the file
if os.path.exists(file_to_delete):
    os.remove(file_to_delete)
    print(f"File '{file_to_delete}' deleted successfully.")
else:
    print(f"File '{file_to_delete}' does not exist.")

File 'best_model.pth' deleted successfully.


In [None]:
!sudo apt-get update
!sudo apt-get install ffmpeg

In [1]:
import pandas as pd

csv_file_path = '/kaggle/input/preprocessed-df/preprocessed_df (1).csv'
annotations = pd.read_csv(csv_file_path)

class_counts = annotations['Severity of the Crash'].value_counts()
print(class_counts)

Severity of the Crash
Moderate    622
Minor       348
Severe      244
moderate     70
severe       57
minor        31
fatal        25
Fatal        18
Name: count, dtype: int64


In [2]:
annotations['Severity of the Crash'] = annotations['Severity of the Crash'].str.lower()
class_counts = annotations['Severity of the Crash'].value_counts()
print(class_counts)

Severity of the Crash
moderate    692
minor       379
severe      301
fatal        43
Name: count, dtype: int64


In [None]:
import os
import pandas as pd

video_dir = '/kaggle/input/crash-1500/Crash-1500'
frames_dir = '/kaggle/working/frames' 

for idx, row in annotations.iterrows():
    video_number = row['Video Number']
    video_path = os.path.join(video_dir, f'{video_number:06}.mp4')
    output_dir = os.path.join(frames_dir, f'{video_number:06}')
    os.makedirs(output_dir, exist_ok=True)

    ffmpeg_command = f'ffmpeg -ss 00:00:01 -i "{video_path}" -vf fps=5 "{output_dir}/frame%04d.png"'
#     print(f"Running FFmpeg command: {ffmpeg_command}")
    os.system(ffmpeg_command)
    
    extracted_frames = os.listdir(output_dir)
    print(f"Extracted {len(extracted_frames)} frames for video {video_number}")

In [3]:
from PIL import ImageFilter,ImageEnhance
import cv2
import numpy as np

def denoise_frame(frame):
    frame = np.array(frame)
    denoised_frame = cv2.medianBlur(frame, 5)
    return Image.fromarray(denoised_frame)

def deblur_frame(frame):
    return frame.filter(ImageFilter.UnsharpMask(radius=2, percent=150, threshold=3))

def enhance_contrast(frame):
    frame = np.array(frame)
    lab = cv2.cvtColor(frame, cv2.COLOR_RGB2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    cl = clahe.apply(l)
    limg = cv2.merge((cl, a, b))
    enhanced_frame = cv2.cvtColor(limg, cv2.COLOR_LAB2RGB)
    return Image.fromarray(enhanced_frame)

def sharpen_frame(frame, factor=2.0):
    enhancer = ImageEnhance.Sharpness(frame)
    return enhancer.enhance(factor)

In [4]:
import os
frames_dir = '/kaggle/working/frames'
video_dirs = next(os.walk(frames_dir))[1]
num_videos = len(video_dirs)

print(f"Number of videos with extracted frames: {num_videos}")

Number of videos with extracted frames: 1415


In [4]:
import os
import glob
import pandas as pd
import torch
from torch.utils.data import Dataset
from torchvision.io import read_image
from torchvision import transforms
from PIL import Image

class VideoDataset(Dataset):
    def __init__(self, annotations_df, root_dir, transform=None, num_frames=20):
        self.annotations = annotations_df
        self.annotations['Severity of the Crash'] = self.annotations['Severity of the Crash'].str.lower()
        self.root_dir = root_dir
        self.transform = transform
        self.label_map = {'minor': 0, 'moderate': 1, 'severe': 2,'fatal': 3}
        self.num_frames = num_frames

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        video_number = self.annotations.iloc[idx]['Video Number']
        label = self.label_map[self.annotations.iloc[idx]['Severity of the Crash']]
        frame_dir = os.path.join(self.root_dir, f'{video_number:06}')
        frames = []
        # print(f"Looking for frames in: {frame_dir}")
        for frame_path in sorted(glob.glob(os.path.join(frame_dir, '*.png'))):
            frame = read_image(frame_path)
            frame = transforms.ToPILImage()(frame)
            frame = denoise_frame(frame)
            frame = sharpen_frame(frame)
            frame = enhance_contrast(frame)
            frame = deblur_frame(frame)
            if self.transform:
                frame = self.transform(frame)
            frames.append(frame)
        # print(f"Found {len(frames)} frames.")

        if not frames: 
            raise ValueError(f"No frames found for video {video_number} in directory {frame_dir}")
        frames = torch.stack(frames)
        return frames, label

transform = transforms.Compose([
    transforms.Resize((112, 112)),  
    transforms.RandomHorizontalFlip(),  
    transforms.RandomRotation(10), 
    transforms.ToTensor(),  
])

dataset = VideoDataset(annotations_df=annotations, root_dir='/kaggle/working/frames', transform=transform)

In [5]:
import torch
from transformers import TimesformerModel, TimesformerConfig
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import TimesformerConfig, TimesformerModel
import torch.nn.functional as F

class Hybrid3DCNNTimeSformer(nn.Module):
    def __init__(self, num_classes, num_frames, cnn_out_channels=128, hidden_size=512, num_layers=1, bidirectional=True, num_heads=4):
        super(Hybrid3DCNNTimeSformer, self).__init__()

        # 3D CNN Layers
        self.conv1 = nn.Conv3d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool3d(kernel_size=(1, 2, 2), stride=(1, 2, 2))  # Pooling only spatial dimensions
        self.conv2 = nn.Conv3d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv3d(64, cnn_out_channels, kernel_size=3, stride=1, padding=1)
        
        # TimeSformer configuration
        self.timesformer_config = TimesformerConfig(num_classes=num_classes)
        self.timesformer = TimesformerModel(self.timesformer_config)
        
        # Calculate output size for LSTM
        self.dummy_input = torch.zeros(1, 3, num_frames, 112, 112)
        self.flattened_size = self._get_conv_output_size(self.dummy_input)
        
        # Additional LSTM and Attention layers
        self.lstm = nn.LSTM(input_size=self.flattened_size, hidden_size=hidden_size, num_layers=num_layers,
                            bidirectional=bidirectional, batch_first=True)
        lstm_output_size = hidden_size * 2 if bidirectional else hidden_size
        self.multihead_attention = nn.MultiheadAttention(embed_dim=lstm_output_size, num_heads=num_heads, batch_first=True)
        
        # Classifier layer
        self.classifier = nn.Linear(lstm_output_size, num_classes)

    def _get_conv_output_size(self, input_tensor):
        x = self.conv1(input_tensor)
        x = self.pool(F.relu(x))
        x = self.conv2(x)
        x = self.pool(F.relu(x))
        x = self.conv3(x)
        x = self.pool(F.relu(x))
        print("Shape after final CNN layer:", x.shape)  
        return x.numel()
    
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))

        print("Shape before reshaping for TimeSformer:", x.shape)
        
        # Reshape to (batch_size, num_frames, num_channels, height, width)
        cnn_features = x.view(x.size(0), 20, 128, 14, 14)  
        print("Shape of cnn_features after reshaping:", cnn_features.shape)

        # TimeSformer processing
        transformer_output = self.timesformer(cnn_features).last_hidden_state

        # LSTM processing
        lstm_output, _ = self.lstm(transformer_output)
        attn_output, _ = self.multihead_attention(lstm_output, lstm_output, lstm_output)
 
        # Sum over attention outputs
        context_vector = torch.sum(attn_output, dim=1)

        logits = self.classifier(context_vector)
        return logits

# Model instantiation
model = Hybrid3DCNNTimeSformer(num_classes=4, num_frames=20)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)


Shape after final CNN layer: torch.Size([1, 128, 20, 14, 14])


OutOfMemoryError: CUDA out of memory. Tried to allocate 7.67 GiB. GPU 0 has a total capacty of 14.74 GiB of which 6.46 GiB is free. Process 6118 has 8.28 GiB memory in use. Of the allocated memory 8.12 GiB is allocated by PyTorch, and 51.59 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [9]:
import numpy as np
import torch
from torch.utils.data import DataLoader, WeightedRandomSampler, SubsetRandomSampler
from collections import Counter

np.random.seed(21)
dataset_size = len(dataset)
indices = list(range(dataset_size))
np.random.shuffle(indices)

train_split = int(np.floor(0.7 * dataset_size))
val_split = int(np.floor(0.2 * dataset_size))
test_split = dataset_size - train_split - val_split

train_indices = indices[:train_split]
val_indices = indices[train_split:train_split + val_split]
test_indices = indices[train_split + val_split:]

print(f"Total dataset size: {dataset_size}")
print(f"Training set size: {len(train_indices)}")
print(f"Validation set size: {len(val_indices)}")
print(f"Test set size: {len(test_indices)}")

# train_labels = [annotations['Severity of the Crash'][idx] for idx in train_indices]
# class_counts = Counter(train_labels)
# total_samples = len(train_labels)
# class_weights = {cls: total_samples / count for cls, count in class_counts.items()}
# print(class_weights)
# sample_weights = [class_weights[annotations['Severity of the Crash'][idx]] for idx in train_indices]
# sample_weights = torch.tensor(sample_weights, dtype=torch.float)

# train_sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)
train_sampler = SubsetRandomSampler(train_indices)
val_sampler = SubsetRandomSampler(val_indices)
test_sampler = SubsetRandomSampler(test_indices)

train_loader = DataLoader(dataset, batch_size=2, sampler=train_sampler)
val_loader = DataLoader(dataset, batch_size=2, sampler=val_sampler)
test_loader = DataLoader(dataset, batch_size=2, sampler=test_sampler)

print("Total effective samples in training set:", len(train_loader) * train_loader.batch_size)

Total dataset size: 1415
Training set size: 990
Validation set size: 283
Test set size: 142
Total effective samples in training set: 990


In [26]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
num_epochs = 3
best_val_acc = 0.0  
save_path = '/kaggle/working/best_model.pth'  

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for data in train_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        inputs = inputs.permute(0, 2, 1, 3, 4)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # Here the model is evaluated on the validation set
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in val_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    val_acc = 100 * correct / total

    print('[%d] loss: %.3f, val_acc: %.3f' % (epoch + 1, running_loss / len(train_loader), val_acc))
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), save_path)
        print(f"New best model saved with val_acc: {best_val_acc:.3f}")

RuntimeError: shape '[2, 20, 128, 14, 14]' is invalid for input of size 100352

TEST ACCURACY, RECALL PART

In [None]:
import torch
from sklearn.metrics import precision_score, recall_score, f1_score

def calculate_metrics(model, data_loader, device):
    model.eval()
    correct_predictions = 0
    custom_correct = 0
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for data in data_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            # The probabilities from the outputs
            probabilities = torch.softmax(outputs, dim=1)
            top_probs, top_classes = torch.topk(probabilities, 2, dim=1)

            p1 = top_classes[:, 0]  
            p2 = top_classes[:, 1]  

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(p1.cpu().numpy())

            for i in range(len(labels)):
                true_label = labels[i].item()
                predicted1 = p1[i].item()
                predicted2 = p2[i].item()
                
                if abs(predicted1-predicted2) == 1:
                    if(predicted1==true_label or predicted2==true_label):  
                        custom_correct += 1 
                else:
                    if predicted1 == true_label:
                        custom_correct += 1

    total_samples = len(all_labels)
    custom_accuracy = custom_correct / total_samples
    precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
    f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

    print(f"Custom Accuracy: {custom_accuracy * 100:.2f}%")
    print(f"Precision: {precision:.2f}")
    print(f"Recall: {recall:.2f}")
    print(f"F1 Score: {f1:.2f}")

In [None]:
model.load_state_dict(torch.load(save_path))
calculate_metrics(model, test_loader, device)

PARTS WHERE MY CODE GAVE AN ERROR

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ThreeDCNN(nn.Module):
    def __init__(self, out_features=512):
        super(ThreeDCNN, self).__init__()
        self.conv1 = nn.Conv3d(in_channels=3, out_channels=32, kernel_size=3, stride=2, padding=2)
        self.pool = nn.MaxPool3d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv3d(32, 64, kernel_size=3, stride=2, padding=2)
        self.conv3 = nn.Conv3d(64, 128, kernel_size=3, stride=2, padding=2)
        self.fc = nn.Linear(512, out_features)  

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = F.relu(self.conv3(x))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [7]:
class VideoClassifier(nn.Module):
    def __init__(self, num_classes=4, num_frames=20, cnn_output_dim=512):
        super(VideoClassifier, self).__init__()
        self.cnn = ThreeDCNN(out_features=cnn_output_dim)  # 3D CNN feature extractor
        self.timesformer_config = TimesformerConfig(
            num_frames=num_frames,
            hidden_size=cnn_output_dim,
            num_classes=num_classes
        )
        self.timesformer = TimesformerModel(self.timesformer_config)
        self.fc = nn.Linear(cnn_output_dim, num_classes)

    def forward(self, x):
        batch_size, channels, depth, height, width = x.size()
        cnn_features = []
        for i in range(depth):  
            frame = x[:, :, i, :, :].unsqueeze(2)  
            cnn_features.append(self.cnn(frame))
        cnn_features = torch.stack(cnn_features, dim=1)  

        cnn_features = cnn_features.permute(0, 2, 1, 3, 4)  
        timesformer_output = self.timesformer(cnn_features).last_hidden_state
        logits = self.fc(torch.mean(timesformer_output, dim=1)) 

        return logits

model = VideoClassifier(num_classes=4, num_frames=20)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)


VideoClassifier(
  (cnn): ThreeDCNN(
    (conv1): Conv3d(3, 32, kernel_size=(3, 3, 3), stride=(2, 2, 2), padding=(2, 2, 2))
    (pool): MaxPool3d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (conv2): Conv3d(32, 64, kernel_size=(3, 3, 3), stride=(2, 2, 2), padding=(2, 2, 2))
    (conv3): Conv3d(64, 128, kernel_size=(3, 3, 3), stride=(2, 2, 2), padding=(2, 2, 2))
    (fc): Linear(in_features=512, out_features=512, bias=True)
  )
  (timesformer): TimesformerModel(
    (embeddings): TimesformerEmbeddings(
      (patch_embeddings): TimesformerPatchEmbeddings(
        (projection): Conv2d(3, 512, kernel_size=(16, 16), stride=(16, 16))
      )
      (pos_drop): Dropout(p=0.0, inplace=False)
      (time_drop): Dropout(p=0.0, inplace=False)
    )
    (encoder): TimesformerEncoder(
      (layer): ModuleList(
        (0-11): 12 x TimesformerLayer(
          (drop_path): Identity()
          (attention): TimeSformerAttention(
            (attention): TimesformerSelfAttentio

In [9]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
num_epochs = 3
best_val_acc = 0.0  
save_path = '/kaggle/working/best_model.pth'  

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for data in train_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        inputs = inputs.permute(0, 2, 1, 3, 4)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # Here the model is evaluated on the validation set
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in val_loader:
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    val_acc = 100 * correct / total

    print('[%d] loss: %.3f, val_acc: %.3f' % (epoch + 1, running_loss / len(train_loader), val_acc))
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), save_path)
        print(f"New best model saved with val_acc: {best_val_acc:.3f}")

RuntimeError: permute(sparse_coo): number of dimensions in the tensor input does not match the length of the desired ordering of dimensions i.e. input.dim() = 3 is not equal to len(dims) = 5