HP tuning
=========



In [None]:
!pip install glasspy
!pip install optuna

## Introduction



Finding the best set of hyperparameters for a given problem is NP-hard. There are many strategies to find a reasonable set of hyperparameters. Here we will use a Bayesian strategy with the `optuna` module.



## Imports



In [None]:
import torch
import lightning as L
import optuna
from sklearn.model_selection import train_test_split
from glasspy.data import SciGlass
from sklearn.preprocessing import MaxAbsScaler
from torch.utils.data import DataLoader, TensorDataset
from optuna import create_study, Trial
from glasspy.predict.base import MLP

## Data pipeline



Collect, process, and split the data. You know the drill by now.



### Collecting the data



In [None]:
config_prop = {
    "must_have_and": ["Tg"],
}

config_comp = {
    "must_have_only": [
        "SiO2",
        "Li2O",
        "Na2O",
        "K2O",
        "CaO",
        "MgO",
        "BaO",
        "Al2O3",
        "TiO2",
    ],
}

source = SciGlass(
    elements_cfg={},
    properties_cfg=config_prop,
    compounds_cfg=config_comp,
)

source.remove_duplicate_composition(
    scope="compounds",
    decimals=3,
    aggregator="median",
)

df = source.data

df["property"].info()

In [None]:
idx = df.index

X = df["compounds"]
y = df["property"]["Tg"]

### Splitting the data



In [None]:
TEST_SIZE = 0.1
RANDOM_SEED = 61455

In [None]:
indices = df.index
train_val_idx, test_idx = train_test_split(
    indices, test_size=TEST_SIZE, random_state=RANDOM_SEED
)

X_train_val = X.loc[train_val_idx]
y_train_val = y.loc[train_val_idx]

X_test = X.loc[test_idx].values
y_test = y.loc[test_idx].values.reshape(-1,1)

In [None]:
train_idx, val_idx = train_test_split(
    train_val_idx, test_size=TEST_SIZE, random_state=RANDOM_SEED
)

X_train = X.loc[train_idx].values
y_train = y.loc[train_idx].values.reshape(-1,1)

X_val = X.loc[val_idx].values
y_val = y.loc[val_idx].values.reshape(-1,1)

### Normalization



In [None]:
x_scaler = MaxAbsScaler()
x_scaler.fit(X_train)

y_scaler = MaxAbsScaler()
y_scaler.fit(y_train)

X_train = x_scaler.transform(X_train)
y_train = y_scaler.transform(y_train)

X_val = x_scaler.transform(X_val)
y_val = y_scaler.transform(y_val)

X_test = x_scaler.transform(X_test)
y_test = y_scaler.transform(y_test)

In [None]:
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)

X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)

X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

### The DataModule class



In [None]:
class DataModule(L.LightningDataModule):
    def __init__(
        self,
        X_train,
        y_train,
        X_val,
        y_val,
        X_test,
        y_test,
        x_scaler=None,
        y_scaler=None,
        batch_size = 256,
        num_workers = 2,
    ):
        super().__init__()

        self.batch_size = batch_size
        self.num_workers = num_workers

        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.X_test = X_test
        self.y_test = y_test
        self.x_scaler = x_scaler
        self.y_scaler = y_scaler


    def train_dataloader(self):
        return DataLoader(
            TensorDataset(self.X_train, self.y_train),
            batch_size=self.batch_size,
            num_workers=self.num_workers,
        )

    def val_dataloader(self):
        return DataLoader(
            TensorDataset(self.X_val, self.y_val),
            batch_size=self.batch_size,
            num_workers=self.num_workers,
        )

    def test_dataloader(self):
        return DataLoader(
            TensorDataset(self.X_test, self.y_test),
            batch_size=self.batch_size,
            num_workers=self.num_workers,
        )

In [None]:
dm = DataModule(
    X_train, y_train, X_val, y_val, X_test, y_test, x_scaler, y_scaler
)

## Hyperparameter tuning



### Search space



We need a function that creates an instance of the desired model. This function must also define the search space using the `suggest_int`, `suggest_float`, and `suggest_categorical`.



In [None]:
num_features = X.shape[1]
num_targets = 1


def create_model_instance(trial):
    activation = trial.suggest_categorical(
        "activation", ["Tanh", "Sigmoid", "ReLU"]
    )

    hparams = {
        "num_layers": 2,
        "n_features": num_features,
        "n_targets": num_targets,
        "layer_1_activation": activation,
        "layer_1_size": trial.suggest_int("layer_1_size", 1, 20),
        "layer_2_activation": activation,
        "layer_2_size": trial.suggest_int("layer_2_size", 1, 20),
        "loss": "mse",
        "max_epochs": trial.suggest_int("max_epochs", 5, 30),
        "optimizer": trial.suggest_categorical("optimizer", ["SGD", "Adam"]),
        "lr": trial.suggest_float("learning_rate", 1e-6, 1e-2, log=True),
        "batch_size": trial.suggest_categorical(
            "batch_size", [256, 512, 1024]
        ),
    }

    model = MLP(**hparams)

    return model

### Objective function



The objective function will create an instance of the model (selecting a set of hyperparameters from the search space), train that model, and then compute and return a performance metric.



In [None]:
def objective_function(trial, dm):
    model = create_model_instance(trial)

    dm.setup("train")
    trainer = L.Trainer(max_epochs=model.hparams["max_epochs"])
    trainer.fit(model, dm)

    X_true = dm.X_val
    y_true = dm.y_val
    y_true = dm.y_scaler.inverse_transform(y_true)

    y_pred = model.predict(X_true)
    y_pred = dm.y_scaler.inverse_transform(y_pred)

    RMSE = model.RMSE(y_true, y_pred)

    return RMSE

Due to the way `optuna` works, the objective function must have only one argument. To meet this requirement, we create a partial objective function.



In [None]:
def partial_objective_function(trial):
    return objective_function(trial, dm)

### Searching



The search starts by creating a study object&#x2026;



In [None]:
STUDY_NAME = "Tg_model"

study = create_study(
    study_name=STUDY_NAME,
    storage=f"sqlite:///{STUDY_NAME}.db",
    direction="minimize",
    load_if_exists=True,
)

&#x2026; and then running the `optimize` method. The number of trials is controlled by `n_trials`.



In [None]:
NUM_TRIALS = 10

study.optimize(partial_objective_function, n_trials=NUM_TRIALS)

### Investigating the result



Let&rsquo;s examine the results.



In [None]:
df = study.trials_dataframe()

df

Let&rsquo;s check the best set of hyperparameters we found so far.



In [None]:
best_trial = study.best_trial

print(f"Number of best trial: {best_trial.number}")
print(f"Parameters of best trial: {best_trial.params}")

If this is the end of the search, we can build a model with the winning HP set and check its performance.



In [None]:
model = create_model_instance(best_trial)
trainer = L.Trainer(max_epochs=model.hparams["max_epochs"])
trainer.fit(model, dm)

In [None]:
dm.setup("test")

X_true = dm.X_test

y_true = dm.y_test
y_true = dm.y_scaler.inverse_transform(y_true)

y_pred = model.predict(X_true)
y_pred = dm.y_scaler.inverse_transform(y_pred)

RMSE = model.RMSE(y_true, y_pred)

print(RMSE)

### Saving and loading



In [None]:
STATE_DICT = "model.pth"
LEARNING_CURVE = "learning_curve.p"
HPARAMS = "hparams.p"

model.save_training(STATE_DICT, LEARNING_CURVE, HPARAMS)

In [None]:
loaded_model = MLP.from_file(HPARAMS, STATE_DICT, LEARNING_CURVE)

In [None]:
dm.setup("test")

X_true = dm.X_test

y_true = dm.y_test
y_true = dm.y_scaler.inverse_transform(y_true)

y_pred = loaded_model.predict(X_true)
y_pred = dm.y_scaler.inverse_transform(y_pred)

RMSE = loaded_model.RMSE(y_true, y_pred)

print(RMSE)