In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

In [None]:
# Set random seeds for reproducibility
torch.manual_seed(42)

In [None]:
# Check for GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
df = pd.read_csv('dataset/fashion-mnist_train.csv')
df.head()

In [None]:
df.shape

In [None]:
# Create a 4x4 grid of images
fig, axes = plt.subplots(4, 4, figsize=(10, 10))
fig.suptitle("First 16 Images", fontsize=16)

# Plot the first 16 images from the dataset
for i, ax in enumerate(axes.flat):
    img = df.iloc[i, 1:].values.reshape(28, 28)  # Reshape to 28x28
    ax.imshow(img)  # Display in grayscale
    ax.axis('off')  # Remove axis for a cleaner look
    ax.set_title(f"Label: {df.iloc[i, 0]}")  # Show the label

plt.tight_layout(rect=[0, 0, 1, 0.96])  # Adjust layout to fit the title
plt.show()


In [None]:
# train test split

X = df.iloc[:, 1:].values
y = df.iloc[:, 0].values

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
X_train = X_train/255.0
X_test = X_test/255.0

In [None]:
class CustomDataset(Dataset):

    def __init__(self, features, labels):

        # Convert to PyTorch tensors
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, index):
        return self.features[index], self.labels[index]

In [None]:
train_dataset = CustomDataset(X_train, y_train)

In [None]:
test_dataset = CustomDataset(X_test, y_test)

In [None]:
len(train_dataset)

In [None]:
class MyNN(nn.Module):
    # Dynamic / Configurable model -  trying multiple model depths/sizes OR scripting experiments or tuning
    def __init__(self, input_dim, output_dim, num_hidden_layers, neurons_per_layer, dropout_rate): # hidden_layer_sizes_list <- combines num_hidden_layers & neurons_per_layer

        super().__init__()

        layers = []

        for i in range(num_hidden_layers): # making hidden layers - dynamic

            layers.append(nn.Linear(input_dim, neurons_per_layer))
            layers.append(nn.BatchNorm1d(neurons_per_layer))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            input_dim = neurons_per_layer # update input_dim for next layer

        layers.append(nn.Linear(neurons_per_layer, output_dim))

        self.model = nn.Sequential(*layers)

    def forward(self, x):

        return self.model(x)

In [None]:
# !pip install optuna mlflow

In [None]:
import optuna

study = optuna.create_study(direction='maximize')

In [None]:
# objective function
def objective(trial):

    # next hyperparameter values from the search space
    num_hidden_layers = trial.suggest_int("num_hidden_layers", 1, 5)
    neurons_per_layer = trial.suggest_int("neurons_per_layer", 8, 128, step=8)
    epochs = trial.suggest_int("epochs", 10, 50, step=10)
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
    dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5, step=0.1)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
    optimizer_name = trial.suggest_categorical("optimizer", ['Adam', 'SGD', 'RMSprop'])
    weight_decay = trial.suggest_float("weight_decay", 1e-5, 1e-3, log=True)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

    # model init
    input_dim = 784
    output_dim = 10

    model = MyNN(input_dim, output_dim, num_hidden_layers, neurons_per_layer, dropout_rate)
    model.to(device)

    # optimizer selection
    criterion = nn.CrossEntropyLoss()
    # optimizer = optim.SGD(model.parameters(), lr=0.1, weight_decay=1e-4) # ??
    # optimizer = optim.SGD(model.parameters(), lr=learning_rate, weight_decay=weight_decay) # why not here?

    if optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer_name == 'SGD':
        optimizer = optim.SGD(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    else:
        optimizer = optim.RMSprop(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    # training loop

    for epoch in range(epochs):

        for batch_features, batch_labels in train_loader:

            # move data to gpu
            batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)

            # forward pass
            outputs = model(batch_features)

            # calculate loss
            loss = criterion(outputs, batch_labels)

            # back pass
            optimizer.zero_grad()
            loss.backward()

            # update grads
            optimizer.step()


    # evaluation
    model.eval()
    # evaluation on test data
    total = 0
    correct = 0

    with torch.no_grad():

        for batch_features, batch_labels in test_loader:

            # move data to gpu
            batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)

            outputs = model(batch_features)

            _, predicted = torch.max(outputs, 1)

            total = total + batch_labels.shape[0]

            correct = correct + (predicted == batch_labels).sum().item()

        accuracy = correct/total

    return accuracy

In [None]:
study.optimize(objective, n_trials=10)

In [None]:
study.best_value

In [None]:
study.best_params

## Better implementation - with optuna+mlflow

In [None]:
# supress few warnings
import warnings
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", module="mlflow")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import optuna
import mlflow
import mlflow.pytorch
from optuna.integration.mlflow import MLflowCallback
import numpy as np
import random

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Seed setup for reproducibility
def set_seed(seed=42):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed()

# Flexible Neural Network
class MyNN(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_layer_sizes, dropout_rate):
        super().__init__()
        layers = []
        for hidden_dim in hidden_layer_sizes:
            layers.extend([
                nn.Linear(input_dim, hidden_dim),
                nn.BatchNorm1d(hidden_dim),
                nn.ReLU(),
                nn.Dropout(dropout_rate)
            ])
            input_dim = hidden_dim
        layers.append(nn.Linear(input_dim, output_dim))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

# Objective function for Optuna
def objective(trial):
    # Suggest hyperparameters
    num_hidden_layers = trial.suggest_int("num_hidden_layers", 1, 5)
    neurons_per_layer = trial.suggest_int("neurons_per_layer", 8, 128, step=8)
    hidden_layer_sizes = [neurons_per_layer] * num_hidden_layers

    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
    dropout_rate = trial.suggest_float("dropout_rate", 0.1, 0.5, step=0.1)
    weight_decay = trial.suggest_float("weight_decay", 1e-5, 1e-3, log=True)
    optimizer_name = trial.suggest_categorical("optimizer", ['Adam', 'SGD', 'RMSprop'])
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
    epochs = trial.suggest_int("epochs", 10, 50, step=10)

    # Data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

    # Model
    input_dim = 784
    output_dim = 10
    model = MyNN(input_dim, output_dim, hidden_layer_sizes, dropout_rate).to(device)

    # Loss & Optimizer
    criterion = nn.CrossEntropyLoss()
    if optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    elif optimizer_name == 'SGD':
        optimizer = optim.SGD(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
    else:
        optimizer = optim.RMSprop(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

    # MLflow logging
    with mlflow.start_run(nested=True):
        mlflow.set_tag("trial_id", trial.number)
        mlflow.log_params({
            "num_hidden_layers": num_hidden_layers,
            "neurons_per_layer": neurons_per_layer,
            "dropout_rate": dropout_rate,
            "learning_rate": learning_rate,
            "weight_decay": weight_decay,
            "optimizer": optimizer_name,
            "batch_size": batch_size,
            "epochs": epochs
        })

        # Training loop
        for epoch in range(epochs):
            model.train()
            for batch_features, batch_labels in train_loader:
                batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
                outputs = model(batch_features)
                loss = criterion(outputs, batch_labels)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        # Evaluation
        model.eval()
        correct = total = 0
        with torch.no_grad():
            for batch_features, batch_labels in test_loader:
                batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
                outputs = model(batch_features)
                _, predicted = torch.max(outputs, 1)
                total += batch_labels.size(0)
                correct += (predicted == batch_labels).sum().item()

        accuracy = correct / total
        mlflow.log_metric("accuracy", accuracy)

        # Log model
        mlflow.pytorch.log_model(model, name="model")

        return accuracy

# Setup MLflow experiment and callback
mlflow.set_experiment("optuna_mynn_experiment2")
mlflow_callback = MLflowCallback(
    tracking_uri=mlflow.get_tracking_uri(),
    metric_name="accuracy"
)

# Run Optuna optimization
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=10, callbacks=[mlflow_callback])

# Optional: log best trial manually
best_trial = study.best_trial
with mlflow.start_run(run_name="best_trial_summary"):
    mlflow.log_params(best_trial.params)
    mlflow.log_metric("best_accuracy", best_trial.value)

In [None]:
study.best_value, study.best_params

In [None]:
!mlflow ui
