In [1]:
# import libraries
import logging
import os
import warnings
from typing import Any, Callable, Dict, Optional

warnings.filterwarnings("ignore")

import numpy as np
import ray
import torch
from ray import tune
from sklearn import metrics
from torch import nn
from torch.utils.data import DataLoader, WeightedRandomSampler, random_split

In [None]:
# define logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

In [None]:
class EarlyStopper:
    """The class used for early stopping during training when the loss doesn't
    decrease validly after some patience steps
    """

    def __init__(self, patience: int = 1, min_delta: float = 0) -> None:
        """Constructor

        Parameters
        ----------
        patience: int, default 1
            the number of steps after which the training stops if the loss
            doesn't decrease
        min_delta: float, default 0
            the minimal delta, if the current loss is more than the sum of the
            delta and the minimal loss, the counter will be added 1 as one
            non-decreasing iteration
        """
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.min_loss = np.inf

    def __call__(self, loss: float) -> bool:
        """Checks whether the non-valid non-decreasing loss is accumulated up to
        the limit patience

        Parameters
        ----------
        loss: float
            the current loss

        Returns
        -------
        bool
            the indicator if to stop the training
        """
        if loss < self.min_loss:
            # once there is a new minimal loss
            self.min_loss = loss
            self.counter = 0
        elif loss > (self.min_loss + self.min_delta):
            self.counter += 1
            if self.counter >= self.patience:
                return True
        return False

In [None]:
# linear network for regression
class DNNNetRegressor(nn.Module):
    """The class to define a 3-layer linear Pytorch regression model"""

    def __init__(
        self,
        input_size: int,
        output_size: int,
        l1: int = 512,
        l2: int = 128,
        l3: int = 64,
    ) -> None:
        """Constructor

        Parameters
        ----------
        input_size: int
            the input size which is the second dim from each batch
        output_size: int
            the final output dimension
        l1: int, default 512
            the number of output samples from the first layer
        l2: int, default 128
            the number of output samples from the second layer
        l3: int, default 64
            the number of output samples from the third layer
        """
        super(DNNNetRegressor, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_size, l1),
            nn.ReLU(),
            nn.Linear(l1, l2),
            nn.ReLU(),
            nn.Linear(l2, l3),
            nn.ReLU(),
            nn.Linear(l3, output_size),
        )

    def forward(self, x) -> torch.Tensor:
        x = self.flatten(x)
        return self.linear_relu_stack(x)

In [None]:
# CNN network for regression
class CNNNetRegressor(nn.Module):
    """The class to define a 1-layer CNN Pytorch regression model"""

    def __init__(
        self,
        input_size: int,
        output_size: int,
        out: int = 16,
        kernel_size: int = 3,
        max_pool: int = 2,
        l1: int = 32,
    ) -> None:
        """Constructor

        Parameters
        ----------
        input_size: int
            the number of features for one sample
        output_size: int
            the final output dimension
        out: int, default 16
            the number of output channel for the CNN layer
        kernel_size: int, default 3
            the number of kernel size for the CNN layer
        max_pool: int, default 2
            the number of kernel size for the maxpool layer
        l1: int, default 32
            the number of output samples for the first linear layer
        """
        super(CNNNetRegressor, self).__init__()
        self.cnn_relu_stack = nn.Sequential(
            nn.Conv1d(1, out, kernel_size),
            nn.MaxPool1d(max_pool),
            nn.Flatten(),
            nn.Linear(out * ((input_size - kernel_size + 1) // max_pool), l1),
            nn.ReLU(),
            nn.Linear(l1, output_size),
        )

    def forward(self, x) -> torch.Tensor:
        return self.cnn_relu_stack(x)

In [None]:
# linear network for classification
class DNNNetClassifier(nn.Module):
    """The class to define a 3-layer linear Pytorch classification model"""

    def __init__(
        self,
        input_size: int,
        output_size: int = 1,
        l1: int = 512,
        l2: int = 128,
        l3: int = 64,
    ) -> None:
        """Constructor

        Parameters
        ----------
        input_size: int
            the input size which is the second dim from each batch
        output_size: int, default 1
            the dimension of the output, the default 1 means univariate
            prediction
        l1: int, default 512
            the number of output samples from the first layer
        l2: int, default 128
            the number of output samples from the second layer
        l3: int, default 64
            the number of output samples from the third layer
        """
        super(DNNNetClassifier, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_size, l1),
            nn.ReLU(),
            nn.Linear(l1, l2),
            nn.ReLU(),
            nn.Linear(l2, l3),
            nn.ReLU(),
            nn.Linear(l3, output_size),
        )
        if output_size == 1:
            self.linear_relu_stack.add_module("sigmoid", nn.Sigmoid())

    def forward(self, x) -> torch.Tensor:
        x = self.flatten(x)
        return self.linear_relu_stack(x)

In [None]:
# CNN network for classification
class CNNNetClassifier(nn.Module):
    """The class to define a 1-layer CNN Pytorch classification model"""

    def __init__(
        self,
        input_size: int,
        output_size: int = 1,
        out: int = 16,
        kernel_size: int = 3,
        max_pool: int = 2,
        l1: int = 32,
    ) -> None:
        """Constructor

        Parameters
        ----------
        input_size: int
            the number of features for one sample
        output_size: int, default 1
            the dimension of the output, the default 1 means univariate
            prediction
        out: int, default 16
            the number of output channel for the CNN layer
        kernel_size: int, default 3
            the number of kernel size for the CNN layer
        max_pool: int, default 2
            the number of kernel size for the maxpool layer
        l1: int, default 32
            the number of output samples for the first linear layer
        """
        super(CNNNetClassifier, self).__init__()
        self.cnn_relu_stack = nn.Sequential(
            nn.Conv1d(1, out, kernel_size),
            nn.MaxPool1d(max_pool),
            nn.Flatten(),
            nn.Linear(out * ((input_size - kernel_size + 1) // max_pool), l1),
            nn.ReLU(),
            nn.Linear(l1, output_size),
        )
        if output_size == 1:
            self.cnn_relu_stack.add_module("sigmoid", nn.Sigmoid())

    def forward(self, x) -> torch.Tensor:
        return self.cnn_relu_stack(x)

In [2]:
def tune_classifier(
    config: Dict[str, Any],
    network_name: str,
    train_ray: ray.ObjectRef,
    loss_fn: Callable,
    val_ray: Optional[ray.ObjectRef] = None,
    val_size: Optional[float] = None,
    last_checkpoint: Optional[str] = None,
    class_weight: bool = False,
    num_workers: int = 0,
    multiclass: bool = False,
    epochs: int = 10,
    early_stopping: Optional[EarlyStopper] = None,
    verbose: int = 0,
    visual_batch: int = 2000,
    random_state: int = 0,
) -> None:
    """Hyperparameter tuning for a classification PyTorch model

    Parameters
    ----------
    config: dict
        the dictionary containing the hyperparameter grid
    network_name: str
        the name of the model, DNN or CNN
    train_ray: ray.ObjectRef
        the train data id represented by ray.ObjectRef
    loss_fn: Callable
        the PyTorch loss function
    val_ray: ray.ObjectRef
        the validation data id represented by ray.ObjectRef
    val_size: float, default None
        the validation data size from the train data
    last_checkpoint: str, default None
        the local checkpoint dir if want to continue from the last time
    class_weight: bool, default False
        the indicator if to use class weight when training
    num_workers: int, default 0
        the number of cpus when loading data
    multiclass: bool, default False
        the indicator if this is a multi-label classification problem
    epochs: int, default 10
        the number of epochs
    early_stopping: EarlyStopper, default None
        the instance to perform early stopping
    verbose: int, default 0
        0 means no logs, 1 means epoch logs, 2 means batch logs
    visual_batch: int, default 2000
        the number of batches when to show the on-going loss
    random_state: int, default 0
        the random state
    """
    # build model
    if network_name == "DNN":
        network = DNNNetClassifier(
            input_size=ray.get(train_ray)[0][0].shape[-1],
            output_size=ray.get(train_ray)[0][1].shape[-1],
            **config["model_params"],
        )
    elif network_name == "CNN":
        network = CNNNetClassifier(
            input_size=ray.get(train_ray)[0][0].shape[-1],
            output_size=ray.get(train_ray)[0][1].shape[-1],
            **config["model_params"],
        )
    else:
        raise NameError(f"Wrong network name selected: {network_name}")

    # define optimizer
    if config["optimizer"] == "Adam":
        optimizer = torch.optim.Adam(network.parameters(), lr=config["lr"])
    elif config["optimizer"] == "SGD":
        optimizer = torch.optim.SGD(network.parameters(), lr=config["lr"])
    else:
        raise NameError(f"Wrong optimizer name selected: {config['optimizer']}")

    # load the model and optimizer from the last time
    if last_checkpoint:
        model_state, optimizer_state = torch.load(
            os.path.join(last_checkpoint, "checkpoint")
        )
        network.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)

    # get training and val sets
    assert val_ray != None or val_size != None
    if val_ray:
        train_subset = ray.get(train_ray)
        val_subset = ray.get(val_ray)
    else:
        val_ratio = int(len(ray.get(train_ray)) * val_size)
        train_subset, val_subset = random_split(
            ray.get(train_ray),
            [len(ray.get(train_ray)) - val_ratio, val_ratio],
            generator=torch.Generator().manual_seed(random_state),
        )
    # real validation indices labels
    if multiclass:
        val_real = torch.argmax(val_subset[:][1], dim=1).numpy()
    else:
        val_real = val_subset[:][1].numpy().flatten()
    # define method for metrics
    average = "weighted" if multiclass else "binary"

    # class weights
    train_sampler = None
    if class_weight:
        _, counts = np.unique(train_subset[:][1].numpy(), return_counts=True)
        class_weights = [sum(counts) / c for c in counts]
        train_sample_weight = [
            class_weights[int(i)] for i in train_subset[:][1].numpy().flatten()
        ]
        train_sampler = WeightedRandomSampler(
            train_sample_weight,
            len(train_sample_weight),
            replacement=True,
        )

    # build dataloaders
    train_loader = DataLoader(
        train_subset,
        sampler=train_sampler,
        batch_size=int(config["batch_size"]),
        shuffle=(train_sampler == None),
        num_workers=num_workers,
    )

    # training
    size = len(train_loader)
    for epoch in range(epochs):
        running_loss = 0.0
        network.train()
        for batch, (X, y) in enumerate(train_loader):
            optimizer.zero_grad()
            pred = network(X)
            loss = loss_fn(pred, y)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            # running loss visualization
            if batch % visual_batch == (visual_batch - 1):
                if verbose > 1:
                    print(
                        (
                            f"epoch {epoch + 1}  batch [{batch+1:<4}/{size}]"
                            + f"  loss: {(running_loss / visual_batch):.6f}"
                        )
                    )
                running_loss = 0.0

        # validation
        with torch.no_grad():
            val_pred = network(val_subset[:][0])
            val_loss = loss_fn(val_pred, val_subset[:][1]).item()
        # transform for univariate or multi-class prediction
        if multiclass:
            val_pred = torch.argmax(val_pred, dim=1).numpy()
        else:
            val_pred = val_pred.detach().numpy().flatten() > 0.5
        # metrics
        accuracy = metrics.accuracy_score(val_real, val_pred)
        f1 = metrics.f1_score(
            val_real,
            val_pred,
            average=average,
        )

        with tune.checkpoint_dir(epoch) as checkpoint_dir:
            path = os.path.join(checkpoint_dir, "checkpoint")
            torch.save((network.state_dict(), optimizer.state_dict()), path)

        tune.report(loss=val_loss, accuracy=accuracy, f1=f1)
        if early_stopping and early_stopping(loss=val_loss):
            logger.info(f"Early stopping at epoch {epoch + 1}")
            break
    print("Finished Training")

In [None]:
def training_regressor(
    config: Dict[str, Any],
    network_name: str,
    train_ray: ray.ObjectRef,
    loss_fn: Callable,
    val_ray: Optional[ray.ObjectRef] = None,
    val_size: Optional[float] = None,
    num_workers: int = 0,
    last_checkpoint: Optional[str] = None,
    epochs: int = 10,
    early_stopping: Optional[EarlyStopper] = None,
    verbose: int = 0,
    visual_batch: int = 2000,
    random_state: int = 0,
) -> None:
    """Hyperparameter tuning for a regression PyTorch model

    Parameters
    ----------
    config: dict
        the dictionary containing the hyperparameter grid
    network_name: str
        the name of the model, DNN or CNN
    train_ray: ray.ObjectRef
        the train data id represented by ray.ObjectRef
    loss_fn: Callable
        the pytorch loss function
    val_ray: ray.ObjectRef
        the validation data id represented by ray.ObjectRef
    val_size: float, default None
        the partition ratio of validation set
    num_workers: int, default 0
        the number of cpus when loading data
    last_checkpoint: str, default None
        the local checkpoint dir if want to continue from the last time
    epochs: int, default 10
        the number of epochs
    early_stopping: EarlyStopper, default None
        the instance to perform early stopping
    verbose: int, default 0
        the number of verbose indictor
    visual_batch: int, default 2000
        the number of batches when to show the on-going loss
    random_state: int, default 0
        the random state
    """
    # build model
    if network_name == "DNN":
        network = DNNNetRegressor(
            input_size=ray.get(train_ray)[0][0].shape[-1],
            output_size=ray.get(train_ray)[0][1].shape[-1],
            **config["model_params"],
        )
    elif network_name == "CNN":
        network = CNNNetRegressor(
            input_size=ray.get(train_ray)[0][0].shape[-1],
            output_size=ray.get(train_ray)[0][1].shape[-1],
            **config["model_params"],
        )
    else:
        raise NameError(f"Wrong model name selected: {network_name}")

    # define optimizer
    optimizer = torch.optim.Adam(network.parameters(), lr=config["lr"])

    # load the model and optimizer from the last time
    if last_checkpoint:
        model_state, optimizer_state = torch.load(
            os.path.join(last_checkpoint, "checkpoint")
        )
        network.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)

    # split into training and val sets
    assert val_ray != None or val_size != None
    if val_ray:
        train_subset = ray.get(train_ray)
        val_subset = ray.get(val_ray)
    else:
        val_ratio = int(len(ray.get(train_ray)) * val_size)
        train_subset, val_subset = random_split(
            ray.get(train_ray),
            [len(ray.get(train_ray)) - val_ratio, val_ratio],
            generator=torch.Generator().manual_seed(random_state),
        )

    # build dataloaders
    train_loader = DataLoader(
        train_subset,
        batch_size=int(config["batch_size"]),
        shuffle=True,
        num_workers=num_workers,
    )

    # training
    size = len(train_loader)
    for epoch in range(epochs):
        running_loss = 0.0
        network.train()
        for batch, (X, y) in enumerate(train_loader):
            optimizer.zero_grad()
            pred = network(X)
            loss = loss_fn(pred, y)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            # running loss visualization
            if batch % visual_batch == (visual_batch - 1):
                if verbose > 1:
                    print(
                        (
                            f"epoch {epoch + 1}  batch [{batch+1:<4}/{size}]"
                            + f"  loss: {(running_loss / visual_batch):.6f}"
                        )
                    )
                running_loss = 0.0

        # validation
        with torch.no_grad():
            val_pred = network(val_subset[:][0])
            val_loss = loss_fn(val_pred, val_subset[:][1]).item()
        # metrics
        mae = metrics.mean_absolute_error(
            val_subset[:][1].numpy().flatten(),
            val_pred.detach().numpy().flatten(),
        )
        mse = metrics.mean_squared_error(
            val_subset[:][1].numpy().flatten(),
            val_pred.detach().numpy().flatten(),
        )
        r2 = metrics.r2_score(
            val_subset[:][1].numpy().flatten(),
            val_pred.detach().numpy().flatten(),
        )

        with tune.checkpoint_dir(epoch) as checkpoint_dir:
            path = os.path.join(checkpoint_dir, "checkpoint")
            torch.save((network.state_dict(), optimizer.state_dict()), path)

        tune.report(loss=val_loss, mae=mae, mse=mse, r2=r2)
        if early_stopping and early_stopping(loss=val_loss):
            logger.info(f"Early stopping at epoch {epoch + 1}")
            break
    print("Finished Training")