In [1]:
import os
import cv2
import shutil
import numpy as np
import pandas as pd
import tensorflow as tf
from google.colab.patches import cv2_imshow
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import load_model
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Dense, Flatten
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

In [2]:
!unzip -q /content/Dataset_Video_Based_Classification_EKF.zip

In [3]:
import tensorflow as tf

import os

import shutil

import numpy as np

from sklearn.model_selection import train_test_split

classes = ['Black Ice','Dry Asphalt','Dry Cobblestone','Dry Concrete','Dry Gravel','Dry Mud',
               'Heavy Snow','Light Snow','Muddy Wet','Packed Snow','Refrozen Ice','Sleet','Slush',
               'Wet Asphalt','Wet Cobblestone','Wet Concrete','Wet Gravel','Wet Heavy']
# replace with your actual class names

data_dir = '/content/Dataset_Video_Based_Classification_EKF'

train_dir = '/content/Dataset_Video_Based_Classification_EKF/train'

test_dir = '/content/Dataset_Video_Based_Classification_EKF/test'

# Create train and test directories

os.makedirs(train_dir, exist_ok=True)

os.makedirs(test_dir, exist_ok=True)

for cls in classes:

    # Create class-specific train and test directories

    os.makedirs(os.path.join(train_dir, cls), exist_ok=True)

    os.makedirs(os.path.join(test_dir, cls), exist_ok=True)

    # Get a list of all image files for this class

    image_files = os.listdir(os.path.join(data_dir, cls))

    # Split the files into train and test sets

    train_files, test_files = train_test_split(image_files, test_size=0.3, random_state=42)  # 80% train, 20% test

    # Copy the train files into the train directory

    for file in train_files:

        shutil.copy(os.path.join(data_dir, cls, file), os.path.join(train_dir, cls, file))

    # Copy the test files into the test directory

    for file in test_files:

        shutil.copy(os.path.join(data_dir, cls, file), os.path.join(test_dir, cls, file))

In [4]:
class_names = ['Black Ice','Dry Asphalt','Dry Cobblestone','Dry Concrete','Dry Gravel','Dry Mud',
               'Heavy Snow','Light Snow','Muddy Wet','Packed Snow','Refrozen Ice','Sleet','Slush',
               'Wet Asphalt','Wet Cobblestone','Wet Concrete','Wet Gravel','Wet Heavy']

train_dir = '/content/Dataset_Video_Based_Classification_EKF/train'
test_dir = '/content/Dataset_Video_Based_Classification_EKF/test'

In [5]:
# Define the Data Generators using Data Augmentation
train_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(
    train_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode='categorical')

test_datagen = ImageDataGenerator(rescale=1./255)

test_generator = test_datagen.flow_from_directory(
    test_dir,
    target_size=(224, 224),
    batch_size=32,
    class_mode='categorical')

Found 1414 images belonging to 18 classes.
Found 609 images belonging to 18 classes.


In [6]:
baseModel = VGG16(weights="imagenet", include_top=False, input_tensor=tf.keras.Input(shape=(224, 224, 3)))

# Construct the head of the model that will be placed on top of the base model
headModel = baseModel.output
headModel = Flatten(name="flatten")(headModel)
headModel = Dense(1024, activation="relu")(headModel)
headModel = Dense(18, activation="softmax")(headModel)  # Change the number here according to your number of class_names

# Place the head FC model on top of the base model (this will become the actual model we will train)
model = tf.keras.Model(inputs=baseModel.input, outputs=headModel)

# Loop over all layers in the base model
for layer in baseModel.layers:
    layer.trainable = False

# Compile the model
model.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=1e-5), metrics=['accuracy'])

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m58889256/58889256[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [7]:
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('Road_Surface_Classification_Video_Based.keras', save_best_only=True, monitor='val_loss')

model.fit(train_generator, validation_data=test_generator, epochs=35, callbacks=[early_stopping, model_checkpoint])

Epoch 1/35


  self._warn_if_super_not_called()


[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m33s[0m 408ms/step - accuracy: 0.1493 - loss: 2.8010 - val_accuracy: 0.5057 - val_loss: 2.1500
Epoch 2/35
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 233ms/step - accuracy: 0.6069 - loss: 1.9461 - val_accuracy: 0.6502 - val_loss: 1.6569
Epoch 3/35
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 226ms/step - accuracy: 0.7275 - loss: 1.4163 - val_accuracy: 0.6634 - val_loss: 1.3872
Epoch 4/35
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 258ms/step - accuracy: 0.7495 - loss: 1.1480 - val_accuracy: 0.6913 - val_loss: 1.2083
Epoch 5/35
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 234ms/step - accuracy: 0.8277 - loss: 0.9046 - val_accuracy: 0.7126 - val_loss: 1.1121
Epoch 6/35
[1m45/45[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 234ms/step - accuracy: 0.8244 - loss: 0.8043 - val_accuracy: 0.7553 - val_loss: 1.0163
Epoch 7/35
[1m45/45[0m [32m━━━

<keras.src.callbacks.history.History at 0x7c11f4058a00>

In [15]:
import math
import random
import csv

# Vehicle parameters
VEHICLE_MASS = 1500.0  # kg
GRAVITY = 9.81  # m/s^2
TIRE_RADIUS = 0.3  # meters (approximately)

# Kalman filter variables
Q = 0.0001  # Process noise covariance
R = 0.01  # Measurement noise covariance
A = 1.0  # State transition matrix (friction coefficient evolution)
H = 1.0  # Measurement matrix (direct observation)
friction_estimate = [0.8, 0.8, 0.8, 0.8]  # Initial friction estimates
P = [1.0, 1.0, 1.0, 1.0]

# Function to calculate slip ratio for a wheel
def calculate_slip_ratio(vehicle_speed, tire_angular_velocity):
    tire_speed = tire_angular_velocity * TIRE_RADIUS  # m/s
    return (vehicle_speed - tire_speed) / vehicle_speed  # Slip ratio

# Kalman filter function to update friction coefficient estimate for a wheel
def kalman_filter(measured_mu, wheel_index):
    # Prediction step (no control input)
    friction_estimate[wheel_index] = A * friction_estimate[wheel_index]

    # Prediction covariance update
    P[wheel_index] = A * P[wheel_index] * A + Q

    # Kalman gain
    K = P[wheel_index] * H / (H * P[wheel_index] * H + R)

    # Update step with the measurement
    friction_estimate[wheel_index] = friction_estimate[wheel_index] + K * (measured_mu - H * friction_estimate[wheel_index])

    # Update covariance
    P[wheel_index] = (1 - K * H) * P[wheel_index]

    return friction_estimate[wheel_index]

In [17]:
model.save("Latest_EKF_Model.keras")

In [9]:
# Define Friction Coefficients
friction_coefficients = {
    "Wet Asphalt" :"0.40-0.50",
    "Black Ice": "0.005-0.02",
    "Wet Cobblestone" : "0.30-0.40",
    "Wet Concrete" : "0.40-0.50",
    "Wet Gravel" : "0.30-0.40",
    "Heavy Snow": "0.15-0.30",
    "Wet Heavy" :  "0.30-0.50",
    "Light Snow": "0.05-0.20",
    "Muddy Wet" :   "0.20-0.30",
    "Packed Snow": "0.05-0.15",
    "Refrozen Ice": "0.30-0.40",
    "Sleet": "0.50",
    "Slush": "0.35-0.40",
    "Dry Asphalt" : "0.70-0.80",
    "Dry Cobblestone" : "0.60-0.70",
    "Dry Concrete" :  "0.70-0.80",
    "Dry Mud" :  "0.50-0.60",
    "Dry Gravel" : "0.60-0.70"
}

In [26]:
input_video_path = '/content/Dashcam_Day_Snowy_2.mp4'
output_video_path = '/content/Dashcam_Day_Snowy_2_Output_Both.mp4'
dashboard_image_path = '/content/Dashboard.png'  # Dashboard path

# Load the dashboard image
dashboard = cv2.imread(dashboard_image_path, cv2.IMREAD_UNCHANGED)

# Check if the image has 4 channels (RGBA)
if dashboard.shape[-1] == 4:
    # Convert the image from RGBA to RGB
    dashboard = cv2.cvtColor(dashboard, cv2.COLOR_BGRA2BGR)

# Resize the dashboard to fit the video frame (adjust dimensions as needed)
dashboard_height, dashboard_width = 110, 235  # Adjust the size if needed
dashboard = cv2.resize(dashboard, (dashboard_width, dashboard_height))

cap = cv2.VideoCapture(input_video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

class_data = []
class_names_data = []
confidences_data = []
mu_values_data = []
slip_ratios_data = []
friction_data_EKF = []
friction_data = []

# Font settings for displaying text
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.27
font_color = (255, 255, 255)  # White text
thickness = 1

vehicle_speed = 20.0  # m/s (example value)
tire_angular_velocity = 10.0  # rad/s (example value)
tire_angular_velocities = [10.0, 9.8, 10.2, 9.7]
slip_ratios=[]

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Overlay the dashboard at the top-left corner of the frame
    frame[0:dashboard_height, 0:dashboard_width] = dashboard

    # Prediction road surface type for a frame
    resized_frame = cv2.resize(frame, (224, 224))
    normalized = resized_frame / 255.0
    frame_array = np.array([normalized])
    predictions = model.predict(frame_array)
    class_id = np.argmax(predictions)
    predicted_class = class_names[class_id]

    predicted_class_indices = np.argsort(predictions[0])[::-1][:3]  # Indices of top 3 probabilities
    predicted_class_names = [class_names[index] for index in predicted_class_indices]
    confidences = [predictions[0, index] for index in predicted_class_indices]



    # Calculate slip ratio dynamically
    # slip_ratios = [calculate_slip_ratio(vehicle_speed, tire_angular_velocity)
    #                        for _ in range(len(predicted_class_names))]

    # Calculate mu and w values
    mu_values = []
    estimated_mu=[]
    for class_name in predicted_class_names:
        mu_range = friction_coefficients[class_name].split('-')
        mu = (float(mu_range[0]) + float(mu_range[1])) / 2 if len(mu_range) == 2 else float(mu_range[0])
        mu_values.append(mu)


    sum_confidences = sum(confidences)
    friction = sum((conf / sum_confidences) * mu for conf, mu in zip(confidences, mu_values))

    for i, tire_angular_velocity in enumerate(tire_angular_velocities):
                slip_ratio = calculate_slip_ratio(vehicle_speed, tire_angular_velocity)
                slip_ratios.append(slip_ratio)


                # friction_EKF = [kalman_filter(mu,i) for mu in mu_values]
                # friction_EKF=format(friction_EKF,'.2f')
                # estimated_mu.append(friction_EKF)

                estimated_mu.append(kalman_filter(mu_values[0],0))
                estimated_mu.append(kalman_filter(mu_values[0],1))
                estimated_mu.append(kalman_filter(mu_values[0],2))
                estimated_mu.append(kalman_filter(mu_values[0],3))

                # surface_friction_EKF = f'{friction_EKF[0]:.2f}'
                surface_friction = f'{friction:.2f}'

    # class_data.append(predicted_class)
    # class_names_data.append(predicted_class_names)
    # confidences_data.append(confidences)
    # mu_values_data.append(mu_values)
    # slip_ratios_data.append(slip_ratios)
    # friction_data_EKF.append(surface_friction_EKF)
    # friction_data.append(surface_friction)

    # data = {
    #     'Predicted Class': class_data,
    #     'Class Names': class_names_data,
    #     'Confidences (P1, P2, P3)': confidences_data,
    #     'Mu Values (mu1, mu2, mu3)': mu_values_data,
    #     'Slip Ratios': slip_ratios_data,
    #     'Friction_VehicleDynamics': friction_data_EKF,
    #     'Surface Friction': friction_data
    # }
    # df = pd.DataFrame(data)
    # df.to_excel('Dashcam_Day_Dry_with_EKF_4_Wheels.xlsx', index=False)

    # Add predictions and Slip Mu values to the dashboard
    # Mu_VehicleDynamics
    cv2.putText(
       frame,
       f"Mu_VehicleDynamics: {estimated_mu[0]:.2f}",
       (40, 40),
       font, font_scale, (0,255,0), thickness, cv2.LINE_AA
   )

    cv2.putText(
        frame,
        f"Vehicle Speed: 72 km/h",
        (50, 50),
        font, font_scale, (0,255,0), thickness, cv2.LINE_AA
    )
    # WFL
    cv2.putText(
        frame,
        f"WFL: {predicted_class}",
        (10, 20),  # Top-left corner of the dashboard
        font, font_scale, font_color, thickness, cv2.LINE_AA
    )

    cv2.putText(
        frame,
        f"Mu: {friction:.2f}",
        (33, 23 + int(font_scale * 25)),  # Adjusted y-coordinate for the second line
        font, font_scale, font_color, thickness, cv2.LINE_AA
    )
    # WFR
    cv2.putText(
        frame,
        f"WFR: {predicted_class}",
        (dashboard_width - 100, 20),  # Top-right corner of the dashboard
        font, font_scale, font_color, thickness, cv2.LINE_AA
    )
    cv2.putText(
        frame,
        f"Mu: {friction:.2f}",
        (dashboard_width - 77, 23 + int(font_scale * 25)),  # Adjusted y-coordinate for the second line
        font, font_scale, font_color, thickness, cv2.LINE_AA
    )
    # WRL
    cv2.putText(
        frame,
        f"WRL: {predicted_class}",
        (12, dashboard_height - 20),  # Bottom-left corner of the dashboard
        font, font_scale, font_color, thickness, cv2.LINE_AA
    )
    cv2.putText(
        frame,
        f"Mu: {friction:.2f}",
        (35, dashboard_height - 17 + int(font_scale * 25)),  # Adjusted y-coordinate for the second line
        font, font_scale, font_color, thickness, cv2.LINE_AA
    )
    # WRR
    cv2.putText(
        frame,
        f"WRR: {predicted_class}",
        (dashboard_width - 100, dashboard_height - 20),  # Bottom-right corner of the dashboard
        font, font_scale, font_color, thickness, cv2.LINE_AA
    )
    cv2.putText(
        frame,
        f"Mu: {friction:.2f}",
        (dashboard_width - 77, dashboard_height - 17 + int(font_scale * 25)),  # Adjusted y-coordinate for the second line
        font, font_scale, font_color, thickness, cv2.LINE_AA
    )

    # Write the frame to the output video
    out.write(frame)


cap.release()
out.release()

print(f'Output video saved to: {output_video_path}')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19

In [21]:
import numpy as np
import csv

# Constants
NUM_WHEELS = 4
VEHICLE_MASS = 1500.0
GRAVITY = 9.81
Q = 0.0001
R = 0.01
H = 1.0
A = 1.0
HISTORY_SIZE = 10

# Pacejka parameter structure
class PacejkaParams:
    def __init__(self, D, C, B, E):
        self.D = D
        self.C = C
        self.B = B
        self.E = E

# Define Pacejka parameters for different road surfaces
dry_asphalt = PacejkaParams(1.0, 1.3, 15.0, 0.97)
wet_asphalt = PacejkaParams(0.7, 1.2, 10.0, 1.0)
snow = PacejkaParams(0.4, 1.1, 7.0, 1.1)
ice = PacejkaParams(0.2, 1.0, 5.0, 1.2)
gravel = PacejkaParams(0.7, 1.2, 8.0, 0.95)
dirt_road = PacejkaParams(0.6, 1.2, 9.0, 1.0)

# Kalman filter state and covariance for all wheels
friction_estimate = [0.8, 0.8, 0.8, 0.8]
P = [1.0, 1.0, 1.0, 1.0]

# Slip ratio history buffers for each wheel
slip_history = [[0.0 for _ in range(HISTORY_SIZE)] for _ in range(NUM_WHEELS)]
slip_index = [0 for _ in range(NUM_WHEELS)]

def calculate_slip_ratio(vehicle_speed, wheel_speed):
    if vehicle_speed == 0:
        return 0
    return (vehicle_speed - wheel_speed) / vehicle_speed

def classify_road_surface(mean_slip_ratio):
    if mean_slip_ratio < 0.1:
        return dry_asphalt
    elif mean_slip_ratio < 0.2:
        return wet_asphalt
    elif mean_slip_ratio < 0.4:
        return gravel
    elif mean_slip_ratio < 0.5:
        return snow
    else:
        return ice

def pacejka_mu(slip_ratio, params):
    if slip_ratio < 0:
        slip_ratio = 0
    if slip_ratio > 1:
        slip_ratio = 1

    theta = params.B * slip_ratio
    mu = params.D * np.sin(params.C * np.arctan(theta - params.E * (theta - np.arctan(theta))))
    return mu

def kalman_filter(measured_mu, wheel_index):
    global friction_estimate, P
    # Prediction step
    friction_estimate[wheel_index] = A * friction_estimate[wheel_index]
    P[wheel_index] = A * P[wheel_index] * A + Q

    # Kalman gain calculation
    K = P[wheel_index] * H / (H * P[wheel_index] * H + R)

    # Update step with the measurement
    friction_estimate[wheel_index] = friction_estimate[wheel_index] + K * (measured_mu - H * friction_estimate[wheel_index])

    # Update covariance
    P[wheel_index] = (1 - K * H) * P[wheel_index]

    return friction_estimate[wheel_index]

def calculate_mean_slip_ratio(wheel_index):
    return sum(slip_history[wheel_index]) / HISTORY_SIZE

def calculate_mu_with_ekf(vehicle_speed, wheel_speeds):
    with open('friction_estimates.csv', 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Wheel', 'Slip Ratio', 'Mean Slip Ratio', 'Measured μ', 'Estimated μ', 'Road Surface'])

        for i in range(NUM_WHEELS):
            slip_ratio = calculate_slip_ratio(vehicle_speed, wheel_speeds[i])
            slip_history[i][slip_index[i]] = slip_ratio
            slip_index[i] = (slip_index[i] + 1) % HISTORY_SIZE

            mean_slip_ratio = calculate_mean_slip_ratio(i)
            selected_params = classify_road_surface(mean_slip_ratio)

            true_mu = pacejka_mu(slip_ratio, selected_params)
            measured_mu = true_mu + (np.random.rand() - 0.5) * 0.05

            estimated_mu = kalman_filter(measured_mu, i)

            surface_type = ''
            if selected_params.D == 1.0:
                surface_type = 'Dry Asphalt'
            elif selected_params.D == 0.7:
                surface_type = 'Wet Asphalt'
            elif selected_params.D == 0.4:
                surface_type = 'Snow'
            elif selected_params.D == 0.2:
                surface_type = 'Ice'
            elif selected_params.D == 0.7:
                surface_type = 'Gravel'
            else:
                surface_type = 'Dirt Road'

            writer.writerow([i+1, slip_ratio, mean_slip_ratio, measured_mu, estimated_mu, surface_type])

# Example inputs for vehicle and wheel speeds
vehicle_speed = 20.0
wheel_speeds = [19.0, 18.5, 19.2, 18.8]

calculate_mu_with_ekf(vehicle_speed, wheel_speeds)

print('Results written to friction_estimates.csv')

Results written to friction_estimates.csv
