In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from mamba_ssm import Mamba
import numpy as np
import scipy.io

# Read the characteristic data of HSI
mat_file_path = r'.../.mat'

mat_data = scipy.io.loadmat(mat_file_path)

variable_name = list(mat_data.keys())[-1]

data = mat_data[variable_name]

print(f"The dimensions of X is: {data.shape}")
X = data


# Read the label data of HSI
mat_file_path = r'.../.mat'


mat_data = scipy.io.loadmat(mat_file_path)

variable_name = list(mat_data.keys())[-1]  # 获取最后一个键名，通常是我们需要的变量名

data = mat_data[variable_name]

print(f"The dimensions of Y is: {data.shape}")
Y = data

In [None]:
flattened_Y = Y.flatten()
# Defines the number of HSI label types
unique_elements = set(flattened_Y)

In [None]:
# Convert HSI to two-dimensional data
X = X.reshape(X.shape[0] * X.shape[1], X.shape[2])
Y = Y.reshape(X.shape[0] * X.shape[1], X.shape[2])

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectFromModel
import numpy as np

# Converts a label to a 1-dimensional array
Y_pooled_reshaped = Y.ravel()

# A random forest classifier was used to evaluate the importance of features
clf = RandomForestClassifier(n_estimators=100, random_state=0)
clf.fit(X, Y_pooled_reshaped)

# Acquired feature importance
importances = clf.feature_importances_

# Use SelectFromModel to select features that are above average in importance
threshold = np.mean(importances)
sfm = SelectFromModel(clf, threshold=threshold)

# Fits the data to a selector to transform the data set
X_important = sfm.fit_transform(X, Y_pooled_reshaped)

A = X_important.shape[1]

In [None]:
from collections import Counter
X = X_important
y = Y_pooled_reshaped
X.shape
print(Counter(y))

In [None]:
# Delete samples of class 0 in y and record the number of rows for these samples
rows_to_delete = []
for index, label in enumerate(y):
    if label == 0.0:
        rows_to_delete.append(index)

# Delete the sample of class 0 in y
y_filtered = np.delete(y, rows_to_delete, axis=0)

# Delete the corresponding row in X
X_filtered = np.delete(X, rows_to_delete, axis=0)

In [None]:
# Data preprocessing
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_filtered)

# Convert data to PyTorch tensor
X_tensor = torch.tensor(X_scaled, dtype=torch.float32)
y_tensor = torch.tensor(y_filtered, dtype=torch.long)

# print(Counter(y_tensor))

In [None]:
import numpy as np
from imblearn.over_sampling import SMOTE
from sklearn.preprocessing import StandardScaler
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
from collections import Counter

# Partition data set
train_size = int(0.7 * len(X_tensor))
val_size = int(0.2 * len(X_tensor))
test_size = len(X_tensor) - train_size - val_size

# Split the dataset
train_dataset, val_dataset, test_dataset = random_split(TensorDataset(X_tensor, y_tensor), [train_size, val_size, test_size])


X_train, y_train = train_dataset.dataset.tensors
X_train, y_train = X_train.numpy(), y_train.numpy()

# Apply SMOTE only to the training set
smote = SMOTE(k_neighbors=2)
X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)


# Converts the processed data back to the PyTorch tensor
X_train_tensor_smote = torch.tensor(X_train_smote, dtype=torch.float32)
y_train_tensor_smote = torch.tensor(y_train_smote, dtype=torch.long)

# Create a new TensorDataset
train_dataset_smote = TensorDataset(X_train_tensor_smote, y_train_tensor_smote)

# Create a DataLoader
batch_size = 64
train_loader = DataLoader(train_dataset_smote, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CustomMamba(Mamba):
    def __init__(self, d_model, d_state, d_conv, expand, cnn_kernel_size, cnn_stride, cnn_padding):
        super(CustomMamba, self).__init__(d_model, d_state, d_conv, expand)
        self.gcn = GCNConv(in_channels=d_model, out_channels=d_model)
        self.linear = nn.Linear(d_model, d_model)

    def forward(self, x, edge_index):
        batch_size, seq_length, d_model = x.size()
        x = x.view(-1, d_model)

        x = self.gcn(x, edge_index)

        x = self.linear(x)

        x = x.view(batch_size, seq_length, d_model)

        x = super(CustomMamba, self).forward(x)
        return x

class GCNConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(GCNConv, self).__init__()
        self.linear = nn.Linear(in_channels, out_channels)

    def forward(self, x, edge_index):
        row, col = edge_index
        num_nodes = x.size(0)
        adj_matrix = torch.zeros((num_nodes, num_nodes), device=x.device)
        adj_matrix[row, col] = 1

        x = self.linear(x)
        x = torch.matmul(adj_matrix, x)
        return F.relu(x)

dim = X_tensor.shape[1]
model2 = CustomMamba(
    d_model=dim,
    d_state=16,
    d_conv=4,
    expand=2,
    cnn_kernel_size=3,
    cnn_stride=1,
    cnn_padding=1
).to("cuda")

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model2.parameters(), lr=0.001)

In [None]:
num_epochs = 200
for epoch in range(num_epochs):
    model2.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.to("cuda"), labels.to("cuda")

        batch_size = inputs.size(0)
        edge_index = torch.stack([
            torch.arange(batch_size, device=inputs.device),
            torch.arange(batch_size, device=inputs.device)
        ], dim=0)

        inputs = inputs.unsqueeze(1)  # (batch_size, 1, dim)
        outputs = model2(inputs, edge_index)  # 传入edge_index
        loss = criterion(outputs.squeeze(1), labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    model2.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in train_loader:
            inputs, labels = inputs.to("cuda"), labels.to("cuda")

            batch_size = inputs.size(0)
            edge_index = torch.stack([
                torch.arange(batch_size, device=inputs.device),
                torch.arange(batch_size, device=inputs.device)
            ], dim=0)

            inputs = inputs.unsqueeze(1)
            outputs = model2(inputs, edge_index)
            _, predicted = torch.max(outputs.squeeze(1).data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Accuracy: {100 * correct / total}%')

In [None]:
from sklearn.metrics import cohen_kappa_score

def evaluate_model_with_kappa(model, val_loader, device="cuda"):
    """
    Extended model evaluation function
    Parameter：
        model
        test_loader
        device:
    return：
        accuracy
        kappa
    """
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            batch_size = inputs.size(0)
            edge_index = torch.stack([
                torch.arange(batch_size, device=device),
                torch.arange(batch_size, device=device)
            ], dim=0)

            inputs = inputs.unsqueeze(1)
            outputs = model(inputs, edge_index)

            _, predicted = torch.max(outputs.squeeze(1).data, 1)

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())

    accuracy = 100 * sum(np.array(all_preds) == np.array(all_labels)) / len(all_labels)
    kappa = cohen_kappa_score(all_labels, all_preds)

    print(f"Test Accuracy: {accuracy:.2f}%")
    print(f"Cohen's Kappa: {kappa:.4f}")

    return accuracy, kappa

print("\nStarting final evaluation with Kappa...")
test_acc, test_kappa = evaluate_model_with_kappa(model2, val_loader)
print(f"Final Metrics - Accuracy: {test_acc:.2f}% | Kappa: {test_kappa:.4f}")

In [None]:
from sklearn.metrics import cohen_kappa_score, classification_report
import numpy as np

def evaluate_full_metrics(model, test_loader, device="cuda"):
    """
    Extended model evaluation function
    Parameter：
        model
        test_loader
        device:
    return：
        accuracy
        kappa
    """
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            batch_size = inputs.size(0)
            edge_index = torch.stack([
                torch.arange(batch_size, device=device),
                torch.arange(batch_size, device=device)
            ], dim=0)

            inputs = inputs.unsqueeze(1)
            outputs = model(inputs, edge_index)
            _, predicted = torch.max(outputs.squeeze(1).data, 1)

            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())

    true = np.array(all_labels)
    pred = np.array(all_preds)

    oa = 100 * np.sum(true == pred) / len(true)

    unique_classes = np.unique(true)
    class_acc = []
    for cls in unique_classes:
        mask = (true == cls)
        if np.sum(mask) > 0:
            class_acc.append(np.sum(pred[mask] == cls) / np.sum(mask))
    aa = 100 * np.mean(class_acc)

    kappa = cohen_kappa_score(true, pred)

    report = classification_report(true, pred, output_dict=True)
    f1 = 100 * report['macro avg']['f1-score']

    # 打印结果
    print(f"OA (Overall Accuracy): {oa:.2f}%")
    print(f"AA (Average Accuracy): {aa:.2f}%")
    print(f"Kappa Coefficient: {kappa:.4f}")
    print(f"Macro F1-Score: {f1:.2f}%")

    return oa, aa, kappa, f1

print("\nStarting comprehensive evaluation...")
oa, aa, kappa, f1 = evaluate_full_metrics(model2, test_loader)