<a href="https://colab.research.google.com/github/grillinr/evolutionary-computing/blob/main/final/final_proj.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import libraries and seed for easier checking

In [1]:
import random
import os
import argparse
import math
from typing import List, Tuple


import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.metrics import accuracy_score, fbeta_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


SEED = 5173
device = torch.device("cpu") if not torch.cuda.is_available() else torch.device("cuda")
print(device)

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

cuda


# Define helper functions

In [2]:
def prepare_data_with_scaler(data, scaler=None, fit=False):
    data = data.dropna()
    X = data.drop(columns=["id", "record", "type"]).values.astype(np.float32)
    y = data["type"].astype("category").cat.codes.values

    if fit:
        X = scaler.fit_transform(X)
    else:
        X = scaler.transform(X)

    return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.long)


def evaluate(model, X, y, criterion):
    model.eval()
    with torch.no_grad():
        logits = model(X)
        loss = criterion(logits, y)
        y_pred = logits.argmax(dim=1).cpu().numpy()

    y_true = y.cpu().numpy()
    return {
        "loss": loss.item(),
        "accuracy": accuracy_score(y_true, y_pred),
        "f_beta_macro": fbeta_score(y_true, y_pred, average="macro", beta=2, zero_division=0)
    }


def estimate_flops(model, input_shape):
    """
    Estimate FLOPs for Linear and Conv2d layers only.
    Args:
        model (nn.Module): PyTorch model
        input_shape (tuple): shape of one input sample, e.g., (1, 3, 224, 224) or (1, input_dim)
    Returns:
        total_flops (int)
    """
    flops = 0

    def count_layer(layer, x_in, x_out):
        nonlocal flops
        # Conv2d FLOPs = Kx * Ky * Cin * Cout * Hout * Wout
        if isinstance(layer, nn.Conv2d):
            out_h, out_w = x_out.shape[2:]
            kernel_ops = layer.kernel_size[0] * layer.kernel_size[1]
            flops += kernel_ops * layer.in_channels * layer.out_channels * out_h * out_w
        # Linear FLOPs = input_features * output_features
        elif isinstance(layer, nn.Linear):
            flops += layer.in_features * layer.out_features

    hooks = []
    for layer in model.modules():
        if isinstance(layer, (nn.Conv2d, nn.Linear)):
            hooks.append(layer.register_forward_hook(count_layer))

    dummy = torch.randn(input_shape).to(next(model.parameters()).device)
    with torch.no_grad():
        model(dummy)

    for h in hooks:
        h.remove()

    return flops

# Create Model Architecture (DNN)

In [3]:
class DNN(nn.Module):
    def __init__(self, input_size=32, hidden=(32, 16, 8), num_classes=5, dropout_rate=0.5):
        super().__init__()
        layers = []
        input_dim = input_size

        for h in hidden:
            layers.append(nn.Linear(input_dim, h))
            layers.append(nn.ReLU(inplace=True))
            layers.append(nn.Dropout(dropout_rate))
            input_dim = h

        layers.append(nn.Linear(input_dim, num_classes))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

# Main Training loop

In [5]:
# Load data
dataset = pd.read_csv("/content/train.csv")
train_dataset, val_dataset = train_test_split(dataset, train_size=0.7, random_state=SEED)
scaler = StandardScaler()
X_train, y_train = prepare_data_with_scaler(train_dataset, scaler, fit=True)
X_val, y_val = prepare_data_with_scaler(val_dataset, scaler, fit=False)

X_train, y_train = X_train.to(device), y_train.to(device)
X_val, y_val = X_val.to(device), y_val.to(device)

In [16]:
# Configuration
class Hyperparameters:
    def __init__(self, lr, epochs, hidden, dropout_rate, patience):
        self.lr = lr
        self.epochs = epochs
        self.hidden = hidden
        self.dropout_rate = dropout_rate
        self.patience = patience

    def __repr__(self):
        return f"Hyperparameters(lr={self.lr}, epochs={self.epochs}, hidden={self.hidden}, dropout_rate={self.dropout_rate}, patience={self.patience})"

    def __str__(self):
        return self.__repr__()

In [7]:
def train(params: Hyperparameters):
  # Create model
  model = DNN(hidden=params.hidden, dropout_rate=params.dropout_rate).to(device)

  class_counts = train_dataset['type'].value_counts()
  weights = 1.0 / class_counts.values
  weights = torch.FloatTensor(weights).to(device)
  criterion = nn.CrossEntropyLoss(weight=weights)
  optimizer = optim.Adam(model.parameters(), lr=params.lr)

  # Training loop with early stopping
  best_val_loss = float('inf')
  patience_counter = 0
  epochs_run = params.epochs
  for epoch in range(1, params.epochs + 1):
      model.train()
      optimizer.zero_grad()
      out = model(X_train)
      loss = criterion(out, y_train)
      loss.backward()
      optimizer.step()
      train_loss = loss.item()

      train_metrics = evaluate(model, X_train, y_train, criterion)
      val_metrics = evaluate(model, X_val, y_val, criterion)

      if epoch % (params.epochs // 10) == 0:
        print(
            f"Epoch {epoch}/{params.epochs} | "
            f"train_loss={train_loss:.4f} train_acc={train_metrics['accuracy']:.4f} "
            f"train_f1={train_metrics['f_beta_macro']:.4f} "
            f"val_loss={val_metrics['loss']:.4f} val_acc={val_metrics['accuracy']:.4f} "
            f"val_f1={val_metrics['f_beta_macro']:.4f} "
        )

      # Early stopping check
      if val_metrics['loss'] < best_val_loss:
          best_val_loss = val_metrics['loss']
          patience_counter = 0
      else:
          patience_counter += 1
          if patience_counter >= params.patience:
              print(f"Early stopping at epoch {epoch}")
              epochs_run = epoch
              break

  # Return a tuple of fitness values for domination comparison
  # Maximize accuracy, f1, and minimize loss, flops.
  # For loss and flops, we take the negative value.
  return (val_metrics["accuracy"], val_metrics["f_beta_macro"], -val_metrics["loss"], -estimate_flops(model, (1, 32)))

In [17]:
# Test the function
hyperparameters = Hyperparameters(lr=1e-3, epochs=500, hidden=(64, 32, 16, 8), dropout_rate=0.5, patience=100)
# result = train(hyperparameters)

# Neuroevolution

In [8]:
def init_population(pop_size: int) -> List[Hyperparameters]:
    population = []
    for _ in range(pop_size):
        lr = random.uniform(1e-5, 1e-1)
        epochs = random.randint(10, 200)

        # Generate variable-length hidden layer tuple
        num_layers = random.randint(1, 5)
        hidden = tuple(2 ** random.randint(3, 8) for _ in range(num_layers))

        dropout_rate = random.uniform(0.0, 0.5)
        patience = random.randint(5, 30)

        population.append(Hyperparameters(lr, epochs, hidden, dropout_rate, patience))

    return population

In [9]:
def count_dominated(fitnesses: List[Tuple[float, ...]], idx: int) -> int:
    """Count how many points are dominated by fitnesses[idx]"""
    point = fitnesses[idx]
    dominated = 0
    for other in fitnesses:
        if other == point:
            continue
        # Check if point dominates other (all >= and at least one >)
        if all(p >= o for p, o in zip(point, other)) and any(p > o for p, o in zip(point, other)):
            dominated += 1
    return dominated

## Define Evolution Strategy to Optimize Hyperparameters

In [14]:
def evolution_strategy(mu: int, lambda_: int, tau: float, max_gens: int) -> List[Hyperparameters]:
    population = init_population(mu)

    for generation_number in range(1, max_gens + 1):
        print(f"Generation {generation_number} starting initial evaluation")
        fitnesses = []
        for i, member in enumerate(population):
            print(f"Evaluating member {i}: {member}")
            fitnesses.append(train(member))

        # fitnesses = [train(member) for member in population]

        # Calculate proportion of dominated points for each individual
        domination_counts = [count_dominated(fitnesses, i) for i in range(mu)]
        domination_proportions = [count / mu for count in domination_counts]

        offspring = []
        for _ in range(lambda_):
            # Select parent using tournament based on domination proportion
            candidates = random.sample(range(mu), 2)
            parent_idx = max(candidates, key=lambda i: domination_proportions[i])
            parent = population[parent_idx]

            # Mutate hyperparameters
            lr = parent.lr * math.exp(tau * random.gauss(0.0, 1.0))
            lr = max(1e-5, min(1e-1, lr))

            epochs = int(parent.epochs + random.gauss(0.0, 10))
            epochs = max(10, min(500, epochs))

            # Mutate hidden layers
            hidden = list(parent.hidden)
            if random.random() < 0.3:
                if len(hidden) > 1 and random.random() < 0.5:
                    hidden.pop(random.randrange(len(hidden)))
                elif len(hidden) < 5:
                    hidden.insert(random.randrange(len(hidden) + 1), 2 ** random.randint(3, 8))
            else:
                idx = random.randrange(len(hidden))
                hidden[idx] = max(8, min(256, int(hidden[idx] + random.gauss(0.0, 16))))

            dropout_rate = parent.dropout_rate + random.gauss(0.0, 0.05)
            dropout_rate = max(0.0, min(0.5, dropout_rate))

            patience = int(parent.patience + random.gauss(0.0, 3))
            patience = max(5, min(30, patience))

            offspring.append(Hyperparameters(lr, epochs, tuple(hidden), dropout_rate, patience))

        offspring_fitnesses = [train(member) for member in offspring]

        # Calculate domination for offspring
        offspring_domination_counts = [count_dominated(offspring_fitnesses, i) for i in range(lambda_)]
        offspring_domination_proportions = [count / lambda_ for count in offspring_domination_counts]

        # Logging
        best_idx = max(range(mu), key=lambda i: domination_proportions[i])
        print(f"Gen {generation_number} Best: {fitnesses[best_idx]}")

        # Select best member from offspring based on domination
        indexed = [(prop, i) for i, prop in enumerate(offspring_domination_proportions)]
        indexed.sort(key=lambda x: x[0], reverse=True)

        population = [offspring[i] for _, i in indexed[:mu]]

    return population

In [18]:
final_pop = evolution_strategy(mu=10, lambda_=10, tau=0.05, max_gens=10)
for member in final_pop:
    print(member)

Generation 1 starting initial evaluation
Evaluating member 0: Hyperparameters(lr=0.04779887398429091, epochs=148, hidden=(64, 32, 64, 256), dropout_rate=0.19037090294267395, patience=14)
Epoch 14/148 | train_loss=0.7306 train_acc=0.0676 train_f1=0.0532 val_loss=0.6898 val_acc=0.0676 val_f1=0.0532 
Epoch 28/148 | train_loss=0.5724 train_acc=0.0676 train_f1=0.0532 val_loss=0.4762 val_acc=0.0676 val_f1=0.0532 
Epoch 42/148 | train_loss=0.5017 train_acc=0.8950 train_f1=0.3388 val_loss=0.3891 val_acc=0.8961 val_f1=0.3393 
Epoch 56/148 | train_loss=0.4836 train_acc=0.9032 train_f1=0.3433 val_loss=0.3681 val_acc=0.9046 val_f1=0.3438 
Epoch 70/148 | train_loss=0.4701 train_acc=0.9100 train_f1=0.3474 val_loss=0.3516 val_acc=0.9112 val_f1=0.3479 
Epoch 84/148 | train_loss=0.4677 train_acc=0.9049 train_f1=0.3442 val_loss=0.3418 val_acc=0.9065 val_f1=0.3451 
Epoch 98/148 | train_loss=0.4609 train_acc=0.9121 train_f1=0.3480 val_loss=0.3311 val_acc=0.9133 val_f1=0.3486 
Epoch 112/148 | train_loss=0.

# Test output

In [None]:
X_test, y_test = prepare_data(test_dataset, device)

# Get predictions
with torch.no_grad():
    logits = model(X_test)
    probs = torch.softmax(logits, dim=1).cpu().numpy()
    predictions = probs.argmax(axis=1)

y_true = y_test.cpu().numpy()

# Calculate metrics
accuracy = accuracy_score(y_true, predictions)
f_beta = fbeta_score(y_true, predictions, average="macro", beta=2)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test F-Beta (macro): {f_beta:.4f}")