# Plastic Classification with PCA Band Reduction
## Training on Google Colab Pro+

This notebook trains a 1D CNN classifier on PCA-reduced hyperspectral bands.

**Pipeline:**
1. Load normalized hyperspectral data (459 bands)
2. Apply PCA to reduce dimensions (e.g., 459 → 150 bands)
3. Train pixel-wise 1D CNN classifier
4. Compare multiple PCA configurations

**Setup Requirements:**
- Google Colab Pro+ (for GPU/High-RAM)
- Runtime: GPU (T4, V100, or A100)
- Upload your dataset to Google Drive

## 1. Setup Environment

In [None]:
# Check GPU availability
!nvidia-smi

In [None]:
# Install required packages
!pip install -q scikit-learn pillow tqdm matplotlib seaborn

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Import libraries
import os
import json
import pickle
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from PIL import Image
from tqdm import tqdm
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
if torch.cuda.is_available():
    print(f'GPU: {torch.cuda.get_device_name(0)}')
    print(f'Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB')

## 2. Configure Paths

**IMPORTANT:** Update these paths to point to your dataset in Google Drive

In [None]:
# ==============================================================================
# CONFIGURE THESE PATHS
# ==============================================================================

# Example: If your dataset is in 'My Drive/plastic_classification/'
DRIVE_BASE = '/content/drive/MyDrive/plastic_classification'

# Dataset paths
TRAIN_DATASET = os.path.join(DRIVE_BASE, 'training_dataset')
LABEL_PATH = os.path.join(DRIVE_BASE, 'Ground_Truth/labels.json')

# Output paths (results will be saved here)
OUTPUT_DIR = os.path.join(DRIVE_BASE, 'colab_results')
os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"Dataset: {TRAIN_DATASET}")
print(f"Labels: {LABEL_PATH}")
print(f"Output: {OUTPUT_DIR}")

## 3. PCA Band Reduction

In [None]:
class PCABandReducer:
    """PCA-based dimensionality reduction for hyperspectral data."""
    
    def __init__(self, n_components=None, variance_threshold=0.99):
        self.n_components = n_components
        self.variance_threshold = variance_threshold
        self.pca = None
        self.scaler = None
        self.n_components_selected = None
        
    def fit(self, hypercube):
        """Fit PCA on hypercube."""
        n_bands, height, width = hypercube.shape
        X = hypercube.reshape(n_bands, -1).T  # (n_pixels, n_bands)
        
        print(f"Fitting PCA on {X.shape[0]:,} pixels x {X.shape[1]} bands...")
        
        # Standardize
        self.scaler = StandardScaler()
        X = self.scaler.fit_transform(X)
        
        # Determine n_components
        if self.n_components is None:
            pca_full = PCA()
            pca_full.fit(X)
            cumsum_var = np.cumsum(pca_full.explained_variance_ratio_)
            self.n_components_selected = np.argmax(cumsum_var >= self.variance_threshold) + 1
        else:
            self.n_components_selected = self.n_components
        
        # Fit PCA
        self.pca = PCA(n_components=self.n_components_selected)
        self.pca.fit(X)
        
        variance_explained = np.sum(self.pca.explained_variance_ratio_)
        
        print(f"✓ PCA fitted: {n_bands} → {self.n_components_selected} components")
        print(f"✓ Variance explained: {variance_explained*100:.2f}%")
        
        return self
    
    def transform(self, spectrum):
        """Transform spectrum (1D array) to PCA space."""
        spectrum_2d = spectrum.reshape(1, -1)
        spectrum_std = self.scaler.transform(spectrum_2d)
        spectrum_pca = self.pca.transform(spectrum_std)
        return spectrum_pca.flatten()
    
    def save(self, filepath):
        """Save PCA model."""
        with open(filepath, 'wb') as f:
            pickle.dump({
                'pca': self.pca,
                'scaler': self.scaler,
                'n_components_selected': self.n_components_selected,
                'variance_threshold': self.variance_threshold
            }, f)
        print(f"✓ PCA model saved to {filepath}")
    
    @classmethod
    def load(cls, filepath):
        """Load PCA model."""
        with open(filepath, 'rb') as f:
            data = pickle.load(f)
        
        reducer = cls()
        reducer.pca = data['pca']
        reducer.scaler = data['scaler']
        reducer.n_components_selected = data['n_components_selected']
        reducer.variance_threshold = data.get('variance_threshold', 0.99)
        
        print(f"✓ PCA model loaded: {reducer.n_components_selected} components")
        return reducer

In [None]:
def load_hypercube(dataset_path):
    """Load normalized hyperspectral cube."""
    dataset_path = Path(dataset_path)
    
    # Load wavelengths
    with open(dataset_path / 'header.json', 'r') as f:
        header = json.load(f)
    wavelengths = header['wavelength (nm)']
    
    # Load bands
    print(f"Loading {len(wavelengths)} bands...")
    bands = []
    for i in tqdm(range(1, len(wavelengths) + 1), desc='Loading bands'):
        img_path = dataset_path / f'ImagesStack{i:03d}.png'
        if img_path.exists():
            img = np.array(Image.open(img_path).convert('L'), dtype=np.float32) / 255.0
            bands.append(img)
    
    hypercube = np.stack(bands, axis=0)
    print(f"✓ Hypercube loaded: {hypercube.shape}")
    
    return hypercube, wavelengths

## 4. Dataset and DataLoader

In [None]:
class HyperspectralDataset(Dataset):
    """Dataset for pixel-wise hyperspectral classification."""
    
    def __init__(self, dataset_path, label_path, pca_reducer=None, max_samples=None):
        self.dataset_path = Path(dataset_path)
        self.pca_reducer = pca_reducer
        
        # Load wavelengths
        with open(self.dataset_path / 'header.json', 'r') as f:
            header = json.load(f)
        self.wavelengths = header['wavelength (nm)']
        self.n_bands = len(self.wavelengths)
        
        # Load labels
        with open(label_path, 'r') as f:
            labels_data = json.load(f)
        
        # Extract samples
        self.samples = []
        for label_info in labels_data:
            class_id = label_info['label']
            for coord in label_info['coordinates']:
                x, y = coord
                self.samples.append({'x': x, 'y': y, 'label': class_id})
        
        if max_samples:
            self.samples = self.samples[:max_samples]
        
        print(f"✓ Dataset: {len(self.samples):,} samples")
    
    def load_spectrum(self, x, y):
        """Load spectral signature for pixel (x, y)."""
        spectrum = np.zeros(self.n_bands, dtype=np.float32)
        
        for i in range(1, self.n_bands + 1):
            img_path = self.dataset_path / f'ImagesStack{i:03d}.png'
            if img_path.exists():
                img = Image.open(img_path).convert('L')
                spectrum[i - 1] = np.array(img)[y, x] / 255.0
        
        return spectrum
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        sample = self.samples[idx]
        x, y, label = sample['x'], sample['y'], sample['label']
        
        # Load spectrum
        spectrum = self.load_spectrum(x, y)
        
        # Apply PCA if available
        if self.pca_reducer:
            spectrum = self.pca_reducer.transform(spectrum)
        
        return torch.from_numpy(spectrum).float(), torch.tensor(label, dtype=torch.long)

## 5. 1D CNN Model

In [None]:
class SpectralCNN1D(nn.Module):
    """1D CNN for pixel-wise hyperspectral classification."""
    
    def __init__(self, n_bands, n_classes, dropout_rate=0.5):
        super().__init__()
        
        self.n_bands = n_bands
        self.n_classes = n_classes
        
        # 1D Convolutional layers
        self.conv1 = nn.Conv1d(1, 64, kernel_size=5, padding=2)
        self.bn1 = nn.BatchNorm1d(64)
        self.pool1 = nn.MaxPool1d(2)
        
        self.conv2 = nn.Conv1d(64, 128, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(128)
        self.pool2 = nn.MaxPool1d(2)
        
        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(256)
        
        # Calculate flattened size
        self.flat_size = 256 * (n_bands // 4)
        
        # Fully connected layers
        self.fc1 = nn.Linear(self.flat_size, 512)
        self.dropout1 = nn.Dropout(dropout_rate)
        
        self.fc2 = nn.Linear(512, 256)
        self.dropout2 = nn.Dropout(dropout_rate)
        
        self.fc3 = nn.Linear(256, n_classes)
        
        self.relu = nn.ReLU()
    
    def forward(self, x):
        # Input: (batch, n_bands)
        x = x.unsqueeze(1)  # (batch, 1, n_bands)
        
        # Conv blocks
        x = self.pool1(self.relu(self.bn1(self.conv1(x))))
        x = self.pool2(self.relu(self.bn2(self.conv2(x))))
        x = self.relu(self.bn3(self.conv3(x)))
        
        # Flatten
        x = x.view(x.size(0), -1)
        
        # FC layers
        x = self.relu(self.fc1(x))
        x = self.dropout1(x)
        
        x = self.relu(self.fc2(x))
        x = self.dropout2(x)
        
        x = self.fc3(x)
        
        return x
    
    def get_num_params(self):
        return sum(p.numel() for p in self.parameters())

## 6. Training Function

In [None]:
def train_model(model, train_loader, val_loader, n_epochs, learning_rate, output_dir, device):
    """Train the model."""
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # Learning rate scheduler
    scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
        optimizer, T_0=10, T_mult=2, eta_min=learning_rate/10
    )
    
    best_val_acc = 0.0
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    
    for epoch in range(1, n_epochs + 1):
        # Training
        model.train()
        train_loss = 0.0
        train_correct = 0
        train_total = 0
        
        pbar = tqdm(train_loader, desc=f'Epoch {epoch}/{n_epochs} [Train]')
        for spectra, labels in pbar:
            spectra, labels = spectra.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(spectra)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()
            
            pbar.set_postfix({
                'loss': f'{train_loss/(pbar.n+1):.4f}',
                'acc': f'{100.*train_correct/train_total:.2f}%'
            })
        
        train_loss /= len(train_loader)
        train_acc = 100. * train_correct / train_total
        
        # Validation
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for spectra, labels in tqdm(val_loader, desc=f'Epoch {epoch}/{n_epochs} [Val]'):
                spectra, labels = spectra.to(device), labels.to(device)
                outputs = model(spectra)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()
        
        val_loss /= len(val_loader)
        val_acc = 100. * val_correct / val_total
        
        # Update scheduler
        scheduler.step()
        
        # Save history
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        
        # Print epoch summary
        print(f"\nEpoch {epoch}: Train Loss={train_loss:.4f}, Train Acc={train_acc:.2f}% | "
              f"Val Loss={val_loss:.4f}, Val Acc={val_acc:.2f}%")
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'val_acc': val_acc,
            }, os.path.join(output_dir, 'best_model.pth'))
            print(f"✓ Best model saved (Val Acc: {val_acc:.2f}%)")
    
    return history, best_val_acc

## 7. Main Training Pipeline

In [None]:
# ==============================================================================
# TRAINING CONFIGURATION
# ==============================================================================

CONFIG = {
    'n_classes': 11,
    'batch_size': 1024,  # Large batch for Colab Pro+ GPU
    'n_epochs': 50,
    'learning_rate': 0.001,
    'dropout_rate': 0.5,
    'train_split': 0.9,
    'num_workers': 2,
    'max_samples': None,  # None = use all samples
    
    # PCA configurations to test
    'pca_configs': [None, 50, 100, 150, 200],  # None = no PCA
}

print("Training Configuration:")
for key, value in CONFIG.items():
    print(f"  {key}: {value}")

In [None]:
# Load hypercube for PCA fitting (once)
print("\n" + "="*80)
print("LOADING HYPERCUBE FOR PCA")
print("="*80)

hypercube, wavelengths = load_hypercube(TRAIN_DATASET)
n_original_bands = hypercube.shape[0]

print(f"\n✓ Loaded hypercube: {hypercube.shape}")
print(f"✓ Wavelengths: {wavelengths[0]:.1f} - {wavelengths[-1]:.1f} nm")

## 8. Train with Different PCA Configurations

In [None]:
results = []

for pca_comp in CONFIG['pca_configs']:
    print("\n" + "="*80)
    if pca_comp is None:
        print("TRAINING WITHOUT PCA (BASELINE)")
        exp_name = "no_pca"
        n_bands = n_original_bands
    else:
        print(f"TRAINING WITH PCA: {pca_comp} COMPONENTS")
        exp_name = f"pca_{pca_comp}"
        n_bands = pca_comp
    print("="*80)
    
    # Create output directory
    exp_output_dir = os.path.join(OUTPUT_DIR, exp_name)
    os.makedirs(exp_output_dir, exist_ok=True)
    
    # Fit PCA if needed
    if pca_comp is None:
        pca_reducer = None
    else:
        print(f"\nFitting PCA with {pca_comp} components...")
        pca_reducer = PCABandReducer(n_components=pca_comp)
        pca_reducer.fit(hypercube)
        pca_reducer.save(os.path.join(exp_output_dir, 'pca_model.pkl'))
    
    # Create datasets
    print("\nCreating datasets...")
    full_dataset = HyperspectralDataset(
        TRAIN_DATASET, LABEL_PATH, pca_reducer, CONFIG['max_samples']
    )
    
    # Split train/val
    n_samples = len(full_dataset)
    n_train = int(n_samples * CONFIG['train_split'])
    n_val = n_samples - n_train
    
    train_dataset, val_dataset = torch.utils.data.random_split(
        full_dataset, [n_train, n_val]
    )
    
    # Create dataloaders
    train_loader = DataLoader(
        train_dataset,
        batch_size=CONFIG['batch_size'],
        shuffle=True,
        num_workers=CONFIG['num_workers'],
        pin_memory=True
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=CONFIG['batch_size'],
        shuffle=False,
        num_workers=CONFIG['num_workers'],
        pin_memory=True
    )
    
    print(f"✓ Train samples: {n_train:,}")
    print(f"✓ Val samples: {n_val:,}")
    
    # Create model
    print(f"\nCreating model with {n_bands} input bands...")
    model = SpectralCNN1D(
        n_bands=n_bands,
        n_classes=CONFIG['n_classes'],
        dropout_rate=CONFIG['dropout_rate']
    ).to(device)
    
    print(f"✓ Model created: {model.get_num_params():,} parameters")
    
    # Train
    print(f"\nTraining for {CONFIG['n_epochs']} epochs...")
    history, best_val_acc = train_model(
        model, train_loader, val_loader,
        CONFIG['n_epochs'], CONFIG['learning_rate'],
        exp_output_dir, device
    )
    
    # Save history
    with open(os.path.join(exp_output_dir, 'history.json'), 'w') as f:
        json.dump(history, f, indent=2)
    
    # Record results
    results.append({
        'config': exp_name,
        'n_components': pca_comp,
        'n_bands': n_bands,
        'best_val_acc': best_val_acc,
        'output_dir': exp_output_dir
    })
    
    print(f"\n✓ Experiment '{exp_name}' completed!")
    print(f"  Best Val Acc: {best_val_acc:.2f}%")
    print(f"  Results saved to: {exp_output_dir}")

## 9. Compare Results

In [None]:
# Print comparison table
print("\n" + "="*80)
print("TRAINING COMPARISON SUMMARY")
print("="*80)
print(f"\n{'Configuration':<20} {'Bands':<10} {'Best Val Acc':<15}")
print("-" * 80)

for result in results:
    config_name = result['config'].replace('_', ' ').title()
    print(f"{config_name:<20} {result['n_bands']:<10} {result['best_val_acc']:>13.2f}%")

# Find best
best_result = max(results, key=lambda x: x['best_val_acc'])
print("\n" + "="*80)
print(f"BEST CONFIGURATION: {best_result['config'].replace('_', ' ').title()}")
print(f"  Validation Accuracy: {best_result['best_val_acc']:.2f}%")
print(f"  Number of Bands: {best_result['n_bands']}")
print(f"  Model saved at: {best_result['output_dir']}/best_model.pth")
if best_result['n_components']:
    print(f"  PCA model saved at: {best_result['output_dir']}/pca_model.pkl")
print("="*80)

# Save comparison
comparison_file = os.path.join(OUTPUT_DIR, 'comparison_results.json')
with open(comparison_file, 'w') as f:
    json.dump({
        'results': results,
        'best_config': best_result,
        'config': CONFIG
    }, f, indent=2)

print(f"\n✓ Comparison saved to: {comparison_file}")

## 10. Visualize Results

In [None]:
# Plot comparison
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Validation accuracy comparison
configs = [r['config'].replace('_', ' ').title() for r in results]
val_accs = [r['best_val_acc'] for r in results]
bands = [r['n_bands'] for r in results]

ax1.bar(configs, val_accs, alpha=0.7, color='steelblue')
ax1.set_ylabel('Validation Accuracy (%)', fontsize=12)
ax1.set_title('Best Validation Accuracy by Configuration', fontsize=14, fontweight='bold')
ax1.grid(axis='y', alpha=0.3)
plt.setp(ax1.xaxis.get_majorticklabels(), rotation=45, ha='right')

# Accuracy vs Number of Bands
ax2.scatter(bands, val_accs, s=100, alpha=0.7, color='darkgreen')
for i, config in enumerate(configs):
    ax2.annotate(config, (bands[i], val_accs[i]), 
                xytext=(5, 5), textcoords='offset points', fontsize=9)
ax2.set_xlabel('Number of Bands', fontsize=12)
ax2.set_ylabel('Validation Accuracy (%)', fontsize=12)
ax2.set_title('Accuracy vs Dimensionality', fontsize=14, fontweight='bold')
ax2.grid(alpha=0.3)

plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, 'comparison_plot.png'), dpi=150, bbox_inches='tight')
plt.show()

print(f"✓ Plot saved to: {os.path.join(OUTPUT_DIR, 'comparison_plot.png')}")

## 11. Download Results to Local Machine

Run this cell to download all results as a zip file.

In [None]:
# Create zip file of results
import shutil

zip_path = '/content/colab_results.zip'
shutil.make_archive('/content/colab_results', 'zip', OUTPUT_DIR)

print(f"✓ Results zipped to: {zip_path}")
print(f"\nDownload the file using:")
print(f"  from google.colab import files")
print(f"  files.download('{zip_path}')")

# Uncomment to auto-download
# from google.colab import files
# files.download(zip_path)

## Summary

This notebook:
1. ✓ Loads hyperspectral data from Google Drive
2. ✓ Applies PCA for dimensionality reduction
3. ✓ Trains pixel-wise 1D CNN classifier
4. ✓ Compares multiple PCA configurations
5. ✓ Saves best model and results

**Next Steps:**
- Download the best model and PCA reducer
- Use for inference on test images
- Fine-tune hyperparameters if needed