In [1]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import torch.optim as optim


from zenml.pipelines import pipeline
from zenml.steps import step, Output

import optuna
from optuna.trial import TrialState

device = "cuda" if torch.cuda.is_available() else "cpu"

In [2]:
@pipeline
def pytorch_hpo_pipeline(
    load_data,
    run_hpo,
    train_test,
):
    """A `pipeline` to load data, load model, and train/evaluate the model."""
    train_dataloader, test_dataloader = load_data()
    best_hparams = run_hpo(train_dataloader, test_dataloader)
    train_test(best_hparams, train_dataloader, test_dataloader)


In [3]:
def get_mnist():
    
    batch_size = 64

    # Download training data from open datasets.
    training_data = datasets.FashionMNIST(
        root="data",
        train=True,
        download=True,
        transform=ToTensor(),
    )

    # Download test data from open datasets.
    test_data = datasets.FashionMNIST(
        root="data",
        train=False,
        download=True,
        transform=ToTensor(),
    )

    # Create data loaders.
    train_dataloader = DataLoader(training_data, batch_size=batch_size)
    test_dataloader = DataLoader(test_data, batch_size=batch_size)

    return train_dataloader, test_dataloader

@step
def load_data() -> Output(
    train_dataloader=DataLoader, test_dataloader=DataLoader
):
    """A `step` to load the Fashion MNIST dataset as a tuple of torch Datasets."""

    train_dataloader, test_dataloader = get_mnist()

    return train_dataloader, test_dataloader

In [4]:
def define_hpo_model(trial):
    CLASSES = 10
    n_layers = trial.suggest_int("n_layers", 1, 3)
    layers = []
    in_features = 28 * 28
    layers.append(nn.Flatten())
    
    for i in range(n_layers):
        out_features = trial.suggest_int(f"n_units_l{i}", 4, 128)
        layers.append(nn.Linear(in_features, out_features))
        layers.append(nn.ReLU())
        p = trial.suggest_float(f"dropout_l{i}", 0.2, 0.5)
        layers.append(nn.Dropout(p))
        in_features = out_features
        
    layers.append(nn.Linear(in_features, CLASSES))
    layers.append(nn.LogSoftmax(dim=1))

    return nn.Sequential(*layers)

def objective(trial, train_dataloader, test_dataloader):

    # Generate model
    model = define_hpo_model(trial)
    
    # Train model
    model = model.to(device)
    loss_fn = nn.CrossEntropyLoss()
    optimizer_name = trial.suggest_categorical("optimizer_name", ["Adam", "RMSprop", "SGD"])
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)
    
    size = len(train_dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(train_dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    
    size = len(test_dataloader.dataset)
    num_batches = len(test_dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in test_dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    test_accuracy = 100*correct
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    
    return test_accuracy


@step
def run_hpo(
    train_dataloader: DataLoader, 
    test_dataloader: DataLoader
) -> dict:
    
    study = optuna.create_study(direction="maximize")
    study.optimize(lambda trial: objective(trial, train_dataloader, test_dataloader), n_trials=30, timeout=600)
    
    pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
    complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

    print("Study statistics: ")
    print("  Number of finished trials: ", len(study.trials))
    print("  Number of pruned trials: ", len(pruned_trials))
    print("  Number of complete trials: ", len(complete_trials))

    print("Best trial:")
    trial = study.best_trial

    print("  Value: ", trial.value)

    print("  Params: ")
    for key, value in trial.params.items():
        print("    {}: {}".format(key, value))
    
    best = trial.params

    print("Found best hyperparam dict from optimization work")
    print(best)
    
    return best

In [5]:
def define_model(hparam:dict):
    CLASSES = 10
    layers = []
    in_features = 28 * 28
    layers.append(nn.Flatten())
    
    print(hparam)
    
    for i in range(hparam['n_layers']):
        out_features = hparam[f"n_units_l{i}"]
        layers.append(nn.Linear(in_features, out_features))
        layers.append(nn.ReLU())
        p = hparam[f"dropout_l{i}"]
        layers.append(nn.Dropout(p))
        in_features = out_features
        
    layers.append(nn.Linear(in_features, CLASSES))
    layers.append(nn.LogSoftmax(dim=1))

    return nn.Sequential(*layers)

In [6]:
def train(dataloader, model, loss_fn, optimizer):
    """A function to train a model for one epoch."""
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    """A function to test a model on the validation / test dataset."""
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    test_accuracy = 100*correct
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

    return test_accuracy

In [7]:
@step
def train_test(
    best_hparams: dict,
    train_dataloader: DataLoader, 
    test_dataloader: DataLoader
) -> Output(trained_model=nn.Module, test_acc=float):
    """A `step` to train and evaluate a torch model on given dataloaders."""
    
    epochs = 5
    
    model = define_model(best_hparams)
    
    print("Constructing optimized model:")
    print(model)

    print("Using optimized hyperparams")
    print(best_hparams)

    model = model.to(device)
    loss_fn = nn.CrossEntropyLoss()
    
    lr = best_hparams['lr']
    optimizer_name = best_hparams['optimizer_name']
    optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)
    
    test_acc = 0
    for t in range(epochs):
        print(f"Epoch {t+1}\n-------------------------------")
        train(train_dataloader, model, loss_fn, optimizer)
        test_acc = test(test_dataloader, model, loss_fn)
    print("Done!")

    return model, test_acc

In [8]:
pytorch_hpo_pipeline(
    load_data=load_data(),
    run_hpo=run_hpo(),
    train_test=train_test(),
).run(unlisted=True)