<a href="https://colab.research.google.com/github/aj1365/SGUMLP/blob/main/SGUMLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from keras.layers import Conv2D, Conv3D, Flatten, Dense, Reshape, BatchNormalization
from keras.layers import Dropout, Input
from keras.models import Model
from tensorflow.keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint
from keras.utils import np_utils
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
from operator import truediv
from plotly.offline import init_notebook_mode
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as sio
import os
import spectral

import tensorflow as tf
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers

init_notebook_mode(connected=True)
%matplotlib inline

In [None]:
################## You need to modify the output data here depending on the dataset

def loadData(name):

    data_path = os.path.join(os.getcwd(),'E:/Datasets/')

    if name == 'Houston':

        MS = sio.loadmat(os.path.join(data_path, 'HS-MS Houston2013/data_MS_HR.mat'))['data_MS_HR']
        HS= sio.loadmat(os.path.join(data_path, 'HS-MS Houston2013/data_HS_LR.mat'))['data_HS_LR']
        Train = sio.loadmat(os.path.join(data_path, 'HS-MS Houston2013/Train.mat'))['Train']
        Test = sio.loadmat(os.path.join(data_path, 'HS-MS Houston2013/Validation.mat'))['Validation']

    if name == 'Berlin':

        SAR = sio.loadmat(os.path.join(data_path, 'HS-SAR Berlin/data_SAR_HR.mat'))['data_SAR_HR']
        HS = sio.loadmat(os.path.join(data_path, 'HS-SAR Berlin/data_HS_LR.mat'))['data_HS_LR']
        Train= sio.loadmat(os.path.join(data_path, 'HS-SAR Berlin/TrainImage.mat'))['TrainImage']
        Test = sio.loadmat(os.path.join(data_path, 'HS-SAR Berlin/TestImage.mat'))['TestImage']

    if name == 'Augsburg':

        SAR = sio.loadmat(os.path.join(data_path, 'HS-SAR-DSM Augsburg/data_SAR_HR.mat'))['data_SAR_HR']
        HS = sio.loadmat(os.path.join(data_path, 'HS-SAR-DSM Augsburg/data_HS_LR.mat'))['data_HS_LR']
        DSM = sio.loadmat(os.path.join(data_path, 'HS-SAR-DSM Augsburg/data_DSM.mat'))['data_DSM']
        Train= sio.loadmat(os.path.join(data_path, 'HS-SAR-DSM Augsburg/TrainImage.mat'))['TrainImage']
        Test = sio.loadmat(os.path.join(data_path, 'HS-SAR-DSM Augsburg/TestImage.mat'))['TestImage']

    return SAR, HS, DSM, Train, Test


In [None]:
## GLOBAL VARIABLES
windowSize = 9

def applyPCA(X, numComponents=75):
    newX = np.reshape(X, (-1, X.shape[2]))
    pca = PCA(n_components=numComponents, whiten=True)
    newX = pca.fit_transform(newX)
    newX = np.reshape(newX, (X.shape[0],X.shape[1], numComponents))
    return newX, pca

def splitTrainTestSet(X, y, testRatio, randomState=345):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testRatio, random_state=randomState,
                                                        stratify=y)
    return X_train, X_test, y_train, y_test

def padWithZeros(X, margin=2):
    newX = np.zeros((X.shape[0] + 2 * margin, X.shape[1] + 2* margin, X.shape[2]))
    x_offset = margin
    y_offset = margin
    newX[x_offset:X.shape[0] + x_offset, y_offset:X.shape[1] + y_offset, :] = X
    return newX

def createImageCubes(X, y, windowSize=9, removeZeroLabels = True):
    margin = int((windowSize - 1) / 2)
    zeroPaddedX = padWithZeros(X, margin=margin)
    # split patches
    patchesData = np.zeros((X.shape[0] * X.shape[1], windowSize, windowSize, X.shape[2]))
    patchesLabels = np.zeros((X.shape[0] * X.shape[1]))
    patchIndex = 0
    for r in range(margin, zeroPaddedX.shape[0] - margin):
        for c in range(margin, zeroPaddedX.shape[1] - margin):
            patch = zeroPaddedX[r - margin:r + margin + 1, c - margin:c + margin + 1]
            patchesData[patchIndex, :, :, :] = patch
            patchesLabels[patchIndex] = y[r-margin, c-margin]
            patchIndex = patchIndex + 1
    if removeZeroLabels:
        patchesData = patchesData[patchesLabels>0,:,:,:]
        patchesLabels = patchesLabels[patchesLabels>0]
        patchesLabels -= 1
    return patchesData, patchesLabels

In [None]:
dataset = 'Augsburg'

SAR, HS , DSM, Train, Test = loadData(dataset)

K = 15 if dataset == 'Berlin' else 12
HSr, pca = applyPCA(HS, numComponents=K)

DSM= DSM.reshape((DSM.shape[0],DSM.shape[1],1))

In [None]:
XTrain1, Train1 = createImageCubes(SAR, Train, windowSize=windowSize)
XTest1, Test1 = createImageCubes(SAR, Test, windowSize=windowSize)


XTrain2, Train2 = createImageCubes(HSr, Train, windowSize=windowSize)
XTest2, Test2 = createImageCubes(HSr, Test, windowSize=windowSize)


XTrain3, Train3 = createImageCubes(DSM, Train, windowSize=windowSize)
XTest3, Test3 = createImageCubes(DSM, Test, windowSize=windowSize)

XtrainS = np.concatenate((XTrain1, XTrain2, XTrain3) , axis = 3)
XtestS = np.concatenate((XTest1, XTest2, XTest3) , axis = 3)

YtestS = Train1
YtrainS = Test1
num_classes = 7
input_shape = (9, 9, 17)

In [None]:
image_size=9
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomRotation(factor=0.02),
        layers.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)
# Compute the mean and the variance of the training data for normalization.
data_augmentation.layers[0].adapt(XtrainS)

In [None]:
from keras_cv_attention_models.backend import layers, models, functional, initializers

from keras_cv_attention_models.attention_layers import (
    ChannelAffine,
    CompatibleExtractPatches,
    depthwise_conv2d_no_bias,
    conv2d_no_bias,
    drop_block,
    layer_norm,
    mlp_block,
    output_block,
    add_pre_post_process,
)
from keras_cv_attention_models.download_and_load import reload_model_weights

In [None]:
from keras_cv_attention_models import backend
from keras_cv_attention_models.backend import layers, models, functional, image_data_format
from keras_cv_attention_models.attention_layers import activation_by_name, add_pre_post_process
from keras_cv_attention_models.download_and_load import reload_model_weights

BATCH_NORM_EPSILON = 1e-5

def spatial_gating_block(inputs, name=None):
    uu, vv = functional.split(inputs, 2, axis=-1 if backend.image_data_format() == "channels_last" else 1)
    vv = layer_norm(vv, name=name and name + "vv_ln")
    vv = layers.Permute((2, 1), name=name and name + "permute_1")(vv) if backend.image_data_format() == "channels_last" else vv
    ww_init = initializers.truncated_normal(stddev=1e-6)
    vv = layers.Dense(vv.shape[-1], kernel_initializer=ww_init, bias_initializer="ones", name=name and name + "vv_dense")(vv)
    vv = layers.Permute((2, 1), name=name and name + "permute_2")(vv) if backend.image_data_format() == "channels_last" else vv
    gated_out = layers.Multiply()([uu, vv])
    return gated_out

def layer_norm(inputs, axis="auto", name=None):
    """Typical LayerNormalization with epsilon=1e-5"""
    norm_axis = (-1 if backend.image_data_format() == "channels_last" else 1) if axis == "auto" else axis
    return layers.LayerNormalization(axis=norm_axis, epsilon=BATCH_NORM_EPSILON, name=name)(inputs)


def mlp_block(inputs, hidden_dim, output_channel=-1, drop_rate=0, use_conv=False, use_bias=True, activation="gelu", name=None):

    channnel_axis = -1 if image_data_format() == "channels_last" else (1 if use_conv else -1)
    output_channel = output_channel if output_channel > 0 else inputs.shape[channnel_axis]

    if use_conv:
        nn = layers.Conv2D(hidden_dim, kernel_size=1, use_bias=use_bias, name=name and name + "Conv_0")(inputs)
    else:
        nn = layers.Dense(hidden_dim, use_bias=use_bias, name=name and name + "Dense_0")(inputs)

    nn = spatial_gating_block(nn, name=name)

    nn = activation_by_name(nn, activation, name=name)
    nn = layers.Dropout(drop_rate)(nn) if drop_rate > 0 else nn

    if use_conv:
        nn = layers.Conv2D(output_channel, kernel_size=1, use_bias=use_bias, name=name and name + "Conv_1")(nn)
    else:

        nn = layers.Dense(output_channel, use_bias=use_bias, name=name and name + "Dense_1")(nn)

    nn = layers.Dropout(drop_rate)(nn) if drop_rate > 0 else nn
    return nn


def mixer_block(inputs, tokens_mlp_dim, channels_mlp_dim, use_bias=True, drop_rate=0, activation="gelu", data_format=None, name=None):

    data_format = backend.image_data_format() if data_format is None else data_format
    norm_axis = -1 if data_format == "channels_last" else 1

    nn = layer_norm(inputs, axis=norm_axis, name=name and name + "LayerNorm_0")
    nn = layers.Permute((2, 1), name=name and name + "permute_0")(nn) if data_format == "channels_last" else nn
    nn = mlp_block(nn, tokens_mlp_dim, use_bias=use_bias, activation=activation, name=name and name + "token_mixing/")
    nn = layers.Permute((2, 1), name=name and name + "permute_1")(nn) if data_format == "channels_last" else nn
    if drop_rate > 0:
        nn = layers.Dropout(drop_rate, noise_shape=(None, 1, 1), name=name and name + "token_drop")(nn)
    token_out = layers.Add(name=name and name + "add_0")([nn, inputs])

    nn = layer_norm(token_out, axis=norm_axis, name=name and name + "LayerNorm_1")
    nn = nn if data_format == "channels_last" else layers.Permute((2, 1), name=name and name + "permute_2")(nn)
    nn = mlp_block(nn, channels_mlp_dim, use_bias=use_bias, activation=activation, name=name and name + "channel_mixing/")
    channel_out = nn if data_format == "channels_last" else layers.Permute((2, 1), name=name and name + "permute_3")(nn)
    if drop_rate > 0:
        channel_out = layers.Dropout(drop_rate, noise_shape=(None, 1, 1), name=name and name + "channel_drop")(channel_out)
    return layers.Add(name=name and name + "output")([channel_out, token_out])


def Mixer(
    num_blocks,
    patch_size,
    stem_width,
    tokens_mlp_dim,
    channels_mlp_dim,
    input_shape=(9, 9, 17),
    attn_kernel_size=4,
    attn_drop_rate=0,
    num_heads=8,
    num_classes=7,
    activation="gelu",
    sam_rho=0,
    dropout=0,
    drop_connect_rate=0,
    classifier_activation="softmax",
    pretrained=None,
    model_name="mixer",
    kwargs=None,
):

    inputs = layers.Input(input_shape)

    X=inputs
    #X=data_augmentation(inputs)
    pos_emb1 = depthwise_conv2d_no_bias(X, kernel_size=1, padding="SAME", use_bias=True)
    pos_emb2 = depthwise_conv2d_no_bias(X, kernel_size=3, padding="SAME", use_bias=True)
    pos_emb3 = depthwise_conv2d_no_bias(X, kernel_size=5, padding="SAME", use_bias=True)
    pos_out = keras.layers.Add()([X, pos_emb1, pos_emb2, pos_emb3])

    X = layers.Add()([X, pos_out])

    nn = layers.Conv2D(stem_width, kernel_size=patch_size, strides=1, padding="VALID", name="stem")(X)
    new_shape = [nn.shape[1] * nn.shape[2], stem_width] if backend.image_data_format() == "channels_last" else [stem_width, nn.shape[2] * nn.shape[3]]
    nn = layers.Reshape(new_shape)(nn)

    drop_connect_s, drop_connect_e = drop_connect_rate if isinstance(drop_connect_rate, (list, tuple)) else [drop_connect_rate, drop_connect_rate]

    for ii in range(num_blocks):

        name = "{}_{}/".format("MixerBlock", str(ii))
        block_drop_rate = drop_connect_s + (drop_connect_e - drop_connect_s) * ii / num_blocks
        nn = mixer_block(nn, tokens_mlp_dim, channels_mlp_dim, drop_rate=block_drop_rate, activation=activation, name=name)
    nn = layer_norm(nn, name="pre_head_layer_norm")

    if num_classes > 0:
        nn = layers.GlobalAveragePooling1D()(nn)  # tf.reduce_mean(nn, axis=1)
        if dropout > 0 and dropout < 1:
            nn = layers.Dropout(dropout)(nn)
        nn = layers.Dense(num_classes, dtype="float32", activation=classifier_activation, name="head")(nn)

        model = models.Model(inputs, nn, name=model_name)

    return model


BLOCK_CONFIGS = {
    "s32": {
        "num_blocks": 1,
        "patch_size": 4,
        "stem_width": 256,
        "tokens_mlp_dim": 256,
        "channels_mlp_dim": 256,

}
}

def SGUMLP(input_shape=(9, 9, 17), num_classes=7, activation="gelu", classifier_activation="softmax", pretrained=None, **kwargs):
    return Mixer(**BLOCK_CONFIGS["s32"], **locals(), model_name="mixer_s32", **kwargs)


In [None]:
model=SGUMLP(input_shape=(9, 9, 17), num_classes=7)

In [None]:
weight_decay = 0.0001
batch_size = 256
dropout_rate = 0.4
learning_rate = 0.001

from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
from operator import truediv
from sklearn.model_selection import KFold
from tensorflow.keras.losses import sparse_categorical_crossentropy
from tensorflow.keras.optimizers import Adam

def AA_andEachClassAccuracy(confusion_matrix):
    counter = confusion_matrix.shape[0]
    list_diag = np.diag(confusion_matrix)
    list_raw_sum = np.sum(confusion_matrix, axis=1)
    each_acc = np.nan_to_num(truediv(list_diag, list_raw_sum))
    average_acc = np.mean(each_acc)
    return average_acc


# Define per-fold score containers
loss_function = sparse_categorical_crossentropy
no_classes = 7
no_epochs = 100
optimizer = Adam()
verbosity = 1
num_folds = 5
aa_per_fold = []
oa_per_fold = []
ki_per_fold = []

loss_function = sparse_categorical_crossentropy
# Merge inputs and targets
inputs = XtrainS
targets = YtrainS

# Define the K-fold Cross Validator
kfold = KFold(n_splits=num_folds, shuffle=True)

# K-fold Cross Validation model evaluation
fold_no = 1
for train, test in kfold.split(inputs, targets):

  # Define the model architecture

  model = SGUMLP(input_shape=(9, 9, 17), num_classes=7)
  # Compile the model
  # Compile the model
  optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

  model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=False),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )



  # Generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  # Fit data to model
  history = model.fit(inputs[train], targets[train],
              batch_size=batch_size,
              epochs=100)



  # Generate generalization metrics
  scores = model.evaluate(XtestS, YtestS, verbose=0)
  print(f'Score for fold {fold_no}: {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')

  Y_pred = model.predict(XtestS)
  y_pred = np.argmax(Y_pred, axis=1)
  confusion = confusion_matrix(YtestS, y_pred)
  oa = accuracy_score(YtestS, y_pred)

  oa_per_fold.append(oa * 100)
  aa = AA_andEachClassAccuracy(confusion)
  aa_per_fold.append(aa * 100)
  kappa = cohen_kappa_score(YtestS, y_pred)
  ki_per_fold.append(kappa * 100)


  # Increase fold number
  fold_no = fold_no + 1

print(f'> OA: {np.mean(oa_per_fold)} (+- {np.std(oa_per_fold)})')
print(f'> AA: {np.mean(aa_per_fold)} (+- {np.std(aa_per_fold)})')
print(f'> KI: {np.mean(ki_per_fold)} (+- {np.std(ki_per_fold)})')
