# Functions for running training loops

## K-Fold Training and Cross-Validation

In [1]:
%load_ext autoreload
#default_exp trainers
#export
import datetime
import logging
import os
import tempfile
from os.path import dirname

import torch
import pandas as pd
import pytorch_lightning as lit
import wandb
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger, CSVLogger, WandbLogger

from reappraisalmodel.lightningreapp import LightningReapp
from reappraisalmodel.utils import upload_file

In [None]:
#exporti
default_config = {
    'lr': 1e-3,
    'hidden_layer_size': 50
    }

#export
def kfold_train(k: int, ldhdata, strat, s3_bucket=None, **trainer_kwargs) -> None:
    """Fits a LightningReapp instance with k-fold cross-validation.
    Args:
        k (int):
        ldhdata : See `reappraisalmodel.ldhdata.LDHDataModule`
    """
    all_metrics = []

    max_epochs = trainer_kwargs.pop('max_epochs', 20)
    gpus = trainer_kwargs.pop('gpus', 1 if torch.cuda.is_available() else None)

    today = datetime.datetime.today().strftime('%Y%m%d_%H%M%S')

    #Create temporary data to store checkpoint files.
    with tempfile.TemporaryDirectory() as tempdir:
        print(f'Created temporary directory: {tempdir}')

        for i in range(k):
            # Select the dataloaders for the given split.
            split = i
            train_dl = ldhdata.get_train_dataloader(split)
            val_dl = ldhdata.get_val_dataloader(split)

            save_dir='reapp_logs'
            name=f"{i:02d}foldCV_{strat}_{today}"
            version="split"
            prefix=i

            # Loggers
            logger = TensorBoardLogger(
                save_dir=save_dir,
                name=name,
                version=version,
                prefix=prefix
            )

            csv_logger = CSVLogger(
                save_dir=save_dir,
                name=name,
                version=version,
                prefix=prefix
            )

            #Checkpoints
            early_stop_checkpoint = EarlyStopping(
                monitor='val_loss',
                mode='min',
                min_delta=0.001,
                patience=3,
                verbose=False
            )

            callback_checkpoint = ModelCheckpoint(
                monitor='val_loss',
                mode='min',
                dirpath=os.path.join(tempdir, name),
                filename= f'{split}_'+'{epoch:02d}-{val_loss:.02f}',
                verbose=False,
                save_last=False,
                save_top_k=1,
                save_weights_only=False,
            )

            model = LightningReapp(default_config)
            trainer = lit.Trainer(
                benchmark=True,
                logger = [logger, csv_logger],
                gpus = gpus,
                gradient_clip_val=1.0,
                max_epochs=max_epochs,
                terminate_on_nan=True,
                weights_summary=None,
                callbacks=[callback_checkpoint, early_stop_checkpoint],
                **trainer_kwargs)
            print(f"Training on split {i}")
            trainer.fit(model, train_dl, val_dl)
            all_metrics.append({
                'metrics': trainer.logged_metrics,
                'checkpoint': callback_checkpoint.best_model_path,
                'num_epochs': trainer.current_epoch
            })

        outputs = []
        for split in all_metrics:
            val_loss = split['metrics']['val_loss'].item()
            train_loss = split['metrics']['train_loss'].item()
            num_epochs = split['num_epochs']
            r2score = split['metrics']['r2score']
            explained_variance = split['metrics']['explained_var']

            ckpt_path = split['checkpoint']
            filename = os.path.split(ckpt_path)[-1]

            upload_result = upload_file(ckpt_path, 'ldhdata', f'{strat}/{i}-{str(today)}-{filename}')
            print(f"Successful {filename} to s3: {upload_result}")

            row = {
                'val_loss': val_loss,
                'train_loss': train_loss,
                'num_epochs': num_epochs,
                'r2score': r2score,
                'explained_var': explained_variance
            }
            print(row)
            outputs.append(row)
        df = pd.DataFrame(outputs)
        df['r2score'] = df['r2score'].apply(lambda x: x.item())
        df['explained_var'] = df['explained_var'].apply(lambda x: x.item())
        
        report_name = f'{str(today)}-report.csv'
        report_path = os.path.join(tempdir, f"{strat}-{report_name}" )
        df.to_csv(report_path)
        if s3_bucket is not None:
            upload_report = upload_file(report_path, s3_bucket, f'{strat}/{report_name}')
            print(f"Successful Uploading Report to s3: {upload_report}")
        print(df.describe())
        return df

## Hyperparameter Tuning

Sources:
- [Scaling Up PyTorch Lightning Hyperparameter Tuning w/ Ray](https://medium.com/distributed-computing-with-ray/scaling-up-pytorch-lightning-hyperparameter-tuning-with-ray-tune-4bd9e1ff9929
)

In [2]:
from reappraisalmodel.ldhdata import LDHDataModule
from pathlib import Path


ROOT_DIR = Path().resolve().parent
print(ROOT_DIR)
STRAT='obj'
ldhdata = LDHDataModule(data_dir=ROOT_DIR, strat=STRAT)
ldhdata.load_train_data()

/home/ubuntu/reapp


Loading cached processed dataset at /home/ubuntu/reapp/output/training/obj/cache-8217e0ac99d256e6.arrow


Training data loaded from disk.
Encoding Training Data:


In [34]:
%autoreload
from reappraisalmodel.utils import download_file


download_file('ldhdata', 'Master_Final_TrainingData.csv',f"{ROOT_DIR}/data/training/Master_Final_TrainingData.csv")

Downloaded file: Master_Final_TrainingData.csv (ldhdata)


In [3]:
!ray stop

Did not find any active Ray processes.
[0m

In [10]:
%autoreload
import torch
import pytorch_lightning as lit
from ray import tune
from ray.tune import JupyterNotebookReporter, CLIReporter
from ray.tune.integration.pytorch_lightning import TuneReportCallback
from ray.tune.suggest.hyperopt import HyperOptSearch
from ray.tune.schedulers import ASHAScheduler
from ray.tune.stopper import TrialPlateauStopper
from ray.util import inspect_serializability


from reappraisalmodel.lightningreapp import LightningReapp

default_tune_config = {
    "lr": tune.loguniform(1e-4, 1e-1), # loguniform samples by magnitude
    "num_embedding_layers": tune.choice([1,2,3])
}

hp_search = HyperOptSearch(metric='loss', mode='min')

scheduler = ASHAScheduler(
    time_attr='training_iteration', 
    grace_period=2, 
    max_t=15)

reporter = JupyterNotebookReporter(
    overwrite=True,
    parameter_columns=["lr", 'num_embedding_layers'],
    metric_columns=["loss", "training_iteration", "explained_variance"],
    print_intermediate_tables=True
)


callback_tuner_val = TuneReportCallback(
    {
        "loss": "val_loss",
        "explained_var": "explained_var"
    },
    on="validation_end",
)

trial_stopper = TrialPlateauStopper("explained_var", mode='max', num_results=5)



def train_tune(config, ldhdata, num_gpus=None, num_epochs=10):
    model = LightningReapp(config)
    trainer = lit.Trainer(
        max_epochs=num_epochs,
        gpus=num_gpus,
        progress_bar_refresh_rate=0,
        weights_summary=None,
        callbacks=[callback_tuner],
    )
    trainer.fit(model, ldhdata)

analysis = tune.run(
    tune.with_parameters(train_tune,
        ldhdata=ldhdata,
        num_epochs=1,
        num_gpus=1),
    local_dir=f"{ROOT_DIR}/reapp_logs/tune",
    config=default_tune_config, 
    scheduler=scheduler,
    progress_reporter=reporter,
    resources_per_trial={
        "cpu": 1,
        "gpu": 0.25
    },
    metric="loss",
    mode='min',
    stop=trial_stopper,
    search_alg=hp_search,
    num_samples=10,
    fail_fast=True)
print("Best hyperparameters found were: ", analysis.best_config)

Trial name,status,loc,lr
_inner_27032a60,RUNNING,,0.000726001


[2m[36m(pid=4135)[0m GPU available: True, used: True
[2m[36m(pid=4135)[0m TPU available: None, using: 0 TPU cores
[2m[36m(pid=5049)[0m GPU available: True, used: True
[2m[36m(pid=5049)[0m TPU available: None, using: 0 TPU cores
[2m[36m(pid=5047)[0m GPU available: True, used: True
[2m[36m(pid=5047)[0m TPU available: None, using: 0 TPU cores
[2m[36m(pid=5051)[0m GPU available: True, used: True
[2m[36m(pid=5051)[0m TPU available: None, using: 0 TPU cores
[2m[36m(pid=4135)[0m   return torch.tensor(x, **format_kwargs)
[2m[36m(pid=5049)[0m   return torch.tensor(x, **format_kwargs)
[2m[36m(pid=5047)[0m   return torch.tensor(x, **format_kwargs)
[2m[36m(pid=5051)[0m   return torch.tensor(x, **format_kwargs)


KeyboardInterrupt: 

In [11]:
#hide
from nbdev.export import notebook2script

notebook2script("Trainers.ipynb")

Converted Trainers.ipynb.


In [14]:
!nbdev_update_lib


Converted trainers.py.
Converted ldhdata.py.
Converted utils.py.
Converted lightningreapp.py.
