In [17]:
import pandas as pd
import numpy as np
import os
import csv
from keras.utils import to_categorical
import keras
from keras.layers import Input, Conv2D, Activation, MaxPool2D, Flatten, Dense, BatchNormalization, Dropout, InputLayer
from keras.models import Model, Sequential
from keras import backend as K
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier 
from sklearn.ensemble import RandomForestClassifier 
from sklearn.neighbors import KNeighborsClassifier
import pickle 
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.vgg19 import VGG19
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications.xception import Xception
from keras.models import load_model 

# Functions for Data Input


In [18]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [19]:
# contains images to be trained in csv format
train_path = '/content/drive/MyDrive/images_color.csv'

# contains images to be tested in csv format
test_path = '/content/drive/MyDrive/test_set_in_csv.csv'

In [20]:
# function that inputs training data and returns test data and labels in proper shape
def load_train_data():
  '''
  function: loads training data from drive
  param:
  returns: training data and corresponding labels in proper shape
  '''
  X_train = []
  Y_train = []
  img_size = 32
  channels = 3
  num_classes = 43

  with open(train_path, 'r') as f:
    reader = csv.reader(f)
    for row in reader:
      if row[0]!='':
        label = row[0]
        image = np.array([int(a) for a in row[1:]], dtype='uint8')
        image = image.reshape((img_size, img_size, channels))
        X_train.append(image)
        Y_train.append(label)

  X_train = np.array(X_train)
  Y_train = np.array(Y_train)

  Y_train = to_categorical(Y_train)

  return X_train, Y_train

In [21]:
# function that inputs testing data and returns test data and it's labels in proper shape
def load_test_data():
  '''
  function: loads testing data from drive
  param:
  returns: testing data and corresponding labels in proper shape
  '''
  X_test = []
  Y_test = []

  with open(test_path, 'r') as f:
    reader = csv.reader(f)
    header = next(reader)
    for row in reader:
      if len(row)!=0:
        label = row[0]
        image = np.array([int(a) for a in row[1:]], dtype='uint8')
        image = image.reshape((32, 32, 3))
        X_test.append(image)
        Y_test.append(label)

  X_test = np.array(X_test)
  Y_test = np.array(Y_test)

  Y_test = to_categorical(Y_test)
  
  return X_test, Y_test

# Function to Design CNN + softmax layer



In [22]:
# defines the types of layers and optimizers that are available to design CNN architecture
layers = ['Conv2D', 'MaxPool2D', 'Flatten', 'Dense', 'BatchNormalization', 'Dropout']
optimizers = ['SGD', 'RMSprop', 'Adam']

In [23]:
# path in drive where the trained models gets saved
model_path = '/content/drive/MyDrive/Models'

In [24]:
# function to design CNN architecture which is then compiled and trained. returns trained model and predictions on test data.
def design_CNN_softmax(model, model_name):
  '''
  function: inputs number of CNN architecture layers to be defined and defines each layer
            with corresponding parameters. model is trained, compiled and saved in drive folder.
  param: data type of model , name of model defined by user
  returns: trained model, predictions
  '''

  img_size = 32
  channels = 3
  num_classes = 43
  layer_num = 1

  total_layers = int(input('Enter the total number of layers of the architecture:'))

  while (total_layers !=0):
    total_layers -= 1

    add = str(input('Enter the type of layer:'))
    if add == 'Conv2D':

      if layer_num != 1:

          filters = int(input('Enter filter size: recommended-16, 32, 64, 128, 256, 512:'))
          kernel_size = tuple(map(int, input('Enter kernel size:').split(',')))
          strides = tuple(map(int, input('Enter strides:').split(',')))
          padding = input('Enter type of padding:')
          activation = str(input('Enter the activation function:'))

          model.add(Conv2D(filters, kernel_size, strides, padding, activation = activation))
          layer_num += 1
        
      else:
          filters = int(input('Enter filter size: recommended-16, 32, 64, 128, 256, 512:'))
          kernel_size = tuple(map(int, input('Enter kernel size:').split(',')))
          strides = tuple(map(int, input('Enter strides:').split(',')))
          padding = str(input('Enter type of padding:'))
          activation = str(input('Enter the activation function:'))

          input_shape = (img_size, img_size, channels)
          model.add(Conv2D(filters, kernel_size, strides, padding, activation = activation, input_shape = input_shape))
          layer_num += 1

    elif add == 'MaxPool2D':
        pool_size = tuple(map(int, input('Enter pool size:').split(',')))
        strides = tuple(map(int, input('Enter strides:').split(',')))
        padding = str(input('Enter type of padding:'))

        model.add(MaxPool2D(pool_size, strides, padding))
        layer_num += 1

    elif add == 'Flatten': 
        model.add(Flatten())
        layer_num += 1

    elif add == 'Dense':
        units = int(input('Enter number of dense layer units:'))
        activation = str(input('Enter the activation function:'))

        model.add(Dense(units, activation))
        layer_num += 1

    elif add == 'BatchNormalization': 
        model.add(BatchNormalization())
        layer_num += 1

    elif add == 'Dropout': 
        rate = float(input('Enter dropout value:'))

        model.add(Dropout(rate = rate))
        layer_num += 1


  model.add(Flatten())
  model.add(Dense(512, activation='relu'))
  model.add(BatchNormalization())
  model.add(Dropout(rate=0.5))
  model.add(Dense(num_classes, activation='softmax'))

  print('Model architecture has been defined.')

  optimizer = str(input('Choose Optimizer:'))

  if optimizer in optimizers:
    if optimizer == 'SGD':

      tuning = input('Do you want to tune optimizer hyperparameters? (y/n)')

      if tuning == 'y':
        learning_rate = float(input('Enter learning rate:'))
        momentum = float(input('Enter momentum value:'))

        opt = keras.optimizers.SGD(learning_rate, momentum)

      else:
        opt = keras.optimizers.SGD()

    elif optimizer == 'RMSprop':

      tuning = input('Do you want to tune optimizer hyperparameters? (y/n)')

      if tuning == 'y':
        learning_rate = float(input('Enter learning rate:'))
        momentum = float(input('Enter momentum value:'))
        rho = float(input('Enter value:'))
        epsilon = float(input('Enter epsilon value:'))

        opt = keras.optimizers.RMSprop(learning_rate, rho, momentum, epsilon)
      else:
        opt = keras.optimizers.RMSprop()

    
    elif optimizer == 'Adam':
      tuning = input('Do you want to tune optimizer hyperparameters? (y/n)')

      if tuning == 'y':
        learning_rate = float(input('Enter learning rate:'))
        beta_1 = float(input('Enter beta_1 value:'))
        beta_2 = float(input('Enter beta_1 value:'))
        epsilon = float(input('Enter epsilon value:'))

        opt = keras.optimizers.Adam(learning_rate, beta_1, beta_2, epsilon=1e-07)

      else:
        opt = keras.optimizers.Adam()

  model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

  X_train, Y_train = load_train_data()
  X_test, Y_test = load_test_data()
  batch_size = int(input('Enter batch size:'))
  epochs = int(input('Enter number of epochs:'))

  model.fit(X_train, Y_train,
            batch_size=batch_size,
            epochs=epochs,
            validation_data=(X_test, Y_test),
            shuffle=True,)
  
  model.save(model_path+'/' + model_name + '.hdf5')
  predictions = model.evaluate(X_test, Y_test)

  return model, predictions

# Function to Design CNN + Classifier 


In [25]:
# dropdown of available classifiers
classifiers = ['Decision tree', 'SVC', 'KNN', 'Random Forest' ]

In [26]:
def design_CNN_classifier(model_name):
  '''
  function: uses design_CNN_softmax function to train model. Inputs classifier which is used for final prediction. Saves model and classifier in drive folder.
  param: name of model defined by user
  returns: trained model, accuracy, trained classifier
  '''

  model = Sequential()
  model, predictions = design_CNN_softmax(model, model_name)

  n = len(model.layers)
  getFeature = K.function([model.layers[0].input],
                        [model.layers[n-2].output])
  
  X_train, Y_train = load_train_data()
  X_test, Y_test = load_test_data()

  test = getFeature([X_test])[0]
  Y_test = np.argmax(Y_test, axis = 1)
  train = getFeature([X_train])[0]
  Y_train = np.argmax(Y_train, axis = 1)

  classifier = str(input('choose classfier for final image classification:'))

  if classifier == 'SVC':
    classifier = SVC()
    classifier.fit(train, Y_train)

  elif classifier == 'KNN':
    classifier = KNeighborsClassifier()
    classifier.fit(train, Y_train)

  elif classifier == 'Decision Tree':
    classifier = DecisionTreeClassifier()
    classifier.fit(train, Y_train)

  elif classifier == 'Random Forest':
    classifier = RandomForestClassifier ()
    classifier.fit(train, Y_train)

  accuracy = classifier.score(test, Y_test)

  %cd '/content/drive/MyDrive/Models'
  pkl_filename = model_name + '.pkl'
  with open(pkl_filename, 'wb') as file:
    pickle.dump(classifier, file)
  %cd '/content/drive'

  return model, accuracy, classifier

# Function for Pre-Trained + softmax 


In [27]:
pre_trained_models = ['ResNet50', 'VGG16', 'VGG19', 'InceptionV3', 'Xception']

In [28]:
def pre_trained_softmax(model, model_name):
  '''
  function: inputs pre-trained model and optimizer. model is trained, compiled, fitted and saved
  param: type of model and name of model defined by user
  returns: trained model and prediction
  '''

  img_size = 32
  channels = 3
  num_classes = 43
  input_shape = (img_size, img_size, channels)
  x = str(input('Choose your pre-trained model:'))
  
  if x == 'ResNet50':
    model.add(ResNet50(include_top = False, weights = 'imagenet', input_shape = (img_size, img_size, channels)))
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(rate=0.5))
    model.add(Dense(num_classes, activation = 'softmax'))
    model.layers[0].trainable = False
    
  elif x == 'VGG16':
    model.add(VGG16(include_top = False, weights = 'imagenet', input_shape = (img_size, img_size, channels)))
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(rate=0.5))
    model.add(Dense(num_classes, activation = 'softmax'))
    model.layers[0].trainable = False

  elif x == 'VGG19':
    model.add(VGG19(include_top = False, weights = 'imagenet', input_shape = (img_size, img_size, channels)))
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(rate=0.5))
    model.add(Dense(num_classes, activation = 'softmax'))
    model.layers[0].trainable = False

  elif x == 'InceptionV3':
    model.add(InceptionV3(include_top = False, weights = 'imagenet', input_shape = (img_size, img_size, channels)))
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(rate=0.5))
    model.add(Dense(num_classes, activation = 'softmax'))
    model.layers[0].trainable = False

  elif x == 'Xception':
    model.add(Xception(include_top = False, weights = 'imagenet', input_shape = (img_size, img_size, channels)))
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(rate=0.5))
    model.add(Dense(num_classes, activation = 'softmax'))
    model.layers[0].trainable = False

  optimizer = str(input('Choose Optimizer:'))

  if optimizer in optimizers:
    if optimizer == 'SGD':

      tuning = input('Do you want to tune optimizer hyperparameters? (y/n)')

      if tuning == 'y':
        learning_rate = float(input('Enter learning rate:'))
        momentum = float(input('Enter momentum value:'))

        opt = keras.optimizers.SGD(learning_rate, momentum)

      else:
        opt = keras.optimizers.SGD()

    elif optimizer == 'RMSprop':

      tuning = input('Do you want to tune optimizer hyperparameters? (y/n)')

      if tuning == 'y':
        learning_rate = float(input('Enter learning rate:'))
        momentum = float(input('Enter momentum value:'))
        rho = float(input('Enter value:'))
        epsilon = float(input('Enter epsilon value:'))

        opt = keras.optimizers.RMSprop(learning_rate, rho, momentum, epsilon)
      else:
        opt = keras.optimizers.RMSprop()

    
    elif optimizer == 'Adam':
      tuning = input('Do you want to tune optimizer hyperparameters? (y/n)')

      if tuning == 'y':
        learning_rate = float(input('Enter learning rate:'))
        beta_1 = float(input('Enter beta_1 value:'))
        beta_2 = float(input('Enter beta_1 value:'))
        epsilon = float(input('Enter epsilon value:'))

        opt = keras.optimizers.Adam(learning_rate, beta_1, beta_2, epsilon=1e-07)

      else:
        opt = keras.optimizers.Adam()

  model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['accuracy'])

  X_train, Y_train = load_train_data()
  X_test, Y_test = load_test_data()
  batch_size = int(input('Enter batch size:'))
  epochs = int(input('Enter number of epochs:'))

  model.fit(X_train, Y_train,
            batch_size=batch_size,
            epochs=epochs,
            validation_data=(X_test, Y_test),
            shuffle=True,)
  
  model.save(model_path + '/' + model_name + '.hdf5')
  predictions = model.predict(X_test)

  return model, predictions

# Function for Pre-trained + Classifier 

In [29]:
# def pre_trained_classifier(model_name):
#   '''
#   function:
#   param:
#   returns:
#   '''
#   model = Sequential()
#   model, predictions = pre_trained_softmax(model, model_name)

#   n = len(model.layers)
#   getFeature = K.function([model.layers[0].input],
#                         [model.layers[n-2].output])
  
#   X_train, Y_train = load_train_data()
#   X_test, Y_test = load_test_data()

#   test = getFeature([X_test])[0]
#   Y_test = np.argmax(Y_test, axis = 1)
#   train = getFeature([X_train])[0]
#   Y_train = np.argmax(Y_train, axis = 1)

#   classifier = str(input('choose classfier for final image classification:'))

#   if classifier == 'SVC':
#     classifier = SVC()
#     classifier.fit(train, Y_train)

#   elif classifier == 'KNN':
#     classifier = KNeighborsClassifier()
#     classifier.fit(train, Y_train)

#   elif classifier == 'Decision Tree':
#     classifier = DecisionTreeClassifier()
#     classifier.fit(train, Y_train)

#   elif classifier == 'Random Forest':
#     classifier = RandomForestClassifier ()
#     classifier.fit(train, Y_train)

#   accuracy = classifier.score(test, Y_test)

#   %cd '/content/drive/MyDrive/Models'
#   pkl_filename = model_name + '.pkl'
#   with open(pkl_filename, 'wb') as file:
#     pickle.dump(classifier, file)
#   %cd '/content/drive'

#   return model, accuracy, classifier

# Function of our Trained Model

In [30]:
def my_model():
  '''
  function: loads pre-trained model from drive
  param:
  returns: predictions
  '''
  path = '/content/drive/MyDrive/GTSRB_classification.h5'
  X_test, Y_test = load_test_data()

  loaded_model = load_model(path)
  y_pred = loaded_model.evaluate(X_test, Y_test)

  loaded_model.save(model_path+'/' + os.path.basename(os.path.normpath(path)))

  return y_pred

# Choose Types of Models to be trained on


In [31]:
def model_to_be_trained(m):
  '''
  function: calls the functions defined earlier to define a model as per user choice
  param: number of models to be trained
  returns: dictionary of trained models with corresponding accuracy
  '''

  models_dict={}

  while(m!=0):

    choice = str(input('Select the type of model do you want to train on:'))

    if choice == 'design CNN architecture + softmax':
      model_name = str(input('Enter the name of your model:'))

      model = Sequential()
      trained_model, prediction = design_CNN_softmax(model, model_name)

      models_dict[model_name]=prediction[1]

    if choice == 'design CNN architecture + classifiers':
      model_name = str(input('Enter the name of your model:'))

      model, accuracy, classifier = design_CNN_classifier(model_name)

      models_dict[model_name]=accuracy

    if choice == 'use pre-trained models + softmax':
      model_name = str(input('Enter the name of your model:'))

      model = Sequential()
      model, prediction = pre_trained_softmax(model, model_name)
      
      models_dict[model_name]=prediction[1]

    # if choice == 'use pre-trained + classifiers':
    #   model_name = str(input('Enter the name of your model:'))

    #   model, accuracy, classifier = pre_trained_classifier(model_name)
      
    #   models_dict[model_name]=accuracy

    if choice == 'trained model':
      pred = my_model()
      models_dict['GTSRB_classification']=pred[1]

  return models_dict

In [None]:
options = ['design CNN architecture + softmax', 
           'design CNN architecture + classifiers',
           'use pre-trained models + softmax', 
          #  'use pre-trained + classifiers',
           'trained model']

m = int(input('choose the number of models to be trained'))
models_dict = model_to_be_trained(m)

# Ensemble Function


In [None]:
no_of_models = int(input('Enter no of models'))

In [None]:
def ensemble(no_of_models):
  '''
  function: ensembles tope accuracy models
  param: number of models to create ensemble with
  returns: mean accuracy after ensembling, number of models used for ensembling
  '''
  sorted_tuples = sorted(models_dict.items(), key=lambda item: item[1])
  sorted_dict = {k: v for k, v in sorted_tuples}
  models=[]
  acc=[]
  for k,v in sorted_dict.items():
    models.append(k)
    acc.append(v)
  acc_mean=np.mean(acc[0:no_of_models])   
  models_used=models[0:no_of_models]
  return acc_mean, models_used

In [None]:
models_to_ensemble=[]
no_of_models=int(input('Enter no of models'))
for i in range(0, no_of_models): 
    model = input('Enter the model name') 
    models_to_ensemble.append(model)

In [None]:
def ensemble(models_list):
  '''
  function: ensembles models as per user choice
  param: list of models to be ensembled
  returns: mean accuracy after ensembling
  '''
  sorted_tuples = sorted(models_dict.items(), key=lambda item: item[1])
  sorted_dict = {k: v for k, v in sorted_tuples}
  acc=[]
  for i in models_list:
    acc.append(sorted_dict[i])
  mean_acc=np.mean(acc)
  return mean_acc  