In [None]:
import numpy as np
import time
import json
import os
from datetime import datetime
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# Check CUDA availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")

path = "../../data/binary/processed/"

datasets = ["mnist_01_pca_4",
            "mnist_01_pca_8",
            "mnist_38_pca_4",
            "mnist_38_pca_8"]

seeds = [42, 100, 20, 5, 99]
sample_sizes = [500, 2000, 4000]

In [None]:
class ShallowNN(nn.Module):
    
    def __init__(self, input_dim, hidden_dim, output_dim=2):
        super(ShallowNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        
        # Calculate total parameters
        self.total_params = sum(p.numel() for p in self.parameters())
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x
    
    def get_param_count(self):
        return self.total_params

def train_model(model, train_loader, criterion, optimizer, epochs, device):
    """Train the neural network."""
    model.train()
    for epoch in range(epochs):
        for batch_x, batch_y in train_loader:
            batch_x = batch_x.to(device)
            batch_y = batch_y.to(device)
            
            # Forward pass
            outputs = model(batch_x)
            loss = criterion(outputs, batch_y)
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

def evaluate_model(model, X, y, device):
    """Evaluate the model and return predictions."""
    model.eval()
    with torch.no_grad():
        X_tensor = torch.FloatTensor(X).to(device)
        outputs = model(X_tensor)
        _, predictions = torch.max(outputs, 1)
        predictions = predictions.cpu().numpy()
    return predictions


def remap_labels(y):
    """
    Remap labels to [0, 1] for binary classification.
    This is necessary for PyTorch CrossEntropyLoss.
    
    For example:
    - [0, 1] -> [0, 1] (no change)
    - [3, 8] -> [0, 1]
    """
    unique_labels = np.unique(y)
    if len(unique_labels) != 2:
        raise ValueError(f"Expected 2 unique labels, got {len(unique_labels)}: {unique_labels}")
    
    # Create mapping: first unique label -> 0, second -> 1
    label_map = {unique_labels[0]: 0, unique_labels[1]: 1}
    return np.array([label_map[label] for label in y])


# Hyperparameter search space
hidden_dims = [8, 16, 32]  # Will give ~50-100 params depending on input_dim
learning_rates = [0.001, 0.01, 0.1]
batch_sizes = [32, 64]
epochs_options = [50, 100]

In [None]:
# Initialize results storage
results_path = "../../results/classical_nn_baseline_results.json"
if os.path.exists(results_path):
    with open(results_path, 'r') as f:
        all_results = json.load(f)
    print(f"Loaded existing results with {len(all_results['results'])} experiments")
else:
    all_results = {
        "experiment_info": {
            "model_type": "shallow_neural_network",
            "date": datetime.now().isoformat(),
            "architecture": "Single hidden layer with ReLU activation",
            "hyperparameter_tuning": "Grid search over hidden_dim, learning_rate, batch_size, epochs",
            "optimizer": "Adam",
            "device": str(device)
        },
        "results": []
    }
    print("Created new results file")

# Main experiment loop
for dataset in datasets:
    print(f"\n{'='*70}")
    print(f"Dataset: {dataset}")
    
    # Load data
    X_full = np.load(os.path.join(path, f"{dataset}/X_train.npy"))
    y_full = np.load(os.path.join(path, f"{dataset}/y_train.npy"))
    X_test = np.load(os.path.join(path, f"{dataset}/X_test.npy"))
    y_test = np.load(os.path.join(path, f"{dataset}/y_test.npy"))
    
    
    y_full = remap_labels(y_full)
    y_test = remap_labels(y_test)
   
    
    print(f"Available training samples: {X_full.shape[0]}")
    print(f"Input dimension: {X_full.shape[1]}")
    print(f"Unique labels in training: {np.unique(y_full)}")  # Should be [0, 1]
    print(f"Unique labels in test: {np.unique(y_test)}")      # Should be [0, 1]
    print(f"{'='*70}\n")
    
    input_dim = X_full.shape[1]
    
    # Test different sample sizes
    for n_samples in sample_sizes:
        if n_samples > X_full.shape[0]:
            n_samples = X_full.shape[0]
        
        print(f"{'─'*70}")
        if n_samples == X_full.shape[0]:
            print(f"Training with FULL dataset ({n_samples} samples)")
        else:
            print(f"Training with {n_samples} samples (subsampled)")
        print(f"{'─'*70}")
        
        for seed in seeds:
            
            experiment_exists = any(
                r["dataset"] == dataset and 
                r["n_train"] == n_samples and 
                r["seed"] == seed
                for r in all_results["results"]
            )
            
            if experiment_exists:
                print(f"  Seed {seed:3d}: SKIPPING (already exists)")
                continue
            
            # Subsample if needed
            if n_samples < X_full.shape[0]:
                indices = np.random.RandomState(seed).choice(
                    X_full.shape[0], n_samples, replace=False
                )
                X_train = X_full[indices]
                y_train = y_full[indices]
            else:
                X_train = X_full
                y_train = y_full
            
            # Split into train/validation (80/20)
            X_train_split, X_val, y_train_split, y_val = train_test_split(
                X_train, y_train, test_size=0.2, random_state=seed, stratify=y_train
            )
            
            # Set random seeds for reproducibility
            torch.manual_seed(seed)
            np.random.seed(seed)
            if torch.cuda.is_available():
                torch.cuda.manual_seed(seed)
            
            # Hyperparameter tuning via grid search on validation set
            best_val_acc = 0
            best_params = {}
            
            for hidden_dim in hidden_dims:
                for lr in learning_rates:
                    for batch_size in batch_sizes:
                        for epochs in epochs_options:
                            # Create model
                            model = ShallowNN(input_dim, hidden_dim).to(device)
                            
                            # Skip if parameter count is too far from target (50-100)
                            param_count = model.get_param_count()
                            if param_count > 150:  # Allow some flexibility
                                continue
                            
                            # Create data loaders
                            train_dataset = TensorDataset(
                                torch.FloatTensor(X_train_split),
                                torch.LongTensor(y_train_split)
                            )
                            train_loader = DataLoader(
                                train_dataset,
                                batch_size=batch_size,
                                shuffle=True
                            )
                            
                            # Train
                            criterion = nn.CrossEntropyLoss()
                            optimizer = optim.Adam(model.parameters(), lr=lr)
                            train_model(model, train_loader, criterion, optimizer, epochs, device)
                            
                            # Validate
                            y_val_pred = evaluate_model(model, X_val, y_val, device)
                            val_acc = accuracy_score(y_val, y_val_pred)
                            
                            # Track best
                            if val_acc > best_val_acc:
                                best_val_acc = val_acc
                                best_params = {
                                    'hidden_dim': hidden_dim,
                                    'learning_rate': lr,
                                    'batch_size': batch_size,
                                    'epochs': epochs,
                                    'param_count': param_count
                                }
            
            # Train final model with best hyperparameters on full training set
            print(f"  Seed {seed:3d}: Best params - hidden={best_params['hidden_dim']}, "
                  f"lr={best_params['learning_rate']}, batch={best_params['batch_size']}, "
                  f"epochs={best_params['epochs']}, params={best_params['param_count']}")
            
            # Set seeds again for final training
            torch.manual_seed(seed)
            np.random.seed(seed)
            if torch.cuda.is_available():
                torch.cuda.manual_seed(seed)
            
            final_model = ShallowNN(input_dim, best_params['hidden_dim']).to(device)
            
            # Create data loader for full training set
            train_dataset = TensorDataset(
                torch.FloatTensor(X_train),
                torch.LongTensor(y_train)
            )
            train_loader = DataLoader(
                train_dataset,
                batch_size=best_params['batch_size'],
                shuffle=True
            )
            
            criterion = nn.CrossEntropyLoss()
            optimizer = optim.Adam(final_model.parameters(), lr=best_params['learning_rate'])
            
            # Training time
            start_time = time.time()
            train_model(final_model, train_loader, criterion, optimizer, 
                       best_params['epochs'], device)
            training_time = time.time() - start_time
            
            # Inference time
            start_time = time.time()
            y_pred = evaluate_model(final_model, X_test, y_test, device)
            inference_time = time.time() - start_time
            
            # Metrics
            accuracy = accuracy_score(y_test, y_pred)
            f1 = f1_score(y_test, y_pred, average='macro')
            
            result = {
                "dataset": dataset,
                "n_train": int(n_samples),
                "n_test": int(X_test.shape[0]),
                "n_features": int(X_train.shape[1]),
                "seed": int(seed),
                "best_hidden_dim": int(best_params['hidden_dim']),
                "best_learning_rate": float(best_params['learning_rate']),
                "best_batch_size": int(best_params['batch_size']),
                "best_epochs": int(best_params['epochs']),
                "param_count": int(best_params['param_count']),
                "accuracy": float(accuracy),
                "f1_score": float(f1),
                "training_time_seconds": float(training_time),
                "inference_time_seconds": float(inference_time),
                "timestamp": datetime.now().isoformat()
            }
            
            all_results["results"].append(result)
            
            print(f"            Test - Acc={accuracy:.4f}, F1={f1:.4f}, "
                  f"Train={training_time:.2f}s")

In [None]:
# Save results
os.makedirs("../../results", exist_ok=True)
with open(results_path, 'w') as f:
    json.dump(all_results, indent=2, fp=f)

print(f"\n{'='*70}")
print(f"Results saved to {results_path}")
print(f"Total experiments: {len(all_results['results'])}")
print(f"{'='*70}")