In [3]:
# === Basic libraries ===
import os
import numpy as np
import pandas as pd
from sklearn import preprocessing as pp

# === Neural networks from Pytorch Lightning ===
import torch
from torch import nn as nn
from torch.utils.data import DataLoader
from torch.nn import functional as F
import lightning as L
from lightning.pytorch import seed_everything

# === Optimization ===
#import optuna

# === Check for GPU ===
torch.cuda.is_available()
torch.cuda.get_device_name(id)

'NVIDIA GeForce GTX 980 Ti'

In [2]:
# === Set project ===
proj = 'LSI'
proj = 'RS'

# === Read Xy ===
X_pd = pd.read_csv(proj+'_X.csv', index_col=0)
X = X_pd.to_numpy()
y = np.genfromtxt(proj+'_y.csv', delimiter=',')

# === Filter data and scale ===
Xs_pd = X_pd.dropna(axis=1)
Xs_pd = Xs_pd.loc[:, Xs_pd.var()!=0]
Xs_pd.to_csv('temp_out/'+proj+'_Xs.csv', header=True)
X = X[:,~np.any(np.isnan(X), axis=0)]
X = X[:, np.var(X, axis=0) != 0]
Xs = pp.MinMaxScaler().fit_transform(X)
sh1 = np.shape(Xs)
print(f'Shape | filtered/scaled: {sh1}')

Shape | filtered/scaled: (158, 656)


In [None]:
class Net(nn.Module):
    def __init__(self, dropout: float, output_dims: List[int]) -> None:
        super().__init__()
        layers: List[nn.Module] = []

        input_dim: int = 656
        for output_dim in output_dims:
            layers.append(nn.Linear(input_dim, output_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            input_dim = output_dim

        layers.append(nn.Linear(input_dim, CLASSES))

        self.layers = nn.Sequential(*layers)

    def forward(self, data: torch.Tensor) -> torch.Tensor:
        logits = self.layers(data)
        return logits
        #return F.log_softmax(logits, dim=1)

class LightningNet(pl.LightningModule):
    def __init__(self, dropout: float, output_dims: List[int]) -> None:
        super().__init__()
        self.model = Net(dropout, output_dims)

    def forward(self, data: torch.Tensor) -> torch.Tensor:
        return self.model(data.view(-1, 656))

    def training_step(self, batch: List[torch.Tensor], batch_idx: int) -> torch.Tensor:
        data, target = batch
        output = self(data)
        return F.binary_cross_entropy_with_logits(output, target)

    def validation_step(self, batch: List[torch.Tensor], batch_idx: int) -> None:
        data, target = batch
        output = self(data)
        pred = output.argmax(dim=1, keepdim=True)
        accuracy = pred.eq(target.view_as(pred)).float().mean()
        self.log("val_acc", accuracy)
        self.log("hp_metric", accuracy, on_step=False, on_epoch=True)

    def configure_optimizers(self) -> optim.Optimizer:
        return optim.Adam(self.model.parameters())


class qsardm(pl.LightningDataModule):
    def __init__(self, data_dir: str, batch_size: int):
        super().__init__()
        self.data_dir = data_dir
        self.batch_size = batch_size

    def setup(self, stage: Optional[str] = None) -> None:
        self.X_trn, self.X_val = random_split(X, [120, 30])
        self.X_tst = self.X_val

    def train_dataloader(self) -> DataLoader:
        return DataLoader(
            self.X_trn, batch_size=self.batch_size, shuffle=True, pin_memory=True
        )

    def val_dataloader(self) -> DataLoader:
        return DataLoader(
            self.X_val, batch_size=self.batch_size, shuffle=False, pin_memory=True
        )

    def test_dataloader(self) -> DataLoader:
        return DataLoader(
            self.X_tst, batch_size=self.batch_size, shuffle=False, pin_memory=True
        )


def objective(trial: optuna.trial.Trial) -> float:
    # We optimize the number of layers, hidden units in each layer and dropouts.
    n_layers = trial.suggest_int("n_layers", 1, 3)
    dropout = trial.suggest_float("dropout", 0.2, 0.5)
    output_dims = [
        trial.suggest_int("n_units_l{}".format(i), 4, 128, log=True) for i in range(n_layers)
    ]

    model = LightningNet(dropout, output_dims)
    datamodule = qsardm(data_dir=DIR, batch_size=BATCHSIZE)

    trainer = pl.Trainer(
        logger=True,
        limit_val_batches=PERCENT_VALID_EXAMPLES,
        enable_checkpointing=False,
        max_epochs=EPOCHS,
        accelerator="auto",
        devices=1,
        callbacks=[PyTorchLightningPruningCallback(trial, monitor="val_acc")],
    )
    hyperparameters = dict(n_layers=n_layers, dropout=dropout, output_dims=output_dims)
    trainer.logger.log_hyperparams(hyperparameters)
    trainer.fit(model, datamodule=datamodule)

    return trainer.callback_metrics["val_acc"].item()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="PyTorch Lightning example.")
    parser.add_argument(
        "--pruning",
        "-p",
        action="store_true",
        help="Activate the pruning feature. `MedianPruner` stops unpromising "
        "trials at the early stages of training.",
    )
    args = parser.parse_args()

    pruner = optuna.pruners.MedianPruner() if args.pruning else optuna.pruners.NopPruner()

    study = optuna.create_study(direction="maximize", pruner=pruner)
    study.optimize(objective, n_trials=100, timeout=600)

    print("Number of finished trials: {}".format(len(study.trials)))

    print("Best trial:")
    trial = study.best_trial

    print("  Value: {}".format(trial.value))

    print("  Params: ")
    for key, value in trial.params.items():
        print("    {}: {}".format(key, value))

In [None]:
# === Optuna ===
def func(trial):
    layers = []
    n_layers = trial.suggest_int('n_layers', l_min, l_max)
    for i in range(n_layers):
        layers.append(trail.suggest_int(str(i), n_min, n_max))
    clf = Model(hidden_layer_sizes=tuple(layers))
    clf.fit(X_trn, y_trn)
    return clf.score(X_tst, y_tst)

study = optuna.create_study()
study.optimize(func, n_trials=100)