In [None]:
import numpy as np
import pandas as pd
from PIL import Image 

from sklearn.metrics import classification_report
from skimage import transform
from skimage import exposure
from skimage import io

import numpy as np
#import argparse
import random
import os

In [None]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
matplotlib.use("Agg")

In [None]:
import tensorflow as tf
import tensorflow.keras as TK
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D,MaxPooling2D,Dense
from tensorflow.keras.layers import BatchNormalization,Flatten,Dropout
from tensorflow.keras.layers import Activation

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam


In [None]:
class Traffic:
    @staticmethod
    def build(width,height,depth,classes) :
        
        model = Sequential() # model is developed in sequential way
        chanDim = -1
        inputShape = (height, width, depth)
        
        #conv=>Relu=>BN=>Pool
        model.add(Conv2D(8,(5,5),padding='same',input_shape=inputShape))
        model.add(Activation('relu'))
        model.add(BatchNormalization(axis=chanDim))
        model.add(MaxPooling2D(pool_size=(2,2)))
        
        model.add(Conv2D(16,(3,3),padding='same'))
        model.add(Activation('relu'))
        model.add(BatchNormalization(axis=chanDim))
        
        model.add(Conv2D(16,(3,3),padding='same'))
        model.add(Activation('relu'))
        model.add(BatchNormalization(axis=chanDim))
        model.add(MaxPooling2D(pool_size=(2,2)))
        
        model.add(Conv2D(32,(3,3),padding='same'))
        model.add(Activation('relu'))
        model.add(BatchNormalization(axis=chanDim))
        model.add(Conv2D(32,(3,3),padding='same'))
        model.add(Activation('relu'))
        model.add(BatchNormalization(axis=chanDim))
        model.add(MaxPooling2D(pool_size=(2,2))) 
        
        model.add(Flatten())
        model.add(Dense(128))
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.5)) #regularization use to  penalize overfitting
        
        model.add(Flatten())
        model.add(Dense(128))
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.5)) 
        #Since numer of class is greater than 2 we are using softmax activation function
        model.add(Dense(classes))
        model.add(Activation("softmax"))
        return model

In [None]:
def load_split(basePath,csvPath):
    data=[]
    labels = []
    rows = open(csvPath).read().strip().split("\n")[1:]
    random.shuffle(rows)
    for (i, row) in enumerate(rows):
        # check to see if we should show a status update
        if i > 0 and i % 1000 == 0:
            print("[INFO] processed {} total images".format(i))
        # split the row into components and then grab the class ID
     # and image path
        (label, imagePath) = row.strip().split(",")[-2:]
    # derive the full path to the image file and load it
        imagePath = os.path.sep.join([basePath, imagePath])
        image = io.imread(imagePath)

        image = transform.resize(image, (32, 32))
        image = exposure.equalize_adapthist(image, clip_limit=0.1)
        # update the list of data and labels, respectively
        data.append(image)
        labels.append(int(label))
    # convert the data and labels to NumPy arrays
    data = np.array(data)
    labels = np.array(labels)
    print("You are using {} images".format(len(labels)))
    # return a tuple of the data and labels
    return (data, labels)

In [None]:
basePath = "Data"


trainPath = os.path.sep.join([basePath, "Train.csv"])
testPath = os.path.sep.join([basePath, "Test.csv"])

In [None]:
# initialize the number of epochs to train for, base learning rate,
# and batch size
NUM_EPOCHS = 30
INIT_LR = 1e-3
BS = 64
# load the label names
labelNames = open("signnames.csv").read().strip().split("\n")[1:]
labelNames = [l.split(",")[1] for l in labelNames]
print(labelNames)

In [None]:

# load the training and testing data
print("[INFO] loading training data...")
(train_x, train_y) = load_split(basePath, trainPath)
print("\n[INFO] loading testing data...")
(test_x, test_y) = load_split(basePath, testPath)
# scale data to the range of [0, 1]
train_x = train_x.astype("float32") / 255.0
test_x = test_x.astype("float32") / 255.0



In [None]:
print(np.unique(train_y))

'''
# calculate the total number of images in each class and
# initialize a dictionary to store the class weights
classTotals = trainY.sum(axis=0)
classWeight = dict()
print(classTotals)
# loop over all classes and calculate the class weight
for i in range(0, len(classTotals)):
	classWeight[i] = classTotals.max() / classTotals[i]
'''
from sklearn.utils import class_weight
c_weights = class_weight.compute_class_weight('balanced',
                                                 classes=np.unique(train_y),
                                                 y=train_y)
print(c_weights)

In [None]:
c_weights = {i:w for i,w in enumerate(c_weights)}
c_weights

In [None]:
# one-hot encode the training and testing labels
numLabels = len(np.unique(train_y))
trainY = to_categorical(train_y, numLabels)
testY = to_categorical(test_y, numLabels)

In [None]:
print(train_x.shape)
print(trainY[0])
print(train_x[0])


In [None]:
# construct the image generator for data augmentation
aug = ImageDataGenerator(
	rotation_range=10,
	zoom_range=0.15,
	width_shift_range=0.1,
	height_shift_range=0.1,
	shear_range=0.15,
	horizontal_flip=False,
	vertical_flip=False,
	fill_mode="nearest")
# initialize the optimizer and compile the model
print("[INFO] compiling model...")
opt = Adam(learning_rate=INIT_LR, weight_decay=INIT_LR / (NUM_EPOCHS * 0.5))
model = Traffic.build(width=32, height=32, depth=3,classes=numLabels)
model.compile(loss="categorical_crossentropy", optimizer=opt,
	metrics=["accuracy"])
# train the network
print("[INFO] training network...")
H = model.fit(
	aug.flow(train_x, trainY, batch_size=BS),
	validation_data=(test_x, testY),
	steps_per_epoch=train_x.shape[0] // BS,
	epochs=NUM_EPOCHS,
	class_weight=c_weights,
	verbose=1)

In [None]:
# evaluate the network
print("[INFO] evaluating network...")
predictions = model.predict(test_x, batch_size=BS)
print(classification_report(testY.argmax(axis=1),
	predictions.argmax(axis=1), target_names=labelNames))
# save the network to disk
print("[INFO] serializing network to '{}'...".format("Trafiic Signal/Model"))
model.save("Trafiic Signal/Model")

In [None]:
# plot the training loss and accuracy
N = np.arange(0, NUM_EPOCHS)
plt.style.use("ggplot")
plt.figure()
plt.plot(N, H.history["loss"], label="train_loss")
plt.plot(N, H.history["val_loss"], label="val_loss")
plt.plot(N, H.history["accuracy"], label="train_acc")
plt.plot(N, H.history["val_accuracy"], label="val_acc")
plt.title("Training Loss and Accuracy on Dataset")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend(loc="lower left")
plt.savefig(args["plot"])