# Importing Needed Libraries

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense , GlobalAveragePooling2D
from tensorflow.keras.callbacks import Callback, LearningRateScheduler, ModelCheckpoint

In [None]:
from tensorflow.keras.utils import to_categorical

In [None]:
import matplotlib.pyplot as plt
import pickle
import numpy as np
import math

In [None]:
import tensorflow as tf
import random
np.random.seed(1)
tf.random.set_seed(1)
random.seed(1)

# Get CNN model

In [None]:
def get_CNN_Mod(input_shape, N):
    """
    We take an input image, pass it through a convolutional layer with 20 filters, then a max pooling
    layer, then another convolutional layer with 15 filters, then another max pooling layer, then a
    flatten layer, then a dense layer with 10 neurons, then a final dense layer with N neurons, where N
    is the number of classes
    
    :param input_shape: the shape of the input image
    :return: The model is being returned.
    """
    
    input = Input(shape = input_shape, name='input1')

    # First branch
    x_1 = Conv2D(20, 3, strides=1, activation='relu', padding = 'valid')(input)
    x_1=MaxPooling2D(pool_size=2)(x_1)
    x_1=Conv2D(15, 3, strides=1, activation='tanh', padding = 'valid')(x_1)
    x_1 = MaxPooling2D(pool_size=x_1.shape[1])(x_1)
    x_1 = Flatten()(x_1)
    x_1 = Dense(10, activation="tanh")(x_1)
    y = Dense(N, activation="softmax")(x_1)
    
    CNNmodel = Model(inputs = input, outputs = y)
    return CNNmodel

In [None]:
def get_CNN(input_shape, N):
    """
    We take an input of shape (28,28,1) and pass it through a convolutional layer with 20 filters of
    size 3x3, followed by a global average pooling layer, a dense layer with 10 neurons, and a final
    dense layer with 10 neurons (one for each class)
    
    :param input_shape: the shape of the input image
    :return: The model is being returned.
    """
    
    input = Input(shape = input_shape, name='input1')

    # First branch
    x_1 = Conv2D(20, 3, strides=1, activation='tanh', padding = 'valid')(input)
#     x_1 = MaxPooling2D(pool_size=x_1.shape[1])(x_1)
#     x_1 = Flatten()(x_1)
    x_1 = GlobalAveragePooling2D()(x_1)
    x_1 = Dense(10, activation="relu")(x_1)
    y = Dense(N, activation="softmax")(x_1)
    
    CNNmodel = Model(inputs = input, outputs = y)
    return CNNmodel

# Input Function

In [None]:
def Data_entry ():
    """
    It takes the dataset name, number of channels, patch size, width and height of the image as input
    and returns the dataset name, number of channels, patch size, width and height of the image and the
    file size
    """
    args={} 
    args["data"]=input("please enter dataset name \n")
    args["channels"]=int(input("Please enter number of channels \n"))
    
    args["patch"]=int(input("please enter window size \n" ))
    args["W"]=int(input("please enter width of the image \n"))
    args["H"]=int(input("please enter hight of the image \n"))
    filesize = args["W"] * args["H"]
    return args["data"],args["channels"],args["patch"],args["W"],args["H"],filesize

In [None]:
#getting the dataset input parameters
dataset,channels,patch_size,m_W,m_H,filesize=Data_entry ()
weights = False
modelName = dataset+ '_patch_' + str(patch_size)

Loading the data from the pickle file.

In [None]:
with open(dataset + '/' + dataset + '_' + str(channels) + '_' + str(patch_size) + '.pkl', 'rb') as f:############
       x_train, y_train,  x_test, y_test = pickle.load(f)

Checking the Data Limits

In [None]:
print("Max and min values of data:")
print(x_test.max())
print(x_test.min())
print(x_train.max())
print(x_train.min())

In [None]:
N = len(np.unique(y_train))
print('Number of classes: ', N)

The above code is converting the y_train data into a categorical array.

In [None]:
y_temp_train = np.array(y_train)
y_train = to_categorical(y_temp_train)

In [None]:
input_shape = (x_train.shape[-3], x_train.shape[-2], x_train.shape[-1])

In [None]:
model = get_CNN(input_shape, N)
model.summary()
model.compile(loss='mse', optimizer='sgd', metrics=['accuracy'])

In [None]:
def step_decay(epoch):
    """
    The learning rate is dropped by a factor of 0.94 every 2 epochs
    
    :param epoch: The current epoch number
    :return: The learning rate is being returned.
    """
    initial_lrate = 0.1
    drop = 0.94
    epochs_drop = 2.0
    lrate = initial_lrate * math.pow(drop,  math.floor((1+epoch)/epochs_drop))
    return lrate
lrate = LearningRateScheduler(step_decay)

It stops training when the loss value is less than 0.01

In [None]:
class EarlyStoppingByLossVal(Callback):
    def __init__(self, monitor='loss', value=0.01, verbose=0):
        """
        This function is used to stop the training process when the monitored quantity has stopped
        improving
        
        :param monitor: quantity to be monitored, defaults to loss (optional)
        :param value: The threshold for measuring the new optimum, to only focus on significant changes
        :param verbose: 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per
        epoch, defaults to 0 (optional)
        """
        super(Callback, self).__init__()
        self.monitor = monitor
        self.value = value
        self.verbose = verbose
    def on_epoch_end(self, epoch, logs={}):
        """
        If the monitored quantity has stopped improving, then stop training
        
        :param epoch: the current epoch number
        :param logs: the same dictionary that is passed to the on_epoch_end callback
        """
        current = logs.get(self.monitor)
        if current is None:
            warnings.warn("Early stopping requires %s available!" % self.monitor, RuntimeWarning)
        if current < self.value:
            if self.verbose > 0:
                print("Epoch %05d: early stopping THR" % epoch)
            self.model.stop_training = True

In [None]:
mc = ModelCheckpoint(modelName + '.h5', monitor='accuracy', mode='max', save_best_only=True, verbose=1)
callbacks_list = [lrate, mc]

# Training the model.

In [None]:
if weights:
    model.load_weights(modelName + '.h5')
else:
    history = model.fit(x_train, y_train, epochs = 150, batch_size = 1, shuffle = True, callbacks = callbacks_list)
    # The below code is saving the loss history of the model.
    model.load_weights(modelName + '.h5')
    loss_history = history.history["loss"]
    numpy_loss_history = np.array(loss_history)
    np.savetxt(modelName + "_loss_history.txt", numpy_loss_history, delimiter=",")
    with open(modelName + '_loss.pkl', 'wb') as f:
        pickle.dump([numpy_loss_history], f, protocol = 4)
    #plotting the graph of the model and the loss curve
    plt.figure(0)
    plt.plot(history.history['loss'])
    plt.title('Model Loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.savefig(modelName + "_loss.png")

In [None]:
#predicting the testing dataset and checking the score
y_pred = model.predict(x_test)
#y_pred = model.predict([x_test])
y_pred = np.argmax(y_pred, axis = 1)
print("Test accuracy is: ", accuracy_score(y_pred, y_test))
C = confusion_matrix(y_test, y_pred)
print("Class Accuracies: ")
s = 0
for i in range(0, C.shape[0]):
    s = s + C[i,i]
    print(C[i,i]/np.sum(C[i, :]))
print("Accuracy: ", s / len(y_pred))
print(C)

# Saving the Model with best weights

In [None]:
model.save(r'Models/'+ dataset + '_' + 'model')

The below code is loading the data from the pickle file.

In [None]:
dataset,channels,patch_size,m_W,m_H,filesize=Data_entry ()
with open(dataset + '/' + dataset + '_' + str(channels) + '_' + str(patch_size) + '_' + 'patchdata' + '.pkl', 'rb') as f:
    Patch_Data_c = pickle.load(f)

sfbay_c<br>
colors_c = np.zeros([5, 3])<br>
colors_c[0, :] = [57, 83, 160]<br>
colors_c[1, :] = [232, 33, 38]<br>
colors_c[2, :] = [185, 79, 151]<br>
colors_c[3, :] = [283, 97, 105]<br>
colors_c[4, :] = [104, 192, 70]

The below code is creating a 15x3 matrix of zeros. Then, it is assigning the RGB values to each row<br>
of the matrix.

In [None]:
#flevo_l
colors_c = np.zeros([15, 3])
colors_c[0, :] = [0, 1, 254]
colors_c[1, :] = [0, 131, 71]
colors_c[2, :] = [0, 253, 255]
colors_c[3, :] = [0, 255, 0]
colors_c[4, :] = [255, 126, 0]
colors_c[5, :] = [180, 0, 255]
colors_c[6, :] = [251, 255, 7]
colors_c[7, :] = [91, 8, 227]
colors_c[8, :] = [253, 0, 0]
colors_c[9, :] = [172, 138, 78]
colors_c[10, :] = [255, 181, 230]
colors_c[11, :] = [191, 191, 255]
colors_c[12, :] = [201, 222, 188]
colors_c[13, :] = [127, 21, 25]
colors_c[14, :] = [249, 226, 150]

flevo_c<br>
colors_c = np.zeros([4, 3])<br>
colors_c[0, :] = [0, 1, 254]<br>
colors_c[1, :] = [0, 131, 71]<br>
colors_c[2, :] = [253, 0, 0]<br>
colors_c[3, :] = [0, 253, 255]

In [None]:
#predicting labels for the whole image
y_all_c = model.predict(Patch_Data_c)
y_all_c = np.argmax(y_all_c, axis = 1)

The below code is creating a color mask for the image.

In [None]:
color_mask_c = np.zeros([m_H, m_W, 3])
counter = 0
for i in range(0, m_H):
    for j in range(0, m_W):
        color_mask_c[i, j, :] = colors_c[y_all_c[counter], :]
        counter = counter + 1
import cv2
color_mask = color_mask_c.astype(np.uint8)
color_mask = cv2.cvtColor(color_mask, cv2.COLOR_BGR2RGB)
cv2.imwrite("file_dataset.jpg", color_mask)

Getting the label positions of the testing data only and mask it

In [None]:
def read_gtd(filename, size):
    """
    It reads the gtd files and converts them to numpy arrays
    
    :param filename: the name of the file to be read
    :param size: The number of samples in the dataset
    :return: The positions and labels of the test set.
    """
    positions = np.zeros((size,), dtype = 'float32')
    labels = np.zeros((size,), dtype = 'float32')
    f1 = open(filename + '_positions.gtd', 'rb')
    f2 = open(filename + '_labels.gtd', 'rb')
    for l in range(0, size):
        (num1,) = struct.unpack('f', f1.read(4))
        (num2,) = struct.unpack('f', f2.read(4))
        positions[l] = num1
        labels[l] = num2
    return positions, labels
### Read gtd
test_positions, test_labels = read_gtd(dataset+ '/' + dataset + '_test', test_size)
#Converting from float to int
test_positions = np.array(test_positions, dtype = 'int')
test_labels = np.array(test_labels, dtype = 'int')

Creating a mask of the image with the colors of the predicted classes.

In [None]:
color_mask_layout_c = np.zeros([m_H, m_W, 3])
counter = 0
for i in range(0, m_H):
    for j in range(0, m_W):
        if counter in test_positions: color_mask_layout_c[i, j, :] = colors_c[y_all_c[counter], :]
        counter = counter + 1
import cv2
color_mask_layout = color_mask_layout_c.astype(np.uint8)
color_mask_layout = cv2.cvtColor(color_mask_layout, cv2.COLOR_BGR2RGB)
cv2.imwrite("file22_c.jpg", color_mask_layout)

# Prediction and Masking for the sfbay_l

our nobel approach of trainning the model on c-band and predict the l-band labels<br>
Loading the data from the pickle file. (General Model)

In [None]:
dataset,channels,patch_size,m_W,m_H,filesize=Data_entry ()
with open(dataset + '/' + dataset + '_' + str(channels) + '_' + str(patch_size) + '_' + 'patchdata' + '.pkl', 'rb') as f:############
    Patch_Data_l = pickle.load(f)

The below code is creating a 4x3 matrix of zeros. Then, it is assigning the first row of the matrix<br>
to the RGB values of the first color. It is doing the same for the other three colors.

In [None]:
#sfbay_l
colors_l = np.zeros([4, 3])
colors_l[0, :] = [0, 1, 254]
colors_l[1, :] = [0, 131, 71]
colors_l[2, :] = [253, 0, 0]
colors_l[3, :] = [0, 253, 255]

In [None]:
y_all_l = model.predict(Patch_Data_l)
y_all_l = np.argmax(y_all_l, axis = 1)

In [None]:
dataset,channels,patch_size,m_W,m_H,filesize=Data_entry ()
with open(dataset  + '/' + dataset + '_' + str(channels) + '_' + str(patch_size) + '.pkl', 'rb') as f:
       x_ltrain, y_ltrain,  x_ltest, y_ltest = pickle.load(f)

In [None]:
pred = model.predict(x_ltest)
#y_pred = model.predict([x_test])
pred = np.argmax(pred, axis = 1)
print("Test accuracy is: ", accuracy_score(pred, y_ltest))

In [None]:
C = confusion_matrix(y_ltest, pred)
print("Class Accuracies: ")
s = 0
for i in range(0, C.shape[0]):
    s = s + C[i,i]
    print(C[i,i]/np.sum(C[i, :]))
print("Accuracy: ", s / len(pred))
print(C)

The above code is creating a color mask for the left image.

In [None]:
color_mask_l = np.zeros([m_H, m_W, 3])
counter = 0
for i in range(0, m_H):
    for j in range(0, m_W):
        color_mask_l[i, j, :] = colors_l[y_all_l[counter], :]
        counter = counter + 1
import cv2
color_mask = color_mask_l.astype(np.uint8)
color_mask = cv2.cvtColor(color_mask, cv2.COLOR_BGR2RGB)
cv2.imwrite("file12_l.jpg", color_mask)

In [None]:
import struct
def read_gtd(filename, size):
    positions = np.zeros((size,), dtype = 'float32')
    labels = np.zeros((size,), dtype = 'float32')
    f1 = open(filename + '_positions.gtd', 'rb')
    f2 = open(filename + '_labels.gtd', 'rb')
    for l in range(0, size):
        (num1,) = struct.unpack('f', f1.read(4))
        (num2,) = struct.unpack('f', f2.read(4))
        positions[l] = num1
        labels[l] = num2
    return positions, labels
### Read gtd
test_positions, test_labels = read_gtd(dataset+ '/' + dataset + '_test', test size)
#Converting from float to int
test_positions = np.array(test_positions, dtype = 'int')
test_labels = np.array(test_labels, dtype = 'int')

Creating a mask of the layout of the image.

In [None]:
color_mask_layout_l = np.zeros([m_H, m_W, 3])
counter = 0
for i in range(0, m_H):
    for j in range(0, m_W):
        if counter in test_positions: color_mask_layout_l[i, j, :] = colors_l[y_all_l[counter], :]
        counter = counter + 1
import cv2
color_mask_layout = color_mask_layout_l.astype(np.uint8)
color_mask_layout = cv2.cvtColor(color_mask_layout, cv2.COLOR_BGR2RGB)
cv2.imwrite("file2_l.jpg", color_mask_layout)