<a href="https://colab.research.google.com/github/aj1365/PolSARConvMixer/blob/main/PolSARConvMixer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import keras
from keras.layers import Conv2D, Dense, Reshape, BatchNormalization
from keras.layers import Dropout, Input
from keras.models import Model
from keras.callbacks import ModelCheckpoint
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
from operator import truediv
from plotly.offline import init_notebook_mode
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as sio
import os
import spectral
from keras import layers
import tensorflow as tf
init_notebook_mode(connected=True)
%matplotlib inline

In [None]:
def loadData(name):

    data_path = os.path.join(os.getcwd(),'C:/PolSAR/Data/')

    if name == 'Flevoland':

        data = sio.loadmat(os.path.join(data_path, 'Flevoland_T3RF.mat'))['T3RF']
        labels = sio.loadmat(os.path.join(data_path, 'Flevoland_15cls.mat'))['label']

    if name == 'SanFrancisco':

        data = sio.loadmat(os.path.join(data_path, 'SanFrancisco_T3RF.mat'))['T3RF']
        labels = sio.loadmat(os.path.join(data_path, 'SanFrancisco_gt.mat'))['SanFrancisco_gt']


    return data, labels

In [None]:
## GLOBAL VARIABLES
dataset = 'SanFrancisco'
test_ratio = 0.9
windowSize = 12

In [None]:
def splitTrainTestSet(X, y, testRatio, randomState=345):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testRatio, random_state=randomState,
                                                        stratify=y)
    return X_train, X_test, y_train, y_test

In [None]:
def applyPCA(X, numComponents=75):
    newX = np.reshape(X, (-1, X.shape[2]))
    pca = PCA(n_components=numComponents, whiten=True)
    newX = pca.fit_transform(newX)
    newX = np.reshape(newX, (X.shape[0],X.shape[1], numComponents))
    return newX, pca

In [None]:
def padWithZeros(X, margin=2):
    newX = np.zeros((X.shape[0] + 2 * margin, X.shape[1] + 2* margin, X.shape[2]))
    x_offset = margin
    y_offset = margin
    newX[x_offset:X.shape[0] + x_offset, y_offset:X.shape[1] + y_offset, :] = X
    return newX

In [None]:
def createImageCubes(X, y, windowSize=8, removeZeroLabels = True):
    margin = int((windowSize) / 2)
    zeroPaddedX = padWithZeros(X, margin=margin)
    # split patches
    patchesData = np.zeros((X.shape[0] * X.shape[1], windowSize, windowSize, X.shape[2]))
    patchesLabels = np.zeros((X.shape[0] * X.shape[1]))
    patchIndex = 0
    for r in range(margin, zeroPaddedX.shape[0] - margin):
        for c in range(margin, zeroPaddedX.shape[1] - margin):
            patch = zeroPaddedX[r - margin:r + margin , c - margin:c + margin ]
            patchesData[patchIndex, :, :, :] = patch
            patchesLabels[patchIndex] = y[r-margin, c-margin]
            patchIndex = patchIndex + 1
    if removeZeroLabels:
        patchesData = patchesData[patchesLabels>0,:,:,:]
        patchesLabels = patchesLabels[patchesLabels>0]
        patchesLabels -= 1
    return patchesData, patchesLabels

In [None]:
X , Y = loadData(dataset)
X=(X-np.min(X))/(np.max(X)-np.min(X))

In [None]:
X1, Y1 = createImageCubes(X, Y, windowSize=windowSize)
X1.shape, Y1.shape

In [None]:
Xtrain2, Xtest2, ytrain2, ytest2 = splitTrainTestSet(X1, Y1, test_ratio)
Xtrain, Xtest, ytrain, ytest = splitTrainTestSet(Xtrain2, ytrain2, 0.5)

Xtrain.shape, Xtest.shape, ytrain.shape, ytest.shape

In [None]:
del Xtrain2, Xtest2, Xtrain, Xtest, X1, Y1

In [None]:
from tensorflow.keras import layers
image_size=12
data_augmentation = keras.Sequential(
    [
        layers.Normalization(),
        layers.Resizing(image_size, image_size),
        layers.RandomFlip("horizontal"),
        layers.RandomZoom(
            height_factor=0.2, width_factor=0.2
        ),
    ],
    name="data_augmentation",
)
# Compute the mean and the variance of the training data for normalization.
data_augmentation.layers[0].adapt(Xtrain)

In [None]:
    ####################################################################### Create CNN model
#kSize=3
nBands=12
n_outputs=5

In [None]:
def activation_block(x):
    x = layers.Activation("gelu")(x)
    return layers.BatchNormalization()(x)

def conv_stem(x, filters: int, patch_size: int):
    x = layers.Conv2D(filters, kernel_size=patch_size, strides=patch_size)(x)
    return activation_block(x)

def conv_mixer_block(x, filters: int, kernel_size: int):
    # Depthwise convolution.
    x0 = x

    pos_emb1 = layers.DepthwiseConv2D(kernel_size=3, padding="same")(x)
    pos_emb2 = layers.DepthwiseConv2D(kernel_size=5, padding="same")(x)

    x = keras.layers.Add()([x0, pos_emb1, pos_emb2])

    # Pointwise convolution.
    x = layers.Conv2D(filters, kernel_size=1)(x)
    x = activation_block(x)
    x = layers.Add()([x, x0])  # Residual 2.

    return x

def get_PolSAR_conv_mixer(
    image_size=12, filters=256, depth=4, kernel_size=3, patch_size=2, num_classes=5
):

    inputs = keras.Input((image_size, image_size, 12))
    augmented = data_augmentation(inputs)
    x=augmented
    # Extract patch embeddings.
    x = conv_stem(x, filters, patch_size)

    # ConvMixer blocks.
    for _ in range(depth):
        x = conv_mixer_block(x, filters, kernel_size)

    # Classification block.
    x = layers.GlobalAvgPool2D()(x)
    outputs = layers.Dense(num_classes, activation="softmax")(x)

    return keras.Model(inputs, outputs)
  model=get_PolSAR_conv_mixer()

In [None]:
weight_decay = 0.0001
batch_size = 256
dropout_rate = 0.4
learning_rate = 0.001

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
from operator import truediv
from sklearn.model_selection import KFold
from tensorflow.keras.losses import sparse_categorical_crossentropy
from tensorflow.keras.optimizers import Adam

def AA_andEachClassAccuracy(confusion_matrix):
    counter = confusion_matrix.shape[0]
    list_diag = np.diag(confusion_matrix)
    list_raw_sum = np.sum(confusion_matrix, axis=1)
    each_acc = np.nan_to_num(truediv(list_diag, list_raw_sum))
    average_acc = np.mean(each_acc)
    return average_acc


# Define per-fold score containers
loss_function = sparse_categorical_crossentropy
no_classes = 5
no_epochs = 5
optimizer = Adam()
verbosity = 1
num_folds = 3
aa_per_fold = []
oa_per_fold = []
ki_per_fold = []

loss_function = sparse_categorical_crossentropy
# Merge inputs and targets
inputs = np.concatenate((Xtrain, Xtest), axis=0)
targets = np.concatenate((ytrain, ytest), axis=0)

# Define the K-fold Cross Validator
kfold = KFold(n_splits=num_folds, shuffle=True)

# K-fold Cross Validation model evaluation
fold_no = 1
for train, test in kfold.split(inputs, targets):


  # Define the model architecture
    model=get_conv_mixer()
  # Compile the model
    optimizer = keras.optimizers.AdamW(
        learning_rate=learning_rate, weight_decay=weight_decay
    )

    model.compile(
        optimizer=optimizer,
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=False),
        metrics=[
            keras.metrics.SparseCategoricalAccuracy(name="accuracy"),
            keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )


  # Generate a print
    print('------------------------------------------------------------------------')
    print(f'Training for fold {fold_no} ...')

  # Fit data to model
    history = model.fit(inputs[train], targets[train],
              batch_size=batch_size,
              epochs=5)



  # Generate generalization metrics
    scores = model.evaluate(inputs[test], targets[test], verbose=0)
    print(f'Score for fold {fold_no}: {model.metrics_names[0]} of {scores[0]}; {model.metrics_names[1]} of {scores[1]*100}%')

    Y_pred = model.predict(inputs[test])
    y_pred = np.argmax(Y_pred, axis=1)
    confusion = confusion_matrix(targets[test], y_pred)
    oa = accuracy_score(targets[test], y_pred)

    oa_per_fold.append(oa * 100)
    aa = AA_andEachClassAccuracy(confusion)
    aa_per_fold.append(aa * 100)
    kappa = cohen_kappa_score(targets[test], y_pred)
    ki_per_fold.append(kappa * 100)


  # Increase fold number
    fold_no = fold_no + 1


In [None]:
print(f'> OA: {np.mean(oa_per_fold)} (+- {np.std(oa_per_fold)})')
print(f'> AA: {np.mean(aa_per_fold)} (+- {np.std(aa_per_fold)})')
print(f'> KI: {np.mean(ki_per_fold)} (+- {np.std(ki_per_fold)})')