In [None]:
## IMPORTING OF LIBRARIES

import numpy as np
from sklearn.decomposition import PCA
import scipy.io as sio
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
import os
import random
from random import shuffle
import scipy.ndimage
from keras.utils import to_categorical
from sklearn.model_selection import StratifiedKFold
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from sklearn.metrics import classification_report, confusion_matrix
import pickle
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPool2D, Flatten, Dense, Dropout, Input, Concatenate
import time

In [None]:
#this function is used to load all data i.e complete labelled samples
def loadData(path, filename):
    ''' x contains the features in image dimension and labels contains the actual gt'''
    data_path = os.path.join(os.getcwd(), path)
    data = sio.loadmat(os.path.join(data_path, filename))['data']
    y = sio.loadmat(os.path.join(data_path, 'gt.mat'))['label']
    x = data
    labels = y
    return x, labels


# filename = 'T3_SVD'
filename = 'RF_svd'

# data_path = os.path.join(os.getcwd(),"/home/workspace/RS2/")
# data_path = os.path.join(os.getcwd(),"/home/workspace/airsar-sf/")
data_path = os.path.join(os.getcwd(),"/home/workspace/flevo/")

X, labels = loadData(data_path, filename)
X.shape


In [None]:

def padWithZeros(X, margin=2):
    newX = np.zeros((X.shape[0] + 2 * margin, X.shape[1] + 2* margin, X.shape[2]))
    x_offset = margin
    y_offset = margin
    newX[x_offset:X.shape[0] + x_offset, y_offset:X.shape[1] + y_offset, :] = X
    return newX


def createPatches(X, y, windowSize=5, removeZeroLabels = True):
    margin = int((windowSize - 1) / 2)
    zeroPaddedX = padWithZeros(X, margin=margin)
    # split patches
    patchesData = np.zeros((X.shape[0] * X.shape[1], windowSize, windowSize, X.shape[2]))
    patchesLabels = np.zeros((X.shape[0] * X.shape[1]))
    patchIndex = 0
    for r in range(margin, zeroPaddedX.shape[0] - margin):
        for c in range(margin, zeroPaddedX.shape[1] - margin):
            patch = zeroPaddedX[r - margin:r + margin + 1, c - margin:c + margin + 1]
            patchesData[patchIndex, :, :, :] = patch
            patchesLabels[patchIndex] = y[r-margin, c-margin]
            patchIndex = patchIndex + 1
    if removeZeroLabels:
        patchesData = patchesData[patchesLabels>0,:,:,:]
        patchesLabels = patchesLabels[patchesLabels>0]
        patchesLabels -= 1
    return patchesData, patchesLabels

windowSize = 7
XPatches, yPatches = createPatches(X, labels, windowSize=windowSize)

XPatches.shape, yPatches.shape

In [None]:
def reports(model, X_test, y_test):
    Y_pred = model.predict(X_test)
    y_pred = np.argmax(Y_pred, axis=1)

# #     ## target_names for Flevo
    target_names = ['a', 'b', 'c', 'd'
            ,'e', 'f', 'g',
                'h', 'i', 'j', 'k',  'l', 'm', 'n', 'o']

    ## target_names for SF
    # target_names = ['a', 'b', 'c', 'd', 'e']

    #
    classification = classification_report(np.argmax(y_test, axis=1), y_pred, target_names=target_names)
    confusion = confusion_matrix(np.argmax(y_test, axis=1), y_pred)
    score = model.evaluate(X_test, y_test, batch_size=256)
    print(score)

    Test_Loss =  score[0]*100
    Test_accuracy = score[1]*100

    return classification, confusion, Test_Loss, Test_accuracy

def generate_reports(XPatches, yPatches, model, filename, index):
    #creation of test patches to include all the samples
#     Xp1 = XPatches[:, :, :,:9] ## FOR T3_SVD
    Xp1 = XPatches[:, :, :,:6] ## FOR RF_SVD
    # Reshape Xpatches2
#     Xp2 = XPatches[:, :, :, 9:45] ## FOR T3_SVD
    Xp2 = XPatches[:, :, :, 6:30] ## FOR RF_SVD

    X_test = [Xp1,Xp2]
    y_test = yPatches

    #convert the y labels to categorical
    y_test = to_categorical(y_test)

    classification, confusion, Test_loss, Test_accuracy = reports(model,X_test,y_test)

    # Calculate class-wise accuracy
    class_wise_acc = np.diag(confusion) / np.sum(confusion, axis = 1)

    # Calculate overall accuracy
    overall_acc = np.sum(np.diag(confusion)) / np.sum(confusion)

    # Calculate average accuracy
    average_acc = np.mean(class_wise_acc)

    # Calculate chance agreement
    chance_agreement = np.sum(np.sum(confusion, axis=1) * np.sum(confusion, axis=0)) / (np.sum(confusion) ** 2)

    # Calculate kappa
    kappa = (overall_acc - chance_agreement) / (1 - chance_agreement)

    print("Class-wise accuracy:", class_wise_acc)
    print("Overall accuracy:", overall_acc)
    print("Average accuracy:", average_acc)
    print("Kappa:", kappa)

    classification = str(classification)
    confusion = str(confusion)
    file_name = f"Accy of {filename}-{index}.txt"
    with open(file_name, 'w') as x_file:
        x_file.write('{} Test loss (%)'.format(Test_loss))
        x_file.write('\n')
        x_file.write('{} Test accuracy (%)'.format(Test_accuracy))
        x_file.write('\n')
        x_file.write('\n')
        x_file.write('{}'.format(classification))
        x_file.write('\n')
        x_file.write('{}'.format(confusion))
        x_file.write('\n')
        x_file.write('\n')
        x_file.write("Class-wise accuracy:{}".format(class_wise_acc))
        x_file.write('\n')
        x_file.write("Overall accuracy:{}".format(overall_acc))
        x_file.write('\n')
        x_file.write("Average accuracy:{}".format(average_acc))
        x_file.write('\n')
        x_file.write("Kappa:{}".format(kappa))

In [None]:
def splitTrainTestSet(X, y, testRatio=0.9):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testRatio, random_state=345,
                                                        stratify=y)
    return X_train, X_test, y_train, y_test


X_train, X_test, y_train, y_test = splitTrainTestSet(XPatches, yPatches)
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)

In [None]:
#     *****************Reshape Train & Teset set*****************

## Reshape of Train set
# X_train1 = X_train[:, :, :,:9] ## FOR T3_SVD
X_train1 = X_train[:, :, :,:6] ## FOR RF_SVD
# X_train1.shape

# X_train2 = X_train[:, :, :, 9:45] ## FOR T3_SVD
X_train2 = X_train[:, :, :, 6:30] ## FOR RF_SVD

X_train1.shape, X_train2.shape

## Reshape of Test set
# X_test1 = X_test[:, :, :,:9] ## FOR T3_SVD
X_test1 = X_test[:, :, :,:6] ## FOR RF_SVD
# X_train1.shape

# Reshape Xpatches2
# X_test2 = X_test[:, :, :, 9:45] ## FOR T3_SVD
X_test2 = X_test[:, :, :, 6:30] ## FOR RF_SVD


X_test1.shape, X_test2.shape

y_train = to_categorical(y_train)

# Define the input shape
input_shape1 = X_train1[0].shape
print("input_shape1: ",input_shape1)
input_shape2 = X_train2[0].shape
print("input_shape2: ",input_shape2)


In [None]:
#     *************************MODEL*****************
num_classes = 15 ## FOR FLEVO
# num_classes = 5 ## FOR SF

# Create two separate channels
input_channel1 = Input(shape=input_shape1)
input_channel2 = Input(shape=input_shape2)

# Define the shared convolutional layers
conv1_channel1 = Conv2D(30, (2, 2), strides=1, padding="same", activation='relu')(input_channel1)
maxpool1_channel1 = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(conv1_channel1)

conv2_channel1 = Conv2D(60, (2, 2), strides=1, padding="same", activation='relu')(maxpool1_channel1)
maxpool2_channel1 = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(conv2_channel1)

conv3_channel1 = Conv2D(120, (2, 2), strides=1, padding="same", activation='relu')(maxpool2_channel1)

conv1_channel2 = Conv2D(30, (2, 2), strides=1, padding="same", activation='relu')(input_channel2)
maxpool1_channel2 = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(conv1_channel2)

conv2_channel2 = Conv2D(60, (2, 2), strides=1, padding="same", activation='relu')(maxpool1_channel2)
maxpool2_channel2 = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(conv2_channel2)

conv3_channel2 = Conv2D(120, (2, 2), strides=1, padding="same", activation='relu')(maxpool2_channel2)

# Concatenate the outputs of both channels
concatenated_channels = Concatenate()([conv3_channel1, conv3_channel2])

# Continue with the rest of the model
flatten = Flatten()(concatenated_channels)
dense1 = Dense(240, activation='relu')(flatten)
dropout = Dropout(0.5)(dense1)
output = Dense(num_classes, activation='softmax')(dropout)

# Create the final model with two input channels
two_channel_model = Model(inputs=[input_channel1, input_channel2], outputs=output)

# Compile the model as needed
two_channel_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Print the model summary
two_channel_model.summary()

start = time.time()
two_channel_model.fit([X_train1, X_train2], y_train, batch_size=256, epochs=2)
print("Total Training time: ", time.time() - start, "seconds")

In [None]:
generate_reports(XPatches, yPatches, two_channel_model, filename, 1)