# creation of the car crash detection model

## Step 1 : Import librairies

In [None]:
import pandas as pd
from pathlib import Path
from torch.utils.data import DataLoader, Dataset
import cv2
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import torch
from PIL import Image
from IPython.display import clear_output, display
import keyboard
import time
import torch.optim as optim
# to install pytorch, follow instructions on https://pytorch.org/get-started/locally/
# if CUDA is installed, this should allow GPU training
# -> pip install torchsummary
from torchsummary import summary
print(torch.cuda.is_available())
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import random_split

## Step 2 : Retrieve data

In [None]:
TABLENAME = "Train_Crash_Table.csv"

# Load the CSV file into a DataFrame
df = pd.read_csv(TABLENAME)
df

In [None]:
# Select only the frame columns from the DataFrame
frame_data = df["collision"]

# Flatten data into a single list
labels = frame_data.values.flatten()
print(labels)
print(labels.shape)

In [None]:
IMAGE_PATH = "C:\\Users\\sacha\\OneDrive\\Bureau\\dataset"
BATCH_SIZE = int(len(labels))

image_paths = list(Path(IMAGE_PATH).glob("*.jpg"))[:BATCH_SIZE]
print(image_paths[0])
print(len(image_paths))

In [None]:
img_width = 256
img_height = 256

# create a dataset class for the crash frames
class CrashFrameDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None, csv_path=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform
        self.csv_path = csv_path

        if self.csv_path:
            self.df = pd.read_csv(self.csv_path)

    def __len__(self):
        return min(len(self.image_paths), len(self.labels))

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = cv2.cvtColor(cv2.imread(str(image_path)), cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (img_width, img_height))
        image = Image.fromarray(image.astype('uint8'))
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

    def toggle_label(self, idx):
        # Toggle in memory
        self.labels[idx] = 1 - self.labels[idx]

        if self.csv_path:
            # Reload CSV (optional, or use self.df directly if already loaded)
            self.df.at[idx, 'collision'] = self.labels[idx]
            self.df.to_csv(self.csv_path, index=False)

    def count_accidents(self):
        count_accidents = 0
        count_non_accidents = 0
        
        for i in range(len(self)):
            if self.labels[i] == 1:
                count_accidents += 1
            else:
                count_non_accidents += 1
        
        return count_accidents, count_non_accidents

transform = transforms.Compose([
    transforms.ToTensor()
])

# Use the existing transform variable defined in a previous cell
dataset = CrashFrameDataset(image_paths, labels, transform=transform, csv_path=TABLENAME)
len(dataset)

# Step 3: Preparing the Data

In [None]:
# Create a matplotlib figure and axis
fig, ax = plt.subplots()

# Initialize index to track the current image
current_index = 0

def update_image():
    global current_index
    image, label = dataset[current_index]
    
    # Convert tensor to numpy and transpose to (H, W, C) for imshow
    if isinstance(image, torch.Tensor):
        image = image.cpu().numpy().transpose(1, 2, 0)
    else:
        image = np.array(image)
        
    ax.clear()
    ax.imshow(image)
    ax.set_title(f"Label: {label}")
    ax.axis("off")
    
    clear_output(wait=True)
    print("press 'right' to go to the next image, 'left' for previous, 'space' to toggle label, 'esc' to exit")
    display(fig)

update_image()
while True:
    if keyboard.is_pressed("right"): # Move to the next image (Right Arrow Key)
        current_index = (current_index + 1) % len(image_paths)
        update_image()
        time.sleep(0.3)

    elif keyboard.is_pressed("left"): # Move to the previous image (Left Arrow Key)
        current_index = (current_index - 1) % len(image_paths)
        update_image()
        time.sleep(0.3)

    elif keyboard.is_pressed("space"): # Toggle the label (Space Key)
        dataset.toggle_label(current_index)
        update_image()
        time.sleep(0.3)

    elif keyboard.is_pressed("esc"): # Exit the loop (Escape Key)
        clear_output(wait=True)
        print("Exiting the labeling tool.")
        count_accidents, count_non_accidents = dataset.count_accidents()
        print(f"Dataset : {len(dataset)} images ({count_accidents} accidents and {count_non_accidents} non-accidents)")
        plt.close(fig)
        break

    time.sleep(0.05)

In [None]:
import random

# recover accident and non-accident indices
accident_indices = [i for i, label in enumerate(dataset.labels) if label == 1]
non_accident_indices = [i for i, label in enumerate(dataset.labels) if label == 0]

# Randomly sample from both classes to balance the dataset
min_count = min(len(accident_indices), len(non_accident_indices))
accident_sample = random.sample(accident_indices, min_count)
non_accident_sample = random.sample(non_accident_indices, min_count)

# shuffle the indices to ensure randomness
balanced_indices = accident_sample + non_accident_sample
random.shuffle(balanced_indices)

# build the balanced dataset
balanced_image_paths = [dataset.image_paths[i] for i in balanced_indices]
balanced_labels = [dataset.labels[i] for i in balanced_indices]
balanced_dataset = CrashFrameDataset(balanced_image_paths, balanced_labels, transform=dataset.transform)

print(f"Balanced dataset : {len(balanced_dataset)} images ({min_count} accidents and {min_count} non-accidents)")

In [None]:
# Define the train-test split ratio
train_ratio = 0.8
train_size = int(train_ratio * len(balanced_dataset))
test_size = len(balanced_dataset) - train_size

# Split the dataset into training and testing sets
train_dataset, test_dataset = random_split(balanced_dataset, [train_size, test_size])

print(f"train dataset : {len(train_dataset)} images and test dataset : {len(test_dataset)} images")

# Step 4: Build a Model

In [None]:
class CrashDetection(nn.Module):
    def __init__(self, input_channels=3, filter_base=8, input_height=256, input_width=256):
        super(CrashDetection, self).__init__()
        self.input_channels = input_channels
        self.filter_base = filter_base
        self.input_height = input_height
        self.input_width = input_width

        # CNN layers
        self.conv1_1 = nn.Conv2d(input_channels, filter_base, kernel_size=3, padding=1)
        self.conv1_2 = nn.Conv2d(filter_base, filter_base, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=4)

        self.conv2_1 = nn.Conv2d(filter_base, filter_base * 2, kernel_size=3, padding=1)
        self.conv2_2 = nn.Conv2d(filter_base * 2, filter_base * 2, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=4)

        self.conv3_1 = nn.Conv2d(filter_base * 2, filter_base * 4, kernel_size=3, padding=1)
        self.conv3_2 = nn.Conv2d(filter_base * 4, filter_base * 4, kernel_size=3, padding=1)
        self.pool3 = nn.MaxPool2d(kernel_size=2)

        # Placeholder for output size
        self._to_linear = None
        self._compute_flattened_size()

        self.fc1 = nn.Linear(self._to_linear, 128)
        self.fc2 = nn.Linear(128, 16)
        self.fc3 = nn.Linear(16, 1)

    def _compute_flattened_size(self):
        with torch.no_grad():
            x = torch.zeros(1, self.input_channels, self.input_height, self.input_width)
            x = F.relu(self.conv1_1(x))
            x = F.relu(self.conv1_2(x))
            x = self.pool1(x)

            x = F.relu(self.conv2_1(x))
            x = F.relu(self.conv2_2(x))
            x = self.pool2(x)

            x = F.relu(self.conv3_1(x))
            x = F.relu(self.conv3_2(x))
            x = self.pool3(x)

            self._to_linear = x.view(1, -1).shape[1]

    def forward(self, x):
        x = F.relu(self.conv1_1(x))
        x = F.relu(self.conv1_2(x))
        x = self.pool1(x)
        #print('Output shape of layer 1', x.shape)

        x = F.relu(self.conv2_1(x))
        x = F.relu(self.conv2_2(x))
        x = self.pool2(x)
        #print('Output shape of layer 2', x.shape)

        x = F.relu(self.conv3_1(x))
        x = F.relu(self.conv3_2(x))
        x = self.pool3(x)
        #print('Output shape of layer 3', x.shape)

        x = x.reshape(x.size(0), -1)
        #print('Shape required to pass to Linear Layer', x.shape)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = CrashDetection(input_channels=3, filter_base=8, input_height=img_height, input_width=img_width).to(device)
summary(model, (3, img_width, img_height))

# Step 5: Train the model

In [None]:
%%time

batch_size = 50
epochs = 16

loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

loss_history = []
accuracy_history = []

for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (images, labels) in enumerate(loader):
        images = images.to(device)
        
        # Ensure labels are in the correct format
        if labels.max() > 1:
            labels = labels / 255.0
        
        labels = labels.unsqueeze(1).float().to(device)

        optimizer.zero_grad()
        outputs = model(images)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # prediction and accuracy calculation
        predicted = (torch.sigmoid(outputs) > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(loader)
    epoch_accuracy = 100 * correct / total

    loss_history.append(epoch_loss)
    accuracy_history.append(epoch_accuracy)

    print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")

In [None]:
fig, ax1 = plt.subplots()

# Plot loss on the primary y-axis
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Loss', color='tab:blue')
ax1.plot(loss_history, label='Loss', color='tab:blue')
ax1.tick_params(axis='y', labelcolor='tab:blue')

# Create a secondary y-axis for accuracy
ax2 = ax1.twinx()
ax2.set_ylabel('Accuracy', color='tab:orange')
ax2.plot(accuracy_history, label='Accuracy', color='tab:orange')
ax2.tick_params(axis='y', labelcolor='tab:orange')

# Add a title and show the plot
plt.title('Training Loss and Accuracy')
fig.tight_layout()
plt.show()

# Step 6 : Evaluation 

In [None]:
model.eval()  # Set the model to evaluation mode

test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

total_correct = 0
total_samples = 0
total_loss = 0.0

all_preds = []
all_targets = []

with torch.no_grad():
    for test_images, test_labels in test_loader:
        test_images = test_images.to(device)
        test_labels = test_labels.to(device).unsqueeze(1).float()

        outputs = model(test_images)
        loss = criterion(outputs, test_labels)
        total_loss += loss.item() * test_images.size(0)

        predicted_labels = (torch.sigmoid(outputs) > 0.5).float()

        # Store predictions and targets for later analysis
        all_preds.extend(predicted_labels.cpu().numpy())
        all_targets.extend(test_labels.cpu().numpy())

        total_correct += (predicted_labels == test_labels).sum().item()
        total_samples += test_labels.size(0)

# Calculate average loss and accuracy
avg_loss = total_loss / total_samples
accuracy = total_correct / total_samples

print(f'Test loss: {avg_loss:.4f}, Test accuracy: {accuracy:.4f}')

In [None]:
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt

print(classification_report(all_targets, all_preds, digits=4))
print(f"Accuracy: {accuracy_score(all_targets, all_preds):.4f}")

# Plot confusion matrix
cm = confusion_matrix(all_targets, all_preds)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=["No Crash", "Crash"], 
            yticklabels=["No Crash", "Crash"])
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.tight_layout()
plt.show()

# Step 7 : Save the model

In [None]:
from datetime import datetime

timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
torch.save(model.state_dict(), f"Models/CarCrashPytorch_{timestamp}.keras")
print(f"Model saved as CarCrashPytorch_{timestamp}.keras")

# Step 8 : Load and try the model with new data

In [None]:
import glob

model_paths = glob.glob("Models/CarCrashPytorch_*.keras")
model_path = model_paths[len(model_paths) - 1]
print(f"Loading model from: {model_path}")

model2 = CrashDetection().to(device)
model2.load_state_dict(torch.load(model_path))
model2.eval()  # Set the model to evaluation mode

In [None]:
NewData = cv2.cvtColor(cv2.imread('NewCrash.jpg'), cv2.COLOR_BGR2RGB)
NewData = cv2.resize(NewData,(256,256))
plt.imshow(NewData)
NewData.shape

In [None]:

# Preprocess the NewData
NewData_tensor = torch.tensor(NewData, dtype=torch.float32).permute(2, 0, 1).to(device) / 255.0  # Normalize to [0, 1]
NewData_tensor.shape
# Set the model to evaluation mode
model2.eval()

# Perform prediction
with torch.no_grad():
    y_sigmoid = model2(NewData_tensor.unsqueeze(0))
    y_pred = (torch.sigmoid(y_sigmoid) > 0.5).int().cpu().numpy().item()

if y_pred == 0:
    print(f"No crash detected") 
else:
    print(f"Crash detected ")