In [None]:
# !pip install opencv-python scikit-learn scikit-image matplotlib spectral keras_tuner vis
# !pip install tensorflow numpy pandas
# !pip install spectral

In [None]:
import tensorflow as tf

In [None]:
from tensorflow import keras as keras
from keras import layers as layers
import numpy as np

In [None]:
import os, timeit
import cv2
import matplotlib.pyplot as plt
from math import inf as inf

In [None]:
from spectral import imshow

In [None]:
import keras_tuner as kt
import sys

In [None]:
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [None]:
#Constants
BAND_NUMBER = 60
FILLED_AREA_RATIO = 0.90
TOTAL_IMAGE_COUNT = 1200
IMAGE_COUNT = int(TOTAL_IMAGE_COUNT/4)
NUM_VARIETIES = 8
NUM_OF_BANDS = 15
FIRST_BAND = 21
LAST_BAND = 149

IMAGE_WIDTH = 30
IMAGE_HEIGHT = 30

NUM_LAYERS_EACH_BLOCK = [1, 1]
NUM_CHAN_PER_BLOCK = [[64,128,128],[128,128,256]]

ACTIVATION_TYPE =  "relu"
BATCH_SIZE = 2*NUM_VARIETIES

LEARNING_RATE_BASE = 0.0001

In [None]:
def start_timer():
    print("Testing started")
    return timeit.default_timer()

def end_timer():
    return timeit.default_timer()

def show_time(tic,toc): 
    test_time = toc - tic
    print('Testing time (s) = ' + str(test_time) + '\n')

In [None]:
from sys import platform
DATA_DIRECTORY = ""
SLASH = ""
if platform == "linux" or platform == "linux2":
    DATA_DIRECTORY = "/home/nitintyagi/wheat data/BULK/"
    SLASH = "/"
elif platform == "win32":
    DATA_DIRECTORY = "D:\mvl\wheat\data\BULK\\"
    SLASH="\\"

In [None]:
# List for All varieties
VARIETIES = []

for name in os.listdir(DATA_DIRECTORY):
    if (name.endswith(".hdr") or name.endswith(".bil")):
        continue
    VARIETIES.append(name)
    if len(VARIETIES)==NUM_VARIETIES:
        break

In [None]:
def dataset_file_name():
    return "./dataset/V"+str(NUM_VARIETIES).zfill(3)+"_IC_"+str(TOTAL_IMAGE_COUNT).zfill(5)+"_FilledArea_"+str(FILLED_AREA_RATIO)+"_NumOfBands_"+str(NUM_OF_BANDS)+"_FB_"+str(FIRST_BAND)+"_LB_"+str(LAST_BAND)+"_BandNo_"+str(BAND_NUMBER)+"_ImageHeight_"+str(IMAGE_HEIGHT)+"_ImageWidth_"+str(IMAGE_WIDTH)

In [None]:
DATASET_FILE_NAME = dataset_file_name()

train_dataset = np.load(DATASET_FILE_NAME+"_train_dataset.npy")
train_dataset_label = np.load(DATASET_FILE_NAME+"_train_dataset_label.npy")
test_dataset = np.load(DATASET_FILE_NAME+"_test_dataset.npy")
test_dataset_label = np.load(DATASET_FILE_NAME+"_test_dataset_label.npy")

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import pyplot as plt
# import math, sys, pdb, os
import keras
import keras.backend as K
import tensorflow as tf
from keras.layers import Input, Conv2D, MaxPooling2D, Activation, BatchNormalization, Add, Conv2DTranspose, Flatten, Dense, Conv1D, AveragePooling2D, LeakyReLU, PReLU, GlobalAveragePooling2D
from keras.layers.core import Dropout
from keras.layers import concatenate
from keras.models import Model

import os, pdb, timeit
import numpy as np
from keras.callbacks import TensorBoard
from keras.optimizers import Adam
import matplotlib
matplotlib.use('Agg')
from matplotlib import pyplot as plt

from sklearn.metrics import confusion_matrix, precision_recall_fscore_support
import matplotlib.cm as cm
import cv2
from keras import activations
import vis
# from vis.visualization import visualize_saliency, overlay
# from vis.utils import utils

In [None]:
def normalizeDataWholeSeed(data,normalization_type='max'):
    
    if normalization_type == 'max':
        for idx in range(data.shape[0]):
            data[idx,:,:,:] = data[idx,:,:,:]/np.max(abs(data[idx,:,:,:]))
            
    elif normalization_type == 'l2norm':
        from numpy import linalg as LA
        for idx in range(data.shape[0]):
            data[idx,:,:,:] = data[idx,:,:,:]/LA.norm(data[idx,:,:,:]) # L2-norm by default        
        
    return data

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):

    import itertools
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.clim(0,sum(cm[0,:]))
    plt.xlabel('Predicted label')

In [None]:
def top_K_classification_accuracy(y_predicted, y_true, K=1):

    num_samples = y_predicted.shape[0]
    num_classes = y_predicted.shape[1]

    if K > num_classes:
        sys.exit(1)

    temp = np.zeros((num_samples,))

    for idx in range(num_samples):
        curr_predicted = np.argsort(y_predicted[idx,:])
        curr_predicted = curr_predicted[::-1] # descending

        if y_true[idx] in curr_predicted[:K]:
            temp[idx] = 1

    return 100.0 * np.sum(temp)/num_samples

In [None]:
def single_conv_block(x, kernel_size=3, activation_type='relu', num_filters=128,use_bias=False):
    # Batch norm
    x = BatchNormalization()(x)
    x = Conv2D(num_filters, kernel_size, activation=None, use_bias=use_bias, padding='same',
               kernel_initializer='truncated_normal')(x)

    # Activation
    x = Activation(activation_type)(x)
    return x

In [None]:
def conv2D_ResNet(x, kernel_size, activation_type, dropout_rate, num_filters_first_conv1D=[32,64,128]):

    x = single_conv_block(x,1,activation_type,num_filters_first_conv1D[0])

    x = single_conv_block(x,kernel_size,activation_type,num_filters_first_conv1D[1],use_bias=True)

    x = single_conv_block(x,1,activation_type,num_filters_first_conv1D[2])

    # Dropout
    return Dropout(dropout_rate)(x)

In [None]:
def createBlock_ResNet2D(x, num_layers, kernel_size, activation_type, dropout_rate, num_filters_first_conv1D):

    for idx_layer in range(num_layers):
        x = conv2D_ResNet(x, kernel_size, activation_type, dropout_rate, num_filters_first_conv1D)

    return x

In [None]:
def ResNet2D_classifier(data_num_rows, data_num_cols, num_classes, kernel_size=3, num_layers_each_block=[],
                        num_chan_per_block=[], activation_type='relu', dropout_rate=0.0, num_input_chans=1, num_nodes_fc=64):

    input_data = Input(shape=(data_num_rows, data_num_cols, num_input_chans))

    # Input layer: Conv2D -> activation
    x = Conv2D(num_chan_per_block[0][0], kernel_size, activation=None, use_bias=True, padding='same',
               kernel_initializer='truncated_normal')(input_data)

    # Activation
    x = Activation(activation_type)(x)


    #  Blocks & Downsampling Layers
    for idx_block in range(len(num_layers_each_block)):
        x = createBlock_ResNet2D(x, num_layers_each_block[idx_block], kernel_size, activation_type, dropout_rate,
                                 num_chan_per_block[idx_block])

        x = BatchNormalization()(x)

        if idx_block != len(num_layers_each_block)-1:
            x = Conv2D(num_chan_per_block[idx_block][1]*2, kernel_size, strides = 2, activation=None, use_bias=True, padding='valid',
                   kernel_initializer='truncated_normal')(x)
        else:
            x = GlobalAveragePooling2D()(x)

        x = Dropout(dropout_rate)(x)

    # Output layer
    x = BatchNormalization()(x)
    
    x = Flatten()(x)

    x = Dense(units=num_nodes_fc, activation=None, kernel_initializer='truncated_normal')(x)
#     x = Dense(units=32, activation=None, kernel_initializer='truncated_normal')(x)

    # Activation
    x = Activation(activation_type)(x)

    x = BatchNormalization()(x)
    output_data = Dense(units=num_classes, activation='softmax', kernel_initializer='truncated_normal')(x)

    return Model(inputs=input_data, outputs=output_data)

In [None]:
def evaluate(model,dataset,dataset_label,normalization_type):
    print("--------------Make Predictions--------------")    
    x = np.array(dataset)
    labels = np.array(dataset_label)
    
    # Normalize the data
    x = normalizeDataWholeSeed(x,normalization_type=normalization_type)
    
    num = x.shape[0]

    tic = start_timer()
    labels_predicted = model.predict(x)
    toc = end_timer()
    show_time(tic,toc)
    
    print(labels_predicted)
    print("--------")
    # Classification accuracy
    labels_integer_format = labels
    labels_predicted_integer_format = np.argmax(labels_predicted, axis=1)

    acc_top2 = top_K_classification_accuracy(labels_predicted, labels_integer_format, K=2)
    acc_top1 = top_K_classification_accuracy(labels_predicted, labels_integer_format, K=1)
    
    # Confusion matrices
    confusion_matrix_results = confusion_matrix(labels_integer_format, labels_predicted_integer_format)
    print("Confusion matrix = ")
    print(confusion_matrix_results)
    print("------------------------------------------------")

In [None]:
def predict(model,normalization_type):
    evaluate(model,train_dataset,train_dataset_label,normalization_type)
    
    evaluate(model,test_dataset,test_dataset_label,normalization_type)
    


In [None]:
def createAndTrainResNetB():
    
    learning_rate_base = LEARNING_RATE_BASE
    kernel_size = 3
    dropout_rate = 0.15 
    activation_type = ACTIVATION_TYPE
    num_nodes_fc = 256
    wheat_types =  VARIETIES
    normalization_type = 'max'
    num_layers_each_block = NUM_LAYERS_EACH_BLOCK
    num_chan_per_block = NUM_CHAN_PER_BLOCK
    N_classes = len(wheat_types)
    
    ############ Load data ############
    print("--------------Load Data--------------")

    # Load training data and their corresponding labels
    x_training = np.array(train_dataset)
    labels_training = np.array(train_dataset_label)
    
    # Normalize the data
    x_training = normalizeDataWholeSeed(x_training,normalization_type=normalization_type)
    
    # Extract some information
    num_training = x_training.shape[0]
    N_spatial = x_training.shape[1:3]
    N_bands = x_training.shape[3]
    
    print("--------------Done--------------")
    
    ############ Create a model ############
    print("--------------Create a model--------------")
    
    # Generate a model
    model = ResNet2D_classifier(data_num_rows = N_spatial[0], 
                                data_num_cols = N_spatial[1], 
                                num_classes = N_classes,
                                kernel_size = kernel_size, 
                                num_layers_each_block = num_layers_each_block,
                                num_chan_per_block = num_chan_per_block, 
                                activation_type = activation_type,
                                dropout_rate = dropout_rate, 
                                num_input_chans = N_bands, 
                                num_nodes_fc = num_nodes_fc)

    # Compile the model
    adam_opt = Adam(learning_rate=LEARNING_RATE_BASE, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.01)
    model.compile(loss='sparse_categorical_crossentropy', optimizer=adam_opt, metrics=['accuracy'])
    print("---------Completed---------")
    return model

In [None]:
model = createAndTrainResNetB()

In [None]:
for x in range(2):
    print("From epochs: ",20*x+1," to ",20+20*x)
    tic = start_timer()
    print(model.fit(x=train_dataset,y=train_dataset_label,batch_size=BATCH_SIZE, epochs=20, initial_epoch = 0, verbose=2,validation_split=0.20,shuffle=True))
    toc = end_timer()
    show_time(tic,toc)
    print("for testing")
    print(model.evaluate(test_dataset,test_dataset_label))
    print("for training")
    print(model.evaluate(train_dataset,train_dataset_label))