In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt#to plot accuracy
import cv2
import tensorflow as tf
from PIL import Image
import os
from sklearn.model_selection import train_test_split #to split training and testing data
from keras.utils import to_categorical#to convert the labels present in y_train and t_test into one-hot encoding
from keras.models import Sequential, load_model
from keras.layers import Conv2D, MaxPool2D, Dense, Flatten, Dropout#to create CNN
from keras.preprocessing.image import ImageDataGenerator  # Import the ImageDataGenerator
from keras.optimizers import Adam as AdamLegacy
from keras.callbacks import LearningRateScheduler

# Define the number of images to load from each class
num_images_per_class = 2000
classes = 43

# Define the directory where your dataset is stored
dataset_dir = "/Users/adityachaturvedi/Desktop/7th Sem/Minor Project"
output_dir = "/Users/adityachaturvedi/Desktop/7th Sem/Minor Project/CleanedData"

# Define the function to clean the data
def clean_data(input_dir, output_dir, num_images_per_class):
    for i in range(classes):
        input_path = os.path.join(input_dir, 'train', str(i))
        output_path = os.path.join(output_dir, 'train', str(i))
        
        if not os.path.exists(output_path):
            os.makedirs(output_path)
        
        images = os.listdir(input_path)[:num_images_per_class]  # Load only a subset of images
        for a in images:
            try:
                image = Image.open(os.path.join(input_path, a))
                image = image.resize((30, 30))
                image.save(os.path.join(output_path, a))
            except:
                print("Error cleaning image:", os.path.join(input_path, a))

# Clean the data
clean_data(dataset_dir, output_dir, num_images_per_class)

def lr_schedule(epoch):
    initial_learning_rate = 0.001
    decay_rate = 0.95
    if epoch >= 10:
        return initial_learning_rate * decay_rate
    return initial_learning_rate

data = []
labels = []
classes = 43
cur_path = os.getcwd()

# Load a smaller subset of images from each class
for i in range(classes):
    path = os.path.join(cur_path, 'train', str(i))
    images = os.listdir(path)[:num_images_per_class]  # Load only a subset of images
    for a in images:
        try:
            image = Image.open(os.path.join(path, a))
            image = image.resize((30, 30))
            image = np.array(image)
            data.append(image)
            labels.append(i)
        except:
            print("Error loading image")


#Converting lists into numpy arrays
data = np.array(data)
labels = np.array(labels)
print(data.shape, labels.shape)

data = data / 255.0

#Splitting training and testing dataset
X_t1, X_t2, y_t1, y_t2 = train_test_split(data, labels, test_size=0.2, random_state=42)
print(X_t1.shape, X_t2.shape, y_t1.shape, y_t2.shape)

# Data augmentation settings
#Converting the labels into one hot encoding
y_t1 = to_categorical(y_t1, 43)
y_t2 = to_categorical(y_t2, 43)


datagen = ImageDataGenerator(
    rotation_range=15,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.15,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

# Building the model
model = Sequential()
model.add(Conv2D(filters=32, kernel_size=(5,5), activation='relu', input_shape=X_t1.shape[1:]))
model.add(Conv2D(filters=32, kernel_size=(5, 5), activation='relu'))
model.add(MaxPool2D(pool_size=(2, 2)))
model.add(Dropout(rate=0.25))
model.add(Conv2D(filters=64, kernel_size=(3, 3), activation='relu'))
model.add(Conv2D(filters=64, kernel_size=(3, 3), activation='relu'))
model.add(MaxPool2D(pool_size=(2, 2)))
model.add(Dropout(rate=0.25))
model.add(Flatten())
model.add(Dense(256, activation='relu'))
model.add(Dropout(rate=0.5))
model.add(Dense(43, activation='softmax'))

# Compile the model with the Adam optimizer
optimizer = AdamLegacy(learning_rate=0.001)  # Initial learning rate
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])


# Training the model with data augmentation
eps = 30
batch_size = 32

# Define the learning rate scheduler callback
lr_scheduler = LearningRateScheduler(lr_schedule)

# Train the model using augmented data generator with the learning rate scheduler
history = model.fit(
    datagen.flow(X_t1, y_t1, batch_size=batch_size),
    steps_per_epoch=len(X_t1) // batch_size,
    epochs=eps,
    validation_data=(X_t2, y_t2),
    validation_steps=len(X_t2) // batch_size,
    callbacks=[lr_scheduler]
)

#plotting graphs for accuracy
plt.figure(0)
plt.plot(history.history['accuracy'], label='training accuracy')
plt.plot(history.history['val_accuracy'], label='val accuracy')
plt.title('Accuracy')
plt.xlabel('epochs')
plt.ylabel('accuracy')
plt.legend()
plt.show()
plt.figure(1)
plt.plot(history.history['loss'], label='training loss')
plt.plot(history.history['val_loss'], label='val loss')
plt.title('Loss')
plt.xlabel('epochs')
plt.ylabel('loss')
plt.legend()
plt.show()

#testing accuracy on test dataset
from sklearn.metrics import accuracy_score
y_test = pd.read_csv('Test.csv')
print(y_test)
labels = y_test["ClassId"].values
imgs = y_test["Path"].values
data=[]


# Load and preprocess test data
y_test = pd.read_csv('Test.csv')
labels = y_test["ClassId"].values
imgs = y_test["Path"].values
test_data = []

base_dir = "/Users/adityachaturvedi/Desktop/7th Sem/Minor Project"

for img_path in imgs:
    full_img_path = os.path.join(base_dir, img_path)
    print("Processing image:", full_img_path)  # Print the full image path
    if os.path.exists(full_img_path):
        image = Image.open(full_img_path)
        image = image.resize((30, 30))
        image = np.array(image)
        test_data.append(image)
    else:
        print("Image not found:", full_img_path)  # Print if the image is not found

X_test = np.array(test_data)

# Predictions on test data
pred_probs_test = model.predict(X_test)
predicted_classes_test = pred_probs_test.argmax(axis=-1)

# Calculate accuracy on test data
accuracy_test = accuracy_score(labels, predicted_classes_test)
print("Test accuracy:", accuracy_test)

#Save the Trained Model
model.save('traffic_classifier.keras')
# import numpy as np
# import pandas as pd
# import matplotlib.pyplot as plt
# import cv2
# from sklearn.metrics import accuracy_score
# import tensorflow as tf
# from PIL import Image
# import os
# from sklearn.model_selection import train_test_split
# from keras.utils import to_categorical
# from keras.models import Sequential
# from keras.layers import Conv2D, MaxPool2D, Dense, Flatten, Dropout
# from sklearn.model_selection import StratifiedKFold
# from keras.preprocessing.image import ImageDataGenerator

# # Define a function to create the model
# def create_model(optimizer='adam', dropout_rate=0.25):
#     model = Sequential()
#     model.add(Conv2D(filters=32, kernel_size=(5, 5), activation='relu', input_shape=X_t1.shape[1:]))
#     model.add(Conv2D(filters=32, kernel_size=(5, 5), activation='relu'))
#     model.add(MaxPool2D(pool_size=(2, 2)))
#     model.add(Dropout(rate=dropout_rate))
#     model.add(Conv2D(filters=64, kernel_size=(3, 3), activation='relu'))
#     model.add(Conv2D(filters=64, kernel_size=(3, 3), activation='relu'))
#     model.add(MaxPool2D(pool_size=(2, 2)))
#     model.add(Dropout(rate=dropout_rate))
#     model.add(Flatten())
#     model.add(Dense(256, activation='relu'))
#     model.add(Dropout(rate=dropout_rate))
#     model.add(Dense(43, activation='softmax'))

#     model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
#     return model

# # Load and preprocess training data
# data = []
# labels = []
# classes = 43
# cur_path = os.getcwd()

# for i in range(classes):
#     path = os.path.join(cur_path, 'train', str(i))
#     images = os.listdir(path)
#     for a in images:
#         try:
#             image = Image.open(os.path.join(path, a))
#             image = image.resize((30, 30))
#             image = np.array(image)
#             data.append(image)
#             labels.append(i)
#         except:
#             print("Error loading image")

# data = np.array(data)
# labels = np.array(labels)
# print(data.shape, labels.shape)

# X_t1, X_t2, y_t1, y_t2 = train_test_split(data, labels, test_size=0.2, random_state=42)
# print(X_t1.shape, X_t2.shape, y_t1.shape, y_t2.shape)

# y_t1 = to_categorical(y_t1, 43)
# y_t2 = to_categorical(y_t2, 43)

# param_grid = {
#     'optimizer': ['adam', 'rmsprop'],
#     'dropout_rate': [0.25, 0.5]
# }

# y_t1_original = np.argmax(y_t1, axis=1)

# cv = StratifiedKFold(n_splits=3, shuffle=True)
# results = []

# for train_index, val_index in cv.split(X_t1, y_t1_original):
#     X_train, X_val = X_t1[train_index], X_t1[val_index]
#     y_train, y_val = y_t1_original[train_index], y_t1_original[val_index]
    
#     y_train_onehot = to_categorical(y_train, num_classes=43)
#     y_val_onehot = to_categorical(y_val, num_classes=43)
    
#     model = create_model(optimizer='adam', dropout_rate=0.25)
    
#     datagen = ImageDataGenerator(
#         rotation_range=10,
#         width_shift_range=0.1,
#         height_shift_range=0.1,
#         shear_range=0.2,
#         zoom_range=0.2,
#         horizontal_flip=True,
#         fill_mode='nearest')
    
#     datagen.fit(X_train)
    
#     model.fit_generator(datagen.flow(X_train, y_train_onehot, batch_size=32),
#               steps_per_epoch= len(X_train) // 32,
#               epochs=15,
#               validation_data=(X_val, y_val_onehot),
#               verbose=0)
    
#     val_score = model.evaluate(X_val, y_val_onehot, verbose=0)
#     results.append((val_score[1], 'adam', 0.25))  # Store validation accuracy and hyperparameters

# # Print the best hyperparameters and accuracy
# best_result = max(results, key=lambda x: x[0])
# print("Best Accuracy:", best_result[0])
# print("Best Optimizer:", best_result[1])
# print("Best Dropout Rate:", best_result[2])

# # Building the model with best hyperparameters
# best_model = create_model(optimizer=best_result[1], dropout_rate=best_result[2])
# best_model.fit_generator(X_t1, y_t1, batch_size=32, epochs=15, validation_data=(X_t2, y_t2), verbose=1)
# best_model.save("my_model.h5.keras")


# # Train the model and store the training history
# history = model.fit(X_train, y_train_onehot, batch_size=32, epochs=15, validation_data=(X_val, y_val_onehot), verbose=1)

# # Plotting graphs for accuracy
# plt.figure(0)
# plt.plot(history.history['accuracy'], label='training accuracy')
# plt.plot(history.history['val_accuracy'], label='val accuracy')
# plt.title('Accuracy')
# plt.xlabel('epochs')
# plt.ylabel('accuracy')
# plt.legend()
# plt.show()

# plt.figure(1)
# plt.plot(history.history['loss'], label='training loss')
# plt.plot(history.history['val_loss'], label='val loss')
# plt.title('Loss')
# plt.xlabel('epochs')
# plt.ylabel('loss')
# plt.legend()
# plt.show()


# # Load and preprocess test data
# y_test = pd.read_csv('Test.csv')
# labels = y_test["ClassId"].values
# imgs = y_test["Path"].values
# test_data = []


# base_dir = "/Users/adityachaturvedi/Desktop/7th Sem/Minor Project"


# for img_path in imgs:
#     full_img_path = os.path.join(base_dir, img_path)
#     print("Processing image:", full_img_path)

#     if os.path.exists(full_img_path):
#         image = Image.open(full_img_path)
#         image = image.resize((30, 30))
#         image = np.array(image)
#         test_data.append(image)
#     else:
#         print("Image not found:", full_img_path)

# X_test = np.array(test_data)

# # Predictions on test data
# pred_probs_test = best_model.predict(X_test)
# predicted_classes_test = pred_probs_test.argmax(axis=-1)

# # Calculate accuracy on test data
# accuracy_test = accuracy_score(labels, predicted_classes_test)
# print("Test accuracy:", accuracy_test)

# # Save the Trained Model
# best_model.save('traffic_classifier.h5')

Error loading image
(38398, 30, 30, 3) (38398,)




(30718, 30, 30, 3) (7680, 30, 30, 3) (30718,) (7680,)
