# Installing required Packages 

In [None]:
!pip install -q ultralytics roboflow

In [2]:
import tensorflow as tf
import os
from roboflow import Roboflow
from tensorflow.keras import models, layers
import matplotlib.pyplot as plt
import pandas as pd

# Downloading the dataset locally

In [3]:
rf = Roboflow(api_key="0VZYY90r4l05ICemwLNZ")
project = rf.workspace("selfdriving-car-qtywx").project("self-driving-cars-lfjou")
version = project.version(6)
dataset = version.download("yolov8")

!mv ./Self* ./dataset
!cd dataset && rm data.yaml README*

loading Roboflow workspace...
loading Roboflow project...


Downloading Dataset Version Zip in Self-Driving-Cars-6 to yolov8:: 100%|██████████| 100921/100921 [00:01<00:00, 55223.43it/s]





Extracting Dataset Version Zip to Self-Driving-Cars-6 in yolov8:: 100%|██████████| 9950/9950 [00:00<00:00, 10365.29it/s]


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.


# Merging all the dataset images and labels

In [4]:
import shutil
from sklearn.model_selection import train_test_split

def merge_directories(base_dir, output_dir):

    images_dir = os.path.join(output_dir, 'images')
    labels_dir = os.path.join(output_dir, 'labels')
    os.makedirs(images_dir, exist_ok=True)
    os.makedirs(labels_dir, exist_ok=True)

    for folder in ['test', 'train', 'valid']:
        for subfolder in ['images', 'labels']:
            src_path = os.path.join(base_dir, folder, subfolder)
            if os.path.exists(src_path):
                for file in os.listdir(src_path):
                    src_file = os.path.join(src_path, file)
                    if os.path.isfile(src_file):
                        shutil.copy(src_file, os.path.join(output_dir, subfolder, file))


base_dir = "./dataset"
output_dir = "./combined_dataset"

In [5]:
merge_directories(base_dir, output_dir)

print("Processing complete. Directoy contents were merged successfully.")

Processing complete. Directoy contents were merged successfully.


# Splitting the dataset

In [6]:
def split_dataset(images_dir, labels_dir, output_dir, test_ratio=0.2, val_ratio=0.2):

    image_files = [f for f in os.listdir(images_dir) if f.endswith('.jpg') or f.endswith('.png')]
    label_files = [f for f in os.listdir(labels_dir) if f.endswith('.txt')]

    image_files.sort()
    label_files.sort()

    images_train, images_temp, labels_train, labels_temp = train_test_split(image_files, label_files, test_size=test_ratio + val_ratio, random_state=15)

    test_size = test_ratio / (test_ratio + val_ratio)

    images_val, images_test, labels_val, labels_test = train_test_split(images_temp, labels_temp, test_size=test_size, random_state=15)

    subsets = {
        'train': (images_train, labels_train),
        'test': (images_test, labels_test),
        'valid': (images_val, labels_val)
    }

    for subset, (image_list, label_list) in subsets.items():
        subset_images_dir = os.path.join(output_dir, subset, 'images')
        subset_labels_dir = os.path.join(output_dir, subset, 'labels')
        os.makedirs(subset_images_dir, exist_ok=True)
        os.makedirs(subset_labels_dir, exist_ok=True)

        for image, label in zip(image_list, label_list):
            shutil.copy(os.path.join(images_dir, image), subset_images_dir)
            shutil.copy(os.path.join(labels_dir, label), subset_labels_dir)

In [7]:
random_split_data = './Split_Dataset'
images_dir = os.path.join(output_dir, './images')
labels_dir = os.path.join(output_dir, './labels')

split_dataset(images_dir, labels_dir, random_split_data)

print("Processing complete. Classes extracted and dataset split successfully.")

Processing complete. Classes extracted and dataset split successfully.


# Organizing the images by folders based on classes

In [8]:
import shutil

classes = {
    '0':'Green Light',
    '1':'Red Light',
    '2':'Speed Limit 10',
    '3':'Speed Limit 100',
    '4':'Speed Limit 110',
    '5':'Speed Limit 120',
    '6':'Speed Limit 20',
    '7':'Speed Limit 30',
    '8':'Speed Limit 40',
    '9':'Speed Limit 50',
    '10':'Speed Limit 60',
    '11':'Speed Limit 70',
    '12':'Speed Limit 80',
    '13':'Speed Limit 90',
    '14':'Stop' }


def organize_images_by_class(image_dir, label_dir, output_dir):

    os.makedirs(output_dir, exist_ok=True)

    label_files = [f for f in os.listdir(label_dir) if f.endswith('.txt')]

    for label_file in label_files:
        label_path = os.path.join(label_dir, label_file)
        with open(label_path, 'r') as file:
            lines = file.readlines()
            if not lines:
                print(f"Warning: {label_file} is empty. Skipping.")
                continue
            first_class_id = classes[lines[0].split()[0]]

        image_name = os.path.splitext(label_file)[0]
        image_path = os.path.join(image_dir, f"{image_name}.jpg")


        class_dir = os.path.join(output_dir, first_class_id)
        os.makedirs(class_dir, exist_ok=True)

        shutil.copy(image_path, class_dir)

    print("Organizing complete successfully.")


In [9]:
image_train_directory = os.path.join(random_split_data, './train/images')
label_train_directory = os.path.join(random_split_data, './train/labels')
output_train_directory = "./train"

organize_images_by_class(image_train_directory, label_train_directory, output_train_directory)

Organizing complete successfully.


In [10]:
image_valid_directory = os.path.join(random_split_data, './valid/images')
label_valid_directory = os.path.join(random_split_data, './valid/labels')
output_valid_directory = "./valid/"

organize_images_by_class(image_valid_directory, label_valid_directory, output_valid_directory)

Organizing complete successfully.


In [11]:
image_test_directory = os.path.join(random_split_data, './test/images')
label_test_directory = os.path.join(random_split_data, './test/labels')
output_test_directory = "./test/"

organize_images_by_class(image_test_directory, label_test_directory, output_test_directory)

Organizing complete successfully.


# Initialization variables

In [12]:
BATCH_SIZE = 16
IMAGE_SIZE = 416
CHANNELS = 3
EPOCHS = 200
input_shape = (BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, CHANNELS)

# Loading the different datasets from their respective directories

In [24]:
train_data = tf.keras.utils.image_dataset_from_directory(
    output_train_directory,
    image_size=(IMAGE_SIZE, IMAGE_SIZE),
    shuffle=True
)

Found 2978 files belonging to 15 classes.


In [25]:
valid_data = tf.keras.utils.image_dataset_from_directory(
    output_valid_directory,
    image_size=(IMAGE_SIZE, IMAGE_SIZE),
    shuffle=True
)

Found 993 files belonging to 15 classes.


In [26]:
test_data = tf.keras.utils.image_dataset_from_directory(
    output_test_directory,
    image_size=(IMAGE_SIZE, IMAGE_SIZE),
    shuffle=True
    )

Found 994 files belonging to 15 classes.


In [16]:
class_names = train_data.class_names
num_classes = len(class_names)

# Building the Model

In [None]:
model = model_build()

In [19]:
model.summary()

In [None]:
model_8  = model_build()
model_16 =  model_build()
model_32 =  model_build()
model_64 =  model_build()

models = [model_8, model_16, model_32, model_64]

# Call-backs

In [None]:
import tensorflow as tf

class BestWeightsCallback(tf.keras.callbacks.Callback):
    def __init__(self):
        super().__init__()
        self.best_val_accuracy = 0.0
        self.best_weights = None

    def on_epoch_end(self, epoch, logs=None):
        val_accuracy = logs.get('val_accuracy')
        if val_accuracy > self.best_val_accuracy:
            self.best_val_accuracy = val_accuracy
            self.best_weights = self.model.get_weights()

    def on_train_end(self, logs=None):
        if self.best_weights is not None:
            self.model.set_weights(self.best_weights)
            print(f"Restoring the best weights based on validation accuracy: {self.best_val_accuracy}")

class EarlyStoppingAccuracy(tf.keras.callbacks.Callback):
    def __init__(self, patience=20):
        super().__init__()
        self.patience = patience
        self.wait = 0
        self.best_val_accuracy = 0.0

    def on_epoch_end(self, epoch, logs=None):
        val_accuracy = logs.get('val_accuracy')
        if val_accuracy > self.best_val_accuracy:
            self.best_val_accuracy = val_accuracy
            self.wait = 0
        else:
            self.wait += 1
            if self.wait >= self.patience:
                print(f"Stopping training early after {self.patience} epochs with no improvement in validation accuracy.")
                self.model.stop_training = True

best_weights_callback8 = BestWeightsCallback()
early_stopping_callback8 = EarlyStoppingAccuracy(patience=20)

best_weights_callback16 = BestWeightsCallback()
early_stopping_callback16 = EarlyStoppingAccuracy(patience=20)

best_weights_callback32 = BestWeightsCallback()
early_stopping_callback32 = EarlyStoppingAccuracy(patience=20)

best_weights_callback64 = BestWeightsCallback()
early_stopping_callback64 = EarlyStoppingAccuracy(patience=20)

In [21]:
for model in models:
    model.compile(
        optimizer = 'adam',
        loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics = ['accuracy'])

# Training and evaluating every model

In [None]:
!mkdir saved_models

history_8 = model_8.fit(
    train_data,
    epochs = EPOCHS,
    batch_size = 8,
    verbose = 1,
    validation_data=valid_data,
    callbacks=[best_weights_callback8, early_stopping_callback8]
)

model_8.save('./saved_models/8.keras')

In [None]:
model_8.evaluate(test_data)

In [None]:
history_16 = model_16.fit(
    train_data,
    epochs = EPOCHS,
    batch_size = 16,
    verbose = 1,
    validation_data=valid_data,
    callbacks=[best_weights_callback16, early_stopping_callback16]
)

model_16.save('./saved_models/16.keras')

In [None]:
model_16.evaluate(test_data)

In [None]:
history_32 = model_32.fit(
    train_data,
    epochs = EPOCHS,
    batch_size = 32,
    verbose = 1,
    validation_data=valid_data,
    callbacks=[best_weights_callback32, early_stopping_callback32]
)

model_32.save('./saved_models/32.keras')

In [None]:
model_32.evaluate(test_data)

In [None]:
history_64 = model_64.fit(
    train_data,
    epochs = EPOCHS,
    batch_size = 64,
    verbose = 1,
    validation_data=valid_data,
    callbacks=[best_weights_callback64, early_stopping_callback64]
)

model_64.save('./saved_models/64.keras')

In [None]:
model_64.evaluate(test_data)

# Visualizing Accuracy Improvements

In [None]:
def visualize_accuracy(history,EPOCHS):
    accuracy = history.history['accuracy']
    validation_accuracy = history.history['val_accuracy']

    plt.figure(figsize=(30, 10))
    plt.subplot(1, 2, 1)
    plt.plot(range(EPOCHS), accuracy, label = 'Training Accuracy')
    plt.plot(range(EPOCHS), validation_accuracy, label = 'Validation Accuracy')
    plt.legend(loc = 'lower right')
    plt.title('Training and Validation accuracy')

In [None]:
visualize_accuracy(history_8, )

In [None]:
visualize_accuracy(history_16, )

In [None]:
visualize_accuracy(history_32, )

In [None]:
visualize_accuracy(history_64, )

# Visualizing Loss Improvements

In [None]:
def visualize_loss(history, EPOCHS):
    loss = history.history['loss']
    validation_loss = history.history['val_loss']

    plt.figure(figsize=(30, 10))
    plt.subplot(1, 2, 1)
    plt.plot(range(EPOCHS), loss, label = 'Training Loss')
    plt.plot(range(EPOCHS), validation_loss, label = 'Validation Loss')
    plt.legend(loc = 'upper right')
    plt.title('Training and Validation loss')
    plt.show()

In [None]:
visualize_loss(history_8, )

In [None]:
visualize_loss(history_16, )

In [None]:
visualize_loss(history_32, )

In [None]:
visualize_loss(history_64, )

# Confusion Matrix

In [57]:
import tensorflow as tf
from sklearn.metrics import confusion_matrix
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns



def confusion_m(model):
    class_names = test_data.class_names
    
    true_labels = []
    predicted_labels = []
    for images, labels in test_data:
        true_labels.extend(labels.numpy())
        predictions = model.predict(images)
        predicted_labels.extend(np.argmax(predictions, axis=1))

    true_labels = np.array(true_labels)
    predicted_labels = np.array(predicted_labels)

    cm = confusion_matrix(true_labels, predicted_labels)

    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix")
    plt.show()

In [None]:
confusion_m(model_8)

In [None]:
confusion_m(model_16)

In [None]:
confusion_m(model_32)

In [None]:
confusion_m(model_64)

# Precision and Recall

In [None]:
compute_precision_recall(model_8, train_data)

In [None]:
compute_precision_recall(model_16, train_data)

In [None]:
compute_precision_recall(model_32, train_data)

In [None]:
compute_precision_recall(model_64, train_data)