In [1]:
######### push from local machine
import tensorflow as tf
from tensorflow import keras
import numpy as np
import cv2
from imutils import paths
from tensorflow.keras import backend as K
import os
import random
from google.colab import drive
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn import metrics


In [2]:
# !mkdir('/content/drive/My Drive/2020-11-01-Data') ## Uncomment this line if there is not such directory
drive.mount('/content/drive')
# !unzip '/content/drive/My Drive/2020-11-01-Data.zip' -d '/content/drive/My Drive/2020-11-01-Data'  ## Uncomment this line if u want to unzip 
input_image_directory = '/content/drive/My Drive/2020-11-01-Data/2020-11-01'
imagePaths = sorted(list(paths.list_images(input_image_directory)))
np.random.seed(0)
imagePaths = np.random.permutation(imagePaths)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
IMAGE_DIMS = (224, 224, 3)   # # if u want to use VGG  or ReSnet50 uncomment this line
# IMAGE_DIMS = (299, 299, 3) # if u want to use InceptionV3 uncomment this line
# initialize the data and labels
data = []
labels = []
size = []
for imagePath in imagePaths:
  image = cv2.imread(imagePath)
  a=image.shape[:2]
  size.append(list(a))
  image = cv2.resize(image, (IMAGE_DIMS[1], IMAGE_DIMS[0]))
  image = tf.keras.preprocessing.image.img_to_array(image)
  data.append(image)
  label = imagePath.split(os.path.sep)[-2]
  labels.append(label)

In [4]:
## preprocess images and split data to train and test
data = np.array(data, dtype="float") / 255.0
labels = np.array(labels)
lb = LabelBinarizer()
labels = lb.fit_transform(labels)
(trainX, testX, trainY, testY) = train_test_split(data,labels, test_size=0.1, random_state=42)

In [5]:
## this function takes source model like ResNet50 and makes new model. 
#INPUTS 
# model: source model like ResNet or VGG
# opt: optimizer object for compiling model
#nClass: number of classes
# OUTPUT
# model: created model
def makeCustomeModel(model,nClass):
  predictions0 = keras.layers.Dense(128, activation='softmax')(model.layers[-2].output)
  predictions2 = keras.layers.Dense(nClass, activation='softmax')(predictions0)
  model = tf.keras.Model(inputs=model.input, outputs=predictions2)
  return model


In [6]:
## choose source model by setting NUMBER
# be carefule about IMAGE_DIMS(u set it before)
NUMBER = 1 
###################ResNet50
if NUMBER == 1:
  sourceModel = tf.keras.applications.ResNet50(
      include_top=True,
      weights="imagenet",
      # weights=None,
      input_shape=(IMAGE_DIMS[0],IMAGE_DIMS[1],3),
      pooling=True  )
########## InceptionV3
elif  NUMBER == 2:
  sourceModel = tf.keras.applications.InceptionV3(
      include_top=True,
      weights="imagenet",
      input_tensor=None,
      input_shape=(IMAGE_DIMS[0],IMAGE_DIMS[1],3),
      pooling=None,
      classes=6,
      classifier_activation="softmax"
  )
elif  NUMBER == 3:
###############VGG16
  sourceModel = keras.applications.vgg16.VGG16(include_top=True, input_shape = (IMAGE_DIMS[0],IMAGE_DIMS[1],3), weights="imagenet")
###freeze all layers in source model except BatchNormalization layer. also change momentum of these layers 
# decreasing  momentum helps us to have good preiction even few epochs
for layer in sourceModel.layers:
    if isinstance(layer, keras.layers.BatchNormalization):
        layer.trainable = True
        layer.momentum = 0.01
    else:
        layer.trainable = False

sourceModel.summary()        


Model: "resnet50"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, 230, 230, 3)  0           input_1[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, 112, 112, 64) 9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
conv1_bn (BatchNormalization)   (None, 112, 112, 64) 256         conv1_conv[0][0]                 
___________________________________________________________________________________________

In [7]:
INIT_LR = 0.001
EPOCHS = 50
BS = 20
numClass = 6
opt = keras.optimizers.Adam(lr=INIT_LR)
finalModel = makeCustomeModel(sourceModel, numClass)
#### unfreeze last layers to have good convergence. 
for layer in finalModel.layers[170:]: ## change depth if u dont use Resnet. set trainable for layers end-10 :end
  layer.trainable = True
finalModel.summary()
finalModel.compile(optimizer=opt, loss='categorical_crossentropy', 	metrics=tf.keras.metrics.CategoricalAccuracy())

Model: "functional_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, 230, 230, 3)  0           input_1[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, 112, 112, 64) 9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
conv1_bn (BatchNormalization)   (None, 112, 112, 64) 256         conv1_conv[0][0]                 
_______________________________________________________________________________________

In [8]:
## this functions takes 3 inputs and make a callbacks_list which used for storing best models in specific address and specific name
def pathToSaveCheckpoint(pathPreFix,folderNam, CheckPointName):
  path = pathPreFix + '/' + folderNam 
  filePath = path + '/' + CheckPointName
  os.mkdir( path )
  checkpoint = keras.callbacks.ModelCheckpoint(filePath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
  callbacks_list = [checkpoint]
  return filePath, callbacks_list

In [9]:
##### this function takes features and labels,... and learns the model. It 's Possible to choose learning with or without data augmentaion
# INPUTS:
# trainX: features 
# trainY : labels
# compiledModel: model, it should be compiled before passing to function
# batchSize: size of batch
# useAug: determine how to model learns (with or without data augmentation)
#callbacks_list: for storing only best trained model based on choosed parameter(val_accuracy or val loss)
def doTrain(trainX, trainY, compiledModel,callbacks_list,batchSize, useAug=False):
  model =compiledModel
   
  if useAug :
    (trainX, validationX, trainY, validationY) = train_test_split(trainX, trainY, test_size=0.1,
  random_state=40 )
    aug = tf.keras.preprocessing.image.ImageDataGenerator(rotation_range=10, width_shift_range=0.1,
      height_shift_range=0.2, shear_range=0.2, zoom_range=0.2,
      horizontal_flip=True, fill_mode="nearest")
    it =aug.flow(x= trainX, y= trainY,batch_size=batchSize)
    itValidation =aug.flow(x= validationX, y= validationY,batch_size=batchSize)

    H = model.fit_generator(it,epochs=EPOCHS,validation_data=itValidation,callbacks=callbacks_list)
  
  else:
    H = model.fit(x=trainX, y=trainY, batch_size=batchSize,
      validation_split= 0.1 ,
      steps_per_epoch=None,
      epochs=EPOCHS, verbose=1,callbacks=callbacks_list)

In [10]:
def doTest(model, testX, testY, batchSize):
  y_pred = model.predict(x= testX,batch_size=batchSize)
  print('confusion_matrix\n')
  print(metrics.confusion_matrix(testY.argmax(axis=1), y_pred.argmax(axis=1)),'\n')
  print('report\n')
  print(metrics.classification_report(testY.argmax(axis=1), y_pred.argmax(axis=1)))


In [11]:
# training
(pathToBestModel, callbacks_list) = pathToSaveCheckpoint('/content/drive/My Drive','checkPointResNet', 'weights-improvement.hdf5')
doTrain(trainX, trainY, finalModel,callbacks_list,batchSize=BS, useAug=True)



KeyboardInterrupt: ignored

In [12]:
### load the best model and Test it
loadedModel = tf.keras.models.load_model(pathToBestModel)
doTest(loadedModel, testX, testY, BS)

confusion_matrix

[[3 0 0 0 0 0]
 [0 4 0 0 0 1]
 [0 0 3 1 0 0]
 [0 0 0 4 0 0]
 [0 0 0 0 0 7]
 [0 0 0 0 0 7]] 

report

              precision    recall  f1-score   support

           0       1.00      1.00      1.00         3
           1       1.00      0.80      0.89         5
           2       1.00      0.75      0.86         4
           3       0.80      1.00      0.89         4
           4       0.00      0.00      0.00         7
           5       0.47      1.00      0.64         7

    accuracy                           0.70        30
   macro avg       0.71      0.76      0.71        30
weighted avg       0.62      0.70      0.63        30



  _warn_prf(average, modifier, msg_start, len(result))
