In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import numpy as np
import scipy.io as sio

# Load the hyperspectral data and ground-truth labels
data = sio.loadmat('Indian_pines_corrected.mat')  # Replace with your file path
ground_truth_data = sio.loadmat('Indian_pines_gt.mat')  # Replace with your file path

# Extract hyperspectral image and ground truth
hsi_data = data['indian_pines_corrected']  # HSI data, shape (145, 145, 200)
ground_truth = ground_truth_data['indian_pines_gt']  # Ground truth labels, shape (145, 145)

def extract_patches(hsi_data, ground_truth, patch_size=3):
    pad_size = patch_size // 2
    hsi_padded = np.pad(hsi_data, ((pad_size, pad_size), (pad_size, pad_size), (0, 0)), mode='reflect')
    
    patches = []
    labels = []
    for i in range(pad_size, hsi_data.shape[0] + pad_size):
        for j in range(pad_size, hsi_data.shape[1] + pad_size):
            patch = hsi_padded[i - pad_size:i + pad_size + 1, j - pad_size:j + pad_size + 1, :]
            patches.append(patch)
            labels.append(ground_truth[i - pad_size, j - pad_size])
    
    patches = np.array(patches)
    labels = np.array(labels)
    valid_indices = labels > 0
    return patches[valid_indices], labels[valid_indices] - 1  # Only keep labeled pixels

# Example usage with patch extraction
patch_size = 3
data_patches, labels = extract_patches(hsi_data, ground_truth, patch_size=patch_size)
data_patches = torch.tensor(data_patches, dtype=torch.float32).permute(0, 3, 1, 2)  # [N, bands, patch_size, patch_size]
labels = torch.tensor(labels, dtype=torch.long)

# Split data into training and testing sets
data_train, data_test, labels_train, labels_test = train_test_split(
    data_patches, labels, test_size=0.2, random_state=42
)

# Convert to PyTorch tensors
data_train = data_train.clone().detach()
data_test = data_test.clone().detach()
labels_train = labels_train.clone().detach()
labels_test = labels_test.clone().detach()

# Create DataLoader for training and testing
batch_size = 64
train_dataset = TensorDataset(data_train, labels_train)
test_dataset = TensorDataset(data_test, labels_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Spatial Attention Block
class SpatialAttention(nn.Module):
    def __init__(self, in_channels):
        super(SpatialAttention, self).__init__()
        self.reduce_conv = nn.Conv2d(2, 1, kernel_size=1)  # 1x1 conv to reduce to 1 channel
        self.conv = nn.Conv2d(1, 1, kernel_size=7, padding=3, bias=False)
        
    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        out = torch.cat([avg_out, max_out], dim=1)
        
        out = F.relu(self.reduce_conv(out))  # Reduce to 1 channel before spatial attention
        out = torch.sigmoid(self.conv(out))
        return x * out

# Model with Spatial-Spectral Attention 
class ImprovedHyperspectralCNN(nn.Module):
    def __init__(self, num_bands, num_classes):
        super(ImprovedHyperspectralCNN, self).__init__()
        
        self.conv1 = nn.Conv2d(num_bands, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.spatial_att1 = SpatialAttention(64)
        
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        self.spectral_att1 = SpatialAttention(128)
        
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(256)
        self.spectral_att2 = SpatialAttention(256)
        
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(256, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.spatial_att1(x)
        
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.spectral_att1(x)
        
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.spectral_att2(x)
        
        x = self.pool(x).view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

# Focal Loss with Class Weights
class FocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

    def forward(self, inputs, targets):
        # Compute standard Cross Entropy Loss with no reduction
        BCE_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-BCE_loss)

        # Gather the correct class weight for each target in the batch
        alpha_t = self.alpha[targets] if self.alpha is not None else 1.0

        # Compute Focal Loss with class weights applied per sample
        F_loss = (alpha_t * (1 - pt) ** self.gamma * BCE_loss).mean()
        return F_loss

# Compute class weights and initialize Focal Loss
class_counts = torch.bincount(labels_train)
class_weights = 1.0 / class_counts.float()
class_weights = class_weights / class_weights.sum()
focal_loss = FocalLoss(alpha=class_weights.to(data_train.device))

# Training parameters
num_classes = len(torch.unique(labels_train))
learning_rate = 0.0005
num_epochs = 30

# Initialize model, optimizer, and loss function
model = ImprovedHyperspectralCNN(num_bands=data_patches.shape[1], num_classes=num_classes)
optimizer = optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=0.01)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for data, target in train_loader:
        outputs = model(data)
        loss = focal_loss(outputs, target)
        
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        
        running_loss += loss.item() * data.size(0)
    
    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")
    
    scheduler.step()

# Evaluation function
def evaluate_model(model, data_loader):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for data, labels in data_loader:
            outputs = model(data)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    accuracy = accuracy_score(all_labels, all_preds)
    conf_matrix = confusion_matrix(all_labels, all_preds)
    class_report = classification_report(all_labels, all_preds, digits=4)
    
    print(f"Accuracy: {accuracy:.4f}")
    print("Confusion Matrix:\n", conf_matrix)
    print("Classification Report:\n", class_report)
    
    return accuracy, conf_matrix, class_report

# Evaluate the model on the test set
test_accuracy, test_conf_matrix, test_class_report = evaluate_model(model, test_loader)


Epoch [1/30], Loss: 0.0163
Epoch [2/30], Loss: 0.0079
Epoch [3/30], Loss: 0.0055
Epoch [4/30], Loss: 0.0041
Epoch [5/30], Loss: 0.0038
Epoch [6/30], Loss: 0.0029
Epoch [7/30], Loss: 0.0025
Epoch [8/30], Loss: 0.0022
Epoch [9/30], Loss: 0.0018
Epoch [10/30], Loss: 0.0018
Epoch [11/30], Loss: 0.0013
Epoch [12/30], Loss: 0.0012
Epoch [13/30], Loss: 0.0011
Epoch [14/30], Loss: 0.0011
Epoch [15/30], Loss: 0.0012
Epoch [16/30], Loss: 0.0009
Epoch [17/30], Loss: 0.0009
Epoch [18/30], Loss: 0.0010
Epoch [19/30], Loss: 0.0008
Epoch [20/30], Loss: 0.0007
Epoch [21/30], Loss: 0.0009
Epoch [22/30], Loss: 0.0007
Epoch [23/30], Loss: 0.0006
Epoch [24/30], Loss: 0.0006
Epoch [25/30], Loss: 0.0006
Epoch [26/30], Loss: 0.0007
Epoch [27/30], Loss: 0.0006
Epoch [28/30], Loss: 0.0005
Epoch [29/30], Loss: 0.0005
Epoch [30/30], Loss: 0.0005
Accuracy: 0.7732
Confusion Matrix:
 [[ 14   0   0   0   1   0   1   0   0   0   0   0   0   0   0   0]
 [  0 188   1  12   2   0   0   0   0  13  13  40   0   0   0   0]

In [26]:
import numpy as np
from sklearn.covariance import EllipticEnvelope

# Function to get attention scores from the spectral attention layers
def get_combined_spectral_attention_scores(model, data_loader, num_bands=200):
    spectral_attention_scores_layer1 = []
    spectral_attention_scores_layer2 = []

    with torch.no_grad():
        for data, _ in data_loader:
            # Forward pass through each layer to capture spectral attention scores
            x = F.relu(model.bn1(model.conv1(data)))
            x = model.spatial_att1(x)
            
            x = F.relu(model.bn2(model.conv2(x)))
            a1 = model.spectral_att1(x)  # First spectral attention layer
            spectral_attention_scores_layer1.append(a1.cpu().numpy().mean(axis=(2, 3)))  # Average across spatial dimensions

            x = F.relu(model.bn3(model.conv3(x)))
            a2 = model.spectral_att2(x)  # Second spectral attention layer
            spectral_attention_scores_layer2.append(a2.cpu().numpy().mean(axis=(2, 3)))  # Average across spatial dimensions

    # Concatenate scores across all batches for each layer and compute the mean
    spectral_attention_scores_layer1 = np.concatenate(spectral_attention_scores_layer1, axis=0).mean(axis=0)
    spectral_attention_scores_layer2 = np.concatenate(spectral_attention_scores_layer2, axis=0).mean(axis=0)
    
    # Normalize scores to the number of original bands
    combined_scores = np.concatenate([spectral_attention_scores_layer1, spectral_attention_scores_layer2])
    
    # Reshape or truncate to match the number of original spectral bands
    if combined_scores.shape[0] > num_bands:
        combined_scores = combined_scores[:num_bands]
    elif combined_scores.shape[0] < num_bands:
        combined_scores = np.pad(combined_scores, (0, num_bands - combined_scores.shape[0]), mode='constant')

    return combined_scores




In [27]:
# Extract combined mean attention scores aligned to original band count
mean_attention_scores = get_combined_spectral_attention_scores(model, train_loader, num_bands=data_patches.shape[1])

# Function to select important bands using EllipticEnvelope
def select_important_bands(attention_scores, contamination_rate):
    """
    Selects important bands using EllipticEnvelope with an adjusted support_fraction.
    """
    env_model = EllipticEnvelope(contamination=contamination_rate, support_fraction=0.5)
    selected_bands = env_model.fit_predict(attention_scores.reshape(-1, 1))
    important_band_indices = [i for i, v in enumerate(selected_bands) if v == -1]
    return important_band_indices

# Identify important bands based on combined attention scores
important_bands = select_important_bands(mean_attention_scores, contamination_rate=0.02)
print("Most Important Bands Selected:", important_bands)

Most Important Bands Selected: [139, 146, 188, 192]
