## Import required libraries

In [1]:
import os, fnmatch
import imageio
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from PIL import Image
Image.LOAD_TRUNCATED_IMAGES = True
from urllib.error import HTTPError
from urllib.request import urlretrieve

from random import shuffle
from sklearn.model_selection import train_test_split
from sklearn import metrics
from keras.models import Model, Sequential
from keras.utils import np_utils, plot_model, to_categorical
from keras.layers import Maximum, ZeroPadding2D, BatchNormalization
from keras.layers import Input, Dense, Flatten, Activation, Dropout
from keras.optimizers import Adam, SGD
from keras import optimizers, regularizers
from keras import backend as K

from keras.initializers import glorot_uniform

from keras.layers.merge import concatenate
from keras.layers.convolutional import Conv2D
from keras.layers.pooling import MaxPooling2D

from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau
from keras.layers.advanced_activations import LeakyReLU, ReLU

from keras import applications
from keras.applications.inception_v3 import InceptionV3

Using TensorFlow backend.


### Set variables for further use

In [2]:
## Set local variables
labelCode = {}
trainImage = []
testImage = []
allImages = []

rows = 256
cols = 256
channels = 3

## Set the path from where images needs to be read
inputFileFolder = "TrainTestImages"
print('Input Files Base Folder: %s' %(inputFileFolder))

Input Files Base Folder: TrainTestImages


## Read all the folders/files present in input location and create training/test list having URL of the images

In [3]:
## lop through all the files and sub-folders
for root, dirs, files in os.walk(inputFileFolder):
    
    file=[]
    ## Read images and convert the same into array, also fetch/set their label
    for f in files:
        img = imageio.imread(os.path.join(root,f))
        
        ## If image is  a gray scale image, ignore that image
        if(len(img.shape)!=3):
            continue
        
        ## Set the directory elements
        file.append(os.path.join(root,f))
        allImages.append(os.path.join(root,f))
                
    if(file!=[]):
        ## Create/update label dictionary
        if(dirs == []):
            imageLabel = os.path.basename(os.path.normpath(root))
            if imageLabel not in labelCode:
                labelCode[imageLabel] = len(labelCode)
        
        ## Shuffle the image URL's to split in train and test data
        shuffle(file)
        idxTrain = int(len(file) * 0.7)
        trainImage.extend(file[:idxTrain])
        testImage.extend(file[idxTrain:])

### Count the number of images in train and test

In [4]:
cntTestImages = len(testImage)
cntTrainImages = len(trainImage)

print('Train Images: %s, Test Images: %s' % (str(cntTrainImages), str(cntTestImages)))
print('Total (%s): %s' %(str(len(allImages)), str(cntTestImages + cntTrainImages)))
#testImage

Train Images: 19341, Test Images: 8304
Total (27645): 27645


## Create methods for Image reading and model creation

In [5]:
def getImageAndLabels(lsImagePath):
    ''' This method is used to read the image from the given path and perform following action:
        1. Conver image to array
        2. Set the label and lable code for the image based on image path
    '''
    lsImageData = []
    lsImageClass = []
    lsImageLabel = []
    
    for path in lsImagePath:
        img = imageio.imread(path)
        ## If image is  a gray scale image, ignore that image
        if(len(img.shape)!=3):
            continue
        
        ## Set the directory elements
        imgPath, filename = os.path.split(path)
        ## basename returns the directory in which file is present and 
        ## normpath is used to remove slashes at the end
        imageLabel = os.path.basename(os.path.normpath(imgPath))
        
        ## Add image array to the list
        lsImageData.append(img)
        ## Add image label code to the list
        lsImageClass.append(labelCode[imageLabel])
        ## Add image label desc to the list
        lsImageLabel.append(imageLabel)
        
    return (lsImageData, lsImageClass, lsImageLabel)

### Methods for creating CNN model

In [6]:
# Dense layers set
def dense_set(inp_layer, n, activation, drop_rate=0):
    dp = Dropout(drop_rate)(inp_layer)
    dns = Dense(n)(dp)
    bn = BatchNormalization(axis=-1)(dns)
    act = Activation(activation=activation)(bn)
    return act

In [7]:
# Conv. layers set
def conv_layer(feature_batch, feature_map, kernel_size=(3, 3),strides=(1,1), padding='same'):
    conv = Conv2D(filters=feature_map, kernel_size=kernel_size, strides=strides, 
                  padding=padding)(feature_batch)
    bn = BatchNormalization(axis=3)(conv)
    act = ReLU()(bn) #LeakyReLU(1/10)(bn)
    return act

### Creating models

In [8]:
def get10LayerModel(myoptim = SGD(lr=1 * 1e-1, momentum=0.9, nesterov=True)):
    inp_img = Input(shape=(256, 256, 3))
    conv1 = conv_layer(inp_img, 64)
    conv2 = conv_layer(conv1, 64)
    mp1 = MaxPooling2D(pool_size=(3, 3), strides=(2, 2))(conv2)
    conv3 = conv_layer(mp1, 128)
    conv4 = conv_layer(conv3, 128)
    mp2 = MaxPooling2D(pool_size=(3, 3), strides=(2, 2))(conv4)
    conv5 = conv_layer(mp2, 256)
    conv6 = conv_layer(conv5, 256)
    conv7 = conv_layer(conv6, 256)
    mp3 = MaxPooling2D(pool_size=(3, 3), strides=(2, 2))(conv7)

    # dense layers
    flt = Flatten()(mp3)
    ds1 = dense_set(flt, 128, activation='relu') ## Changed it from 128 to 512
    #ds2 = dense_set(ds1, 512, activation='relu') ## Added this layer
    out = dense_set(ds1, num_classes, activation='softmax')

    model = Model(inputs=inp_img, outputs=out)
    model.compile(loss='categorical_crossentropy', optimizer=myoptim, metrics=['accuracy'])
    model.summary()
    return model

In [9]:
def get6LayerModel(myoptim = SGD(lr=1 * 1e-1, momentum=0.9, nesterov=True)):
    inp_img = Input(shape=(256, 256, 3))
    conv1 = conv_layer(inp_img, 32, padding='same')
    mp1 = MaxPooling2D(pool_size=(3, 3), strides=(2, 2))(conv1)
    conv2 = conv_layer(mp1, 64, padding='same')
    mp2 = MaxPooling2D(pool_size=(5, 5), strides=(2, 2))(conv2)
    conv3 = conv_layer(mp2, 128)
    mp3 = MaxPooling2D(pool_size=(5, 5), strides=(2, 2))(conv3)
    conv4 = conv_layer(mp3, 256)
    mp4 = MaxPooling2D(pool_size=(7, 7), strides=(2, 2))(conv4)

    # dense layers
    flt = Flatten()(mp4)
    ds1 = dense_set(flt, 64, activation='relu')
    ds2 = dense_set(ds1, 128, activation='relu')
    out = dense_set(ds2, num_classes, activation='softmax')

    model = Model(inputs=inp_img, outputs=out)
    model.compile(loss='categorical_crossentropy', optimizer=myoptim, metrics=['accuracy'])
    model.summary()
    return model

In [10]:
# simple model 
def getSimpleModel(myoptim = SGD(lr=1 * 1e-1, momentum=0.9, nesterov=True)):
    inp_img = Input(shape=(256, 256, 3))
    conv1 = conv_layer(inp_img, 32)
    mp1 = MaxPooling2D(pool_size=(4, 4))(conv1)
    
    # dense layers
    flt = Flatten()(mp1)
    ds1 = dense_set(flt, 256, activation='relu')
    out = dense_set(ds1, num_classes, activation='softmax')

    model = Model(inputs=inp_img, outputs=out)
    model.compile(loss='categorical_crossentropy',
                   optimizer=myoptim,
                   metrics=['accuracy'])
    model.summary()
    return model


In [11]:
def trainAndPredictModel(model, x_train, x_test, y_train, y_test, epochs=50, batchSize=16):
    ''' This method is used to train and test the model and it also creates the confusion matrix'''
    
    ## Training and validating the model
    model.fit(x= x_train, y=y_train, epochs=epochs, batch_size=batchSize, verbose=2, shuffle=True)
    acc = model.evaluate(X_test,y_test)
    print("=====================================================================================")
    print("Accuracy of the model: %s" %(str(acc)))
    print("=====================================================================================")
    
    ## Creating confusion matrix and heat-map
    ypred = np.zeros((y_test.shape))
    y_predict = model.predict(x_test)
    for idx, val in enumerate(y_predict):
        ypred[idx, np.argmax(val)] = 1

    return (model, ypred)

In [12]:
def createConfusionMatrix(y_test, y_pred):
    cr = metrics.classification_report(y_test,y_pred)
    print(cr)

    cm = metrics.confusion_matrix(y_test.argmax(axis=1), ypred.argmax(axis=1))
    #print(cm)    
    dfCM = pd.DataFrame(cm, index=list(labelCode), columns=list(labelCode))
    plt.figure(figsize=(80,20))
    ax = sns.heatmap(dfCM,vmax=8, square=True, fmt='.2f',annot=True, 
                     linecolor='white', linewidths=0.1)
    plt.show()

## Based on train/test URL, fetching the image array and corresponding label

In [13]:
xtrain, ytrain, lbl_train = getImageAndLabels(trainImage)
xtest, ytest, lbl_test = getImageAndLabels(testImage)

In [14]:
X_train = np.asarray(xtrain)
y_train = np_utils.to_categorical(np.array(ytrain))
X_test = np.asarray(xtest)
y_test = np_utils.to_categorical(np.array(ytest))
num_classes = y_train.shape[1]

## conver to float
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
##data needs to be normalized from 0
X_train = X_train/255
X_test = X_test/255

print("X_Train Shape:%s" %(str(X_train.shape)))
print("X_Test Shape:%s" %(str(X_test.shape)))
print("Y_Train Shape:%s" %(str(y_train.shape)))
print("Y_Test Shape:%s" %(str(y_test.shape)))
print("Number of classes:%s" %(str(num_classes)))

X_Train Shape:(19341, 256, 256, 3)
X_Test Shape:(8304, 256, 256, 3)
Y_Train Shape:(19341, 22)
Y_Test Shape:(8304, 22)
Number of classes:22


### Randomly viewing the train image and corresponding label for verification

In [None]:
idx = 1380
print("Label:%s, ClassCode:%s, Encoding:%s" %(lbl_train[idx], ytrain[idx], y_train[idx]))
plt.imshow(X_train[idx])

## Running various models

### Running simple model

In [None]:
model1 = getSimpleModel()

In [None]:
_, ypred = trainAndPredictModel(model1, X_train, X_test, y_train, y_test)

In [None]:
createConfusionMatrix(y_test, ypred)

#### Viewing random predicted values

In [None]:
idx = 160
print("Predicted value: %s" %(ypred[idx]))
print("Actual value: %s" %(y_test[idx]))
print(labelCode)
plt.imshow(X_test[idx])

### Running simple model with diff learning rate

In [None]:
model2 = getSimpleModel(optimizers.SGD(lr=0.0001, momentum=0.9))

In [None]:
_, ypred = trainAndPredictModel(model2, X_train, X_test, y_train, y_test)

In [None]:
createConfusionMatrix(y_test, ypred)

> ### Running 6 layer model

In [None]:
model3 = get6LayerModel(myoptim='adam')

In [None]:
_, ypred = trainAndPredictModel(model3, X_train, X_test, y_train, y_test)

In [None]:
createConfusionMatrix(y_test, ypred)

## Transfer Learning

In [None]:
def createTransferModel(base_model, freezeLayers=10, 
                        optimizer = optimizers.SGD(lr=0.0001, momentum=0.9)):
    # Freeze the layers which you don't want to train. Here I am freezing the first 10 layers.
    for layer in base_model.layers[:freezeLayers]:
        layer.trainable = False

    #Adding custom Layers 
    x = base_model.output
    x = Flatten()(x)
    x = Dense(1024, activation="relu")(x)
    #x = Dropout(0.5)(x)
    x = Dense(1024, activation="relu")(x)
    predictions = Dense(num_classes, activation="softmax")(x)

    # creating the final model 
    model_final = Model(input = base_model.input, output = predictions)

    # compile the model 
    model_final.compile(loss = "categorical_crossentropy", 
                        optimizer = optimizer,    
                        metrics=["accuracy"])

    model_final.summary()
    return model_final

In [None]:
# Fetch the base pre-trained model for creating new model
#base_model = InceptionV3(weights='imagenet', include_top=False, input_shape = (rows, cols, channels))
base_model = applications.VGG16(weights = "imagenet",include_top=False, 
                                input_shape = (rows, cols, channels))

In [None]:
base_model.summary()

In [None]:
premodel = createTransferModel(base_model)

In [None]:
_, ypred = trainAndPredictModel(premodel, X_train, X_test, y_train, y_test)

In [None]:
createConfusionMatrix(y_test, ypred)