<a href="https://colab.research.google.com/github/aj1365/PolSARFormer/blob/main/PoLSARFormer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import keras
from keras.layers import Conv2D, Conv3D, Flatten, Dense, Reshape, BatchNormalization,GlobalAveragePooling2D
from keras.layers import Dropout, Input
from keras.models import Model
#from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint
from keras.utils import np_utils

from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score

from operator import truediv

from plotly.offline import init_notebook_mode

import numpy as np
import matplotlib.pyplot as plt
import scipy.io as sio
import os
import spectral

init_notebook_mode(connected=True)
%matplotlib inline

In [None]:
import torch
import torch.nn as nn
from timm.models.layers import trunc_normal_, DropPath
from timm.models.registry import register_model



In [None]:
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

In [None]:
def loadData(name):
    
    data_path = os.path.join(os.getcwd(),'E:/PolSAR/Data/')
   
    if name == 'Flevoland':
        
        data = sio.loadmat(os.path.join(data_path, 'Flevoland_T3RF.mat'))['T3RF']
        labels = sio.loadmat(os.path.join(data_path, 'Flevoland_15cls.mat'))['label']
  
    if name == 'SanFrancisco':
        
        data = sio.loadmat(os.path.join(data_path, 'SanFrancisco_T3RF.mat'))['T3RF']
        labels = sio.loadmat(os.path.join(data_path, 'SanFrancisco_gt.mat'))['SanFrancisco_gt']

    
    return data, labels

In [None]:
## GLOBAL VARIABLES
dataset = 'SanFrancisco'
test_ratio = 0.90
windowSize = 12

In [None]:
def splitTrainTestSet(X, y, testRatio, randomState=345):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testRatio, random_state=randomState,
                                                        stratify=y)
    return X_train, X_test, y_train, y_test

In [None]:
def applyPCA(X, numComponents=75):
    newX = np.reshape(X, (-1, X.shape[2]))
    pca = PCA(n_components=numComponents, whiten=True)
    newX = pca.fit_transform(newX)
    newX = np.reshape(newX, (X.shape[0],X.shape[1], numComponents))
    return newX, pca

In [None]:
def padWithZeros(X, margin=2):
    newX = np.zeros((X.shape[0] + 2 * margin, X.shape[1] + 2* margin, X.shape[2]))
    x_offset = margin
    y_offset = margin
    newX[x_offset:X.shape[0] + x_offset, y_offset:X.shape[1] + y_offset, :] = X
    return newX

In [None]:
def createImageCubes(X, y, windowSize=8, removeZeroLabels = True):
    margin = int((windowSize) / 2)
    zeroPaddedX = padWithZeros(X, margin=margin)
    # split patches
    patchesData = np.zeros((X.shape[0] * X.shape[1], windowSize, windowSize, X.shape[2]))
    patchesLabels = np.zeros((X.shape[0] * X.shape[1]))
    patchIndex = 0
    for r in range(margin, zeroPaddedX.shape[0] - margin):
        for c in range(margin, zeroPaddedX.shape[1] - margin):
            patch = zeroPaddedX[r - margin:r + margin , c - margin:c + margin ]   
            patchesData[patchIndex, :, :, :] = patch
            patchesLabels[patchIndex] = y[r-margin, c-margin]
            patchIndex = patchIndex + 1
    if removeZeroLabels:
        patchesData = patchesData[patchesLabels>0,:,:,:]
        patchesLabels = patchesLabels[patchesLabels>0]
        patchesLabels -= 1
    return patchesData, patchesLabels

In [None]:
X , Y = loadData(dataset)

X=(X-np.min(X))/(np.max(X)-np.min(X))

In [None]:
Y.shape, np.min(Y), np.max(Y)

In [None]:
X1, Y1 = createImageCubes(X, Y, windowSize=windowSize)
X1.shape, Y1.shape

In [None]:
X1 = X1.reshape((X1.shape[0],windowSize,windowSize,12,1))
X1.shape

In [None]:
Xtrain, Xtest, ytrain, ytest = splitTrainTestSet(X1, Y1, test_ratio)

np.min(ytrain), np.max(ytrain)

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras_cv_attention_models.attention_layers import (
    ChannelAffine,
    CompatibleExtractPatches,
    conv2d_no_bias,
    drop_block,
    layer_norm,
    mlp_block,
    output_block,
    add_pre_post_process,
)
from keras_cv_attention_models.download_and_load import reload_model_weights


In [None]:
class MultiHeadRelativePositionalKernelBias(tf.keras.layers.Layer):
    def __init__(self, input_height=-1, is_heads_first=False, **kwargs):
        super().__init__(**kwargs)
        self.input_height, self.is_heads_first = input_height, is_heads_first

    def build(self, input_shape):
        # input (is_heads_first=False): `[batch, height * width, num_heads, ..., size * size]`
        # input (is_heads_first=True): `[batch, num_heads, height * width, ..., size * size]`
        blocks, num_heads = (input_shape[2], input_shape[1]) if self.is_heads_first else (input_shape[1], input_shape[2])
        size = int(tf.math.sqrt(float(input_shape[-1])))
        height = self.input_height if self.input_height > 0 else int(tf.math.sqrt(float(blocks)))
        width = blocks // height
        pos_size = 2 * size - 1
        initializer = tf.initializers.truncated_normal(stddev=0.02)
        self.pos_bias = self.add_weight(name="positional_embedding", shape=(num_heads, pos_size * pos_size), initializer=initializer, trainable=True)

        idx_hh, idx_ww = tf.range(0, size), tf.range(0, size)
        coords = tf.reshape(tf.expand_dims(idx_hh, -1) * pos_size + idx_ww, [-1])
        bias_hh = tf.concat([idx_hh[: size // 2], tf.repeat(idx_hh[size // 2], height - size + 1), idx_hh[size // 2 + 1 :]], axis=-1)
        bias_ww = tf.concat([idx_ww[: size // 2], tf.repeat(idx_ww[size // 2], width - size + 1), idx_ww[size // 2 + 1 :]], axis=-1)
        bias_hw = tf.expand_dims(bias_hh, -1) * pos_size + bias_ww
        bias_coords = tf.expand_dims(bias_hw, -1) + coords
        bias_coords = tf.reshape(bias_coords, [-1, size**2])[::-1]  # torch.flip(bias_coords, [0])

        bias_coords_shape = [bias_coords.shape[0]] + [1] * (len(input_shape) - 4) + [bias_coords.shape[1]]
        self.bias_coords = tf.reshape(bias_coords, bias_coords_shape)  # [height * width, 1 * n, size * size]
        if not self.is_heads_first:
            self.transpose_perm = [1, 0] + list(range(2, len(input_shape) - 1))  # transpose [num_heads, height * width] -> [height * width, num_heads]

    def call(self, inputs):
        if self.is_heads_first:
            return inputs + tf.gather(self.pos_bias, self.bias_coords, axis=-1)
        else:
            return inputs + tf.transpose(tf.gather(self.pos_bias, self.bias_coords, axis=-1), self.transpose_perm)

    def get_config(self):
        base_config = super().get_config()
        base_config.update({"input_height": self.input_height, "is_heads_first": self.is_heads_first})
        return base_config


def LWA(
    inputs, kernel_size=7, num_heads=4, key_dim=0, out_weight=True, qkv_bias=True, out_bias=True, attn_dropout=0, output_dropout=0, name=None
):
    _, hh, ww, cc = inputs.shape
    key_dim = key_dim if key_dim > 0 else cc // num_heads
    qk_scale = 1.0 / (float(key_dim) ** 0.5)
    out_shape = cc
    qkv_out = num_heads * key_dim

    should_pad_hh, should_pad_ww = max(0, kernel_size - hh), max(0, kernel_size - ww)
    if should_pad_hh or should_pad_ww:
        inputs = tf.pad(inputs, [[0, 0], [0, should_pad_hh], [0, should_pad_ww], [0, 0]])
        _, hh, ww, cc = inputs.shape

    qkv = keras.layers.Dense(qkv_out * 3, use_bias=qkv_bias, name=name and name + "qkv")(inputs)
    query, key_value = tf.split(qkv, [qkv_out, qkv_out * 2], axis=-1)  # Matching weights from PyTorch
    query = tf.expand_dims(tf.reshape(query, [-1, hh * ww, num_heads, key_dim]), -2)  # [batch, hh * ww, num_heads, 1, key_dim]

    # key_value: [batch, height // kernel_size, width // kernel_size, kernel_size, kernel_size, key + value]
    key_value = CompatibleExtractPatches(sizes=kernel_size, strides=1, padding="VALID", compressed=False)(key_value)
    padded = (kernel_size - 1) // 2
    # torch.pad 'replicate'
    key_value = tf.concat([tf.repeat(key_value[:, :1], padded, axis=1), key_value, tf.repeat(key_value[:, -1:], padded, axis=1)], axis=1)
    key_value = tf.concat([tf.repeat(key_value[:, :, :1], padded, axis=2), key_value, tf.repeat(key_value[:, :, -1:], padded, axis=2)], axis=2)

    key_value = tf.reshape(key_value, [-1, kernel_size * kernel_size, key_value.shape[-1]])
    key, value = tf.split(key_value, 2, axis=-1)  # [batch * block_height * block_width, kernel_size * kernel_size, key_dim]
    key = tf.transpose(tf.reshape(key, [-1, key.shape[1], num_heads, key_dim]), [0, 2, 3, 1])  # [batch * hh*ww, num_heads, key_dim, kernel_size * kernel_size]
    key = tf.reshape(key, [-1, hh * ww, num_heads, key_dim, kernel_size * kernel_size])  # [batch, hh*ww, num_heads, key_dim, kernel_size * kernel_size]
    value = tf.transpose(tf.reshape(value, [-1, value.shape[1], num_heads, key_dim]), [0, 2, 1, 3])
    value = tf.reshape(value, [-1, hh * ww, num_heads, kernel_size * kernel_size, key_dim])  # [batch, hh*ww, num_heads, kernel_size * kernel_size, key_dim]
    # print(f">>>> {query.shape = }, {key.shape = }, {value.shape = }")

    # [batch, hh * ww, num_heads, 1, kernel_size * kernel_size]
    attention_scores = keras.layers.Lambda(lambda xx: tf.matmul(xx[0], xx[1]))([query, key]) * qk_scale
    attention_scores = MultiHeadRelativePositionalKernelBias(input_height=hh, name=name and name + "pos")(attention_scores)
    attention_scores = keras.layers.Softmax(axis=-1, name=name and name + "attention_scores")(attention_scores)
    attention_scores = keras.layers.Dropout(attn_dropout, name=name and name + "attn_drop")(attention_scores) if attn_dropout > 0 else attention_scores

    # attention_output = [batch, block_height * block_width, num_heads, 1, key_dim]
    attention_output = keras.layers.Lambda(lambda xx: tf.matmul(xx[0], xx[1]))([attention_scores, value])
    attention_output = tf.reshape(attention_output, [-1, hh, ww, num_heads * key_dim])
    # print(f">>>> {attention_output.shape = }, {attention_scores.shape = }")

    if should_pad_hh or should_pad_ww:
        attention_output = attention_output[:, : hh - should_pad_hh, : ww - should_pad_ww, :]

    if out_weight:
        # [batch, hh, ww, num_heads * key_dim] * [num_heads * key_dim, out] --> [batch, hh, ww, out]
        attention_output = keras.layers.Dense(out_shape, use_bias=out_bias, name=name and name + "output")(attention_output)
    attention_output = keras.layers.Dropout(output_dropout, name=name and name + "out_drop")(attention_output) if output_dropout > 0 else attention_output
    return attention_output


def LWA_block(inputs, attn_kernel_size=7, num_heads=4, mlp_ratio=4, mlp_drop_rate=0, attn_drop_rate=0, drop_rate=0, layer_scale=-1, name=None):
    input_channel = inputs.shape[-1]

    attn = layer_norm(inputs, name=name + "attn_")
    attn = LWA(attn, attn_kernel_size, num_heads, attn_dropout=attn_drop_rate, name=name + "attn_")
    attn = ChannelAffine(use_bias=False, weight_init_value=layer_scale, name=name + "1_gamma")(attn) if layer_scale >= 0 else attn
    attn = drop_block(attn, drop_rate=drop_rate, name=name + "attn_")
    attn_out = keras.layers.Add(name=name + "attn_out")([inputs, attn])

    mlp = layer_norm(attn_out, name=name + "mlp_")
    mlp = mlp_block(mlp, int(input_channel * mlp_ratio), activation="gelu", name=name + "mlp_")
    mlp = ChannelAffine(use_bias=False, weight_init_value=layer_scale, name=name + "2_gamma")(mlp) if layer_scale >= 0 else mlp
    mlp = drop_block(mlp, drop_rate=drop_rate, name=name + "mlp_")
    return keras.layers.Add(name=name + "output")([attn_out, mlp])

def FExtractor(inputs):
    
    x = Conv3D(filters=16, kernel_size=(1, 1, 7), activation='relu', padding='same')(inputs)
    x = Conv3D(filters=32, kernel_size=(3, 3, 5), activation='relu',padding='same')(x)
    x = Conv3D(filters=64, kernel_size=(5, 5, 7), activation='relu',padding='same')(x)
    x_shape = x.shape
    x = Reshape((x_shape[1], x_shape[2], x_shape[3]*x_shape[4]))(x)
    x = Conv2D(filters=12, kernel_size=(3,3), activation='relu',padding='same')(x)
    
    return x
    

def PolSARFormer(
    num_blocks=[3, 4],
    out_channels=[64, 128],
    num_heads=[2, 2],
    stem_width=-1,
    attn_kernel_size=7,
    mlp_ratio=3,
    layer_scale=-1,
    input_shape=(12, 12, 12,1),
    num_classes=5,
    drop_connect_rate=0,
    classifier_activation="softmax",
    dropout=0,
    pretrained=None,
    model_name="PolF",
    kwargs=None,
):
    """ConvTokenizer stem"""
    inputs = keras.layers.Input(input_shape)
    x=FExtractor(inputs)
    
    stem_width = stem_width if stem_width > 0 else out_channels[0]
    nn = conv2d_no_bias(x, stem_width // 2, kernel_size=3, strides=2, use_bias=True, padding="SAME", name="stem_1_")
    nn = conv2d_no_bias(nn, stem_width, kernel_size=3, strides=2, use_bias=True, padding="SAME", name="stem_2_")
    nn = layer_norm(nn, name="stem_")

    """ stages """
    total_blocks = sum(num_blocks)
    global_block_id = 0
    for stack_id, (num_block, out_channel, num_head) in enumerate(zip(num_blocks, out_channels, num_heads)):
        stack_name = "stack{}_".format(stack_id + 1)
        if stack_id > 0:
            ds_name = stack_name + "downsample_"
            nn = conv2d_no_bias(nn, out_channel, kernel_size=3, strides=2, padding="SAME", name=ds_name)
            nn = layer_norm(nn, name=ds_name)
        for block_id in range(num_block):
            block_name = stack_name + "block{}_".format(block_id + 1)
            block_drop_rate = drop_connect_rate * global_block_id / total_blocks
            nn = LWA_block(nn, attn_kernel_size, num_head, mlp_ratio, drop_rate=block_drop_rate, layer_scale=layer_scale, name=block_name)
            global_block_id += 1
    nn = layer_norm(nn, name="pre_output_")

    nn = output_block(x, num_classes=num_classes, drop_rate=dropout, classifier_activation=classifier_activation)
    model = keras.models.Model(inputs, nn, name=model_name)
    add_pre_post_process(model, rescale_mode="torch")
    return model



In [None]:
# Defining your Model .
model = PolSARFormer(input_shape=(12, 12, 12,1),out_channels=[64,128],num_heads=[2,2], num_classes=5)
model.summary()

In [None]:
import tensorflow_addons as tfa

weight_decay = 0.0001
batch_size = 256
dropout_rate = 0.4
learning_rate = 0.001

In [None]:
####                               K-Fold data validation



from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
from operator import truediv
from sklearn.model_selection import KFold
from tensorflow.keras.losses import sparse_categorical_crossentropy
from tensorflow.keras.optimizers import Adam

def AA_andEachClassAccuracy(confusion_matrix):
    counter = confusion_matrix.shape[0]
    list_diag = np.diag(confusion_matrix)
    list_raw_sum = np.sum(confusion_matrix, axis=1)
    each_acc = np.nan_to_num(truediv(list_diag, list_raw_sum))
    average_acc = np.mean(each_acc)
    return average_acc


# Define per-fold score containers
loss_function = sparse_categorical_crossentropy
no_classes = 5
no_epochs = 40
optimizer = Adam()
verbosity = 1
num_folds = 3
aa_per_fold = []
oa_per_fold = []
ki_per_fold = []

loss_function = sparse_categorical_crossentropy
# Merge inputs and targets
inputs = np.concatenate((Xtrain, Xtest), axis=0)
targets = np.concatenate((ytrain, ytest), axis=0)

# Define the K-fold Cross Validator
kfold = KFold(n_splits=num_folds, shuffle=True)

# K-fold Cross Validation model evaluation
fold_no = 1
for train, test in kfold.split(inputs, targets):

  # Define the model architecture

  model = PolSARFormer(input_shape=(12, 12, 12,1),out_channels=[64,128],num_heads=[2,2], num_classes=5)
  # Compile the model
  # Compile the model
  optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

  model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=False),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )



  # Generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  # Fit data to model
  history = model.fit(inputs[train], targets[train],
              batch_size=batch_size,
              epochs=5)



  # Generate generalization metrics
  scores = model.evaluate(inputs[test], targets[test], verbose=0)
  print(f'Score for fold {fold_no}: {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')
    
  Y_pred = model.predict(inputs[test])
  y_pred = np.argmax(Y_pred, axis=1)
  confusion = confusion_matrix(targets[test], y_pred)
  oa = accuracy_score(targets[test], y_pred)
  
  oa_per_fold.append(oa * 100)
  aa = AA_andEachClassAccuracy(confusion)
  aa_per_fold.append(aa * 100)
  kappa = cohen_kappa_score(targets[test], y_pred)
  ki_per_fold.append(kappa * 100)


  # Increase fold number
  fold_no = fold_no + 1
    
print(f'> OA: {np.mean(oa_per_fold)} (+- {np.std(oa_per_fold)})')
print(f'> AA: {np.mean(aa_per_fold)} (+- {np.std(aa_per_fold)})')
print(f'> KI: {np.mean(ki_per_fold)} (+- {np.std(ki_per_fold)})')


In [None]:
########################################## Without k-fold data validation

optimizer = tfa.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=False),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )

checkpoint_filepath = "E:/San.h5"
checkpoint_callback = keras.callbacks.ModelCheckpoint(
        checkpoint_filepath,
        monitor="val_accuracy",
        save_best_only=True,
        save_weights_only=True,
    )

history = model.fit(
        x=Xtrain,
        y=ytrain,
        batch_size=batch_size,
        epochs=100,
        validation_split=0.1,
        callbacks=[checkpoint_callback],
    )

In [None]:
Xtest = Xtest.reshape(-1, 12, 12, 12,1)


Xtest.shape

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
from operator import truediv

def AA_andEachClassAccuracy(confusion_matrix):
    counter = confusion_matrix.shape[0]
    list_diag = np.diag(confusion_matrix)
    list_raw_sum = np.sum(confusion_matrix, axis=1)
    each_acc = np.nan_to_num(truediv(list_diag, list_raw_sum))
    average_acc = np.mean(each_acc)
    return average_acc


Y_pred = model.predict(Xtest)
y_pred = np.argmax(Y_pred, axis=1)
confusion = confusion_matrix(ytest, y_pred)
oa = accuracy_score(ytest, y_pred)
aa = AA_andEachClassAccuracy(confusion)
kappa = cohen_kappa_score(ytest, y_pred)


oa, aa, kappa