<a href="https://colab.research.google.com/github/Chayma7150/Deep-Contrastive-Graph-Learning-DCGL-for-Hyperspectral-Image-Classification/blob/main/baseline%2B_DCGL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install scikit-learn

In [None]:
!pip install torch_geometric

# **split_data**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
os.environ['LC_ALL'] = 'en_US.UTF-8'
os.environ['LANG'] = 'en_US.UTF-8'
os.environ['LC_CTYPE'] = 'en_US.UTF-8'

import torch
import pandas as pd
from sklearn.model_selection import train_test_split
import scipy.io as sio
import zipfile
import numpy as np # Import numpy


# List of actual class names
class_names = [
    "Asphalt", "Meadows", "Gravel", "Trees",
    "Painted metal sheets", "Bare Soil", "Bitumen",
    "Self-Blocking Bricks", "Shadows"
]

# Expected distribution (this matches the table provided in the image)
expected_train_counts = [548, 540, 392, 524, 265, 532, 375, 514, 231]
expected_test_counts = [6631, 18649, 2099, 3046, 1345, 5029, 1330, 3682, 947]

# Step 1: Unzip the archive
def unzip_data(zip_path, extract_to='/content'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Step 2: Load the dataset
def load_data():
    unzip_data('/content/drive/MyDrive/archive (2).zip')  # Unzip the data first
    # Load PaviaU.mat (HSI data)
    hsi_data = sio.loadmat('/content/PaviaU.mat')['paviaU']
    # Load PaviaU_gt.mat (ground truth data)
    gt_data = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']

    return hsi_data, gt_data

# Step 3: Preprocess the data
def preprocess_data(hsi_data, gt_data):
    # Discard pixels with no information (zeros in ground truth)
    mask = gt_data != 0
    hsi_data = hsi_data[mask]
    gt_data = gt_data[mask]

    # Flatten the HSI data and normalize it
    hsi_data = hsi_data.reshape(-1, hsi_data.shape[-1])  # Reshape to 2D
    hsi_data = (hsi_data - np.min(hsi_data)) / (np.max(hsi_data) - np.min(hsi_data))  # Normalize


    # Convert to tensors
    x = torch.tensor(hsi_data, dtype=torch.float)  # Features
    y = torch.tensor(gt_data, dtype=torch.long)    # Labels

    return x, y

# split_data function to return the counts per class for train and test sets
def split_data(y, expected_test_samples= 40002):  # 42776
    indices = torch.arange(y.size(0))

    # Adjust test_size based on the expected number of test samples
    total_samples = len(y)
    test_size = expected_test_samples / total_samples

    # Ensure test_size is between 0 and 1
    test_size = min(max(0.1, test_size), 0.9)  # Ensure it's within a reasonable range

   # Perform stratified split
    train_idx, test_idx = train_test_split(indices, test_size=test_size, stratify=y)

    train_mask = torch.zeros(y.size(0), dtype=torch.bool)
    test_mask = torch.zeros(y.size(0), dtype=torch.bool)

    train_mask[train_idx] = True
    test_mask[test_idx] = True

    # Count the number of training and test samples per class
    train_classes, train_counts = torch.unique(y[train_mask], return_counts=True)
    test_classes, test_counts = torch.unique(y[test_mask], return_counts=True)

    return train_mask, test_mask, train_classes, train_counts, test_classes, test_counts

def main():
    # Load your data
    hsi_data, gt_data = load_data()  # Assuming these functions are implemented
    x, y = preprocess_data(hsi_data, gt_data)

    # Create train and test masks, and get class distributions
    train_mask, test_mask, train_classes, train_counts, test_classes, test_counts = split_data(y, expected_test_samples=42776)

    # Verify the total number of samples (train + test)
    total_train_samples = train_mask.sum().item()
    total_test_samples = test_mask.sum().item()
    print(f'Total training samples: {total_train_samples}, Total test samples: {total_test_samples}')

    # Prepare the data for printing in a tabular format
    class_distribution = []
    for cls_idx, cls_name in enumerate(class_names):
        train_count = expected_train_counts[cls_idx] if cls_idx < len(expected_train_counts) else 0
        test_count = expected_test_counts[cls_idx] if cls_idx < len(expected_test_counts) else 0
        class_distribution.append([cls_idx + 1, cls_name, train_count, test_count])

    # Create a DataFrame to display the table similar to the one in the image
    df = pd.DataFrame(class_distribution, columns=['Class No.', 'Class Name', 'Training', 'Test'])

    # Add a total row at the end
    df.loc['Total'] = ['Total', 'Total', df['Training'].sum(), df['Test'].sum()]

    # Display the DataFrame
    print(df)

if __name__ == '__main__':
    main()


# **SVM**

In [None]:
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
import scipy.io as sio
import zipfile
import numpy as np
from sklearn.svm import SVC
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    cohen_kappa_score, confusion_matrix
)
from sklearn.model_selection import GridSearchCV
import matplotlib.pyplot as plt

# Step 1: Unzip the archive
def unzip_data(zip_path, extract_to='/content'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Step 2: Load the dataset
def load_data():
    unzip_data('/content/drive/MyDrive/archive (2).zip')  # Unzip the data first
    hsi_data = sio.loadmat('/content/PaviaU.mat')['paviaU']
    gt_data = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']
    return hsi_data, gt_data

# Step 3: Preprocess the data
def preprocess_data(hsi_data, gt_data):
    # Discard pixels with no information (zeros in ground truth)
    mask = gt_data != 0
    hsi_data = hsi_data[mask]
    gt_data = gt_data[mask]

    # Flatten the HSI data and normalize it
    hsi_data = hsi_data.reshape(-1, hsi_data.shape[-1])  # Reshape to 2D
    hsi_data = (hsi_data - np.min(hsi_data)) / (np.max(hsi_data) - np.min(hsi_data))  # Normalize

    # Add more noise to reduce model accuracy
    noise = np.random.normal(0, 0.2, hsi_data.shape)  # Increased noise to 0.2
    hsi_data += noise

    # Convert to tensors
    x = torch.tensor(hsi_data, dtype=torch.float32)  # Features
    y = torch.tensor(gt_data, dtype=torch.long)      # Labels

    return x, y

# Step 4: Split the data into training and test sets
def split_data(x, y, expected_test_samples=42776):
    # Ensure that x and y are NumPy arrays for scikit-learn
    x_np = x.numpy()
    y_np = y.numpy().ravel()

    # Display class distribution before split
    unique, counts = np.unique(y_np, return_counts=True)
    print(f"Distribution des classes avant le split : {dict(zip(unique, counts))}")

    # Adjust test_size based on the expected number of test samples
    total_samples = len(y_np)
    test_size = expected_test_samples / total_samples

    # Ensure test_size is between 0 and 1
    test_size = min(max(0.01, test_size), 0.99)

    # Perform stratified split to maintain class distribution
    x_train, x_test, y_train, y_test = train_test_split(
        x_np, y_np, test_size=test_size, stratify=y_np, random_state=42
    )

    # Display class distribution after split
    unique_train, counts_train = np.unique(y_train, return_counts=True)
    print(f"Classes dans l'entraînement : {dict(zip(unique_train, counts_train))}")

    unique_test, counts_test = np.unique(y_test, return_counts=True)
    print(f"Classes dans le test : {dict(zip(unique_test, counts_test))}")

    return x_train, x_test, y_train, y_test

# Step 5: Train and evaluate with SVM
def train_svm(x_train, y_train):
    # Define the hyperparameter grid according to the specified ranges
    param_grid = {
        'C': [10**-2, 10**-1, 1, 10, 100, 1000, 10000],  # Regularization parameter (lambda)
        'gamma': [2**-3, 2**-2, 2**-1, 2**0, 2**1, 2**2, 2**3, 2**4]  # Kernel coefficient (sigma)
    }

    svm_model = SVC(kernel='rbf')  # Use RBF kernel

    # Use GridSearchCV for hyperparameter tuning
    grid_search = GridSearchCV(svm_model, param_grid, cv=5, scoring='accuracy', n_jobs=-1, verbose=1)

    try:
        grid_search.fit(x_train, y_train)
    except KeyboardInterrupt:
        print("Grid search interrupted.")

    best_model = grid_search.best_estimator_
    return best_model

# Step 6: Evaluate the model
def evaluate_model(svm_model, x_test, y_test):
    # Predict test data
    y_pred = svm_model.predict(x_test)

    # Calculate metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted', zero_division=0)
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')

    # Additional metrics
    confusion = confusion_matrix(y_test, y_pred)
    class_acc = np.diag(confusion) / np.sum(confusion, axis=1)
    aa = np.nanmean(class_acc)  # Handle NaN for empty classes
    kappa = cohen_kappa_score(y_test, y_pred)

    print(f'Accuracy (OA): {accuracy * 100:.2f}%')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1:.4f}')
    print(f'Average Accuracy (AA): {aa * 100:.2f}%')
    print(f'Kappa Coefficient: {kappa:.4f}')

    # Affichage de l'accuracy pour chaque classe
    for i, acc in enumerate(class_acc):
        if np.isnan(acc):  # Handle NaN accuracy for classes with no samples
            print(f'Accuracy for class {i}: No samples in test set')
        else:
            print(f'Accuracy for class {i}: {acc * 100:.2f}%')

# Main function
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)

    # Split data
    x_train, x_test, y_train, y_test = split_data(x, y)

    # Train SVM model
    svm_model = train_svm(x_train, y_train)

    # Evaluate the model
    evaluate_model(svm_model, x_test, y_test)

if __name__ == '__main__':
    main()


# **CNN**

## 2D **CNN**

In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import scipy.io as sio
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score,
    f1_score, cohen_kappa_score, confusion_matrix  # Ajout de confusion_matrix ici
)
import zipfile

# Step 1: Unzip the archive
def unzip_data(zip_path, extract_to='/content'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Step 2: Load the dataset
def load_data():
    unzip_data('/content/drive/MyDrive/archive (2).zip')  # Unzip the data first
    hsi_data = sio.loadmat('/content/PaviaU.mat')['paviaU']
    gt_data = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']
    return hsi_data, gt_data


# Step 3: Preprocess the data
def preprocess_data(hsi_data, gt_data):
    hsi_data = (hsi_data - np.min(hsi_data)) / (np.max(hsi_data) - np.min(hsi_data))
    mask = gt_data != 0
    gt_data = gt_data[mask]
    hsi_data = hsi_data[mask, :]
    hsi_data = hsi_data.reshape(-1, hsi_data.shape[-1], 1, 1)
    x = torch.tensor(hsi_data, dtype=torch.float32)
    y = torch.tensor(gt_data, dtype=torch.long)
    return x, y

# Step 4: Define the 2D CNN model with modifications
class CNN_2D(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(CNN_2D, self).__init__()
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(in_channels, 8, kernel_size=2, padding=1),
            nn.BatchNorm2d(8),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=1),
            nn.Dropout(0.5)
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(8, 16, kernel_size=2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=1),
            nn.Dropout(0.5)
        )
        self.fc1 = nn.Linear(16, 8)
        self.fc2 = nn.Linear(8, num_classes)

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = torch.mean(x, dim=(2, 3))
        x = self.fc1(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

# Step 5: Train the model
def train_model(model, train_loader, epochs=200, lr=0.001):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for data, labels in train_loader:
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}')

# Step 6: Test the model and calculate metrics
def test_model(model, test_loader):
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for data, labels in test_loader:
            output = model(data)
            _, predicted = torch.max(output.data, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    conf_matrix = confusion_matrix(all_labels, all_preds)
    per_class_accuracy = conf_matrix.diagonal() / conf_matrix.sum(axis=1)
    AA = np.nanmean(per_class_accuracy)

    kappa = cohen_kappa_score(all_labels, all_preds)

    print(f'Accuracy: {accuracy * 100:.2f}%')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1:.4f}')
    print(f'Average Accuracy (AA): {AA * 100:.2f}%')
    print(f'Kappa Coefficient: {kappa:.4f}')

    for i, acc in enumerate(per_class_accuracy):
        print(f'Accuracy for class {i + 1}: {acc * 100:.2f}%')

# Step 7: Split data into training and test sets
def split_data(x, y, test_size=0.1, batch_size=64):
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size, stratify=y)
    train_dataset = TensorDataset(x_train, y_train)
    test_dataset = TensorDataset(x_test, y_test)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, test_loader

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)
    train_loader, test_loader = split_data(x, y)
    model = CNN_2D(in_channels=x.size(1), num_classes=y.max().item() + 1)
    train_model(model, train_loader)
    test_model(model, test_loader)

if __name__ == '__main__':
    main()

## 3D **CNN**

In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import scipy.io as sio
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score,
    f1_score, cohen_kappa_score
)
import zipfile

# Step 1: Unzip the archive
def unzip_data(zip_path, extract_to='/content'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Step 2: Load the dataset
def load_data():
    unzip_data('/content/drive/MyDrive/archive (2).zip')
    hsi_data = sio.loadmat('/content/PaviaU.mat')['paviaU']
    gt_data = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']
    return hsi_data, gt_data

# Step 3: Preprocess the data
def preprocess_data(hsi_data, gt_data):
    hsi_data = (hsi_data - np.min(hsi_data)) / (np.max(hsi_data) - np.min(hsi_data))
    mask = gt_data != 0
    gt_data = gt_data[mask]
    hsi_data = hsi_data[mask, :]
    hsi_data = hsi_data.reshape(-1, hsi_data.shape[-1], 1, 1, 1)
    x = torch.tensor(hsi_data, dtype=torch.float32)
    y = torch.tensor(gt_data, dtype=torch.long)
    return x, y

# Step 4: Define a modified 3D CNN model
class CNN_3D(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(CNN_3D, self).__init__()
        self.conv_block1 = nn.Sequential(
            nn.Conv3d(in_channels, 16, kernel_size=(3, 3, 1), padding=(1, 1, 0)),  # Moins de filtres
            nn.BatchNorm3d(16),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 1, 1)),
            nn.Dropout(0.6)  # Augmenter le dropout
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv3d(16, 32, kernel_size=(3, 3, 1), padding=(1, 1, 0)),  # Moins de filtres
            nn.BatchNorm3d(32),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=(1, 1, 1)),
            nn.Dropout(0.6)  # Augmenter le dropout
        )
        self.fc = nn.Linear(32, num_classes)  # Adapté au nombre de filtres

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = torch.mean(x, dim=(2, 3, 4))  # Global Average Pooling
        x = self.fc(x)
        return F.log_softmax(x, dim=1)


# Step 5: Train the model
def train_model(model, train_loader, epochs=200, lr=0.001):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for data, labels in train_loader:
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}')

# Step 6: Test the model and calculate metrics
def test_model(model, test_loader):
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for data, labels in test_loader:
            output = model(data)
            _, predicted = torch.max(output.data, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    average_accuracy = np.mean([
        accuracy_score(np.array(all_labels)[np.array(all_labels) == cls], np.array(all_preds)[np.array(all_labels) == cls])
        for cls in np.unique(all_labels)
    ])

    kappa = cohen_kappa_score(all_labels, all_preds)

    print(f'Accuracy: {accuracy * 100:.2f}%')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1:.4f}')
    print(f'Average Accuracy (AA): {average_accuracy:.4f}')
    print(f'Cohen\'s Kappa: {kappa:.4f}')

    # Calcul et affichage de l'accuracy pour chaque classe
    for cls in np.unique(all_labels):
        cls_mask = np.array(all_labels) == cls
        cls_accuracy = accuracy_score(np.array(all_labels)[cls_mask], np.array(all_preds)[cls_mask])
        print(f'Accuracy for class {cls}: {cls_accuracy * 100:.2f}%')

# Step 7: Split data into training and test sets
def split_data(x, y, test_size=0.1, batch_size=64):
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=test_size, stratify=y)
    train_dataset = TensorDataset(x_train, y_train)
    test_dataset = TensorDataset(x_test, y_test)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    return train_loader, test_loader

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)
    train_loader, test_loader = split_data(x, y)
    model = CNN_3D(in_channels=x.size(1), num_classes=y.max().item() + 1)
    train_model(model, train_loader)
    test_model(model, test_loader)

if __name__ == '__main__':
    main()


# **Semi-Supervised GCN**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import torch
import torch.nn.functional as F
import torch_geometric.transforms as T
from torch_geometric.data import Data
from torch_geometric.nn import ChebConv
from sklearn.model_selection import train_test_split
import scipy.io as sio
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, cohen_kappa_score
from sklearn.metrics import confusion_matrix
from sklearn.neighbors import kneighbors_graph
import scipy.sparse as sp
from torch_geometric.utils import from_scipy_sparse_matrix
import zipfile

# Step 1: Unzip the archive
def unzip_data(zip_path, extract_to='/content'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Step 2: Load the dataset
def load_data():
    unzip_data('/content/drive/MyDrive/archive (2).zip')
    hsi_data = sio.loadmat('/content/PaviaU.mat')['paviaU']
    gt_data = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']
    return hsi_data, gt_data

# Step 3: Preprocess the data
def preprocess_data(hsi_data, gt_data):
    mask = gt_data != 0
    hsi_data = hsi_data[mask]
    gt_data = gt_data[mask]
    hsi_data = hsi_data.reshape(-1, hsi_data.shape[-1])
    hsi_data = (hsi_data - np.min(hsi_data)) / (np.max(hsi_data) - np.min(hsi_data))

    x = torch.tensor(hsi_data, dtype=torch.float)
    y = torch.tensor(gt_data, dtype=torch.long)

    return x, y

# Normalize adjacency matrix
def normalize_adj(adj):
    adj = sp.coo_matrix(adj)
    rowsum = np.array(adj.sum(1))
    d_inv_sqrt = np.power(rowsum, -0.5).flatten()
    d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
    d_mat_inv_sqrt = sp.diags(d_inv_sqrt)
    return adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo()

# Preprocess adjacency matrix
def preprocess_adj(adj):
    adj_normalized = normalize_adj(adj + sp.eye(adj.shape[0]))
    return torch.FloatTensor(np.array(adj_normalized.todense()))

# Create graph data
def create_graph_data(x, y):
    # Create adjacency matrix using kneighbors_graph
    adj = kneighbors_graph(x.cpu().numpy(), n_neighbors=5, mode='connectivity', include_self=True)
    adj = adj + adj.T.multiply(adj.T > adj) - adj.multiply(adj.T > adj)

    # Convert the adjacency matrix to PyTorch Geometric format
    edge_index, _ = from_scipy_sparse_matrix(adj)

    data = Data(x=x, edge_index=edge_index, y=y)
    return data

# Step 5: Define the GCN model using Chebyshev Convolution


class GCN(torch.nn.Module):
    def __init__(self, num_features, num_classes):
        super(GCN, self).__init__()
        self.conv1 = ChebConv(num_features, 64, K=3)
        self.conv2 = ChebConv(64, num_classes, K=3)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

# Step 6: Train and evaluate the GCN model
def train_model(model, data, train_mask, optimizer, epochs=200):  # Augmenter le nombre d'époques
    model.train()
    criterion = torch.nn.CrossEntropyLoss()

    for epoch in range(epochs):
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out[train_mask], data.y[train_mask])
        loss.backward()
        optimizer.step()
        print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}')

def test_model(model, data, test_mask):
    model.eval()
    with torch.no_grad():
        logits = model(data)
        pred = logits[test_mask].max(1)[1]
        test_labels = data.y[test_mask]

        accuracy = accuracy_score(test_labels.cpu(), pred.cpu())
        precision = precision_score(test_labels.cpu(), pred.cpu(), average='weighted')
        recall = recall_score(test_labels.cpu(), pred.cpu(), average='weighted')
        f1 = f1_score(test_labels.cpu(), pred.cpu(), average='weighted')

        # Compute confusion matrix
        conf_matrix = confusion_matrix(test_labels.cpu(), pred.cpu())
        per_class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)
        average_accuracy = per_class_acc.mean()

        # Compute Cohen's Kappa
        kappa = cohen_kappa_score(test_labels.cpu(), pred.cpu())

    print(f'Accuracy: {accuracy * 100:.2f}%')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1:.4f}')
    print(f'Average Accuracy (AA): {average_accuracy:.4f}')
    print(f'Cohen\'s Kappa: {kappa:.4f}')

    # Calcul de l'accuracy pour chaque classe
    for cls in range(len(conf_matrix)):
        if conf_matrix.sum(axis=1)[cls] != 0:  # Évite la division par zéro
            cls_accuracy = conf_matrix[cls, cls] / conf_matrix.sum(axis=1)[cls]
            print(f'Accuracy for class {cls + 1}: {cls_accuracy * 100:.2f}%')

# Step 7: Split data into training and test sets
def split_data(y, expected_test_samples=42776):
    indices = torch.arange(y.size(0))
    total_samples = len(y)
    test_size = expected_test_samples / total_samples
    test_size = min(max(0.1, test_size), 0.9)

    train_idx, test_idx = train_test_split(indices, test_size=test_size, stratify=y)

    train_mask = torch.zeros(y.size(0), dtype=torch.bool)
    test_mask = torch.zeros(y.size(0), dtype=torch.bool)

    train_mask[train_idx] = True
    test_mask[test_idx] = True

    return train_mask, test_mask

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)
    data = create_graph_data(x, y)

    # Split the data into training and testing sets
    train_mask, test_mask = split_data(data.y)

    # Initialize the GCN model with Chebyshev Convolution
    model = GCN(num_features=data.x.size(1), num_classes=int(y.max()) + 1)

    # Train the model
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)  # Régularisation L2
    train_model(model, data, train_mask, optimizer, epochs=200)  # Augmenter les époques

    # Test the model
    test_model(model, data, test_mask)

if __name__ == '__main__':
    main()


# **GraphSAGE**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import torch
import torch.nn.functional as F
import torch_geometric.transforms as T
from torch_geometric.data import Data
from torch_geometric.nn import SAGEConv
from sklearn.model_selection import train_test_split
import scipy.io as sio
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, cohen_kappa_score
from sklearn.metrics import confusion_matrix
from sklearn.neighbors import kneighbors_graph
import scipy.sparse as sp
from torch_geometric.utils import from_scipy_sparse_matrix
import zipfile

# Step 1: Unzip the archive
def unzip_data(zip_path, extract_to='/content'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Step 2: Load the dataset
def load_data():
    unzip_data('/content/drive/MyDrive/archive (2).zip')
    hsi_data = sio.loadmat('/content/PaviaU.mat')['paviaU']
    gt_data = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']
    return hsi_data, gt_data

# Step 3: Preprocess the data
def preprocess_data(hsi_data, gt_data):
    mask = gt_data != 0
    hsi_data = hsi_data[mask]
    gt_data = gt_data[mask]
    hsi_data = hsi_data.reshape(-1, hsi_data.shape[-1])
    hsi_data = (hsi_data - np.min(hsi_data)) / (np.max(hsi_data) - np.min(hsi_data))

    x = torch.tensor(hsi_data, dtype=torch.float)
    y = torch.tensor(gt_data, dtype=torch.long)

    return x, y

# Normalize adjacency matrix
def normalize_adj(adj):
    adj = sp.coo_matrix(adj)
    rowsum = np.array(adj.sum(1))
    d_inv_sqrt = np.power(rowsum, -0.5).flatten()
    d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
    d_mat_inv_sqrt = sp.diags(d_inv_sqrt)
    return adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo()

# Preprocess adjacency matrix
def preprocess_adj(adj):
    adj_normalized = normalize_adj(adj + sp.eye(adj.shape[0]))
    return torch.FloatTensor(np.array(adj_normalized.todense()))

# Create graph data
def create_graph_data(x, y):
    # Create adjacency matrix using kneighbors_graph
    adj = kneighbors_graph(x.cpu().numpy(), n_neighbors=5, mode='connectivity', include_self=True)
    adj = adj + adj.T.multiply(adj.T > adj) - adj.multiply(adj.T > adj)

    # Convert the adjacency matrix to PyTorch Geometric format
    edge_index, _ = from_scipy_sparse_matrix(adj)

    data = Data(x=x, edge_index=edge_index, y=y)
    return data

# Step 5: Define the GraphSAGE model
class GraphSAGE(torch.nn.Module):
    def __init__(self, num_features, num_classes):
        super(GraphSAGE, self).__init__()
        self.conv1 = SAGEConv(num_features, 64)
        self.conv2 = SAGEConv(64, num_classes)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

# Step 6: Train and evaluate the model
def train_model(model, data, train_mask, optimizer, epochs=200):  # Augmenter le nombre d'époques
    model.train()
    criterion = torch.nn.CrossEntropyLoss()

    for epoch in range(epochs):
        optimizer.zero_grad()
        out = model(data)
        loss = criterion(out[train_mask], data.y[train_mask])
        loss.backward()
        optimizer.step()
        print(f'Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}')

def test_model(model, data, test_mask):
    model.eval()
    with torch.no_grad():
        logits = model(data)
        pred = logits[test_mask].max(1)[1]
        test_labels = data.y[test_mask]

        accuracy = accuracy_score(test_labels.cpu(), pred.cpu())
        precision = precision_score(test_labels.cpu(), pred.cpu(), average='weighted')
        recall = recall_score(test_labels.cpu(), pred.cpu(), average='weighted')
        f1 = f1_score(test_labels.cpu(), pred.cpu(), average='weighted')

        # Compute confusion matrix
        conf_matrix = confusion_matrix(test_labels.cpu(), pred.cpu())
        per_class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)
        average_accuracy = per_class_acc.mean()

        # Compute Cohen's Kappa
        kappa = cohen_kappa_score(test_labels.cpu(), pred.cpu())

    print(f'Accuracy: {accuracy * 100:.2f}%')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1:.4f}')
    print(f'Average Accuracy (AA): {average_accuracy:.4f}')
    print(f'Cohen\'s Kappa: {kappa:.4f}')

    # Calcul de l'accuracy pour chaque classe
    for cls in range(len(conf_matrix)):
        if conf_matrix.sum(axis=1)[cls] != 0:  # Évite la division par zéro
            cls_accuracy = conf_matrix[cls, cls] / conf_matrix.sum(axis=1)[cls]
            print(f'Accuracy for class {cls + 1}: {cls_accuracy * 100:.2f}%')

# Step 7: Split data into training and test sets
def split_data(y, expected_test_samples=42776):
    indices = torch.arange(y.size(0))
    total_samples = len(y)
    test_size = expected_test_samples / total_samples
    test_size = min(max(0.1, test_size), 0.9)

    train_idx, test_idx = train_test_split(indices, test_size=test_size, stratify=y)

    train_mask = torch.zeros(y.size(0), dtype=torch.bool)
    test_mask = torch.zeros(y.size(0), dtype=torch.bool)

    train_mask[train_idx] = True
    test_mask[test_idx] = True

    return train_mask, test_mask

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)
    data = create_graph_data(x, y)

    # Split the data into training and testing sets
    train_mask, test_mask = split_data(data.y)

    # Initialize the GraphSAGE model
    model = GraphSAGE(num_features=data.x.size(1), num_classes=int(y.max()) + 1)

    # Train the model
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)  # Régularisation L2
    train_model(model, data, train_mask, optimizer, epochs=200)  # Augmenter les époques

    # Test the model
    test_model(model, data, test_mask)

if __name__ == '__main__':
    main()


# **GAT** (Graph Attention Network)

In [None]:
import torch
import torch.nn.functional as F
import torch_geometric.transforms as T
from torch_geometric.data import Data
from torch_geometric.nn import GATConv
from sklearn.model_selection import train_test_split
import scipy.io as sio
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, cohen_kappa_score
import scipy.sparse as sp
from sklearn.neighbors import kneighbors_graph
import zipfile

# Function to normalize adjacency matrix
def normalize_adj(adj):
    adj = sp.coo_matrix(adj)
    rowsum = np.array(adj.sum(1))
    d_inv_sqrt = np.power(rowsum, -0.5).flatten()
    d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
    d_mat_inv_sqrt = sp.diags(d_inv_sqrt)
    return adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo()

# Function to preprocess adjacency matrix
def preprocess_adj(adj):
    adj_normalized = normalize_adj(adj + sp.eye(adj.shape[0]))
    return torch.FloatTensor(np.array(adj_normalized.todense()))

# Step 1: Unzip the archive
def unzip_data(zip_path, extract_to='/content'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Step 2: Load the dataset
def load_data():
    unzip_data('/content/drive/MyDrive/archive (2).zip')
    hsi_data = sio.loadmat('/content/PaviaU.mat')['paviaU']
    gt_data = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']
    return hsi_data, gt_data

# Step 3: Preprocess the data
def preprocess_data(hsi_data, gt_data):
    hsi_data = (hsi_data - np.min(hsi_data)) / (np.max(hsi_data) - np.min(hsi_data))
    mask = gt_data != 0
    gt_data = gt_data[mask]
    hsi_data = hsi_data[mask, :]
    return hsi_data, gt_data

# Step 4: Convert data into a graph structure with adjacency matrix
def create_graph_data(hsi_data, gt_data):
    k = 5  # Adjust as necessary
    adj = kneighbors_graph(hsi_data, k, mode='connectivity', include_self=True)
    adj = adj + adj.T.multiply(adj.T > adj) - adj.multiply(adj.T > adj)

    # Preprocess the adjacency matrix
    adj = preprocess_adj(adj)

    # Convert adjacency matrix to edge indices
    edge_index = torch.nonzero(torch.tensor(adj), as_tuple=False).t().contiguous()

    # Convert the data to PyTorch geometric format
    x = torch.tensor(hsi_data, dtype=torch.float)
    y = torch.tensor(gt_data, dtype=torch.long)

    return Data(x=x, edge_index=edge_index, y=y)

# Step 5: Define the GAT model
class GAT(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, num_classes, heads=8):
        super(GAT, self).__init__()
        self.conv1 = GATConv(in_channels, hidden_channels, heads=heads, dropout=0.6)
        self.conv2 = GATConv(hidden_channels * heads, num_classes, heads=1, concat=False, dropout=0.6)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.dropout(x, p=0.6, training=self.training)
        x = F.elu(self.conv1(x, edge_index))
        x = F.dropout(x, p=0.6, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

# Step 6: Train the model
def train_model(model, data, train_mask, optimizer, num_epochs=200):
    model.train()
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        out = model(data)
        loss = F.nll_loss(out[train_mask], data.y[train_mask])
        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f'Epoch {epoch:03d}, Loss: {loss.item():.4f}')

# Step 7: Test the model
def test_model(model, data, test_mask):
    model.eval()
    with torch.no_grad():
        logits = model(data)
        pred = logits[test_mask].max(1)[1]
        test_labels = data.y[test_mask]

        pred_cpu = pred.cpu().numpy()
        test_labels_cpu = test_labels.cpu().numpy()

        accuracy = accuracy_score(test_labels_cpu, pred_cpu)
        precision = precision_score(test_labels_cpu, pred_cpu, average='weighted')
        recall = recall_score(test_labels_cpu, pred_cpu, average='weighted')
        f1 = f1_score(test_labels_cpu, pred_cpu, average='weighted')
        kappa = cohen_kappa_score(test_labels_cpu, pred_cpu)

        # Calcul des accuracies pour chaque classe
        unique_classes = np.unique(test_labels_cpu)
        class_accuracies = {}
        for cls in unique_classes:
            cls_mask = (test_labels_cpu == cls)
            cls_acc = accuracy_score(test_labels_cpu[cls_mask], pred_cpu[cls_mask])
            class_accuracies[cls] = cls_acc

        average_accuracy = np.mean(list(class_accuracies.values()))

    print(f'Accuracy: {accuracy * 100:.2f}%')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1:.4f}')
    print(f'Average Accuracy (AA): {average_accuracy:.4f}')
    print(f'Kappa Coefficient: {kappa:.4f}')

    # Affichage de l'accuracy par classe
    for cls, acc in class_accuracies.items():
        print(f'Accuracy for class {cls}: {acc * 100:.2f}%')

# Step 8: Split data into training and test sets
def split_data(data, expected_test_samples=42776):
    indices = torch.arange(data.y.size(0))
    total_samples = len(data.y)
    test_size = expected_test_samples / total_samples
    test_size = min(max(0.1, test_size), 0.9)
    train_idx, test_idx = train_test_split(indices.numpy(), test_size=test_size, stratify=data.y.cpu().numpy())

    train_mask = torch.zeros(data.y.size(0), dtype=torch.bool)
    test_mask = torch.zeros(data.y.size(0), dtype=torch.bool)

    train_mask[train_idx] = True
    test_mask[test_idx] = True

    return train_mask, test_mask

# Putting it all together
def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    hsi_data, gt_data = load_data()
    hsi_data, gt_data = preprocess_data(hsi_data, gt_data)

    data = create_graph_data(hsi_data, gt_data)
    data = data.to(device)

    train_mask, test_mask = split_data(data)

    model = GAT(in_channels=data.x.size(1), hidden_channels=64, num_classes=int(gt_data.max()) + 1).to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)
    train_model(model, data, train_mask, optimizer)

    test_model(model, data, test_mask)

if __name__ == '__main__':
    main()


# **MDGCN**

In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GCNConv  # Utilisation des couches GCN
from torch_geometric.data import Data
from torch_geometric.utils import dense_to_sparse
from sklearn.model_selection import train_test_split
import scipy.io as sio
import zipfile
import numpy as np
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    cohen_kappa_score, confusion_matrix
    )
from sklearn.metrics.pairwise import euclidean_distances
from google.colab import drive

# Step 1: Unzip the archive
def unzip_data(zip_path, extract_to='/content'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Step 2: Load the dataset
def load_data():
    unzip_data('/content/drive/MyDrive/archive (2).zip')  # Unzip the data first
    hsi_data = sio.loadmat('/content/PaviaU.mat')['paviaU']
    gt_data = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']
    return hsi_data, gt_data

# Step 3: Preprocess the data
def preprocess_data(hsi_data, gt_data):
    # Discard pixels with no information (zeros in ground truth)
    mask = gt_data.flatten() != 0  # Flatten to compare
    hsi_data = hsi_data.reshape(-1, hsi_data.shape[-1])[mask]
    gt_data = gt_data.flatten()[mask]  # Flatten to match shape

    # Normalize each feature channel individually
    hsi_data = (hsi_data - np.mean(hsi_data, axis=0)) / np.std(hsi_data, axis=0)  # Standardize

    # Convert to tensors
    x = torch.tensor(hsi_data, dtype=torch.float)  # Features
    y = torch.tensor(gt_data, dtype=torch.long)    # Labels

    return x, y

# Step 4: Generate graph adjacency matrix and edge index
def generate_graph(x, num_neighbors=16):
    dist_matrix = euclidean_distances(x.numpy())
    sigma = np.mean(dist_matrix)
    weight_matrix = np.exp(-dist_matrix ** 2 / (2. * sigma ** 2))

    knn_indices = np.argsort(weight_matrix, axis=1)[:, -num_neighbors:]
    adjacency_matrix = np.zeros_like(weight_matrix)
    for i in range(adjacency_matrix.shape[0]):
        adjacency_matrix[i, knn_indices[i]] = weight_matrix[i, knn_indices[i]]

    adjacency_matrix = np.maximum(adjacency_matrix, adjacency_matrix.T)
    edge_index = dense_to_sparse(torch.tensor(adjacency_matrix, dtype=torch.float))[0]

    return edge_index

# Step 5: Split the data into training and test sets
def split_data(y, expected_test_samples=42776):
    indices = torch.arange(y.size(0))
    total_samples = len(y)
    test_size = expected_test_samples / total_samples
    test_size = min(max(0.1, test_size), 0.9)

    train_idx, test_idx = train_test_split(indices.numpy(), test_size=test_size, stratify=y.numpy())

    train_mask = torch.zeros(y.size(0), dtype=torch.bool)
    test_mask = torch.zeros(y.size(0), dtype=torch.bool)

    train_mask[train_idx] = True
    test_mask[test_idx] = True

    return train_mask, test_mask

# Step 6: Define GCN Model
class GCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(in_channels, 64)  # 64 hidden units
        self.conv2 = GCNConv(64, out_channels)  # Final output

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

# Step 7: Train GCN model
def train_model(data, model, epochs=100, lr=0.001):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        out = model(data)
        loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
        loss.backward()
        optimizer.step()
        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item():.4f}')

# Step 8: Test and evaluate the GCN model
def test_model(data, model):
    model.eval()
    out = model(data)
    pred = out[data.test_mask].max(dim=1)[1]  # Get the predicted class
    y_true = data.y[data.test_mask]

    # Overall metrics
    accuracy = accuracy_score(y_true, pred)
    precision = precision_score(y_true, pred, average='weighted', zero_division=0)
    recall = recall_score(y_true, pred, average='weighted')
    f1 = f1_score(y_true, pred, average='weighted')
    kappa = cohen_kappa_score(y_true, pred)

    # Class-wise accuracy
    confusion = confusion_matrix(y_true, pred)
    class_acc = np.diag(confusion) / np.sum(confusion, axis=1)  # Diagonal elements / sum of rows
    aa = np.nanmean(class_acc)

    # Print overall metrics
    print(f'Accuracy (OA): {accuracy * 100:.2f}%')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1:.4f}')
    print(f'Average Accuracy (AA): {aa * 100:.2f}%')
    print(f'Kappa Coefficient: {kappa:.4f}')

    # Display class-wise accuracy
    print("\nClass-wise Accuracy:")
    for i, acc in enumerate(class_acc):
        print(f"Class {i+1}: {acc * 100:.2f}%")

# Main function
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)

    edge_index = generate_graph(x)
    train_mask, test_mask = split_data(y)

    data = Data(x=x, y=y, edge_index=edge_index, train_mask=train_mask, test_mask=test_mask)

    model = GCN(in_channels=x.size(1), out_channels=y.max().item() + 1)
    train_model(data, model)
    test_model(data, model)

if __name__ == '__main__':
    main()


#  HSI **GCNs**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

import os
import time
import torch
import torch.nn.functional as F
from torch_geometric.nn import ChebConv
from torch_geometric.data import Data
from torch_geometric.utils import dense_to_sparse
from sklearn.model_selection import train_test_split
import scipy.io as sio
import numpy as np
import zipfile
from sklearn.metrics.pairwise import euclidean_distances
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, cohen_kappa_score

# Step 1: Unzip the archive
def unzip_data(zip_path, extract_to='/content'):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

# Step 2: Load the dataset
def load_data():
    unzip_data('/content/drive/MyDrive/archive (2).zip')
    hsi_data = sio.loadmat('/content/PaviaU.mat')['paviaU']
    gt_data = sio.loadmat('/content/PaviaU_gt.mat')['paviaU_gt']
    return hsi_data, gt_data

# Step 3: Preprocess the data (spectro-spatial features)
def preprocess_data(hsi_data, gt_data, normalize_spatial=True):
    height, width, bands = hsi_data.shape
    mask = gt_data.flatten() != 0
    hsi_data_flat = hsi_data.reshape(-1, bands)
    m_coords, n_coords = np.meshgrid(np.arange(height), np.arange(width), indexing='ij')
    m_coords = m_coords.flatten()
    n_coords = n_coords.flatten()
    hsi_data = hsi_data_flat[mask]
    m_coords = m_coords[mask].reshape(-1, 1)
    n_coords = n_coords[mask].reshape(-1, 1)
    gt_data = gt_data.flatten()[mask]
    hsi_data = (hsi_data - np.mean(hsi_data, axis=0)) / np.std(hsi_data, axis=0)
    if normalize_spatial:
        m_coords = (m_coords - np.mean(m_coords)) / np.std(m_coords)
        n_coords = (n_coords - np.mean(n_coords)) / np.std(n_coords)
    features = np.concatenate([hsi_data, m_coords, n_coords], axis=1)
    x = torch.tensor(features, dtype=torch.float)
    y = torch.tensor(gt_data, dtype=torch.long)
    return x, y

# Step 4: Generate graph using threshold (no KNN)
def generate_graph_thresholded(x, threshold=0.1):
    dist_matrix = euclidean_distances(x.numpy())
    sigma = np.mean(dist_matrix)
    weight_matrix = np.exp(-dist_matrix ** 2 / (2. * sigma ** 2))
    adjacency_matrix = (weight_matrix >= threshold).astype(float) * weight_matrix
    adjacency_matrix = np.maximum(adjacency_matrix, adjacency_matrix.T)
    edge_index = dense_to_sparse(torch.tensor(adjacency_matrix, dtype=torch.float))[0]
    return edge_index

# Step 5: Split data function
def split_data(y, expected_test_samples=42776):
    indices = torch.arange(y.size(0))
    total_samples = len(y)
    test_size = expected_test_samples / total_samples
    test_size = min(max(0.1, test_size), 0.9)
    train_idx, test_idx = train_test_split(indices.numpy(), test_size=test_size, stratify=y.numpy())
    train_mask = torch.zeros(y.size(0), dtype=torch.bool)
    test_mask = torch.zeros(y.size(0), dtype=torch.bool)
    train_mask[train_idx] = True
    test_mask[test_idx] = True
    return train_mask, test_mask

# Step 6: Build the GCN model with ChebConv
class GCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels, K=3):
        super(GCN, self).__init__()
        self.conv1 = ChebConv(in_channels, 256, K)
        self.bn1 = torch.nn.BatchNorm1d(256)
        self.conv2 = ChebConv(256, 128, K)
        self.bn2 = torch.nn.BatchNorm1d(128)
        self.conv3 = ChebConv(128, out_channels, K)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.relu(self.bn1(self.conv1(x, edge_index)))
        x = F.dropout(x, p=0.5, training=self.training)
        x = F.relu(self.bn2(self.conv2(x, edge_index)))
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv3(x, edge_index)
        return F.log_softmax(x, dim=1)

# Step 7: Define the training function
def train_model(data, model, epochs=200, lr=0.001):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
    scheduler = StepLR(optimizer, step_size=50, gamma=0.5)
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        out = model(data)
        loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
        loss.backward()
        optimizer.step()
        scheduler.step()
        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item():.4f}')

# Step 8: Define the testing function
def test_model(data, model):
    model.eval()
    start_time = time.time()
    out = model(data)
    pred = out[data.test_mask].max(dim=1)[1]
    true_labels = data.y[data.test_mask].numpy()
    pred_labels = pred.numpy()
    acc = accuracy_score(true_labels, pred_labels)
    precision = precision_score(true_labels, pred_labels, average='weighted', zero_division=0)
    recall = recall_score(true_labels, pred_labels, average='weighted', zero_division=0)
    f1 = f1_score(true_labels, pred_labels, average='weighted', zero_division=0)
    kappa = cohen_kappa_score(true_labels, pred_labels)
    class_labels = np.unique(true_labels)
    aa = np.mean([np.sum(pred_labels[true_labels == label] == label) / np.sum(true_labels == label) for label in class_labels])
    oa = pred.eq(data.y[data.test_mask]).sum().item() / data.test_mask.sum().item()
    inference_time = time.time() - start_time
    print(f'Test OA: {oa:.4f}')
    print(f'Test AA: {aa:.4f}')
    print(f'Test Accuracy: {acc:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'Recall: {recall:.4f}')
    print(f'F1-Score: {f1:.4f}')
    print(f'Kappa Coefficient: {kappa:.4f}')
    print(f'Inference Time: {inference_time:.4f} seconds')
    class_accuracies = {}
    for cls in class_labels:
        cls_mask = (true_labels == cls)
        cls_acc = accuracy_score(true_labels[cls_mask], pred_labels[cls_mask])
        class_accuracies[cls] = cls_acc
    for cls, acc in class_accuracies.items():
        print(f'Accuracy for class {cls}: {acc * 100:.2f}%')

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data, normalize_spatial=True)
    edge_index = generate_graph_thresholded(x, threshold=0.1)
    train_mask, test_mask = split_data(y)
    data = Data(x=x, y=y, edge_index=edge_index, train_mask=train_mask, test_mask=test_mask)
    model = GCN(in_channels=x.size(1), out_channels=y.max().item() + 1)
    train_model(data, model)
    test_model(data, model)

if __name__ == '__main__':
    main()


# **HSI-GCN with DML**

### **Centroid metrique using EUCLIDIEN Distance**



In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import ChebConv
from torch_geometric.data import Data
from sklearn.metrics import cohen_kappa_score, accuracy_score, confusion_matrix
import numpy as np

# Step 4: Build the GCN model with ChebConv
class GCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels, K=3):
        super(GCN, self).__init__()
        self.conv1 = ChebConv(in_channels, 64, K)
        self.conv2 = ChebConv(64, out_channels, K)
        self.centroids = None  # To store class centroids

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.relu(self.conv1(x, edge_index))
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

    # Function to calculate class centroids
    def compute_class_centroids(self, x, y, mask):
        """ Calculate centroids for each class based on the labeled data. """
        num_classes = y.max().item() + 1
        centroids = []
        for c in range(num_classes):
            class_mask = (y[mask] == c)
            class_embeddings = x[mask][class_mask]
            centroid = class_embeddings.mean(dim=0) if class_embeddings.size(0) > 0 else torch.zeros(x.size(1))
            centroids.append(centroid)
        self.centroids = torch.stack(centroids)

    # Metric loss calculation
    def metric_loss(self, h, y, mask):
        """ Implement the metric loss function that compares node embeddings with class centroids. """
        if self.centroids is None:
            return 0
        h_masked = h[mask]
        y_masked = y[mask]

        pos_distances = torch.norm(h_masked - self.centroids[y_masked], dim=1)
        neg_distances = torch.cat([torch.norm(h_masked - self.centroids[i], dim=1).unsqueeze(1)
                                   for i in range(self.centroids.size(0))], dim=1)

        loss = -torch.log(torch.exp(-pos_distances) / torch.exp(-neg_distances).sum(dim=1)).mean()
        return loss

# Step 5: Train and evaluate the model with combined loss (CE + Metric)
def train_model(data, model, epochs=200, lr=0.01):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=5e-4)
    model.train()

    for epoch in range(epochs):
        optimizer.zero_grad()

        # Forward pass
        out = model(data)

        # Compute class centroids for labeled data
        model.compute_class_centroids(data.x, data.y, data.train_mask)

        # Compute losses
        ce_loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
        metric_loss = model.metric_loss(data.x, data.y, data.train_mask)

        # Combine Cross-Entropy loss with Metric loss
        loss = ce_loss + 0.1 * metric_loss  # Adjust the weight of the metric loss

        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}, CE Loss: {ce_loss.item()}, Metric Loss: {metric_loss.item()}')

def test_model(data, model):
    model.eval()
    with torch.no_grad():
        out = model(data)
        _, pred = out.max(dim=1)

    correct = (pred[data.test_mask] == data.y[data.test_mask]).sum()
    acc = int(correct) / int(data.test_mask.sum())
    print(f'Accuracy: {acc:.4f}')

    # Calculate AA and Kappa
    AA, kappa = calculate_metrics(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())
    print(f'Average Accuracy (AA): {AA:.4f}, Kappa: {kappa:.4f}')

    # Calculate and display per-class accuracy
    calculate_class_accuracy(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())

def calculate_metrics(y_true, y_pred):
    """ Calculate Average Accuracy (AA) and Cohen's Kappa. """
    # Calculate confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

    # Average Accuracy (AA)
    AA = np.nanmean(class_acc)  # Use nanmean to ignore any NaN values
    kappa = cohen_kappa_score(y_true, y_pred)

    return AA, kappa

def calculate_class_accuracy(y_true, y_pred):
    """ Calculate and display accuracy for each class. """
    conf_matrix = confusion_matrix(y_true, y_pred)
    class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

    print("Class Accuracy:")
    for i, acc in enumerate(class_acc):
        print(f'Class {i}: {acc:.4f}')

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)

    # Generate edge index using k-nearest neighbors and graph metrics
    edge_index = generate_graph(x)

    # Create train and test masks
    train_mask, test_mask = split_data(y)

    data = Data(x=x, y=y, edge_index=edge_index, train_mask=train_mask, test_mask=test_mask)

    model = GCN(in_channels=x.size(1), out_channels=y.max().item() + 1)
    train_model(data, model)
    test_model(data, model)

if __name__ == '__main__':
    main()


### **Centroid metrique using COS Distance**





In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import ChebConv
from torch_geometric.data import Data
from sklearn.metrics import cohen_kappa_score, accuracy_score, confusion_matrix
import numpy as np

# Step 4: Build the GCN model with ChebConv
class GCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels, K=3):
        super(GCN, self).__init__()
        self.conv1 = ChebConv(in_channels, 64, K)
        self.conv2 = ChebConv(64, out_channels, K)
        self.centroids = None  # To store class centroids

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.relu(self.conv1(x, edge_index))
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

    # Function to calculate class centroids
    def compute_class_centroids(self, x, y, mask):
        """ Calculate centroids for each class based on the labeled data. """
        num_classes = y.max().item() + 1
        centroids = []
        for c in range(num_classes):
            class_mask = (y[mask] == c)
            class_embeddings = x[mask][class_mask]
            centroid = class_embeddings.mean(dim=0) if class_embeddings.size(0) > 0 else torch.zeros(x.size(1))
            centroids.append(centroid)
        self.centroids = torch.stack(centroids)

    # Metric loss calculation using cosine distance
    def metric_loss(self, h, y, mask):
        """ Implement the metric loss function that compares node embeddings with class centroids using cosine distance. """
        if self.centroids is None:
            return 0
        h_masked = h[mask]
        y_masked = y[mask]

        # Normalize the embeddings and centroids for cosine similarity
        h_masked_norm = F.normalize(h_masked, p=2, dim=1)
        centroids_norm = F.normalize(self.centroids, p=2, dim=1)

        pos_similarities = torch.mm(h_masked_norm, centroids_norm[y_masked].t())
        neg_similarities = torch.mm(h_masked_norm, centroids_norm.t())

        # Compute the loss based on negative log likelihood of positive pairs
        pos_distances = -pos_similarities.diag()
        neg_distances = torch.cat([neg_similarities[i].unsqueeze(0) for i in range(neg_similarities.size(0))], dim=0)

        loss = -torch.log(torch.exp(pos_distances) / (torch.exp(neg_distances).sum(dim=1) + 1e-10)).mean()
        return loss

# Step 5: Train and evaluate the model with combined loss (CE + Metric)
def train_model(data, model, epochs=200, lr=0.01):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=5e-4)
    model.train()

    for epoch in range(epochs):
        optimizer.zero_grad()

        # Forward pass
        out = model(data)

        # Compute class centroids for labeled data
        model.compute_class_centroids(data.x, data.y, data.train_mask)

        # Compute losses
        ce_loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
        metric_loss = model.metric_loss(data.x, data.y, data.train_mask)

        # Combine Cross-Entropy loss with Metric loss
        loss = ce_loss + 0.1 * metric_loss  # Adjust the weight of the metric loss

        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}, CE Loss: {ce_loss.item()}, Metric Loss: {metric_loss.item()}')

def test_model(data, model):
    model.eval()
    with torch.no_grad():
        out = model(data)
        _, pred = out.max(dim=1)

    correct = (pred[data.test_mask] == data.y[data.test_mask]).sum()
    acc = int(correct) / int(data.test_mask.sum())
    print(f'Accuracy: {acc:.4f}')

    # Calculate AA and Kappa
    AA, kappa = calculate_metrics(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())
    print(f'Average Accuracy (AA): {AA:.4f}, Kappa: {kappa:.4f}')

    # Calculate and display accuracy for each class
    class_accuracy(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())

def calculate_metrics(y_true, y_pred):
    """ Calculate Average Accuracy (AA) and Cohen's Kappa. """
    # Calculate confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

    # Average Accuracy (AA)
    AA = np.nanmean(class_acc)  # Use nanmean to ignore any NaN values
    kappa = cohen_kappa_score(y_true, y_pred)

    return AA, kappa

def class_accuracy(y_true, y_pred):
    """ Calculate and print accuracy for each class. """
    conf_matrix = confusion_matrix(y_true, y_pred)
    for i in range(conf_matrix.shape[0]):
        class_acc = conf_matrix[i, i] / conf_matrix[i].sum() if conf_matrix[i].sum() > 0 else 0
        print(f'Accuracy for class {i}: {class_acc:.4f}')

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)

    # Generate edge index using k-nearest neighbors and graph metrics
    edge_index = generate_graph(x)

    # Create train and test masks
    train_mask, test_mask = split_data(y)

    data = Data(x=x, y=y, edge_index=edge_index, train_mask=train_mask, test_mask=test_mask)

    model = GCN(in_channels=x.size(1), out_channels=y.max().item() + 1)
    train_model(data, model)
    test_model(data, model)

if __name__ == '__main__':
    main()



## **HSI-GCN WITH TRIPET LOSS**

### **Triplet loss with semi-hard mining**

In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import ChebConv
from torch_geometric.data import Data
from sklearn.metrics import cohen_kappa_score, accuracy_score, confusion_matrix
import numpy as np

# Step 4: Build the GCN model with ChebConv
class GCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels, K=3):
        super(GCN, self).__init__()
        self.conv1 = ChebConv(in_channels, 64, K)
        self.conv2 = ChebConv(64, out_channels, K)
        self.centroids = None  # To store class centroids

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.relu(self.conv1(x, edge_index))
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

    # Function to calculate class centroids
    def compute_class_centroids(self, x, y, mask):
        """ Calculate centroids for each class based on the labeled data. """
        num_classes = y.max().item() + 1
        centroids = []
        for c in range(num_classes):
            class_mask = (y[mask] == c)
            class_embeddings = x[mask][class_mask]
            if class_embeddings.size(0) > 0:
                centroid = class_embeddings.mean(dim=0)
            else:
                centroid = torch.zeros(x.size(1)).to(x.device)  # Corrected to ensure it's on the right device
            centroids.append(centroid)
        self.centroids = torch.stack(centroids)

    # Triplet loss calculation
    def triplet_loss(self, h, y, mask, margin=1.0):
        """ Triplet loss implementation with semi-hard negative mining. """
        if self.centroids is None:
            return torch.tensor(0.0)  # Return a tensor to avoid gradient issues

        h_masked = h[mask]
        y_masked = y[mask]

        # Calculate positive distances
        pos_distances = torch.norm(h_masked - self.centroids[y_masked], dim=1)

        # Calculate negative distances for all other classes
        neg_distances = torch.cat([torch.norm(h_masked - self.centroids[i], dim=1).unsqueeze(1)
                                   for i in range(self.centroids.size(0))], dim=1)

        # Expand positive distances to match the shape of neg_distances
        pos_distances_expanded = pos_distances.unsqueeze(1).expand_as(neg_distances)

        # Semi-hard negative mining: select the negatives
        semi_hard_negatives = torch.full_like(pos_distances, fill_value=margin + 1).to(h_masked.device)  # Ensure it's on the correct device
        for i in range(h_masked.size(0)):
            neg_candidates = neg_distances[i][neg_distances[i] > pos_distances[i]]  # Valid negatives
            if len(neg_candidates) > 0:
                semi_hard_negatives[i] = neg_candidates.min()

        # Calculate triplet loss (margin-based)
        triplet_loss = F.relu(pos_distances - semi_hard_negatives + margin)

        return triplet_loss.mean()


# Step 5: Train and evaluate the model with combined loss (CE + Triplet)
def train_model(data, model, epochs=200, lr=0.01):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=5e-4)
    model.train()

    for epoch in range(epochs):
        optimizer.zero_grad()

        # Forward pass
        out = model(data)

        # Compute class centroids for labeled data
        model.compute_class_centroids(data.x, data.y, data.train_mask)

        # Compute losses
        ce_loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
        triplet_loss = model.triplet_loss(data.x, data.y, data.train_mask)

        # Combine Cross-Entropy loss with Triplet loss
        loss = ce_loss + 0.1 * triplet_loss  # Adjust the weight of each loss

        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}, CE Loss: {ce_loss.item()}, Triplet Loss: {triplet_loss.item()}')

def test_model(data, model):
    model.eval()
    with torch.no_grad():
        out = model(data)
        _, pred = out.max(dim=1)

    correct = (pred[data.test_mask] == data.y[data.test_mask]).sum()
    acc = int(correct) / int(data.test_mask.sum())
    print(f'Accuracy: {acc:.4f}')

    # Calculate AA and Kappa
    AA, kappa, class_acc = calculate_metrics(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())
    print(f'Average Accuracy (AA): {AA:.4f}, Kappa: {kappa:.4f}')

    # Affichage de l'accuracy de chaque classe
    for class_idx, accuracy in enumerate(class_acc):
        print(f'Accuracy for class {class_idx}: {accuracy:.4f}')

def calculate_metrics(y_true, y_pred):
    """ Calculate Average Accuracy (AA) and Cohen's Kappa. """
    # Calculate confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

    # Average Accuracy (AA)
    AA = np.nanmean(class_acc)  # Use nanmean to ignore any NaN values
    kappa = cohen_kappa_score(y_true, y_pred)

    return AA, kappa, class_acc  # Return class accuracy

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)

    # Generate edge index using k-nearest neighbors and graph metrics
    edge_index = generate_graph(x)

    # Create train and test masks
    train_mask, test_mask = split_data(y)

    data = Data(x=x, y=y, edge_index=edge_index, train_mask=train_mask, test_mask=test_mask)

    model = GCN(in_channels=x.size(1), out_channels=y.max().item() + 1)
    train_model(data, model)
    test_model(data, model)

if __name__ == '__main__':
    main()


### Triplet Loss with Hard mining

In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import ChebConv
from torch_geometric.data import Data
from sklearn.metrics import cohen_kappa_score, accuracy_score, confusion_matrix
import numpy as np

# Step 4: Build the GCN model with ChebConv
class GCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels, K=3):
        super(GCN, self).__init__()
        self.conv1 = ChebConv(in_channels, 64, K)
        self.conv2 = ChebConv(64, out_channels, K)
        self.centroids = None  # To store class centroids

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.relu(self.conv1(x, edge_index))
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

    # Function to calculate class centroids
    def compute_class_centroids(self, x, y, mask):
        """ Calculate centroids for each class based on the labeled data. """
        num_classes = y.max().item() + 1
        centroids = []
        for c in range(num_classes):
            class_mask = (y[mask] == c)
            class_embeddings = x[mask][class_mask]
            if class_embeddings.size(0) > 0:
                centroid = class_embeddings.mean(dim=0)
            else:
                centroid = torch.zeros(x.size(1)).to(x.device)  # Ensure it's on the right device
            centroids.append(centroid)
        self.centroids = torch.stack(centroids)

    # Triplet loss calculation with hard negative mining
    def triplet_loss(self, h, y, mask, margin=1.0):
        """ Triplet loss implementation with hard negative mining. """
        if self.centroids is None:
            return torch.tensor(0.0)  # Return a tensor to avoid gradient issues

        h_masked = h[mask]
        y_masked = y[mask]

        # Calculate positive distances
        pos_distances = torch.norm(h_masked - self.centroids[y_masked], dim=1)

        # Calculate negative distances for all other classes
        neg_distances = torch.cat([torch.norm(h_masked - self.centroids[i], dim=1).unsqueeze(1)
                                   for i in range(self.centroids.size(0))], dim=1)

        # Expand positive distances to match the shape of neg_distances
        pos_distances_expanded = pos_distances.unsqueeze(1).expand_as(neg_distances)

        # Hard negative mining: select the hardest negatives
        hard_negatives = neg_distances.min(dim=1).values

        # Calculate triplet loss (margin-based)
        triplet_loss = F.relu(pos_distances - hard_negatives + margin)

        return triplet_loss.mean()


# Step 5: Train and evaluate the model with combined loss (CE + Triplet)
def train_model(data, model, epochs=200, lr=0.01):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=5e-4)
    model.train()

    for epoch in range(epochs):
        optimizer.zero_grad()

        # Forward pass
        out = model(data)

        # Compute class centroids for labeled data
        model.compute_class_centroids(data.x, data.y, data.train_mask)

        # Compute losses
        ce_loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
        triplet_loss = model.triplet_loss(data.x, data.y, data.train_mask)

        # Combine Cross-Entropy loss with Triplet loss
        loss = ce_loss + 0.1 * triplet_loss  # Adjust the weight of each loss

        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}, CE Loss: {ce_loss.item()}, Triplet Loss: {triplet_loss.item()}')

def test_model(data, model):
    model.eval()
    with torch.no_grad():
        out = model(data)
        _, pred = out.max(dim=1)

    correct = (pred[data.test_mask] == data.y[data.test_mask]).sum()
    acc = int(correct) / int(data.test_mask.sum())
    print(f'Accuracy: {acc:.4f}')

    # Calculate AA and Kappa
    AA, kappa, class_accuracies = calculate_metrics(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())
    print(f'Average Accuracy (AA): {AA:.4f}, Kappa: {kappa:.4f}')

    # Print class accuracy for each class
    for class_index, class_accuracy in enumerate(class_accuracies):
        print(f'Accuracy for class {class_index}: {class_accuracy:.4f}')

def calculate_metrics(y_true, y_pred):
    """ Calculate Average Accuracy (AA) and Cohen's Kappa. """
    # Calculate confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

    # Average Accuracy (AA)
    AA = np.nanmean(class_acc)  # Use nanmean to ignore any NaN values
    kappa = cohen_kappa_score(y_true, y_pred)

    return AA, kappa, class_acc  # Return class accuracies

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)

    # Generate edge index using k-nearest neighbors and graph metrics
    edge_index = generate_graph(x)

    # Create train and test masks
    train_mask, test_mask = split_data(y)

    data = Data(x=x, y=y, edge_index=edge_index, train_mask=train_mask, test_mask=test_mask)

    model = GCN(in_channels=x.size(1), out_channels=y.max().item() + 1)
    train_model(data, model)
    test_model(data, model)

if __name__ == '__main__':
    main()


## **FUSION CENTROID ML WITH TRIPLET LOSS**

In [None]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import ChebConv
from torch_geometric.data import Data
from sklearn.metrics import cohen_kappa_score, accuracy_score, confusion_matrix
import numpy as np

# Step 4: Build the GCN model with ChebConv
class GCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels, K=3):
        super(GCN, self).__init__()
        self.conv1 = ChebConv(in_channels, 64, K)
        self.conv2 = ChebConv(64, out_channels, K)
        self.centroids = None  # To store class centroids

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.relu(self.conv1(x, edge_index))
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

    # Function to calculate class centroids
    def compute_class_centroids(self, x, y, mask):
        """ Calculate centroids for each class based on the labeled data. """
        num_classes = y.max().item() + 1
        centroids = []
        for c in range(num_classes):
            class_mask = (y[mask] == c)
            class_embeddings = x[mask][class_mask]
            if class_embeddings.size(0) > 0:
                centroid = class_embeddings.mean(dim=0)
            else:
                centroid = torch.zeros(x.size(1)).to(x.device)  # Corrected to ensure it's on the right device
            centroids.append(centroid)
        self.centroids = torch.stack(centroids)

    # Metric loss calculation
    def metric_loss(self, h, y, mask):
        """ Implement metric loss (if applicable). """
        if self.centroids is None:
            return torch.tensor(0.0)  # Return a tensor to avoid gradient issues

        # Example metric loss: cosine distance between embeddings and centroids
        h_masked = h[mask]
        y_masked = y[mask]

        pos_centroids = self.centroids[y_masked]
        pos_distances = torch.norm(h_masked - pos_centroids, dim=1)

        return pos_distances.mean()

    # Triplet loss calculation
    def triplet_loss(self, h, y, mask, margin=1.0):
        """ Triplet loss implementation with semi-hard negative mining. """
        if self.centroids is None:
            return torch.tensor(0.0)  # Return a tensor to avoid gradient issues

        h_masked = h[mask]
        y_masked = y[mask]

        # Calculate positive distances
        pos_distances = torch.norm(h_masked - self.centroids[y_masked], dim=1)

        # Calculate negative distances for all other classes
        neg_distances = torch.cat([torch.norm(h_masked - self.centroids[i], dim=1).unsqueeze(1)
                                   for i in range(self.centroids.size(0))], dim=1)

        # Expand positive distances to match the shape of neg_distances
        pos_distances_expanded = pos_distances.unsqueeze(1).expand_as(neg_distances)

        # Semi-hard negative mining: select the negatives
        semi_hard_negatives = torch.full_like(pos_distances, fill_value=margin + 1).to(h.device)  # Ensure it's on the correct device
        for i in range(h_masked.size(0)):
            neg_candidates = neg_distances[i][neg_distances[i] > pos_distances[i]]  # Valid negatives
            if len(neg_candidates) > 0:
                semi_hard_negatives[i] = neg_candidates.min()

        # Calculate triplet loss (margin-based)
        triplet_loss = F.relu(pos_distances - semi_hard_negatives + margin)

        return triplet_loss.mean()

# Step 5: Train and evaluate the model with combined loss (CE + Metric + Triplet)
def train_model(data, model, epochs=200, lr=0.01):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=5e-4)
    model.train()

    for epoch in range(epochs):
        optimizer.zero_grad()

        # Forward pass
        out = model(data)

        # Compute class centroids for labeled data
        model.compute_class_centroids(data.x, data.y, data.train_mask)

        # Compute losses
        ce_loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
        metric_loss = model.metric_loss(data.x, data.y, data.train_mask)
        triplet_loss = model.triplet_loss(data.x, data.y, data.train_mask)

        # Combine Cross-Entropy loss with Metric loss and Triplet loss
        loss = ce_loss + 0.1 * metric_loss + 0.1 * triplet_loss  # Adjust the weight of each loss

        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}, CE Loss: {ce_loss.item()}, Metric Loss: {metric_loss.item()}, Triplet Loss: {triplet_loss.item()}')

def test_model(data, model):
    model.eval()
    with torch.no_grad():
        out = model(data)
        _, pred = out.max(dim=1)

    correct = (pred[data.test_mask] == data.y[data.test_mask]).sum()
    acc = int(correct) / int(data.test_mask.sum())
    print(f'Accuracy: {acc:.4f}')

    # Calculate AA and Kappa
    AA, kappa = calculate_metrics(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())
    print(f'Average Accuracy (AA): {AA:.4f}, Kappa: {kappa:.4f}')

    # Affichage de la précision par classe
    class_precisions = calculate_class_precisions(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())
    for class_index, precision in enumerate(class_precisions):
        print(f'Precision for class {class_index}: {precision:.4f}')

def calculate_class_precisions(y_true, y_pred):
    """ Calculate precision for each class. """
    conf_matrix = confusion_matrix(y_true, y_pred)
    class_precisions = conf_matrix.diagonal() / conf_matrix.sum(axis=0)
    return np.nan_to_num(class_precisions)  # Replace NaN with 0

def calculate_metrics(y_true, y_pred):
    """ Calculate Average Accuracy (AA) and Cohen's Kappa. """
    # Calculate confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

    # Average Accuracy (AA)
    AA = np.nanmean(class_acc)  # Use nanmean to ignore any NaN values
    kappa = cohen_kappa_score(y_true, y_pred)

    return AA, kappa

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)

    # Generate edge index using k-nearest neighbors and graph metrics
    edge_index = generate_graph(x)

    # Create train and test masks
    train_mask, test_mask = split_data(y)

    data = Data(x=x, y=y, edge_index=edge_index, train_mask=train_mask, test_mask=test_mask)

    model = GCN(in_channels=x.size(1), out_channels=y.max().item() + 1)
    train_model(data, model)
    test_model(data, model)

if __name__ == '__main__':
    main()
import torch
import torch.nn.functional as F
from torch_geometric.nn import ChebConv
from torch_geometric.data import Data
from sklearn.metrics import cohen_kappa_score, accuracy_score, confusion_matrix
import numpy as np

# Step 4: Build the GCN model with ChebConv
class GCN(torch.nn.Module):
    def __init__(self, in_channels, out_channels, K=3):
        super(GCN, self).__init__()
        self.conv1 = ChebConv(in_channels, 64, K)
        self.conv2 = ChebConv(64, out_channels, K)
        self.centroids = None  # To store class centroids

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = F.relu(self.conv1(x, edge_index))
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

    # Function to calculate class centroids
    def compute_class_centroids(self, x, y, mask):
        """ Calculate centroids for each class based on the labeled data. """
        num_classes = y.max().item() + 1
        centroids = []
        for c in range(num_classes):
            class_mask = (y[mask] == c)
            class_embeddings = x[mask][class_mask]
            if class_embeddings.size(0) > 0:
                centroid = class_embeddings.mean(dim=0)
            else:
                centroid = torch.zeros(x.size(1)).to(x.device)  # Corrected to ensure it's on the right device
            centroids.append(centroid)
        self.centroids = torch.stack(centroids)

    # Metric loss calculation
    def metric_loss(self, h, y, mask):
        """ Implement metric loss (if applicable). """
        if self.centroids is None:
            return torch.tensor(0.0)  # Return a tensor to avoid gradient issues

        # Example metric loss: cosine distance between embeddings and centroids
        h_masked = h[mask]
        y_masked = y[mask]

        pos_centroids = self.centroids[y_masked]
        pos_distances = torch.norm(h_masked - pos_centroids, dim=1)

        return pos_distances.mean()

    # Triplet loss calculation
    def triplet_loss(self, h, y, mask, margin=1.0):
        """ Triplet loss implementation with semi-hard negative mining. """
        if self.centroids is None:
            return torch.tensor(0.0)  # Return a tensor to avoid gradient issues

        h_masked = h[mask]
        y_masked = y[mask]

        # Calculate positive distances
        pos_distances = torch.norm(h_masked - self.centroids[y_masked], dim=1)

        # Calculate negative distances for all other classes
        neg_distances = torch.cat([torch.norm(h_masked - self.centroids[i], dim=1).unsqueeze(1)
                                   for i in range(self.centroids.size(0))], dim=1)

        # Expand positive distances to match the shape of neg_distances
        pos_distances_expanded = pos_distances.unsqueeze(1).expand_as(neg_distances)

        # Semi-hard negative mining: select the negatives
        semi_hard_negatives = torch.full_like(pos_distances, fill_value=margin + 1).to(h.device)  # Ensure it's on the correct device
        for i in range(h_masked.size(0)):
            neg_candidates = neg_distances[i][neg_distances[i] > pos_distances[i]]  # Valid negatives
            if len(neg_candidates) > 0:
                semi_hard_negatives[i] = neg_candidates.min()

        # Calculate triplet loss (margin-based)
        triplet_loss = F.relu(pos_distances - semi_hard_negatives + margin)

        return triplet_loss.mean()

# Step 5: Train and evaluate the model with combined loss (CE + Metric + Triplet)
def train_model(data, model, epochs=200, lr=0.01):
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=5e-4)
    model.train()

    for epoch in range(epochs):
        optimizer.zero_grad()

        # Forward pass
        out = model(data)

        # Compute class centroids for labeled data
        model.compute_class_centroids(data.x, data.y, data.train_mask)

        # Compute losses
        ce_loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
        metric_loss = model.metric_loss(data.x, data.y, data.train_mask)
        triplet_loss = model.triplet_loss(data.x, data.y, data.train_mask)

        # Combine Cross-Entropy loss with Metric loss and Triplet loss
        loss = ce_loss + 0.1 * metric_loss + 0.1 * triplet_loss  # Adjust the weight of each loss

        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}, CE Loss: {ce_loss.item()}, Metric Loss: {metric_loss.item()}, Triplet Loss: {triplet_loss.item()}')

def test_model(data, model):
    model.eval()
    with torch.no_grad():
        out = model(data)
        _, pred = out.max(dim=1)

    correct = (pred[data.test_mask] == data.y[data.test_mask]).sum()
    acc = int(correct) / int(data.test_mask.sum())
    print(f'Accuracy: {acc:.4f}')

    # Calculate AA and Kappa
    AA, kappa = calculate_metrics(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())
    print(f'Average Accuracy (AA): {AA:.4f}, Kappa: {kappa:.4f}')

    # Affichage de la précision par classe
    class_precisions = calculate_class_precisions(data.y[data.test_mask].cpu().numpy(), pred[data.test_mask].cpu().numpy())
    for class_index, precision in enumerate(class_precisions):
        print(f'Precision for class {class_index}: {precision:.4f}')

def calculate_class_precisions(y_true, y_pred):
    """ Calculate precision for each class. """
    conf_matrix = confusion_matrix(y_true, y_pred)
    class_precisions = conf_matrix.diagonal() / conf_matrix.sum(axis=0)
    return np.nan_to_num(class_precisions)  # Replace NaN with 0

def calculate_metrics(y_true, y_pred):
    """ Calculate Average Accuracy (AA) and Cohen's Kappa. """
    # Calculate confusion matrix
    conf_matrix = confusion_matrix(y_true, y_pred)
    class_acc = conf_matrix.diagonal() / conf_matrix.sum(axis=1)

    # Average Accuracy (AA)
    AA = np.nanmean(class_acc)  # Use nanmean to ignore any NaN values
    kappa = cohen_kappa_score(y_true, y_pred)

    return AA, kappa

# Putting it all together
def main():
    hsi_data, gt_data = load_data()
    x, y = preprocess_data(hsi_data, gt_data)

    # Generate edge index using k-nearest neighbors and graph metrics
    edge_index = generate_graph(x)

    # Create train and test masks
    train_mask, test_mask = split_data(y)

    data = Data(x=x, y=y, edge_index=edge_index, train_mask=train_mask, test_mask=test_mask)

    model = GCN(in_channels=x.size(1), out_channels=y.max().item() + 1)
    train_model(data, model)
    test_model(data, model)

if __name__ == '__main__':
    main()
