In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix
import matplotlib.pyplot as plt

In [2]:
class HyperspectralDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.class_labels = ['canola', 'soybean', 'sugarbeet', 'kochia', 
                           'common_ragweed', 'common_waterhemp', 'redroot_pigweed']
        self.file_paths = []
        self.labels = []
        
        # Collect all .npy files and their labels
        for label_idx, label in enumerate(self.class_labels):
            for i in range(1, 21):  # Assuming 20 samples per class
                file_path = os.path.join(data_dir, f"{label}_{i}.npy")
                if os.path.exists(file_path):
                    self.file_paths.append(file_path)
                    self.labels.append(label_idx)
    
    def __len__(self):
        return len(self.file_paths)
    
    def __getitem__(self, idx):
        # Load hyperspectral cube
        cube = np.load(self.file_paths[idx])
        
        # Resize to (32, 32, 224)
        cube = torch.tensor(cube, dtype=torch.float32).permute(2, 0, 1)  # (C, H, W)
        cube = nn.functional.interpolate(cube.unsqueeze(0), size=(32, 32), mode='bilinear').squeeze(0)
        
        # Normalize
        cube = (cube - cube.min()) / (cube.max() - cube.min())
        
        # Get label
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        
        return cube, label

In [3]:
class HyperspectralCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv1 = nn.Conv3d(1, 32, kernel_size=(3, 3, 10), stride=1)
        self.pool1 = nn.MaxPool3d(kernel_size=(2, 2, 2))
        self.conv2 = nn.Conv3d(32, 64, kernel_size=(3, 3, 10), stride=1)
        self.fc1 = nn.Linear(64 * 15 * 15 * 53, 128)  # Adjusted for input size
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(128, num_classes)
        
    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dim (B, 1, C, H, W)
        x = self.pool1(torch.relu(self.conv1(x)))
        x = torch.relu(self.conv2(x))
        x = torch.flatten(x, 1)
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [4]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25):
    best_acc = 0.0
    train_loss, val_acc = [], []
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Each epoch has training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()
            
            running_loss = 0.0
            running_corrects = 0
            
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            
            if phase == 'train':
                train_loss.append(epoch_loss)
            else:
                val_acc.append(epoch_acc)
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
    
    return model, train_loss, val_acc

In [8]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
# Initialize dataset
data_dir = "Users/tahmi/Documents/Work/ETAAV/DataSet/kochia/kochia/kochia"
dataset = HyperspectralDataset(data_dir)

In [9]:
# Ensure the dataset is not empty
if len(dataset) == 0:
    raise ValueError("The dataset is empty. Please check the data_dir and ensure it contains the expected .npy files.")

train_idx, val_idx = train_test_split(
    np.arange(len(dataset)), 
    test_size=0.2,
    stratify=dataset.labels,
    random_state=42
)

ValueError: The dataset is empty. Please check the data_dir and ensure it contains the expected .npy files.