# Classification Phase #1 -  MobileNet ( Method 1 )

### CTCB (Classification of Toxigenic CyanoBacterial genera) project

* **Author : Iman Kianian**
* **Paper Link : **

---------------

#### Import the necessary libraries

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix,ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder
import collections
from random import choices
from sklearn.metrics import accuracy_score

In [None]:
import tensorflow as tf
import keras
from keras.models import Model
from keras.callbacks import *
from keras.layers import *
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model

# convert to one-hot-encoding
from keras.utils.np_utils import to_categorical 

# Transfer learning model
from tensorflow.keras.applications import *

# Data Augmentation
from keras.preprocessing.image import ImageDataGenerator

---------------

### Fine Tuning

#### Preparing Training and Test sets

Loading Train set:

In [None]:
Categories = list(os.listdir("https://github.com/iman2693/CTCB/tree/main/dataset-processed/Train"))
Categories

Creating dictionary from images of the training set:

In [None]:
images = {} # A dictionary with 10 keys includes the class names and each value is a list of images
images_directory = {} # like images, just save image address instead of image
for dirname in os.listdir("https://github.com/iman2693/CTCB/tree/main/dataset-processed/Train/"):
    dir = "https://github.com/iman2693/CTCB/tree/main/dataset-processed/Train/"+dirname
    if os.path.isdir(dir):
        imags = []
        imgs_directory = []
        for i, filename in enumerate(os.listdir(dir)):
          if 'jpg' in filename or 'png' in filename:
              addr = dir+"/"+filename
              image = cv2.imread(addr,0)
              image = image/255.0
              imags.append(image)
              imgs_directory.append(addr)
        images[dirname] = imags
        images_directory[dirname] = imgs_directory

Counting all images in training set:

In [None]:
countofallimages = 0
for cls in Categories:
  countofallimages += len(images[cls])

In [None]:
print(f'Count of all images in Dataset = {countofallimages}')
print(f'There are {len(Categories )} classes')
print(f'Average number of essential images for each class is = {round(countofallimages/len(Categories))}')

Creating a list of images and their labels :

In [None]:
X = []
y = []
X_addr = []
for cls in Categories:
  for img in images[cls]:
    X.append(img)
    y.append(cls)
for cls in Categories:
  for img in images_directory[cls]:
    X_addr.append(img)
y = np.array(y)
X = np.array(X)
X_addr = np.array(X_addr)

Count of images from each class in training set:

In [None]:
counter = collections.Counter(y)
counter

Split Train Dataset to training and validation dataset:

In [None]:
X_train_ind, X_val_ind, y_train, y_val = train_test_split(list(range(y.shape[0])), y, test_size=0.01,random_state=1)
X_train_addr = X_addr[X_train_ind]
X_train = X[X_train_ind]

X_val_addr = X_addr[X_val_ind]
X_val = X[X_val_ind]

Count of images from each class in validation set:

In [None]:
counter = collections.Counter(y_val)
counter

----------------------

Loading the test set:

In [None]:
images = {}
images_directory = {}
for dirname in os.listdir("https://github.com/iman2693/CTCB/tree/main/dataset-processed/Test"):
    dir = "https://github.com/iman2693/CTCB/tree/main/dataset-processed/Test/"+dirname
    if os.path.isdir(dir):
        imags = []
        imgs_directory = []
        for i, filename in enumerate(os.listdir(dir)):
          if 'jpg' in filename or 'png' in filename:
              addr = dir+"/"+filename
              image = cv2.imread(addr,0)
              image = image/255.0
              imags.append(image)
              imgs_directory.append(addr)
        images[dirname] = imags
        images_directory[dirname] = imgs_directory

In [None]:
X_test = []
y_test = []
X_test_addr = []
for cls in Categories:
  for img in images[cls]:
    X_test.append(img)
    y_test.append(cls)
for cls in Categories:
  for img in images_directory[cls]:
    X_test_addr.append(img)
y_test = np.array(y_test)
X_test = np.array(X_test)
X_test_addr = np.array(X_test_addr)

Count of images from each class in test set:

In [None]:
counter = collections.Counter(y_test)
counter

In [None]:
del images
del images_directory
del X
del X_val_addr
del y

------------------

Data Augmentation and Downsampling:

In [None]:
def generateimage(img,y,count):
    new_X_train = []
    new_y_train = []
    i = 0
    y = np.array(y).reshape((1,))

    flow = demo_datagen.flow(img.reshape((1,img.shape[0],img.shape[1],1)),y, batch_size=12)
    for batch,ind in flow:
        if i>=count:
            break
        new_X_train.append(batch.reshape((batch.shape[1],batch.shape[2])))
        new_y_train.append(ind[0])
        i+=1
    new_X_train = np.array(new_X_train)
    new_y_train = np.array(new_y_train)
    return new_X_train,new_y_train

In [None]:
def preprocessing(X_train,y_train,downsampling=True,upsampling=True):
  new_X = []
  new_y = []
  new_X_addr = []
  counter = collections.Counter(y_train)
  for cls in Categories:
    print(cls)
    if counter[cls] > np.round(np.mean(list(counter.values()))) :
      select_prob = (np.round(np.mean(list(counter.values()))) /counter[cls]) +0.3 # 0.3 is constant
      for ind , x in enumerate(X_train):
        if y_train[ind] == cls:
            if downsampling:
                choise = choices([1,0], [select_prob,1-select_prob])
                if choise==[1]:
                    new_X.append(x)
                    new_y.append(cls)
                    new_X_addr.append(X_train_addr[ind])
            else:
                new_X.append(x)
                new_y.append(cls)
                new_X_addr.append(X_addr[ind])



    if counter[cls] < np.round(np.mean(list(counter.values()))):
      if upsampling:
        generate_number = int(np.round(np.round(np.mean(list(counter.values()))) /counter[cls]))
        for ind , x in enumerate(X_train):
          if y_train[ind] == cls:
            n_x,n_y = generateimage(x ,y_train[ind],generate_number)
            for xx in n_x:
                new_X.append(xx)
                new_y.append(cls)
                new_X_addr.append(X_train_addr[ind])
            new_X.append(x)
            new_y.append(cls)
            new_X_addr.append(X_train_addr[ind])
      else:
        for ind , x in enumerate(X_train):
          if y_train[ind] == cls:
            new_X.append(x)
            new_y.append(cls)
            new_X_addr.append(X_train_addr[ind])
            
          
  new_X = np.array(new_X)
  new_y = np.array(new_y)
  new_X_addr = np.array(new_X_addr)
  return new_X,new_y,new_X_addr

### parameters of augmentation

In [None]:
demo_datagen = ImageDataGenerator(
    rotation_range=90,
    width_shift_range=0.2,
    height_shift_range=0.2,
    zoom_range=0.2,
    fill_mode='nearest',
    horizontal_flip=True,
    vertical_flip=True
)

In [None]:
new_X_train, new_y_train, new_X_addr = preprocessing(X_train,y_train,downsampling=True,upsampling=True)

Count of images from each class in training set after upsampling and downsampling:

In [None]:
counter = collections.Counter(new_y_train)
counter

In [None]:
plt.figure(figsize=(16,12))
for n , i in enumerate(list(np.random.randint(0,len(new_X_train),12))) : 
  plt.subplot(3,4,n+1)
  plt.imshow(new_X_train[i],cmap="gray")   
  plt.axis('off')
  plt.title(new_y_train[i])

In [None]:
def Gray3channel(X):
  listt = []
  for i,x in enumerate(X):
    listt.append(cv2.merge([x,x,x]))
  return np.array(listt)

In [None]:
del X_train
del y_train

In [None]:
Gray3channel(new_X_train).shape

In [None]:
def ReshapeGrey(X):
  return X.reshape((X.shape[0],X.shape[1],X.shape[2],1))

In [None]:
ReshapeGrey(new_X_train).shape

In [None]:
encoder = LabelEncoder()
encoder.fit(new_y_train)
encoded_labels_train = encoder.transform(new_y_train)
encoded_labels_test = encoder.transform(y_test)
encoded_labels_val = encoder.transform(y_val)

OneHot_labels_train = to_categorical(encoded_labels_train,num_classes=len(Categories))
OneHot_labels_test = to_categorical(encoded_labels_test,num_classes=len(Categories))
OneHot_labels_val = to_categorical(encoded_labels_val,num_classes=len(Categories))

In [None]:
new_y_train

In [None]:
OneHot_labels_train.shape

------------------

In [None]:
def labelwithoutput(output):
  listt = []
  for o in output:
    listt.append(np.argmax(o))
  return np.array(listt)

# **Method 1 - MobileNet**

In [None]:
mobilenet_model =MobileNet(input_shape = (150, 150, 3),include_top = False, weights = 'imagenet')
mobilenet_output = GlobalAveragePooling2D()(mobilenet_model.output)
mobilenet_output = Dense(32, activation='relu')(mobilenet_output)
mobilenet_output = Dense(16, activation='relu')(mobilenet_output)
mobilenet_output = Dense(10, activation='softmax')(mobilenet_output)
mobilenetmodel=Model(inputs=[mobilenet_model.input],outputs=mobilenet_output)
mobilenetmodel.summary()

In [None]:
def scheduler(epoch, lr):
    if epoch < 5:
        return lr
    else:
        return lr * tf.math.exp(-0.1)
callback = tf.keras.callbacks.LearningRateScheduler(scheduler)

In [None]:
opt = tf.keras.optimizers.Adam(learning_rate=0.001)

filepath = 'filepath/callback.h5'

checkpoint = ModelCheckpoint(filepath, 
                                monitor = 'val_loss', 
                                save_best_only=True, 
                                verbose = 1)

mobilenetmodel.compile(optimizer = opt,
                       loss = 'binary_crossentropy',
                       metrics = ['accuracy'])

history = mobilenetmodel.fit(Gray3channel(new_X_train),
                             OneHot_labels_train,
                             validation_data = (Gray3channel(X_val), OneHot_labels_val),
                             epochs = 40,callbacks=[callback,checkpoint],
                             verbose = 1)

In [None]:
#mobilenetmodel.load_weights('/content/gdrive/MyDrive/Cyano/callback.h5')

--------------

#### Load Fine-tuned MobileNet

In [None]:
mobilenetmodel.load_weights('https://github.com/iman2693/CTCB/blob/main/weights/Model1%20-%20MobileNet/mobilenet.h5')

------------

In [None]:
test_Loss, test_Acc = mobilenetmodel.evaluate(Gray3channel(X_test),OneHot_labels_test)
print('Test Loss:', test_Loss)
print('Test Accuracy :', test_Acc)
Y_pred = mobilenetmodel.predict(Gray3channel(X_test))
Y_pred = encoder.inverse_transform(labelwithoutput(Y_pred))
print(classification_report(y_test, Y_pred,digits=4))
disp = ConfusionMatrixDisplay(confusion_matrix(y_test, Y_pred),display_labels=[cl[:3].upper() for cl in Categories])
fig, ax = plt.subplots(figsize=(10,10))

disp.plot(ax=ax,cmap='Blues')
plt.savefig("filepath/MobileNet-Confusion.svg")
plt.show()

In [None]:
# Add data
epochs = [f'{str(i)}' for i in range(1,41)]
val_loss = history.history['val_loss']
val_accuracy = history.history['val_accuracy']
train_loss = history.history['loss']
train_accuracy = history.history['accuracy']

fig = go.Figure()
# Create and style traces
fig.add_trace(go.Scatter(x=epochs, y=train_accuracy, name='Train Accuracy',
                         line=dict(color='firebrick', width=4)))
fig.add_trace(go.Scatter(x=epochs, y=val_loss, name = 'Val Loss',
                         line=dict(color='royalblue', width=4 , dash='dash')))
fig.add_trace(go.Scatter(x=epochs, y=train_loss, name='Train Loss',
                         line=dict(color='firebrick', width=4,
                              dash='dash') # dash options include 'dash', 'dot', and 'dashdot'
))
fig.add_trace(go.Scatter(x=epochs, y=val_accuracy, name='Val Accuracy',
                         line = dict(color='royalblue', width=4)))

# Edit the layout
fig.update_layout(title='Epoch/Loss plot',
                   xaxis_title='Epochs',
                   yaxis_title='Loss / Accuracy (%)')

fig.show()

In [None]:
new_layer = Flatten()(mobilenetmodel.layers[-4].output)
intermediate_layer_model = Model(inputs=mobilenetmodel.input,
                                       outputs=new_layer)
intermediate_output_train = intermediate_layer_model.predict(Gray3channel(new_X_train))
intermediate_output_test = intermediate_layer_model.predict(Gray3channel(X_test))

In [None]:
from sklearn.decomposition import PCA
pca = PCA(n_components=100)
pca.fit(intermediate_output_train)
X_new = pca.transform(intermediate_output_train)
X_test_new = pca.transform(intermediate_output_test)

In [None]:
import xgboost as xgb

xgb_cl = xgb.XGBClassifier(objective="binary:logistic")
xgb_cl.fit(X_new, new_y_train)
Y_pred = xgb_cl.predict(X_test_new)
print(accuracy_score(y_test, Y_pred))
print(classification_report(y_test, Y_pred,digits=4))

In [None]:
from sklearn.ensemble import RandomForestClassifier
clf = RandomForestClassifier(max_depth=100, random_state=0)
clf.fit(X_new, new_y_train)
Y_pred = clf.predict(X_test_new)
print(accuracy_score(y_test, Y_pred))
print(classification_report(y_test, Y_pred,digits=4))

In [None]:
from sklearn.neighbors import KNeighborsClassifier
neigh = KNeighborsClassifier(n_neighbors=3)
neigh.fit(X_new, new_y_train)
Y_pred = neigh.predict(X_test_new)
print(accuracy_score(y_test, Y_pred))
print(classification_report(y_test, Y_pred,digits=4))

In [None]:
from sklearn.naive_bayes import GaussianNB
clf = GaussianNB()
clf.fit(X_new, new_y_train)
Y_pred = clf.predict(X_test_new)
print(accuracy_score(y_test, Y_pred))
print(classification_report(y_test, Y_pred,digits=4))

In [None]:
from sklearn.svm import SVC
clf = SVC(kernel='rbf')
clf.fit(X_new, new_y_train)
Y_pred = clf.predict(X_test_new)
print(accuracy_score(y_test, Y_pred))
print(classification_report(y_test, Y_pred,digits=4))

In [None]:
from sklearn.neural_network import MLPClassifier
clf = MLPClassifier(random_state=0, max_iter=300).fit(X_new, new_y_train)
Y_pred = clf.predict(X_test_new)
print(accuracy_score(y_test, Y_pred))
print(classification_report(y_test, Y_pred,digits=4))

Save Weights of model:

In [None]:
#mobilenetmodel.save_weights('filepath/mobilenetweights.h5') 

-----------------------------