# Library

In [29]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import kornia.augmentation as K
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

In [30]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [31]:
# Constants
NUM_BANDS = 100  # Number of hyperspectral bands
PATCH_SIZE = 64  # Default patch size for images
NUM_CLASSES = 3  # Update this based on your dataset


In [32]:
# Define paths
DATA_DIR = "Deep_Learning/Project (Spectral)/Data/ot"
TRAIN_CSV = "C:/IIUM/AI Note IIUM/Deep_Learning/Project (Spectral)/Data/train.csv"
TEST_CSV = "C:/IIUM/AI Note IIUM/Deep_Learning/Project (Spectral)/Data/test.csv"

In [33]:
BANDS = 100
BATCH_SIZE = 32
EPOCHS = 50
LEARNING_RATE = 0.001
NUM_BANDS = 100

In [34]:
# Define the working HyperspectralDataset class
class HyperspectralDataset(Dataset):
    def __init__(self, dataframe, base_path, patch_size=PATCH_SIZE, augment=False, num_bands=NUM_BANDS):
        super().__init__()
        self.df = dataframe.reset_index(drop=True)
        self.base_path = base_path
        self.patch_size = patch_size
        self.augment = augment
        self.num_bands = num_bands

        # Set up transformations
        self.transform = K.AugmentationSequential(
            K.RandomHorizontalFlip(p=0.3),
            K.RandomVerticalFlip(p=0.3),
            K.RandomAffine(degrees=5, translate=(0.05, 0.05), scale=(0.95, 1.05), p=0.5),
            K.RandomCrop((patch_size, patch_size), padding=4, p=0.5),
            data_keys=["input"]
        ) if augment else None

    def __len__(self):
        return len(self.df)

    def _load_image(self, img_path):
        try:
            img = np.load(img_path)
            if img.ndim == 2:
                img = np.repeat(img[:, :, None], self.num_bands, axis=2)
            elif img.shape[2] != self.num_bands:
                diff = self.num_bands - img.shape[2]
                if diff > 0:
                    img = np.pad(img, ((0,0), (0,0), (0,diff)), mode='constant')
                else:
                    img = img[:, :, :self.num_bands]
            return img.astype(np.float32) / 65535.0
        except Exception as e:
            print(f"💀 Error loading {img_path}: {e}")
            return np.zeros((self.patch_size, self.patch_size, self.num_bands), dtype=np.float32)

    def _prepare_tensor(self, img_np):
        img_tensor = torch.from_numpy(img_np).permute(2, 0, 1)  # [C, H, W]
        if self.augment:
            img_tensor = self.transform(img_tensor.unsqueeze(0))[0] # type: ignore
        if img_tensor.shape[1:] != (self.patch_size, self.patch_size):
            img_tensor = F.interpolate(img_tensor.unsqueeze(0), size=(self.patch_size, self.patch_size), mode='bilinear').squeeze(0)
        return img_tensor

    def _process_label(self, raw_label):
        label = torch.tensor(raw_label, dtype=torch.long)
        return label - 1 if label > 0 else label

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.base_path, row['id'])
        img_np = self._load_image(img_path)
        img_tensor = self._prepare_tensor(img_np)
        label_tensor = self._process_label(row['label'])
        return img_tensor, label_tensor

In [35]:
# Define an improved CNN model for hyperspectral images
class HyperspectralCNN(nn.Module):
    def __init__(self, in_channels=NUM_BANDS, num_classes=NUM_CLASSES):
        super(HyperspectralCNN, self).__init__()
        
        # Initial convolution to reduce the number of channels
        self.spectral_reduction = nn.Sequential(
            nn.Conv2d(in_channels, 32, kernel_size=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True)
        )
        
        # Spatial feature extraction path
        self.spatial_features = nn.Sequential(
            # First block
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Second block
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Third block
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            # Fourth block
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Adaptive pooling to handle different input sizes
        self.adaptive_pool = nn.AdaptiveAvgPool2d((2, 2))
        
        # Classifier
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 2 * 2, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        
    def forward(self, x):
        x = self.spectral_reduction(x)
        x = self.spatial_features(x)
        x = self.adaptive_pool(x)
        x = self.classifier(x)
        return x

In [36]:
# Function to train the model
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler=None, num_epochs=25):
    best_val_acc = 0.0
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Each epoch has training and validation phases
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
                dataloader = train_loader
            else:
                model.eval()
                dataloader = val_loader
            
            running_loss = 0.0
            running_corrects = 0
            total_samples = 0
            
            # Iterate over data
            pbar = tqdm(dataloader, desc=f"{phase}")
            for inputs, labels in pbar:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                # Zero the parameter gradients
                optimizer.zero_grad()
                
                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                    # Backward + optimize only in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
                total_samples += inputs.size(0)
                
                # Update progress bar
                pbar.set_postfix(loss=f"{loss.item():.4f}")
            
            epoch_loss = running_loss / total_samples
            epoch_acc = running_corrects.double() / total_samples
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            # Record history
            if phase == 'train':
                history['train_loss'].append(epoch_loss)
                history['train_acc'].append(epoch_acc.item())
            else:
                history['val_loss'].append(epoch_loss)
                history['val_acc'].append(epoch_acc.item())
                
                # Save best model
                if epoch_acc > best_val_acc:
                    best_val_acc = epoch_acc
                    torch.save({
                        'epoch': epoch,
                        'model_state_dict': model.state_dict(),
                        'optimizer_state_dict': optimizer.state_dict(),
                        'val_acc': epoch_acc,
                    }, 'best_hyperspectral_model.pth')
                    print(f"Saved new best model with validation accuracy: {epoch_acc:.4f}")
        
        # Step the scheduler if provided
        if scheduler is not None and phase == 'val':
            scheduler.step(epoch_loss)
            
        print()
    
    print(f'Best val Acc: {best_val_acc:.4f}')
    
    # Load best model weights
    checkpoint = torch.load('best_hyperspectral_model.pth')
    model.load_state_dict(checkpoint['model_state_dict'])
    return model, history

In [37]:
# Function to evaluate the model on the test set
def evaluate_model(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc="Evaluating"):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    conf_matrix = confusion_matrix(all_labels, all_preds)
    
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test F1 Score: {f1:.4f}")
    
    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.savefig('confusion_matrix.png')
    plt.close()
    
    return accuracy, f1, conf_matrix

In [38]:
def plot_training_history(history):
    # Plot training & validation accuracy and loss
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(history['train_acc'])
    plt.plot(history['val_acc'])
    plt.title('Model accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')
    
    plt.subplot(1, 2, 2)
    plt.plot(history['train_loss'])
    plt.plot(history['val_loss'])
    plt.title('Model loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper left')
    
    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.close()


In [40]:
def main():
    # Read CSV files
    train_df = pd.read_csv(TRAIN_CSV)
    test_df = pd.read_csv(TEST_CSV)
    
    # Print dataset information
    print(f"Training samples: {len(train_df)}")
    print(f"Testing samples: {len(test_df)}")
    print(f"Training CSV columns: {train_df.columns.tolist()}")
    
    if 'label' in train_df.columns:
        print("Class distribution in training set:")
        print(train_df['label'].value_counts())
        # Update NUM_CLASSES based on data
        global NUM_CLASSES
        NUM_CLASSES = len(train_df['label'].unique())
        print(f"Number of classes detected: {NUM_CLASSES}")
    
    # Create datasets
    train_val_ratio = 0.8
    train_size = int(train_val_ratio * len(train_df))
    val_size = len(train_df) - train_size
    
    # Split dataframe for train/val
    train_df_split = train_df.sample(frac=1, random_state=42).reset_index(drop=True)
    train_df_subset = train_df_split.iloc[:train_size]
    val_df_subset = train_df_split.iloc[train_size:]
    
    # Create datasets with augmentation for training
    train_dataset = HyperspectralDataset(
        dataframe=train_df_subset,
        base_path=DATA_DIR,
        augment=True  # Apply augmentation to training
    )
    
    val_dataset = HyperspectralDataset(
        dataframe=val_df_subset,
        base_path=DATA_DIR,
        augment=False  # No augmentation for validation
    )
    
    test_dataset = HyperspectralDataset(
        dataframe=test_df,
        base_path=DATA_DIR,
        augment=False  # No augmentation for testing
    )
    
    # Test data loading
    print("Testing data loading...")
    try:
        sample_img, sample_label = train_dataset[0]
        print(f"Sample image shape: {sample_img.shape}, Label: {sample_label}")
    except Exception as e:
        print(f"Error loading sample: {e}")
        return
    
    # Create data loaders
    batch_size = 16
    
    train_loader = DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True, 
        num_workers=2,
        pin_memory=True
    )
    
    val_loader = DataLoader(
        val_dataset, 
        batch_size=batch_size, 
        shuffle=False, 
        num_workers=2,
        pin_memory=True
    )
    
    test_loader = DataLoader(
        test_dataset, 
        batch_size=batch_size, 
        shuffle=False, 
        num_workers=2,
        pin_memory=True
    )
    
    # Initialize model
    model = HyperspectralCNN(in_channels=NUM_BANDS, num_classes=NUM_CLASSES).to(device)
    print(f"Model initialized on: {device}")
    print(f"Model parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")
    
    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
    
    # Learning rate scheduler
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, 
        mode='min',
        factor=0.5,
        patience=5,
        verbose=True
    )
    
    # Train the model
    print("Starting training...")
    model, history = train_model(
        model, 
        train_loader, 
        val_loader, 
        criterion, 
        optimizer,
        scheduler,
        num_epochs=30
    )
    
    # Plot training history
    plot_training_history(history)
    
    # Evaluate the model
    accuracy, f1, conf_matrix = evaluate_model(model, test_loader)
    
    print("Training and evaluation complete!")
    print(f"Final Test Accuracy: {accuracy:.4f}")
    print(f"Final Test F1 Score: {f1:.4f}")
    
    # Save the final model
    torch.save({
        'model_state_dict': model.state_dict(),
        'num_bands': NUM_BANDS,
        'num_classes': NUM_CLASSES,
        'patch_size': PATCH_SIZE,
        'test_accuracy': accuracy,
        'test_f1': f1
    }, 'final_hyperspectral_model.pth')
    
    print("Model saved to 'final_hyperspectral_model.pth'")

if __name__ == "__main__":
    main()

Training samples: 2177
Testing samples: 545
Training CSV columns: ['id', 'label']
Class distribution in training set:
label
44    34
21    33
12    31
92    30
66    30
      ..
34    13
48    13
31    12
89    12
10     9
Name: count, Length: 101, dtype: int64
Number of classes detected: 101
Testing data loading...
💀 Error loading Deep_Learning/Project (Spectral)/Data/ot\sample453.npy: [Errno 2] No such file or directory: 'Deep_Learning/Project (Spectral)/Data/ot\\sample453.npy'
Sample image shape: torch.Size([100, 64, 64]), Label: 78
Model initialized on: cuda
Model parameters: 2673797




Starting training...
Epoch 1/30
----------


train:   0%|          | 0/109 [00:10<?, ?it/s]


RuntimeError: DataLoader worker (pid(s) 21532, 9056) exited unexpectedly