<div align='center'>
    <img src="https://github.com/HamzaElshafie/Spectral-Spatial-Transformers-for-Precise-Crop-Classification-from-UAV-borne-Hyperspectral-Images/blob/HybridSN/Figs/HybridSNTemplate.png?raw=1" width="80%"/>
</div>

---
S. K. Roy, G. Krishna, S. R. Dubey and B. B. Chaudhuri, "HybridSN: Exploring 3-D–2-D CNN Feature Hierarchy for Hyperspectral Image Classification," in IEEE Geoscience and Remote Sensing Letters, vol. 17, no. 2, pp. 277-281, Feb. 2020, doi: 10.1109/LGRS.2019.2918719.



In [1]:
!apt-get install git-lfs
!git lfs install --skip-smudge
!git clone https://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@github.com/HamzaElshafie/Spectral-Spatial-Transformers-for-Precise-Crop-Classification-from-UAV-borne-Hyperspectral-Images.git

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
git-lfs is already the newest version (3.0.2-1ubuntu0.2).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.
Git LFS initialized.
Cloning into 'Spectral-Spatial-Transformers-for-Precise-Crop-Classification-from-UAV-borne-Hyperspectral-Images'...
remote: Enumerating objects: 452, done.[K
remote: Counting objects: 100% (173/173), done.[K
remote: Compressing objects: 100% (128/128), done.[K
remote: Total 452 (delta 97), reused 103 (delta 43), pack-reused 279 (from 1)[K
Receiving objects: 100% (452/452), 5.85 MiB | 4.81 MiB/s, done.
Resolving deltas: 100% (230/230), done.


In [2]:
# Comment out the checkout operation if we are in mainn
!cd /content/Spectral-Spatial-Transformers-for-Precise-Crop-Classification-from-UAV-borne-Hyperspectral-Images && git checkout HybridSN

Branch 'HybridSN' set up to track remote branch 'HybridSN' from 'origin'.
Switched to a new branch 'HybridSN'


In [3]:
!pip install einops
!pip install optuna
#!pip install torch
# Uninstall any existing PyTorch installation
#!pip uninstall -y torch
# Install PyTorch (GPU version with CUDA 12.1)
#!pip install torch==2.3.0 torchvision==0.18.0 torchaudio==2.3.0 --index-url https://download.pytorch.org/whl/cu121

Collecting optuna
  Downloading optuna-3.6.1-py3-none-any.whl.metadata (17 kB)
Collecting alembic>=1.5.0 (from optuna)
  Downloading alembic-1.13.2-py3-none-any.whl.metadata (7.4 kB)
Collecting colorlog (from optuna)
  Downloading colorlog-6.8.2-py3-none-any.whl.metadata (10 kB)
Collecting Mako (from alembic>=1.5.0->optuna)
  Downloading Mako-1.3.5-py3-none-any.whl.metadata (2.9 kB)
Downloading optuna-3.6.1-py3-none-any.whl (380 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m380.1/380.1 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading alembic-1.13.2-py3-none-any.whl (232 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m233.0/233.0 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorlog-6.8.2-py3-none-any.whl (11 kB)
Downloading Mako-1.3.5-py3-none-any.whl (78 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.6/78.6 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Ma

In [4]:
import numpy as np
import scipy.io as sio
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
import torch
import torch.nn as nn
import torch.optim as optim
import optuna
from operator import truediv
import sys
import time
import matplotlib.pyplot as plt
import os
import zipfile
import argparse
import scipy.io as sio
import psutil
import gc

In [5]:
from google.colab import files
files.upload()  # Manually upload kaggle.json

# Move kaggle.json to the correct location
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json


# **Training + Bayesian Optimization**



---



## HongHu


---



In [None]:
main_dir = '/content/Spectral-Spatial-Transformers-for-Precise-Crop-Classification-from-UAV-borne-Hyperspectral-Images'
sys.path.append(main_dir)

data_dir = '/content/Data'
sys.path.append(data_dir)

from data_fetcher import loadData
from HybridSN import HybridSN
# from HybridSN import get_cls_map

BATCH_SIZE_TRAIN = 64

NUM_CLASSES = {
    'HanChuan': 16,
    'HongHu': 22,
    'LongKou': 9
}

dataset_mapping = {
    'HanChuan': 'WHU-Hi-HanChuan',
    'HongHu': 'WHU-Hi-HongHu',
    'LongKou': 'WHU-Hi-LongKou'
}

def loadDataWrapper(dataset, kaggle_json_path, train_samples):
    dataset_name = dataset_mapping[dataset]
    full_data_file = f'{data_dir}/{dataset_name}/WHU_Hi_{dataset}.mat'
    train_file = f'{data_dir}/{dataset_name}/Training samples and test samples/Train{train_samples}.mat'
    test_file = f'{data_dir}/{dataset_name}/Training samples and test samples/Test{train_samples}.mat'

    data, labels = loadData(dataset_name, kaggle_json_path)

    if dataset == 'HongHu':
        train_mask = sio.loadmat(train_file)[f'HHCYtrain{train_samples}']
        test_mask = sio.loadmat(test_file)[f'HHCYtest{train_samples}']
    elif dataset == 'HanChuan':
        train_mask = sio.loadmat(train_file)[f'Train{train_samples}']
        test_mask = sio.loadmat(test_file)[f'Test{train_samples}']
    elif dataset == 'LongKou':
        train_mask = sio.loadmat(train_file)[f'LKtrain{train_samples}']
        test_mask = sio.loadmat(test_file)[f'LKtest{train_samples}']

    return data, labels, train_mask, test_mask

def applyPCA(X_train, X_test, pca_components=30):
    newX_train = np.reshape(X_train, (-1, X_train.shape[-1])).astype(np.float32)
    newX_test = np.reshape(X_test, (-1, X_test.shape[-1])).astype(np.float32)

    pca = PCA(n_components=pca_components, whiten=True)
    newX_train = pca.fit_transform(newX_train).astype(np.float32)

    valid_components = pca.n_components_
    print(f"Valid components: {valid_components}")

    newX_test = pca.transform(newX_test).astype(np.float32)
    newX_train = np.reshape(newX_train, (X_train.shape[0], X_train.shape[1], X_train.shape[2], valid_components))
    newX_test = np.reshape(newX_test, (X_test.shape[0], X_test.shape[1], X_test.shape[2], valid_components))

    return newX_train, newX_test, pca

def padWithZeros(X, margin=2):
    newX = np.zeros((X.shape[0] + 2 * margin, X.shape[1] + 2 * margin, X.shape[2]), dtype=np.float32)
    x_offset = margin
    y_offset = margin
    newX[x_offset:X.shape[0] + x_offset, y_offset:X.shape[1] + y_offset, :] = X.astype(np.float32)
    return newX

def createImageCubesWithMask(X, y, mask, mask_type, windowSize=13):
    margin = int((windowSize - 1) / 2)
    print(f"Margin: {margin}")
    zeroPaddedX = padWithZeros(X, margin=margin)
    print(f"Padded X shape: {zeroPaddedX.shape}")
    positions = np.argwhere(mask >= 1)
    patchesData = np.zeros((len(positions), windowSize, windowSize, X.shape[2]), dtype=np.float32)
    patchesLabels = np.zeros(len(positions))

    unique, counts = np.unique(mask, return_counts=True)
    mask_stats = dict(zip(unique, counts))
    print(f"{mask_type} statistics: {mask_stats}")

    print(f"y shape: {y.shape}, type: {type(y)}")
    print(f"{mask_type} positions shape: {positions.shape}, type: {type(positions)}")
    print("_________________________________________________________________________\n")

    for i, pos in enumerate(positions):
        x, y_pos = pos
        x_padded = x + margin
        y_padded = y_pos + margin
        patch = zeroPaddedX[x_padded - margin:x_padded + margin + 1, y_padded - margin:y_padded + margin + 1, :].astype(np.float32)
        patchesData[i, :, :, :] = patch
        patchesLabels[i] = y[x, y_pos] - 1

    return patchesData, patchesLabels

def createImageCubes(X, y, windowSize=13, removeZeroLabels=True):
    margin = int((windowSize - 1) / 2)
    zeroPaddedX = padWithZeros(X, margin=margin)
    patchesData = np.zeros((X.shape[0] * X.shape[1], windowSize, windowSize, X.shape[2]), dtype=np.float32)
    patchesLabels = np.zeros((X.shape[0] * X.shape[1]))
    patchIndex = 0
    for r in range(margin, zeroPaddedX.shape[0] - margin):
        for c in range(margin, zeroPaddedX.shape[1] - margin):
            patch = zeroPaddedX[r - margin:r + margin + 1, c - margin:c + margin + 1].astype(np.float32)
            patchesData[patchIndex, :, :, :] = patch
            patchesLabels[patchIndex] = y[r-margin, c-margin]
            patchIndex = patchIndex + 1
    if removeZeroLabels:
        patchesData = patchesData[patchesLabels > 0, :, :, :]
        patchesLabels = patchesLabels[patchesLabels > 0]
        patchesLabels -= 1

    return patchesData, patchesLabels

def create_data_loader(dataset, kaggle_json_path, patch_size, train_samples):
    X, y, train_mask, test_mask = loadDataWrapper(dataset, kaggle_json_path, train_samples)
    pca_components = 30

    print('Hyperspectral data shape: ', X.shape)
    print('Label shape: ', y.shape)

    print(f'Train mask shape: {train_mask.shape}')
    print(f'Test mask shape: {test_mask.shape}')

    print(f"Patch (window) size: {patch_size}")

    print('\n... ... Create data cubes with masks (Before PCA to avoid data leakage) ... ...')
    Xtrain, ytrain = createImageCubesWithMask(X, y, train_mask, mask_type="Training mask", windowSize=patch_size)
    Xtest, ytest = createImageCubesWithMask(X, y, test_mask, mask_type="Testing mask", windowSize=patch_size)
    print('Xtrain shape: ', Xtrain.shape)
    print('Xtest shape: ', Xtest.shape)
    print('ytrain shape: ', ytrain.shape)
    print('ytest shape: ', ytest.shape)

    gc.collect()

    print('\n... ... PCA tranformation ... ...')
    X_train_pca, X_test_pca, pca_model = applyPCA(Xtrain, Xtest, pca_components)
    print('Xtrain shape after PCA: ', X_train_pca.shape)
    print('Xtest shape after PCA: ', X_test_pca.shape)

    X_train_pca = X_train_pca.reshape(-1, patch_size, patch_size, pca_model.n_components_, 1)
    X_test_pca = X_test_pca.reshape(-1, patch_size, patch_size, pca_model.n_components_, 1)
    print('before transpose: Xtrain shape: ', X_train_pca.shape)
    print('before transpose: Xtest shape: ', X_test_pca.shape)

    X_train_pca = X_train_pca.transpose(0, 4, 3, 1, 2)
    X_test_pca = X_test_pca.transpose(0, 4, 3, 1, 2)
    print('after transpose: Xtrain shape: ', X_train_pca.shape)
    print('after transpose: Xtest shape: ', X_test_pca.shape)

    trainset = TrainDS(X_train_pca, ytrain)
    testset = TestDS(X_test_pca, ytest)
    train_loader = torch.utils.data.DataLoader(dataset=trainset, batch_size=BATCH_SIZE_TRAIN, shuffle=True, num_workers=2)
    test_loader = torch.utils.data.DataLoader(dataset=testset, batch_size=BATCH_SIZE_TRAIN, shuffle=False, num_workers=2)

    gc.collect()

    newX = np.reshape(X, (-1, X.shape[-1])).astype(np.float32)
    X_pca_full = pca_model.transform(newX).astype(np.float32)
    X_pca_full = np.reshape(X_pca_full, (X.shape[0], X.shape[1], pca_model.n_components_))

    X_pca_full, y_all = createImageCubes(X_pca_full, y, windowSize=patch_size)
    X_pca_full = X_pca_full.reshape(-1, patch_size, patch_size, pca_model.n_components_, 1)
    X_pca_full = X_pca_full.transpose(0, 4, 3, 1, 2)
    all_data_loader = torch.utils.data.DataLoader(dataset=TestDS(X_pca_full, y_all), batch_size=BATCH_SIZE_TRAIN, shuffle=False, num_workers=2)

    return train_loader, test_loader, all_data_loader, y, pca_model.n_components_

def train(train_loader, num_classes, pca_components, lr, dropout, epochs=100):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(f"PCA components passed: {pca_components}")
    net = HybridSN.HybridSN(
        num_classes=num_classes,
        pca_components=pca_components,
        dropout=dropout).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(net.parameters(), lr=lr)

    total_loss = 0
    for epoch in range(epochs):
        for i, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            outputs = net(data)
            loss = criterion(outputs, target)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print('[Epoch: %d]   [loss avg: %.4f]   [current loss: %.4f]' % (epoch + 1, total_loss / (epoch + 1), loss.item()))
    print('Finished Training')
    return net, device

def test(device, net, test_loader):
    count = 0
    net.eval()
    y_pred_test = 0
    y_test = 0
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        outputs = net(inputs)
        outputs = np.argmax(outputs.detach().cpu().numpy(), axis=1)
        if count == 0:
            y_pred_test = outputs
            y_test = labels
            count = 1
        else:
            y_pred_test = np.concatenate((y_pred_test, outputs))
            y_test = np.concatenate((y_test, labels))

    return y_pred_test, y_test

def objective(trial):
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)
    dropout = trial.suggest_float('dropout', 0.0, 0.5)  # Changed to suggest_float due to deprecation
    patch_size = trial.suggest_categorical('patch_size', [7, 9, 11, 13, 15])

    train_loader, test_loader, all_data_loader, y_all, pca_components = create_data_loader(
        args.dataset, args.kaggle_json_path, patch_size, train_samples=300
    )

    net, _ = train(
        train_loader,
        num_classes=num_classes,
        pca_components=pca_components,
        lr=lr,
        dropout=dropout,
        epochs=100
    )

    y_pred_test, y_test = test(torch.device("cuda:0" if torch.cuda.is_available() else "cpu"), net, test_loader)
    oa = accuracy_score(y_test, y_pred_test)

    return oa

def AA_andEachClassAccuracy(confusion_matrix):
    list_diag = np.diag(confusion_matrix)
    list_raw_sum = np.sum(confusion_matrix, axis=1)
    each_acc = np.nan_to_num(np.divide(list_diag, list_raw_sum))
    average_acc = np.mean(each_acc)
    return each_acc, average_acc

def acc_reports(y_test, y_pred_test, dataset):
    target_mapping = {
    'HanChuan': ['Strawberry', 'Cowpea', 'Soybean', 'Sorghum', 'Water spinach', 'Watermelon', 'Greens', 'Trees', 'Grass', 'Red roof',
                 'Gray roof', 'Plastic', 'Bare soil', 'Road', 'Bright object', 'Water'],
    'HongHu': ['Red roof', 'Road', 'Bare soil', 'Cotton', 'Cotton firewood', 'Rape', 'Chinese cabbage', 'Pakchoi', 'Cabbage', 'Tuber mustard',
               'Brassica parachinensis', 'Brassica chinensis', 'Small Brassica chinensis', 'Lactuca sativa', 'Celtuce', 'Film covered lettuce',
               'Romaine lettuce', 'Carrot', 'White radish', 'Garlic sprout', 'Broad bean', 'Tree'],
    'LongKou': ['Corn', 'Cotton', 'Sesame', 'Broad-leaf soybean', 'Narrow-leaf soybean', 'Rice', 'Water', 'Roads and houses', 'Mixed weed']
    }
    target_names = target_mapping[dataset]
    classification = classification_report(y_test, y_pred_test, digits=4, target_names=target_names)
    oa = accuracy_score(y_test, y_pred_test)
    confusion = confusion_matrix(y_test, y_pred_test)
    each_acc, aa = AA_andEachClassAccuracy(confusion)
    kappa = cohen_kappa_score(y_test, y_pred_test)

    return classification, oa*100, confusion, each_acc*100, aa*100, kappa*100

class TestDS():
    def __init__(self, Xtest, ytest):
        self.len = Xtest.shape[0]
        self.x_data = torch.FloatTensor(Xtest)
        self.y_data = torch.LongTensor(ytest)

    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    def __len__(self):
        return self.len

class TrainDS():
    def __init__(self, Xtrain, ytrain):
        self.len = Xtrain.shape[0]
        self.x_data = torch.FloatTensor(Xtrain)
        self.y_data = torch.LongTensor(ytrain)

    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    def __len__(self):
        return self.len

class Args:
    def __init__(self, dataset, kaggle_json_path):
        self.dataset = dataset
        self.kaggle_json_path = kaggle_json_path

args = Args(dataset='HongHu', kaggle_json_path='~/.kaggle/kaggle.json')
num_classes = NUM_CLASSES[args.dataset]

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=50)

best_params = study.best_params
print("Best parameters: ", best_params)


train_loader, test_loader, all_data_loader, y_all, pca_components = create_data_loader(
    args.dataset, args.kaggle_json_path, patch_size=best_params['patch_size'],train_samples=300
)

# Training model using the optimal parameters found
tic1 = time.perf_counter()
best_net, device = train(train_loader, num_classes, pca_components, lr=best_params['lr'], dropout=best_params['dropout'], epochs=100)
toc1 = time.perf_counter()

torch.save(best_net.state_dict(), f'{main_dir}/HybridSN/cls_params/HybridSN_params_{args.dataset}.pth')

tic2 = time.perf_counter()
y_pred_test, y_test = test(device, best_net, test_loader)
toc2 = time.perf_counter()

classification, oa, confusion, each_acc, aa, kappa = acc_reports(y_test, y_pred_test, args.dataset)
print(f"{args.dataset} overall accuracy: {oa}")
classification = str(classification)
Training_Time = toc1 - tic1
Test_time = toc2 - tic2
report_file_name = f"{main_dir}/HybridSN/cls_results/classification_report_{args.dataset}.txt"
with open(report_file_name, 'w') as x_file:
    x_file.write('{} Training_Time (s)'.format(Training_Time))
    x_file.write('\n')
    x_file.write('{} Test_time (s)'.format(Test_time))
    x_file.write('\n')
    x_file.write('{} Kappa accuracy (%)'.format(kappa))
    x_file.write('\n')
    x_file.write('{} Overall accuracy (%)'.format(oa))
    x_file.write('\n')
    x_file.write('{} Average accuracy (%)'.format(aa))
    x_file.write('\n')
    x_file.write('{} Each accuracy (%)'.format(each_acc))
    x_file.write('\n')
    x_file.write('{}'.format(classification))
    x_file.write('\n')
    x_file.write('{}'.format(confusion))

#get_cls_map.get_cls_map(best_net, device, all_data_loader, y_all, args.dataset)