In [12]:
import pandas as pd
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras import Sequential
from keras.layers import Dense,Conv2D,MaxPooling2D,Flatten,BatchNormalization,Dropout,AveragePooling2D,Softmax
from tensorflow.keras.layers import Conv3D,LayerNormalization,ReLU
from tensorflow.keras import Sequential
from tensorflow.keras.optimizers import SGD

In [3]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

In [4]:
!kaggle datasets download -d asefjamilajwad/car-crash-dataset-ccd

Dataset URL: https://www.kaggle.com/datasets/asefjamilajwad/car-crash-dataset-ccd
License(s): other
Downloading car-crash-dataset-ccd.zip to /content
100% 7.60G/7.61G [02:06<00:00, 67.4MB/s]
100% 7.61G/7.61G [02:06<00:00, 64.7MB/s]


In [5]:
import zipfile
zip_ref = zipfile.ZipFile('/content/car-crash-dataset-ccd.zip', 'r')
zip_ref.extractall('/content')
zip_ref.close()

##Data Preprocessing

In [8]:
df1=pd.read_csv("/content/Severity.csv")
# num_videos=df1.shape[0]
num_videos=5

In [12]:
from google.colab import files
import shutil
# Define source and destination paths
source_path = "/content/CrashBest/"
dest_path = "/content/test1/"
os.makedirs(dest_path, exist_ok=True)

# List all files in the source directory
all_files = os.listdir(source_path)

# Copy the first 100 files
for video_name in range(1,num_videos+1):
  for i in range(1,51):
    frame_number = str(i).zfill(2)  # Ensure two digits with leading zeros
    file_name = f"C_{video_name:06}_{frame_number}.jpg"

    shutil.copy(source_path + file_name, dest_path + file_name)



In [13]:
BASE_PATH =dest_path
TABLENAME = "/content/Crash_Table.csv"

In [20]:
df=pd.read_csv(TABLENAME)
col_class="Severity of the Crash"
df[col_class] = df1[col_class]
df[col_class] = df[col_class].fillna('minor')#fill NAN with minor of col_class
df[col_class] = df[col_class].str.lower()#convert into lower case
num_classes = df[col_class].unique()
label_encoding = {category: [1 if category == unique else 0 for unique in num_classes] for category in num_classes}
img_size=[225,225]

In [21]:
def load_video(video_name):
  frames = []
  for i in range(1, 51):
    frame_number = str(i).zfill(2)  # Ensure two digits with leading zeros
    file_name = f"C_{video_name:06}_{frame_number}.jpg"  # Six-digit video name
    frame_path = os.path.join(BASE_PATH, file_name)
    if os.path.exists(frame_path):
      img = cv2.imread(frame_path)  # Use OpenCV to read image
      img = cv2.resize(img, (img_size[0], img_size[1]))  # Resize image
      frames.append(img)
  return tf.convert_to_tensor(frames, dtype=tf.float32)

In [22]:
def attention_func(data):
  accident_frame_atten=0.8
  non_accident_frame_atten=0.2
  frame_scene=[]
  for index in range(1,51):
    frame_num=f"frame_{index}"
    if df[frame_num][data]==1:
      frame_scene.append(accident_frame_atten)
    else:
      frame_scene.append(non_accident_frame_atten)
  return tf.convert_to_tensor(frame_scene, dtype=tf.float32)


In [29]:
from sklearn.model_selection import train_test_split
def Create_dataset():
  images_list=[]
  labels_list=[]
  attention_list=[]
  for data in range(0,num_videos):
    frame=load_video(df['vidname'][data])
    images_list.append(frame)
    labels_list.append(label_encoding[df[col_class][data]])
    attention_list.append(attention_func(data))
  image_array=np.array(images_list)
  labels_array=np.array(labels_list)
  attention_array=np.array(attention_list)
  train_images, temp_images, train_labels, temp_labels, train_attentions, temp_attentions = train_test_split(
      image_array, labels_array, attention_array, train_size=0.7, random_state=42, shuffle=True
  )

  val_images, test_images, val_labels, test_labels, val_attentions, test_attentions = train_test_split(
      temp_images, temp_labels, temp_attentions, test_size=0.5, random_state=42, shuffle=True
  )
  train_images=tf.convert_to_tensor(train_images)
  val_images=tf.convert_to_tensor(val_images)
  test_images=tf.convert_to_tensor(test_images)
  train_attentions=tf.convert_to_tensor(train_attentions)
  val_attentions=tf.convert_to_tensor(val_attentions)
  test_attentions=tf.convert_to_tensor(test_attentions)
  train_labels=tf.convert_to_tensor(train_labels)
  val_labels=tf.convert_to_tensor(val_labels)
  test_labels=tf.convert_to_tensor(test_labels)
  return train_images, val_images, test_images,  train_labels,val_labels, test_labels,  train_attentions, val_attentions, test_attentions
train_images, val_images, test_images,  train_labels,val_labels, test_labels,  train_attentions, val_attentions, test_attentions=Create_dataset()


##Model

In [2]:
class Conv2Plus1D(tf.keras.layers.Layer):
    def __init__(self, filters, kernel_size, padding):
        super().__init__()
        self.seq = Sequential([
            # Spatial decomposition
            Conv3D(filters=filters,
                  kernel_size=(1, kernel_size[1], kernel_size[2]),
                  padding=padding),
            # Temporal decomposition
            Conv3D(filters=filters,
                  kernel_size=(kernel_size[0], 1, 1),
                  padding=padding)
        ])

    def call(self, inputs):
        return self.seq(inputs)


In [3]:
class CombineTemporalLayers(tf.keras.layers.Layer):
    def __init__(self):
        super(CombineTemporalLayers, self).__init__()
        self.conv3d = tf.keras.layers.Conv3D(
            filters=3,
            kernel_size=(50, 1, 1),
            strides=(1, 1, 1),
            padding='valid',
            activation=None,
            use_bias=False
        )

    def call(self, inputs):
        # Apply Conv3D to combine the layers
        x = self.conv3d(inputs)
        # Squeeze the combined layer dimension
        x = tf.squeeze(x, axis=1)
        return x

In [4]:
class AttentionLayer(tf.keras.layers.Layer):
    def __init__(self):
        super(AttentionLayer, self).__init__()

    def build(self, input_shape):
        super(AttentionLayer, self).build(input_shape)

    def call(self, x, attention):
        # Expand dimensions of the attention tensor to match the dimensions of x
        attention = tf.expand_dims(tf.expand_dims(tf.expand_dims(attention, axis=-1), axis=-1), axis=-1)
        # Perform element-wise multiplication
        return x * attention

In [5]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, filters, kernel_size):
    super(Encoder,self).__init__()
    self.seq = keras.Sequential([
        Conv2Plus1D(filters=filters,
                    kernel_size=kernel_size,
                    padding='same'),
        LayerNormalization(),
        ReLU()
    ])
    self.attention=AttentionLayer()
    self.combine=CombineTemporalLayers()
  def call(self, x,attention):
     x=self.attention(self.seq(x),attention)
     return self.combine(x)


In [6]:
class inception(keras.Model):
    def __init__(self,num_first_filter,num_second_filter):
        super().__init__()
        self.conv1=Sequential([Conv2D(num_first_filter,kernel_size=(1,1),strides=1,padding='valid'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2A=Sequential([Conv2D(num_second_filter,kernel_size=(1,1),strides=1,padding='valid'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2B=Sequential([Conv2D(num_second_filter,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2C=Sequential([Conv2D(num_second_filter,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.pool=MaxPooling2D(pool_size=2, strides=1, padding='same')
    def call(self,inputs):
        x=self.conv2B(self.conv1(inputs))
        # print(x.shape)
        x1=self.conv2C(x)
        # print(x1.shape)
        x2=self.conv2A(self.pool(inputs))
        x3=self.conv2A(inputs)

        output=tf.concat([x,x1,x2,x3],-1)
        return output


In [7]:
class inception_reduction(keras.Model):
    def __init__(self,num_first_filter,num_second_filter,num_third_filter):
        super(inception_reduction,self).__init__()
        self.conv1=Sequential([Conv2D(num_first_filter,kernel_size=(1,1),strides=1,padding='valid'),
                                BatchNormalization(),
                                ReLU()])
        self.conv1A=Sequential([Conv2D(num_first_filter,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2A=Sequential([Conv2D(num_second_filter,kernel_size=(3,3),strides=2,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2B=Sequential([Conv2D(num_second_filter,kernel_size=(3,3),strides=2,padding='same'),
                                BatchNormalization(),
                                ReLU()])

        self.pool=MaxPooling2D(pool_size=2, strides=2)
        self.conv3=Sequential([Conv2D(num_third_filter,kernel_size=(1,1),strides=1,padding='valid'),
                                BatchNormalization(),
                                ReLU()])

    def call(self,inputs):
        # x=self.conv64(inputs)
        x=self.conv1A(self.conv1(inputs))
        # print(x.shape)
        x=self.conv2A(x)
        # print(x.shape)
        x1=self.conv2B(self.conv1(inputs))
        x2=self.conv3(self.pool(inputs))
        output=tf.concat([x,x1,x2],-1)
        return output

In [8]:
class inception_resnet(keras.Model):
    def __init__(self,num_first_filter,num_second_filter):
        super(inception_resnet,self).__init__()
        self.conv1=Sequential([Conv2D(num_first_filter,kernel_size=(1,1),strides=1,padding='valid'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2=Sequential([Conv2D(num_second_filter,kernel_size=(1,1),strides=1,padding='valid'),
                                BatchNormalization(),
                                ReLU()])
        self.conv1A=Sequential([Conv2D(num_first_filter,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2A=Sequential([Conv2D(num_second_filter,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2B=Sequential([Conv2D(num_second_filter,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])

    def call(self,inputs):
        x=self.conv1A(self.conv1(inputs))
        # print(x.shape)
        x=self.conv2A(x)
        # print(x.shape)
        x1=self.conv2B(self.conv1(inputs))
        # print(x1.shape)
        x2=self.conv2(inputs)
        # print(x2.shape)
        output=tf.concat([x,x1,x2],-1)

        return output

In [9]:
class inception_resnet_V2(keras.Model):
    def __init__(self,num_first_filter,num_second_filter,num_third_filter):
        super(inception_resnet_V2,self).__init__()
        self.conv1=Sequential([Conv2D(num_first_filter,kernel_size=(1,1),strides=1,padding='valid'),
                                BatchNormalization(),
                                ReLU()])
        self.conv3=Sequential([Conv2D(num_third_filter,kernel_size=(1,1),strides=1,padding='valid'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2=Sequential([Conv2D(num_second_filter,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.conv3A=Sequential([Conv2D(num_third_filter,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.conv3B=Sequential([Conv2D(num_third_filter,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])

        self.pool=MaxPooling2D(pool_size=2, strides=1,padding='same')
        self.conv3C=Sequential([Conv2D(num_third_filter,kernel_size=(1,1),strides=1,padding='valid'),
                                BatchNormalization(),
                                ReLU()])

    def call(self,inputs):
        x=self.conv2(self.conv1(inputs))
        # print(x.shape)
        x=self.conv3A(x)
        # print(x.shape)
        x1=self.conv3B(self.conv1(inputs))
        # print(x1.shape)
        x2=self.conv3(inputs)
        # print(x2.shape)
        x3=self.conv3C(self.pool(inputs))
        output=tf.concat([x,x1,x2,x3],-1)
        return output

In [10]:
class Attention(keras.Model):
    def __init__(self):
        super(Attention,self).__init__()
        self.conv1=Conv2D(1024,kernel_size=(1,1),strides=1,padding='valid',activation='relu')
        self.bn1=BatchNormalization()
        self.softmax=Softmax(axis=-1)
    def call(self,input1,input2):
        f=self.bn1(self.conv1(input1))
        G=tf.multiply(f,input2)
        p=self.softmax(G)
        temp=tf.multiply(p,f)
        attn=tf.reduce_sum(temp,axis=[1,2])
        return attn


In [11]:
class Neural_Network(keras.Model):
    def __init__(self):
        super().__init__()
        self.conv1=Sequential([Conv2D(32,kernel_size=(3,3),strides=2,padding='valid'),
                                BatchNormalization(),
                                ReLU()])
        self.conv2=Sequential([Conv2D(64,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.pool3=MaxPooling2D(pool_size=2, strides=2, padding='valid')
        self.inception4=inception(48,64)
        self.inception5=inception(48,64)
        self.inception_red6=inception_reduction(64,96,192)
        self.inception_red10=inception_reduction(96,112,384)
        self.inception_resnet7=inception_resnet(96,128)
        self.inception_resnet8=inception_resnet(96,128)
        self.inception_resnet11=inception_resnet_V2(112,128,152)
        self.inception_resnet12=inception_resnet_V2(112,128,152)
        self.inception_red14=inception_reduction(112,128,608)
        self.conv15=Sequential([Conv2D(1024,kernel_size=(3,3),strides=1,padding='same'),
                                BatchNormalization(),
                                ReLU()])
        self.pool16=AveragePooling2D(pool_size=(7,7))
        self.attention1=Attention()
        self.attention2=Attention()
        self.ffn18=Dense(512, activation='relu')
        self.bn18=BatchNormalization()
        # self.ffn19=Dense(2, activation='relu')
        # self.softmax19=Softmax()
    def call(self,inputs):
        x_1=self.conv1(inputs)
        print(x_1.shape)
        x_2=self.conv2(x_1)
        print(x_2.shape)
        x_3=self.pool3(x_2)
        print(x_3.shape)
        x_4=self.inception4(x_3)
        x_5=self.inception5(x_4)
        x_6=self.inception_red6(x_5)
        print(x_6.shape)
        x_7=self.inception_resnet7(x_6)
        print(x_7.shape)
        x_8=self.inception_resnet8(x_7)
        x_10=self.inception_red10(x_8)
        x_11=self.inception_resnet11(x_10)
        x_12=self.inception_resnet12(x_11)
        x_14=self.inception_red14(x_12)
        x_15=self.conv15(x_14)
        x_16=self.pool16(x_15)
        x_9=self.attention1(x_8,x_16)
        x_13=self.attention2(x_12,x_16)
        x_17=tf.concat([x_9,x_13],-1)
        x_18=self.bn18(self.ffn18(x_17))
        # x_19=self.ffn19(x_18)
        # logits_softmax, logits_centers = tf.split(x_19, [2, 2], axis=-1)
        # print(logits_centers.shape)
        # output=self.softmax19(x_19)
        output=x_18
        print(f'output{output.shape}')
        return output

In [18]:
class Decision_Network(keras.Model):
    def __init__(self, num_classes, output_dim):
        super().__init__()
        self.conv1 = Dense(2, activation='relu')
        self.conv2 = Dense(2, activation='relu')
        self.softmax = Softmax()
        self.Encoder = Encoder(filters=3, kernel_size=(3, 3, 3))
        self.NN1 = Neural_Network()
        self.centers = tf.Variable(tf.random.normal((num_classes, output_dim)), trainable=True)
        self.g = 0.5  # learning rate for the class centers

    def call(self, inputs, attention):
        x = self.Encoder(inputs, attention)
        x = self.conv1(self.NN1(x))
        softmax_output = self.softmax(x)
        center = self.conv2(self.NN1(inputs))
        output = softmax_output + center
        return output

    def center_loss(self, features, labels):
        batch_size = tf.shape(features)[0]
        expanded_features = tf.expand_dims(features, 1)
        expanded_centers = tf.expand_dims(self.centers, 0)
        squared_distances = tf.reduce_sum(tf.square(expanded_features - expanded_centers), axis=2)

        labels = tf.expand_dims(labels, axis=1)
        mask = tf.cast(tf.equal(labels, tf.range(self.centers.shape[0])), dtype=tf.float32)
        masked_distances = squared_distances * mask

        epsilon = 1e-6
        average_loss = tf.reduce_sum(masked_distances) / (batch_size * self.centers.shape[0] + epsilon)
        return average_loss

    def update_centers(self, features, labels):
        delta_centers = tf.zeros_like(self.centers)
        for i in range(tf.shape(features)[0]):
            feature = features[i]
            label = labels[i]
            delta_centers += tf.where(tf.equal(tf.range(self.centers.shape[0]), label), feature - self.centers[label], 0)

        label_counts = tf.reduce_sum(tf.cast(tf.equal(tf.expand_dims(labels, axis=1), tf.range(self.centers.shape[0])), tf.float32), axis=0)
        delta_centers = delta_centers / (label_counts[:, None] + 1e-6)

        self.centers.assign_sub(self.g * delta_centers)

    def combined_loss(self, y_true, y_pred, features):
        softmax_loss = tf.keras.losses.categorical_crossentropy(y_true, y_pred)
        center_loss_value = self.center_loss(features, tf.argmax(y_true, axis=1))
        d = 0.006
        total_loss = softmax_loss + d * center_loss_value
        return total_loss

    def train_step(self, data):
        inputs, y_true = data
        attention = None  # Assuming no attention input for simplicity
        with tf.GradientTape() as tape:
            features = self.NN1(self.Encoder(inputs, attention))
            y_pred = self(inputs, attention)
            loss = self.combined_loss(y_true, y_pred, features)

        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        self.update_centers(features, tf.argmax(y_true, axis=1))

        self.compiled_metrics.update_state(y_true, y_pred)

        return {"loss": loss, **{m.name: m.result() for m in self.metrics}}

    def compile(self, optimizer, loss_weights=None, metrics=None):
        super().compile(optimizer=optimizer, metrics=metrics)

# Example usage
model = Decision_Network(num_classes=4, output_dim=4)
model.compile(optimizer=SGD(), metrics=["accuracy"])

In [None]:
history = model.fit([train_images,train_attentions], train_labels, epochs=5, validation_data=([val_images,val_attentions], val_labels))

# Evaluate the model
test_loss, test_acc = model.evaluate([test_images,test_attentions], test_labels)
print("Test accuracy:", test_acc)