In [2]:
# PSO FROM SCRATCH — Hyperparameter tuning for a Keras model (minimize validation loss)
# Paste this entire block into a Colab cell and run.
!pip install tensorflow numpy matplotlib pandas scikit-learn -q
import numpy as np
import tensorflow as tf
from tensorflow import keras
from sklearn.model_selection import train_test_split
import time
import os
import random

# -------------------------
# Configuration / Settings
# -------------------------
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)
random.seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)

# PSO hyperparameters
NUM_PARTICLES = 4        # reduce for faster run during tests
MAX_ITERS = 5            # number of PSO iterations (reduce for quick tests)
INERTIA = 0.7            # w
C1 = 1.5                 # cognitive coeff
C2 = 1.5                 # social coeff
VELOCITY_CLAMP = None    # set to a tuple (min,max) per-dimension if desired

# Objective (model training) hyperparameters
EPOCHS_PER_EVAL = 3      # epochs used during each particle evaluation (keep small for speed)
VERBOSE = True           # whether to print detailed per iteration info

# Search bounds for hyperparameters: [(lr_min, lr_max), (drop_min, drop_max), (batch_min, batch_max)]
BOUNDS = [(1e-4, 1e-2),   # learning rate
          (0.05, 0.5),    # dropout rate
          (16, 128)]      # batch size (treated as integer)

# -------------------------
# Load dataset & split
# -------------------------
# We'll use Fashion MNIST for demonstration
(x_train_full, y_train_full), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train_full = x_train_full.astype("float32") / 255.0
x_test = x_test.astype("float32") / 255.0
# Flatten for simple dense network
x_train_full = x_train_full.reshape((-1, 28*28))
x_test = x_test.reshape((-1, 28*28))

# Create a fixed validation split for consistent val_loss reporting across particles
x_train, x_val, y_train, y_val = train_test_split(
    x_train_full, y_train_full, test_size=0.1, random_state=SEED, shuffle=True
)

print(f"Train shape: {x_train.shape}, Val shape: {x_val.shape}, Test shape: {x_test.shape}")

# -------------------------
# Model factory
# -------------------------
def create_model(learning_rate, dropout_rate):
    """
    Create a simple feed-forward model. We clear session before creating to avoid TF memory accumulation.
    """
    tf.keras.backend.clear_session()
    model = keras.Sequential([
        keras.layers.Input(shape=(784,)),
        keras.layers.Dense(256, activation='relu'),
        keras.layers.Dropout(dropout_rate),
        keras.layers.Dense(128, activation='relu'),
        keras.layers.Dense(10, activation='softmax')
    ])
    opt = keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(optimizer=opt, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# -------------------------
# Objective function
# -------------------------
def evaluate_hyperparams(params, epochs=EPOCHS_PER_EVAL, verbose_fit=0):
    """
    params: array-like [lr, dropout, batch]
    Returns: validation loss (float)
    """
    lr = float(params[0])
    dropout = float(params[1])
    batch = int(np.round(params[2]))
    batch = max(1, batch)  # ensure positive

    # Build and train the model (small epochs to keep runtime reasonable)
    model = create_model(lr, dropout)
    history = model.fit(
        x_train, y_train,
        validation_data=(x_val, y_val),
        epochs=epochs,
        batch_size=batch,
        verbose=verbose_fit
    )
    # We use the last epoch validation loss as the fitness
    val_loss = float(history.history['val_loss'][-1])
    # To help free memory, delete model (session clearing done at next model creation)
    del model
    tf.keras.backend.clear_session()
    return val_loss

# -------------------------
# PSO Implementation
# -------------------------
def pso_optimize(func, bounds, num_particles=NUM_PARTICLES, max_iter=MAX_ITERS,
                 w=INERTIA, c1=C1, c2=C2, velocity_clamp=VELOCITY_CLAMP, verbose=VERBOSE):
    """
    func: function that takes a vector x and returns scalar fitness (lower is better)
    bounds: list of (low, high) per dimension
    """
    dim = len(bounds)
    lb = np.array([b[0] for b in bounds], dtype=float)
    ub = np.array([b[1] for b in bounds], dtype=float)

    # Initialize particle positions and velocities
    X = np.random.uniform(lb, ub, (num_particles, dim))
    # Initialize small random velocities
    V = np.random.uniform(-0.1*(ub-lb), 0.1*(ub-lb), (num_particles, dim))

    # Personal bests
    pbest = X.copy()
    pbest_val = np.array([np.inf]*num_particles, dtype=float)

    # Evaluate initial particles
    for i in range(num_particles):
        try:
            fitness = func(X[i])
        except Exception as e:
            fitness = np.inf
            print("Error evaluating particle", i, e)
        pbest_val[i] = fitness
        if verbose:
            print(f"[Init] Particle {i:02d} pos={X[i]} val_loss={fitness:.6f}")

    # Global best
    gbest_idx = int(np.argmin(pbest_val))
    gbest = pbest[gbest_idx].copy()
    gbest_val = float(pbest_val[gbest_idx])

    if verbose:
        print(f"\nInitial global best idx={gbest_idx}, val_loss={gbest_val:.6f}, params={gbest}\n")

    # PSO main loop
    history_gbest = [gbest_val]
    start_time = time.time()
    for t in range(max_iter):
        iter_start = time.time()
        if verbose:
            print(f"=== Iteration {t+1}/{max_iter} ===")

        # Update velocity and position for all particles
        r1 = np.random.rand(num_particles, dim)
        r2 = np.random.rand(num_particles, dim)

        V = (w * V
             + c1 * r1 * (pbest - X)
             + c2 * r2 * (gbest - X))

        # Optionally clamp velocities
        if velocity_clamp is not None:
            V = np.clip(V, velocity_clamp[0], velocity_clamp[1])

        # Update positions and clip to bounds
        X = X + V
        X = np.clip(X, lb, ub)

        # Evaluate each particle and update pbest
        for i in range(num_particles):
            val = func(X[i])
            if val < pbest_val[i]:
                pbest_val[i] = val
                pbest[i] = X[i].copy()
                improved = True
            else:
                improved = False

            if verbose:
                pos_pretty = ", ".join([f"{v:.6g}" for v in X[i]])
                pbest_pretty = ", ".join([f"{v:.6g}" for v in pbest[i]])
                print(f"Particle {i:02d} | pos=[{pos_pretty}] | val_loss={val:.6f} | pbest_val={pbest_val[i]:.6f} | improved={improved}")

        # Update global best
        min_idx = int(np.argmin(pbest_val))
        if pbest_val[min_idx] < gbest_val:
            gbest_val = float(pbest_val[min_idx])
            gbest = pbest[min_idx].copy()
            g_improved = True
        else:
            g_improved = False

        history_gbest.append(gbest_val)

        if verbose:
            print(f"Iteration {t+1} summary: gbest_val={gbest_val:.6f} params=[{', '.join(f'{v:.6g}' for v in gbest)}] improved={g_improved}")
            print(f"Iteration time: {time.time()-iter_start:.1f}s\n")

    total_time = time.time() - start_time
    if verbose:
        print(f"PSO finished in {total_time:.1f}s. Best val_loss={gbest_val:.6f}, best_params={gbest}")

    return gbest, gbest_val, history_gbest

# -------------------------
# Run PSO
# -------------------------
print("Running PSO optimization (this will train multiple small models)...")
best_params, best_val, g_history = pso_optimize(
    evaluate_hyperparams,
    bounds=BOUNDS,
    num_particles=NUM_PARTICLES,
    max_iter=MAX_ITERS,
    w=INERTIA, c1=C1, c2=C2,
    velocity_clamp=VELOCITY_CLAMP,
    verbose=VERBOSE
)

# Post-process best params (batch -> int)
best_lr = float(best_params[0])
best_dropout = float(best_params[1])
best_batch = int(np.round(best_params[2]))
best_batch = max(1, best_batch)

print("\n=== PSO Result ===")
print(f"Best learning rate : {best_lr:.8f}")
print(f"Best dropout       : {best_dropout:.6f}")
print(f"Best batch size    : {best_batch}")
print(f"Best validation loss: {best_val:.6f}")



[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m620.7/620.7 MB[0m [31m813.3 kB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.5/57.5 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.5/24.5 MB[0m [31m102.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.5/5.5 MB[0m [31m144.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.6/6.6 MB[0m [31m141.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m224.5/224.5 kB[0m [31m23.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.5/72.5 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25h



Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
[1m29515/29515[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
[1m26421880/26421880[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
[1m5148/5148[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
[1m4422102/4422102[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
Train shape: (54000, 784), Val shape: (6000, 784), Test shape: (10000, 784)
Running PSO optimization (this will train multiple small models)...
[Init] Particle 00 pos=[3.80794718e-03 4.77821438e-01 9.79833215e+01] val_loss=0.392332
[Init] Par

In [3]:
# -------------------------
# Train final model with best params (longer training if desired)
# -------------------------
final_epochs = 8
print(f"\nTraining final model with PSO-tuned hyperparameters for {final_epochs} epochs...")
final_model = create_model(best_lr, best_dropout)
final_model.fit(x_train, y_train, validation_data=(x_val, y_val), epochs=final_epochs,
                batch_size=best_batch, verbose=1)
final_test_loss, final_test_acc = final_model.evaluate(x_test, y_test, verbose=0)
print(f"Final test accuracy: {final_test_acc:.4f}, test loss: {final_test_loss:.6f}")


Training final model with PSO-tuned hyperparameters for 8 epochs...
Epoch 1/8
[1m520/520[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 3ms/step - accuracy: 0.7608 - loss: 0.6917 - val_accuracy: 0.8548 - val_loss: 0.4029
Epoch 2/8
[1m520/520[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.8600 - loss: 0.3903 - val_accuracy: 0.8700 - val_loss: 0.3717
Epoch 3/8
[1m520/520[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.8736 - loss: 0.3456 - val_accuracy: 0.8750 - val_loss: 0.3461
Epoch 4/8
[1m520/520[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.8814 - loss: 0.3215 - val_accuracy: 0.8787 - val_loss: 0.3399
Epoch 5/8
[1m520/520[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.8893 - loss: 0.3000 - val_accuracy: 0.8735 - val_loss: 0.3463
Epoch 6/8
[1m520/520[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.8942 - loss: 0.2827 - val_a