Calvin's Scalable Paraellel Downsampler (CSPD)

Written by Calvin W.Y. Chan <calvin.chan@bayer.com>, June 2021

# Initialization

In [1]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import KFold, train_test_split
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data.dataloader import default_collate
from siuba import _, select, rename, left_join
from pytorch_forecasting.metrics import MAPE
import ray
from ray import tune
from ray.tune.schedulers import HyperBandScheduler
from ray.tune.suggest.basic_variant import BasicVariantGenerator
from ray.tune.suggest.bayesopt import BayesOptSearch
from ray.tune.suggest.bohb import TuneBOHB


import os
import itertools
import warnings
import filelock

import string
import time
import random

import pdb

# Environment Setting

In [2]:
import sys
sys.version_info

sys.version_info(major=3, minor=9, micro=5, releaselevel='final', serial=0)

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


In [4]:
rm -rf /tmp/cpu.lock

# Environmental Variables

In [5]:
# Input Data Parameters
subgroup_col = "subgroup"   # Use "None" if no subgrouping is given
y_id_col = "cluster_id"

# Fake Data Creation Parameters
N_SAMPLE = 500000
N_FEATURE = 4
N_CLUSTER = 5000
N_SUBGROUP = 5

In [6]:
# Data Handling Parameters
test_split_ratio = 0.2
k = 5

In [7]:
# Training Parameters
num_epochs = 5

# Learning Algorithm Parameters
lr_min = 1e-4
lr_max = 1e-1
# batch_size = [1,16,32,64]
batch_size = [64]

# Architecture Sampling Parameters
h_total_min = 8*N_SUBGROUP
h_total_max = 11*N_SUBGROUP
h_total_step = N_SUBGROUP

h_subgroup_min_neuron_per_sgrp = 7
h_subgroup_max_neuron_per_sgrp = 12

h_branch_min_neuron_per_layer = 2
h_branch_max_neuron_per_layer = 5
h_branch_max_layer = None

dropout_p_min = 0
dropout_p_max = 0.7

# Only use for Architecture Table Search
h_subgroup_n_samples = 6
h_branch_n_samples = 2

# Ray Tune Hyperparameter Search
num_hp_search_samples = 1
chkpt_dir = "/home/calvin_chan/data/output/checkpoint/testing"

In [8]:
# Setup output directories
if not os.path.exists(chkpt_dir):
    os.makedirs(chkpt_dir)

# Data I/O

Fake Dataset

In [9]:
features_colname = [ 'feature_' + x for x in string.ascii_lowercase[:N_FEATURE] ]

clusterid_start = 1 + N_SAMPLE 
clusterid_end = N_SAMPLE + N_CLUSTER
hcp_features = pd.DataFrame(np.random.randn(N_SAMPLE,N_FEATURE),columns=features_colname)
hcp_subgroup = pd.DataFrame(np.random.randint(1,N_SUBGROUP+1,size=N_SAMPLE),columns=['subgroup'])
hcp_clusterid = pd.DataFrame(np.random.randint(clusterid_start,clusterid_end+1,size=N_SAMPLE+1),columns=['cluster_id'],dtype=int)
cluster_y_out = pd.DataFrame(np.random.randint(1000,80000,size=N_CLUSTER+1),columns=['y_out'])
cluster_clusterid = pd.DataFrame(np.linspace(clusterid_start,clusterid_end,N_CLUSTER).reshape(N_CLUSTER,1),columns=['cluster_id'],dtype=int)

In [10]:
features_colname = [ 'feature_' + x for x in string.ascii_lowercase[:N_FEATURE] ]

hcp_features = pd.DataFrame(np.random.randn(N_SAMPLE,N_FEATURE),columns=features_colname)
hcp_subgroup = pd.DataFrame(np.random.randint(1,N_SUBGROUP+1,size=N_SAMPLE),columns=['subgroup'])
hcp_clusterid = pd.DataFrame(np.random.randint(1,N_CLUSTER+1,size=N_SAMPLE),columns=['cluster_id'],dtype=int)
cluster_y_out = pd.DataFrame(np.random.randint(1000,80000,size=N_CLUSTER),columns=['y_out'])
cluster_clusterid = pd.DataFrame(np.linspace(1,N_CLUSTER,N_CLUSTER).reshape(N_CLUSTER,1),columns=['cluster_id'],dtype=int)

In [11]:
# s = pd.get_dummies(hcp_subgroup.subgroup, prefix='Subgroup')
x = pd.concat([hcp_clusterid.astype(str),
               hcp_subgroup,
               hcp_features], axis=1)
y = pd.concat([cluster_clusterid.astype(str),
               cluster_y_out], axis=1)

---

# Neural Network Module

## Calvin's Scalable Paraellel Downsampler (CSPD)

<center>
    <img src="./graphics/DataFlow.jpg" width="1340" alt="subgroup_branch"  />
</center>

---

## Scalable Layer Block

This allow different number of layer blocks to be used in the architecture.

<center>
    <img src="./graphics/ParameterSharedLayerBlock.jpg" width="947" alt="parameter_shared_layer"  />
</center>

In [12]:
class SAMPLE_Branch(nn.Module):

    # Constructor
    def __init__(self, in_feat, layers, dropout_p=None, act_fn=torch.relu):
        super(SAMPLE_Branch, self).__init__()
        layers = [in_feat] + layers   # Add input layer
        self.hidden = nn.ModuleList()
        self.out = nn.Linear(layers[-1],1).double()
        self.act_fn = act_fn
        self.dropout = nn.Dropout(p=dropout_p)
        # --- Scalable Layers ---
        for input_size, output_size in zip(layers, layers[1:]):
            self.hidden.append(nn.Linear(input_size,output_size).double())
            
    # Prediction
    def forward(self, x):

        L = len(self.hidden)
        for (l, single_layer) in zip(range(L), self.hidden):
            x = single_layer(x)
            x = torch.unbind(x,dim=1)                                        # Separate activation sum
            x = [ self.dropout(self.act_fn(x_sample)) for x_sample in x ]    # Compute sum activation for each X_hcp separately
            x = torch.stack(x, dim=1)                                        # Stack hcp_samples back together for next layer
        x = torch.unbind(x,dim=1)
        x = [ self.act_fn(x_sample.sum(dim=1)) for x_sample in x ]   # Compute sum activation for each X_hcp separately
        x = torch.stack(x,dim=1)                                     # Stack back together for output

        return x

## Subgroup Parallel Branch Block

This allow the achitecture to scale with respect to the total number of subgroups in the input data, as well, to encapsulate each subgroup into a submodel.

<center>
    <img src="./graphics/SubgroupBranch.jpg" width="1340" alt="subgroup_branch"  />
</center>

In [13]:
class cspd(nn.Module):
    
    # Constructor
    def __init__(self, in_feat, Subgroups, Architecture, dropout_p=None, act_fn=torch.relu):
        super(cspd, self).__init__()
        self.hidden = nn.ModuleList()
        # --- Scalable Subgroup Branch ---
        for s_id in range(Subgroups):
            self.hidden.append(SAMPLE_Branch(in_feat, Architecture[s_id], dropout_p, act_fn))
            
    # Prediction
    def forward(self, x, s):
        z = [ z_s(x) for z_s in self.hidden ]
        z = torch.stack(z, dim=2)
        branch_avg_factor = s.sum(dim=[2], keepdim=True)             # Cases with unsure subgrouping, normalize them across all subgroups
        branch_avg_factor = torch.max(branch_avg_factor,             # Avoid divide by zero
                                      torch.ones(branch_avg_factor.shape, 
                                                 device=branch_avg_factor.device.type))   
        z = (z * s).sum(dim=[2], keepdim=True) / branch_avg_factor   # Sum across subgroup branches
        z = z.sum(dim=[1], keepdim=True)                             # Sum across samples
        return z

<center>
    <img src="./graphics/DataArchitecture.jpg" width="1153" alt="architecture"  />
</center>

In [14]:
def initialize_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.kaiming_uniform_(m.weight.data)
        nn.init.constant_(m.bias.data, 0)

---

# Data Handling and Hyperparameter Tunning

### Overview

* Modeling = Train + Validation 80%, Test 20% Data Split
* Modeling = Train + Validation 80%, Using K-Fold for Hyperparameters Tunning

<ol>
    <li>Split the data into 80/20</li>
    <li>Use K-Fold cross-validation splitting given k (eg. k=5 would results 80%/5=16% of each fold)</li>
    <ol>
        <li>For each fold, use the K-Fold training set for building model for each hyperparameters set</li>
        <li>For each fold, evaluate all models with different hyperparameters with the K-Fold test set</li>
        <li>Summarize the results into a table of (hyperparameter index, K-Fold index)</li>
        <li>Find the best hyperparameter set
    </ol>
    <li>Use the entire "Modeling = Train + Validation 80%" dataset to train a model</li>
    <li>Evaluate the model using the 20% Test Data
</ol>

<center>
    <img src="./graphics/DataHandling.jpg" width="1074" alt="data_splitting"  />
</center>

### Implementation

Data Splitting
* Using `sklearn.model_selection.train_test_split` function to perform the 80/20 split
* Generate index for K-fold of the 80% train+validation dataset using `sklearn.model_selection.KFold`
* Create a list of pytorch dataloader for each of the K-fold for training
* Create a list of pytorch dataloader for each of the K-fold for validation

Hyperparameter Tunning
* Use K in K-Fold as grid (must run) hyperparameter
* Use __custom sampling function__ to describe the hierachical neuron distribution between:
 * total neuron: $H_{total}$
 * neuron per subgroup: $H_{subgroup}$
 * neuron per layer: $H_{branch}$

<p style="margin-left: 100px">$H_{total}=15\quad\longrightarrow\quad H_{subgroup}=\begin{bmatrix}3\\4\\5\\3\end{bmatrix}\quad \longrightarrow\quad H_{branch}=\begin{bmatrix}[2,1]\\ [2,2]\\ [2,2,1]\\ [1,2] \end{bmatrix}$ </p>
    
* __Custom Sampling Function__
 * Using the total number of neuron from the last level, create all possible combination given the number of element
 * Sample an element from the list of combination and returns it


## Data Handling

Modeling-Testing 80/20 Split

In [15]:
def convert_multidimensional_labels(df,col):
    '''
    Convert Multiple Column Label into Single Column
    
    Args:
        df: A pandas dataframe with row as samples, and column as N-dimensional subgroup to be encoded.
        col: Column name of the combined column
        
    Returns:
        df: A pandas dataframe with new label column

    Raises:
        -

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    if df.shape[1] == 1:
        df = pd.concat([df,df],axis=1)
        df.columns = [df.columns[0],col]
    else:
        df[col] = tuple(labels.values.tolist())
        df[col] = labels[col].apply(lambda x: ','.join([str(c) for c in x ]))
    return(df)

def combine_multidimensional_ohe(s):
    '''
    One-Hot-Encoding (OHE) based on joint label of multiple columns
    The default OHE feature of Pandas and sklearn takes each column as independent OHE. 
    This function uses the 2D unique label combination as a single dimension for OHE.
    
    Args:
        s: A pandas dataframe with row as samples, and column as N-dimensional subgroup to be encoded.

    Returns:
        s_ohe: A pandas dataframe with N-D OHE
        conversion_table: The conversion table for N-D OHE

    Raises:
        -

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    unique_labels = [ sorted(s[name].unique().tolist()) for name in s.columns.tolist() ]
    multidimensional_labels = [*itertools.product(*unique_labels)]
    labels = pd.DataFrame(multidimensional_labels, columns=s.columns.tolist())
    labels = convert_multidimensional_labels(labels,'sgrp')
    conversion_table = pd.get_dummies(labels, columns=['sgrp'])
    s_ohe = pd.merge(s,conversion_table,on=s.columns.tolist(),how='left').drop(s.columns.tolist(),axis=1)
    return(s_ohe,conversion_table)

In [16]:
def model_test_split(*args, id_col=None, test_ratio=0.2, random_state=25, report_id=False):
    
    '''
    Split the dataset into modeling and test set
    
    This function is to encapsulate the variying input feature size given the grouping by id_col,
    and this decompose the one-hot-encoding column into a separate feature set to be used in the
    deep learning model as separate input.
    
    Args:
        *args:
            x: A pandas dataframe with row as samples, and column as ID and feature type
            y: A pandas dataframe with row as samples, and column as output
        ohe_col: A list of column names indicating the one-hot-encoding columns in x
        id_col: Column name of the grouping column to be converted to one-hot-encoding
        test_size: The split ratio of the test set
        random_state: Random seed use by the `sklearn.model_selection.train_test_split` function

    Returns:
        x_model, x_test: List of numpy matrix as model/test data split with from commond id_col labels of x and y
        s_model, s_test: List of numpy matrix as model/test data split with from commond id_col labels of x and y
        y_model, y_test: List of numpy matrix as model/test data split with from commond id_col labels of x and y

    Raises:
        Warning when the labels in id_col of x and y do not match
        
    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    
    if id_col is not None:

        inds = []
        data = []
        for arg in args:
            (ind,dat) = zip(*list(arg.groupby(id_col)))
            inds.append(ind)
            data.append(dat)

        # Determine of ID entry is missing from any of the input dataset
        id_not_match_flag = !(len(set.intersection(*[set(ind) for ind in inds])) == len(set.union(*[set(ind) for ind in inds])))
        if not id_not_match_flag:
            warnings.warn("Unmatch ID entries in one or more data inputs (eg. x, y)!")

        # Extract Common ID from x, s, y Samples
        select_ids = set.intersection(*[set(ind) for ind in inds])

        dataset = []
        for i, dat in enumerate(data):
            dataset.append([ dat[inds[i].index(single_id)].drop(id_col,axis=1) for single_id in select_ids ])

    else:
        # Determine number of elements in each input dataset is the same
        data_length = list(set([ len(arg) for arg in args ]))
        id_not_match_flag = !(len(data_length) == 1)
        assert id_not_match_flag, "Unmatch length in one or more data inputs (eg. x, y)!"

        select_ids = [*range(data_length[0])]
        dataset = args

    out = train_test_split(*dataset, test_size=test_ratio, random_state=random_state)

    if report_id:
        return(out, select_ids)
    else:
        return(out)

In [17]:
def model_test_split(*args, id_col=None, test_ratio=0.2, random_state=25, report_id=False):
    
    '''
    Split the dataset into modeling and test set
    
    This function is to encapsulate the variying input feature size given the grouping by id_col,
    and this decompose the one-hot-encoding column into a separate feature set to be used in the
    deep learning model as separate input.
    
    Args:
        *args:
            x: A pandas dataframe with row as samples, and column as ID and feature type
            y: A pandas dataframe with row as samples, and column as output
        ohe_col: A list of column names indicating the one-hot-encoding columns in x
        id_col: Column name of the grouping column to be converted to one-hot-encoding
        test_size: The split ratio of the test set
        random_state: Random seed use by the `sklearn.model_selection.train_test_split` function
        retain_df: If this is 'True' and the input 'args' are dataframes, do not convert them to list of single row dataframe

    Returns:
        x_model, x_test: List of numpy matrix as model/test data split with from commond id_col labels of x and y
        s_model, s_test: List of numpy matrix as model/test data split with from commond id_col labels of x and y
        y_model, y_test: List of numpy matrix as model/test data split with from commond id_col labels of x and y

    Raises:
        Warning when the labels in id_col of x and y do not match
        
    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''

    if id_col is not None:

        inds = []
        data = []
        for arg in args:
            (ind,dat) = zip(*list(arg.groupby(id_col)))
            inds.append(ind)
            data.append(dat)

        # Determine of ID entry is missing from any of the input dataset
        id_not_match_flag = !(len(set.intersection(*[set(ind) for ind in inds])) == len(set.union(*[set(ind) for ind in inds])))
        if not id_not_match_flag:
            warnings.warn("Unmatch ID entries in one or more data inputs (eg. x, y)!")

        # Extract Common ID from x, s, y Samples
        select_ids = list(set.intersection(*[set(ind) for ind in inds]))

        # Split dataframes into sample list
        # (multi-resolution: each list element contains multiple x and single y based on id_col)
        dataset = []
        for i, dat in enumerate(data):
            dataset.append([ dat[inds[i].index(single_id)].drop(id_col,axis=1) for single_id in select_ids ])

    else:
        # Determine index labels in each input dataset is the same
        dataset_indices = [ list(dataset.index) for dataset in args ]
        select_ids = unique_list(dataset_indices)
        
        id_not_match_flag = !(len(data_length) == 1)
        assert id_not_match_flag, "Unmatch length in one or more data inputs (eg. x, y)!"
        select_ids = select_ids[0]
        
        # Split dataframes into sample list
        # (equal resolution: each list element contains one row in both x and y)
        dataset = []
        for i, dat in enumerate(args):
            dataset.append([ dat.loc[[single_id]] for single_id in select_ids ])

    # Including index as one of the splitting dataset
    dataset = dataset + [select_ids]
    out = train_test_split(*dataset, test_size=test_ratio, random_state=random_state)
    split_ids = out[-2:]
    out = out[0:-2]

    if report_id:
        return(out, split_ids)
    else:
        return(out)

Data Loader

In [18]:
# class SgrpData(Dataset):
#     def __init__(self, x, s, y, transform=None, id_col="cluster_id"):
        
#         self.x_col = x[0].columns
#         self.s_col = s[0].columns
#         self.len = len(y)
#         self.transform = transform
        
#         # === Preconvert Pandas Dataframe to Torch Tensor and move to GPU for Performance ===
#         # Remove "id_col" and convert to matrix
#         # (Common samples are extracted, therefore row ID of x,s,y must be aligned)
#         self.x = [ torch.tensor(x_ele.values).type(torch.double) for x_ele in x ]
#         self.s = [ torch.tensor(s_ele.values).type(torch.double) for s_ele in s ]
#         self.y = [ torch.tensor(y_ele.values).type(torch.double) for y_ele in y ]
        
#     def __getitem__(self, index):
        
#         sample = [self.x[index],
#                   self.s[index],
#                   self.y[index]]
#         if self.transform:
#             sample = self.transform(sample)
#         return sample
    
#     def __len__(self):
#         return self.len

In [19]:
class SgrpDataBatch(Dataset):
    '''
    Data object for precomputing zero-patched dataset
    
    This is the data class for pytorch dataloader for input dataset with subgrouping/sgrpregation 
    input designed specifically for input x with higher resolution than output y.  Due to the
    difference between the resolution between each input sample, zero patching is required to
    perform mini-batch training. 
    
    This class precompute the zero-patched samples and provides an internal flag to decide whether
    a zero-patched sample is outputted during loading.

    The initializing input data x, s, y are all converted to list of dataframe already by the 
    data spliting function `sgrp_split_parser`.  This is neede because the x and y have different
    resolution, in order to match the resolution, splitting must be done prior creating the dataset
    object.

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''

    def __init__(self, x, s, y, transform=None, zero_patch=False, dtype=torch.double, samples_id=None):
        
        input_equal_length_flag = (len(set([len(y), len(x), len(s)])) == 1)
        assert input_equal_length_flag, "Number of x and y samples do not match!"
        
        self.x_col = x[0].columns
        self.s_col = s[0].columns
        self.len = len(y)
        self.transform = transform
        self.zero_patched = zero_patch
        self.samples_id = samples_id
        
        # === Convert sample data type ===
        # (Common samples are extracted, therefore row ID of x,s,y must be aligned)
        self.x = self._sample_type_convert(x,dtype)
        self.s = self._sample_type_convert(s,dtype)
        self.y = self._sample_type_convert(y,dtype)
        
        # === Zero Patching x and s for Batch Training ===
        max_sample = max([ x.size(0) for x in self.x ])
        self.x_zero_patched = [ torch.cat([x.to(device), torch.zeros(max_sample - x.size(0), len(self.x_col)).to(device)], dim=0) for x in self.x ]
        self.s_zero_patched = [ torch.cat([s.to(device), torch.zeros(max_sample - s.size(0), len(self.s_col)).to(device)], dim=0) for s in self.s ]
        
    def __getitem__(self, index):
        
        if self.zero_patched:
            sample = [self.x_zero_patched[index],
                      self.s_zero_patched[index],
                      self.y[index]]
            
        else:
            sample = [self.x[index],
                      self.s[index],
                      self.y[index]]

        if self.transform:
            sample = self.transform(sample)
        return sample
    
    def __len__(self):
        return self.len
    
    def _sample_type_convert(self, samples, dtype):
        samples_out = [ torch.tensor(sample_ele.values).type(dtype) for sample_ele in samples ]
        return samples_out

K-Fold Data Preparation

In [20]:
def get_k_fold_indices(n_samples, k=5, shuffle=False):
    '''
    Drawing sample indices for K-Fold
    
    Args:
        samples: Number of samples in the dataset
        shuffle: Shuffling of samples

    Returns:
        kfold_train_ind: Indices for training set
        kfold_valid_ind: Indices for validation set

    Raises:
        -

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    kfold = KFold(n_splits=k, shuffle=shuffle).split([*range(n_samples)])
    i, kfold_ind = zip(*[*enumerate(kfold)])   # Expand the index obtained by the K-Fold function
    kfold_train_ind, kfold_valid_ind = zip(*kfold_ind)
    return(kfold_train_ind, kfold_valid_ind)

Dataset Creator
* generalized for data with and without subgroup
* x and y with same or different resolution

In [21]:
def sgrp_split_parser(x, y, test_ratio=0.2, sgrp_col=None, y_id_col=None, report_id=False):
    '''
    Parsing Subgroup and Dualscaled Data
    Split the dualscaled x and y dataframe into a list of dataframe and extract the subgrouping information for Pytorch data object
    
    Args:
        x: Pandas dataframe at same or higher resoultion than y
        y: Pandas dataframe at same or lower resolution than x
        test_ratio: The split ratio of the test set
        sgrp_col: Column name of the column(s) indicating the branch/subgroup/sgrpregation in x
        y_id_col: Common column for dualscaled x and y, the column id name which joins the 2 dataframes

    Returns:
        out: Tuple of 6 variables - x_model, x_test, s_model, s_test, y_model, y_test indicates the splited dataset
        s_ohe_table: Subgroup one-hot-encoding conversion table
            
    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    if sgrp_col is None:
        s_ohe = pd.DataFrame(np.ones([x.shape[0],1]), columns=['no_subgroup'])
        s_ohe.index = x.index
        s_ohe_table = None
    else:
        (s_ohe, s_ohe_table) = combine_multidimensional_ohe(x[[sgrp_col]])
        x = x.drop(sgrp_col,axis=1)
        
    if y_id_col is not None:
        s_ohe[y_id_col] = x[y_id_col]
        
    out, sample_ids = model_test_split(x, s_ohe, y, id_col=y_id_col, test_ratio=test_ratio, random_state=25, report_id=True)
    
    if report_id:
        return (out, s_ohe_table, sample_ids)
    else:
        return (out, s_ohe_table)

In [22]:
def select_ind(ls,ind):
    return [ ls[i] for i in ind.tolist() ]

Process and Split the Data

In [23]:
def patitioned_data_object_cspd(x, y, sgrp_col, y_id_col, test_split_ratio, k):
    # Model/Test Splitting
    ((x_model, x_test, 
      s_model, s_test, 
      y_model, y_test), 
      s_ohe_table, 
     (samples_id_model, 
      samples_id_test)) = sgrp_split_parser(x, y, 
                                           test_ratio=test_split_ratio, 
                                           sgrp_col=subgroup_col, 
                                           y_id_col=y_id_col,
                                           report_id=True)    
    # K-Fold Index Sampling
    [kfold_train_ind, kfold_valid_ind] = get_k_fold_indices(n_samples=len(y_model), k=k, shuffle=False)   # Shuffle is NOT needed, since the samples were shuffled in the model/test split

    # Create K-set of datasets for Pytorch data loader
    dataset_train_kfold = [ SgrpDataBatch(select_ind(x_model,fold_ind),
                                         select_ind(s_model,fold_ind),
                                         select_ind(y_model,fold_ind),
                                         zero_patch = False,
                                         samples_id = select_ind(samples_id_model, fold_ind) )
                                               for fold_ind in kfold_train_ind ]
    dataset_valid_kfold = [ SgrpDataBatch(select_ind(x_model,fold_ind),
                                         select_ind(s_model,fold_ind),
                                         select_ind(y_model,fold_ind),
                                         zero_patch = False,
                                         samples_id = select_ind(samples_id_model, fold_ind) )
                                               for fold_ind in kfold_valid_ind ]

    dataset_model = SgrpDataBatch(x_model, s_model, y_model, zero_patch = False, samples_id = samples_id_model)
    dataset_test = SgrpDataBatch(x_test, s_test, y_test, zero_patch = False, samples_id = samples_id_test)
    
    return dataset_model, dataset_test, dataset_train_kfold, dataset_valid_kfold

## Hyperparameter Sampling

#### Neuron Custom Sampling Function

In [24]:
def integer_partitions(n_ele, n_min=1, max_dim=None, recursion_level=1):
    '''
    Fast Integer Partitioning
    Dividing a single integer into a list of integer that sums up to the given number
    
    Args:
        num_ele: Total number of elements to be distributed
        n_min: Minimum number of elements per output dimension

    Returns:
        Iterator as list of elements splitted into multiple dimensions
        
    Original Source :
    (Modification made to speed up by skpping recurrsion exceed max_dim)
        https://stackoverflow.com/questions/10035752/elegant-python-code-for-integer-partitioning
    
    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    if (max_dim is not None) and (recursion_level > max_dim):
        yield None
    else:
        yield (n_ele,)
        for i in range(n_min, n_ele//2 + 1):
            for p in integer_partitions(n_ele-i, i, max_dim, recursion_level+1):
                if p is not None:
                    yield (i,) + p
                elif recursion_level != 1:
                    yield None

In [25]:
def split_sampling(num_ele, num_layers=None, n_min=1, n_max=None, n_samples=1, prepend=[], postpend=[], single_sample=False):
    '''
    Randomly split the elements into multiple dimensions
    This is use for neuron sampling the number of elements and layer for multibranch neural network
    
    Args:
        num_ele: Total number of elements to be distributed
        n_min: Minimum number of elements per output dimension
        n_max: Maximum number of elements per output dimension
        num_layers: Number of layers to distribute the element, random dimensions will be given with None given

    Returns:
        sample: List of elements splitted into multiple dimensions
        
    Raises:
        -
        
    Example:
        >>> split_sampling(14, n_min=2, num_layers=4)
        [2, 5, 4, 3]
        
    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    # !!! DEBUG !!!
    # print(f"num_ele: {num_ele}; n_min: {n_min}; num_layers: {num_layers}")
    
    # Generate the Integer Partitions
    splits = integer_partitions(num_ele, n_min=n_min, max_dim=num_layers)
    if n_max is not None:
        splits = [ split for split in splits if max(split) <= n_max ]
    if num_layers is not None:
        splits = [ split for split in splits if len(list(split)) == num_layers ]
    else:
        splits = [ split for split in splits ]
    
    # Filter with Number of Output Dimension
    splits_perm = [list(set(itertools.permutations(split))) for split in splits ]
    unique_splits_perm = list(itertools.chain.from_iterable(splits_perm))
        
    # Randomly Sample one of the permutation
    if n_samples <= len(unique_splits_perm):
        sample = list([ prepend+list(sample)+postpend for sample in random.sample(unique_splits_perm, k=n_samples)])
    else:
        sample = list([ prepend+list(sample)+postpend for sample in random.choices(unique_splits_perm, k=n_samples)])
    if single_sample:
        sample = sample[0]
    
    return(sample)

#### Hyperparameters

 __Remark__: Random Architecture Configuration Table could be useful when training needs to be done with repeated samples at different level for comparison.  Ray Tune grid search only run each given hyperparameter for once and mix with random choices of hyperparameters, therefore, for fair K-fold comparison the Radom Architecture Configuration Table method is requred!

Feature request had been make to Ray Tune repeated sampling on grid search to fix the above problem: https://discuss.ray.io/t/tune-feature-request-using-ray-tune-for-k-fold-with-repeated-grid-sampling/2541/2

Once Ray Tune has this feature, Random Architecture Configuration Table will not be needed.

Sample Neuron/Layer Architecture as Hyperparameter

In [26]:
def compile_architecture_table(h_total_min, h_total_max, h_total_step,
                               h_subgroup_dim, h_subgroup_min_neuron_per_sgrp, h_subgroup_max_neuron_per_sgrp, h_subgroup_n_samples,
                               h_branch_min_neuron_per_layer, h_branch_max_neuron_per_layer, h_branch_n_samples):
    '''
    Generate Architecture Table for the Neural Network Architecture
    
    Args:
        h_total_min, h_total_max, h_total_step: Equally spaced ampling criteria for total number of neuron
        subgroup_dim: Subgroup output dimension (# subgroups)
        h_subgroup_min_neuron_per_sgrp: Minimum total number of neuron per subgroup branch
        h_subgroup_max_neuron_per_sgrp: Maximum total number of neuron per subgroup branch
        h_subgroup_n_samples: Total number of sample for subgroup distribution (from splitted neuron distribution)
        h_branch_min_neuron_per_layer: Minimum total number of neuron per layer
        h_branch_max_neuron_per_layer: Maximum total number of neuron per layer
        h_branch_n_samples: Total number of sample for branch distriubtion (from splitted neuron distribution)

    Returns:
        Pandas Dataframe with each row containing one random neural network architecture sample with the corresponding architecture information in each column.
            
    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    # Sampling the Architecture given the criteria
    h_total = [*range(h_total_min, h_total_max+1, h_total_step)]
    h_subgroup = [ split_sampling(num_ele = n_neuron, 
                             n_min = h_subgroup_min_neuron_per_sgrp, 
                             n_max = h_subgroup_max_neuron_per_sgrp,
                             n_samples = h_subgroup_n_samples, 
                             num_layers = h_subgroup_dim) for n_neuron in h_total ]
    h_branch = [[[ split_sampling(num_ele = n_subgroup, 
                              n_min = h_branch_min_neuron_per_layer,
                              n_max = h_branch_max_neuron_per_layer,
                              n_samples = h_branch_n_samples,
                              num_layers = None) 
                for n_subgroup in sample ] for sample in total ] for total in h_subgroup ]
    
    # Compile Random Architecture Configuration Table (Table index is used as grid search for hyperparameter tunning)
    total_table = pd.DataFrame({'total_id': [*range(len(h_total))], 'total': h_total})
    
    subgroup_table = pd.DataFrame(columns=['total_id', 'subgroup_id', 'subgroup'])
    for total_id, total in enumerate(h_subgroup):
        for subgroup_id, subgroup in enumerate(total):
                subgroup_table.loc[len(subgroup_table)] = [total_id, subgroup_id, subgroup]
                
    sample_table = pd.DataFrame(columns=['total_id', 'subgroup_id', 'branch_id', 'sample_id', 'h_branch'])
    for total_id, total in enumerate(h_branch):
        for subgroup_id, subgroup in enumerate(total):
            for branch_id, branch in enumerate(subgroup):
                for sample_id, sample in enumerate(branch):
                    sample_table.loc[len(sample_table)] = [total_id, subgroup_id, branch_id, sample_id, sample]
                    
    architecture_table = sample_table.groupby(["total_id","subgroup_id","sample_id"])["h_branch"].apply(list).reset_index()
    architecture_table = pd.merge(architecture_table, subgroup_table, how="left", on=["total_id", "subgroup_id"])
    architecture_table = pd.merge(architecture_table, total_table, how="left", on=["total_id"])
    
    return(architecture_table)

In [27]:
architecture_table = compile_architecture_table(h_total_min,
                                                h_total_max, 
                                                h_total_step,
                                                N_SUBGROUP,
                                                h_subgroup_min_neuron_per_sgrp,
                                                h_subgroup_max_neuron_per_sgrp,
                                                h_subgroup_n_samples,
                                                h_branch_min_neuron_per_layer,
                                                h_branch_max_neuron_per_layer,
                                                h_branch_n_samples,
                                                )

architecture_table[["total_id","subgroup_id","total","subgroup","h_branch"]].head()

Unnamed: 0,total_id,subgroup_id,total,subgroup,h_branch
0,0,0,40,"[7, 7, 7, 7, 12]","[[4, 3], [2, 2, 3], [2, 2, 3], [4, 3], [2, 2, ..."
1,0,0,40,"[7, 7, 7, 7, 12]","[[2, 2, 3], [4, 3], [2, 5], [2, 2, 3], [5, 2, 5]]"
2,0,1,40,"[7, 7, 8, 11, 7]","[[2, 3, 2], [4, 3], [3, 5], [5, 2, 2, 2], [2, ..."
3,0,1,40,"[7, 7, 8, 11, 7]","[[4, 3], [3, 4], [2, 4, 2], [4, 3, 4], [2, 5]]"
4,0,2,40,"[9, 7, 7, 8, 9]","[[2, 5, 2], [4, 3], [3, 2, 2], [2, 2, 4], [2, ..."


---

# Training Procedure

Reporting Functions

In [28]:
def remove_zero_padded(y_est, y, sgrp, return_length=False):
    '''
    Remove Zero Padding Data for Batch Data
    Due to the variable input resolution, zero padding is required for batch gradient decent for cspd algorithm.  Therefore, the zero padded batches could introduce a bias in the loss metric computation.  To avoid this problem, the zero padded data with all zeros for the subgroup indicator is used to remove these entries during error computation.
    
    Args:
        y_est: Model prediction output
        y: Training data output ground truth
        sgrp: Subgrouping one-hot-encoded matrix for the batch data (B x O x S matrix, where B is batch size, O is output dimensions, S is number of subgroups)

    Returns:
        out: Loss metric variable

    Raises:
        -

    Example:
        

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    # identify non-zero padded data (not all subgroup flags are zero)
    # :: first `torch.any` perform non-zero padded sample checking across subgroups
    # :: second `torch.any` perform non-zero padded sample checking across input sample space within output sample
    flag = torch.any(torch.any(sgrp,dim=2,keepdim=False),dim=1,keepdim=False)   
    # select non-zero padded entries
    y_est = y_est[flag]
    y = y[flag]
    
    if not return_length:
        return(y_est, y)
    else:
        return(y_est, y, sum(flag).item())

In [29]:
def loss_fifo(y_est, y, sgrp=None, history=None, queue_len=1000):
    '''
    Record Loss of Output Data
    Due to the variable input resolution, zero padding is required for batch gradient decent for cspd algorithm.  Therefore, the zero padded batches could introduce a bias in the loss metric computation.  To avoid this problem, the zero padded data with all zeros for the subgroup indicator is used to remove these entries during error computation.
    
    Args:
        y_est: Model prediction output
        y: Training data output ground truth
        sgrp: Subgrouping one-hot-encoded matrix for the batch data (B x O x S matrix, where B is batch size, O is output dimensions, S is number of subgroups)
        queue_len: Maximum records to be stored in the history queue

    Returns:
        history: Dictionary of output to be reported, each dictionary element is a numpy array as a queue containing the history of past results.

    Raises:
        -

    Example:
        

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
       
    # remove zero-padded cases
    if sgrp is not None:
        y_est, y, num_pts = remove_zero_padded(y_est, y, sgrp, return_length=True)
    else:
        assert y_est.shape[0] == y.shape[0], "y and y_est has different shape"
        num_pts = y_est.shape[0]
    
    num_pts = min(num_pts, queue_len)   # if queue is smaller than the number of results, truncate the front
    y_est = y_est[-num_pts:]
    y = y[-num_pts:]
        
    # managing the results FIFO queue
    # :: push new sample and remove older samples
    # :: keep the y, y_est in a FIFO for computing statistics
    if history is None or len(history) == 0:
        # initialize for the queue
        history = {'y': y, 'y_est': y_est}
    elif len(history['y']) < queue_len:
        # insert y into non-empty queue and trim data extended beyond queue size
        history['y'] = torch.cat( (history['y'], y), dim=0)[-queue_len:]
        history['y_est'] = torch.cat( (history['y_est'], y_est), dim=0)[-queue_len:]
    else:
        # shift element and replace (push on FIFO)
        history['y'] = torch.roll(history['y'], -num_pts, dims=0)
        history['y'][-num_pts:] = y
        history['y_est'] = torch.roll(history['y_est'], -num_pts, dims=0)
        history['y_est'][-num_pts:] = y_est
    
    return(history)

In [30]:
def compute_iqr(e):
    '''
    Compute Loss IQR
    
    Args:
        e: Error/Loss

    Returns:
        iqr: Interquartile range of the error

    Raises:
        -

    Example:
        

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    q75 = torch.quantile(e, 0.75)
    q25 = torch.quantile(e, 0.25)
    iqr = q75 - q25
    return iqr

In [31]:
def compute_l1_iqr(y_est, y):
    '''
    Compute L1 Loss IQR
    
    Args:
        y_est: Model prediction output
        y: Training data output ground truth

    Returns:
        iqr: Interquartile range of the error

    Raises:
        -

    Example:
        

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    e = torch.abs(y_est - y)
    q75 = torch.quantile(e, 0.75)
    q25 = torch.quantile(e, 0.25)
    iqr = q75 - q25
    return iqr

In [32]:
def compute_mape_iqr(y_est, y):
    '''
    Compute Error IQR
    
    Args:
        y_est: Model prediction output
        y: Training data output ground truth

    Returns:
        iqr: Interquartile range of the error

    Raises:
        -

    Example:
        

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    e = torch.abs(y_est - y)
    q75 = torch.quantile(e, 0.75)
    q25 = torch.quantile(e, 0.25)
    iqr = q75 - q25
    return iqr

Ray Tune Training Procedure

In [33]:
# Training procedure
def train_cspd_raytune(config, 
                       num_in_feat,
                       num_branch,
                       criterion=nn.MSELoss(),
                       checkpoint_dir=None, 
                       num_epochs=100, 
                       train_dataset=None, 
                       valid_dataset=None,
                       metric_dict={'rmse':     lambda y_est,y: torch.sqrt(nn.MSELoss(reduction="mean")(y_est,y)),
                                    'mean_l1':  lambda y_est,y: nn.L1Loss(reduction="mean")(y_est,y),
                                    'l1_iqr':   lambda y_est,y: compute_iqr(nn.L1Loss(reduction="none")(y_est,y)),
                                    'med-ape':  lambda y_est,y: torch.median((y-y_est).abs()/y.abs()),
                                    'mape':     lambda y_est,y: torch.mean((y-y_est).abs()/y.abs()),
                                    'mape_iqr': lambda y_est,y: compute_iqr((y-y_est).abs()/y.abs())},
                       train_metric_samples=None,
                       force_cpu=False
                      ):
    '''
    Training procedure for cspd regression with Ray Tune hyperparameter tuning
    This function is to be used for training with hyperparameter tuning based on Ray Tune. A cspd architecture table is given and the following hyperparameters are sampled by Ray Tune:
        lr: learning rate
        h_branch: neural network architecture definition
        dropout_p: dropout probability of all the neurons in the network
        k: k-fold index k for the dataset
        batch_size: the batch size use for the mini-batch use for batch gradient descent

    Args:
    (Note: This function is not meant to run directly by user, these arguemnts are passed indirectly by tune.run.)
        config: Ray Tune hyperparameter sampling configuration (for details, please refer to: https://docs.ray.io/en/master/tune/user-guide.html)
        checkpoint_dir: Output directory of training log, including the tensorboard output
        num_epochs: Number of training epochs
        num_in_feat: Number of input features for the network
        num_branch: Number of parallel branches in the network (subgroups)
        train_dataset: List of K element SgrpDataBatch class Pytorch dataloader object
        valid_dataset: List of K element SgrpDataBatch class Pytorch dataloader object
        metric_dict: Dictionary of loss function to be use for metric reporting (Attention: These are only used for reporting, not as training loss function!)

    Returns:
        result is return indirectly with tune.run

    Raises:
        -

    Example:
        -
    '''

    #====================== Ray Tune Parameters Setup ======================#

    if 'dropout_p' in config.keys():
        _dropout_p = config['dropout_p']
    else:
        _dropout_p = 0

    if 'batch_size' in config.keys():
        _batch_size = config['batch_size']
    else:
        _batch_size = 1
        
    # determine if input is k-fold dataset or single dataset
    if (type(train_dataset) is list) and (type(valid_dataset) is list) and ('k' in config.keys()):
        _train_dataset = train_dataset[config['k']]
        _valid_dataset = valid_dataset[config['k']]
    else:
        _train_dataset = train_dataset
        _valid_dataset = valid_dataset

    # zero patch high-res dimension dataset if we are doing mini-batch
    if _batch_size > 1:
        _train_dataset.zero_patched = True
        _valid_dataset.zero_patched = True
    else:
        _train_dataset.zero_patched = False
        _valid_dataset.zero_patched = False

    # measure error metric across whole epoch if no sample length is given
    # (the latest progress might not be shown properly and error could be overestimated by earlier samples)
    if train_metric_samples is None:
        train_metric_samples = len(_train_dataset)
    
    # gpu usage
    if not force_cpu:
        device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    else:
        device = "cpu"

    train_loader = torch.utils.data.DataLoader(dataset=_train_dataset, batch_size=_batch_size, shuffle=True, 
                                               collate_fn=lambda x: [ x_ele.to(device) for x_ele in default_collate(x) ] )
    valid_loader = torch.utils.data.DataLoader(dataset=_valid_dataset, batch_size=_batch_size, shuffle=True,
                                               collate_fn=lambda x: [ x_ele.to(device) for x_ele in default_collate(x) ] )

    #====================== Model Setup ======================#

    # initialize ANN architecture
    model = cspd(in_feat = num_in_feat, 
                 Subgroups = num_branch, 
                 Architecture = config['h_branch'], 
                 dropout_p = _dropout_p,
                 act_fn = torch.relu)
    model.apply(initialize_weights)

    # gpu usage
    if torch.cuda.device_count() > 1:
        model = nn.DataParallel(model)   # for multiple GPUs
    model.to(device)

    
    # optimizer is controlled by ray tune hyperparameter
    optimizer = torch.optim.Adam(model.parameters(), lr = config["lr"])
    
    # The `checkpoint_dir` parameter gets passed by Ray Tune when a checkpoint
    # should be restored.
    if checkpoint_dir:
        checkpoint = os.path.join(checkpoint_dir, "checkpoint")
        model_state, optimizer_state = torch.load(checkpoint)
        model.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)
    

    # create loss metric dictionary to store results
    history = {'train': {}, 'valid': {}}
    metric_output = {}

    for epoch in range(num_epochs):

        #====================== Training ======================#

        # training using all training samples
        for i, (x, s, y) in enumerate(train_loader):
            # zero the parameter gradients
            optimizer.zero_grad()

            # set the model to training mode
            model.train()
            
            # forward + backward + optimize
            y_est = model(x, s)
            y_est, y = remove_zero_padded(y_est, y, s)
            loss = criterion(y_est, y)
            loss.backward()
            optimizer.step()
            
            # record the prediction results
            # :: the following function is use to remove zero-padded samples in batch training
            # :: loss metrics are kept in a FIFO queue per latest samples in order to compute statistics
            history['train'] = loss_fifo(y_est, y, sgrp=s, history=history['train'], queue_len=train_metric_samples)
            
        #====================== Validation ======================#    
        
        # set the model to evaluation mode
        model.eval()

        # training using all validation samples
        with torch.no_grad():
            for  i, (x, s, y) in enumerate(valid_loader):
                y_est = model(x, s)
                # record the prediction results
                # :: the following function is use to remove zero-padded samples in batch training
                # :: loss metrics are kept in a FIFO queue per latest samples in order to compute statistics
                history['valid'] = loss_fifo(y_est, y, sgrp=s, history=history['valid'], queue_len=len(_valid_dataset))
                
        for metric in metric_dict.keys():
            for dataset in history.keys():
                metric_label = '_'.join([dataset,metric])
                metric_output[metric_label] = metric_dict[metric](history[dataset]['y_est'],history[dataset]['y']).item()
                
        # Here we save a checkpoint. It is automatically registered with
        # Ray Tune and will potentially be passed as the `checkpoint_dir`
        # parameter in future iterations.
        with tune.checkpoint_dir(step=epoch) as checkpoint_dir:
            path = os.path.join(checkpoint_dir, "checkpoint")
            torch.save( (model.state_dict(), optimizer.state_dict()), path )

        tune.report(**metric_output)

Training routine to be use for manual training with __No Hyperparameter Tuning__ with Ray Tune

In [34]:
# Training procedure
def train_cspd(model, train_dataset, valid_dataset, criterion, optimizer, 
               epochs=100, 
               batch_size=1, 
               metric_dict={'rmse':     lambda y_est,y: torch.sqrt(nn.MSELoss(reduction="mean")(y_est,y)),
                            'mean_l1':  lambda y_est,y: nn.L1Loss(reduction="mean")(y_est,y),
                            'l1_iqr':   lambda y_est,y: compute_iqr(nn.L1Loss(reduction="none")(y_est,y)),
                            'med-ape':  lambda y_est,y: torch.median((y-y_est).abs()/y.abs()),
                            'mape':     lambda y_est,y: torch.mean((y-y_est).abs()/y.abs()),
                            'mape_iqr': lambda y_est,y: compute_iqr((y-y_est).abs()/y.abs())},
               train_metric_samples=None,
               ):
    '''
    Training procedure for cspd regression
    This function is to be used for training without hyperparameter optimization, this function is usually use for test run to make sure all modification on the cspd architecture is working before submitting a list of models for hyperparameter search. To use hyperparameter optimization, please use either `train_cspd_raytune` or `train_cspd_raytune_auto_architecture`.
    
    Args:
        model: Pytorch model object of cspd
        train_dataset: List of K element SgrpDataBatch class Pytorch dataloader object
        valid_dataset: List of K element SgrpDataBatch class Pytorch dataloader object
        criterion: Training criterion to be used (eg. criterion = nn.MSELoss())
        optimizer: Training optimizer to be used (eg. optimizer = torch.optim.Adam(model.parameters(), lr = 0.1))
        epochs: Number of training epochs to be used
        batch_size: The batch size to use for batch gradient descent of the output dimension, the input dimension will be setted to zero patching within the SgrpDataBatch object for comparable input size to perform the stacked computation
        metric_dict: Dictionary of loss function to be use for metric reporting (Attention: These are only used for reporting, not as training loss function!)
        history_queue_len: The number of loss result samples to keep for statistical reporting

    Returns:
        history: Training and validation results summary
        model: Implicitly updated in the model object

    Raises:
        -

    Example:
        # Example of cspd training with no subgroupings
        # (Remark: s_model and s_test are all generated with all 1's by model_test_split function with ohe_cols=None)
        dataset_model, dataset_test, dataset_train_kfold, dataset_valid_kfold = patitioned_data_object_cspd(x, y, subgroup_col, y_id_col, test_split_ratio, k)
        dataset_train = dataset_train_kfold
        dataset_valid = dataset_valid_kfold
        architecture = [ [2,2,3,2,2] ]   # single branch with 5 layers
        model = cspd(in_feat=10, Subgroups=1, Architecture=architecture, dropout_p=0.3)
        model.apply(initialize_weights)
        optimizer = torch.optim.Adam(model.parameters(), lr = 0.1)
        criterion = nn.MSELoss()
        metric_dict = {'rmse': lambda y_est,y: torch.sqrt(nn.MSELoss(reduction="none")(y_est,y)), 
                       'mape': lambda y_est,y: (y-y_est).abs()/y.abs()}
        training_results = train_cspd(model=model, 
                                      train_dataset=dataset_model, 
                                      valid_dataset=dataset_test, 
                                      criterion=criterion,
                                      optimizer=optimizer,
                                      metric_dict=metric_dict,
                                      epochs=num_epochs, 
                                      batch_size=64)        

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    history = {'train': {}, 'valid': {}}
    metric_output = {}

    if train_metric_samples is None:
        train_metric_samples = len(train_dataset)
    
    if batch_size > 1:
        train_dataset.zero_patched = True
        valid_dataset.zero_patched = True
    else:
        train_dataset.zero_patched = False
        valid_dataset.zero_patched = False

    train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = torch.utils.data.DataLoader(dataset=valid_dataset, batch_size=batch_size, shuffle=True)
    
    for epoch in range(epochs):

        #====================== Training ======================#
        running_loss = 0.0
        epoch_steps = 0

        # training using all training samples
        for i, (x, sgrp, y) in enumerate(train_loader):
            # zero the parameter gradients
            optimizer.zero_grad()

            # set the model to training mode
            model.train()
            
            # forward + backward + optimize
            y_est = model(x, sgrp)
            y_est, y = remove_zero_padded(y_est, y, sgrp)
            loss = criterion(y_est, y)
            loss.backward()
            optimizer.step()
            history['train'] = loss_fifo(y_est, y, sgrp=sgrp, history=history['train'], queue_len=train_metric_samples)

        #====================== Validation ======================#
        
        # set the model to evaluation mode
        model.eval()

        # training using all validation samples
        with torch.no_grad():
            for  i, (x, sgrp, y) in enumerate(valid_loader):
                y_est = model(x, sgrp)
                history['valid'] = loss_fifo(y_est, y, sgrp=sgrp, history=history['valid'], queue_len=len(valid_dataset))
                
    
        metric_labels = []
        for metric in metric_dict.keys():
            for dataset in history.keys():
                metric_label = '_'.join([dataset,metric])
                metric_output[metric_label] = metric_dict[metric](history[dataset]['y_est'],history[dataset]['y']).item()
                metric_labels.append(metric_label)
                        
        print(f"[Epoch: { epoch+1 }]", end=" " )
        for metric_label in metric_labels:
            print(f"{metric_label}: {metric_output[metric_label]:.3f},", end=" ")
        print(f"")

    return (history)

In [35]:
def train_cspd_raytune_cpu_gpu_distributed(config, 
                                           num_in_feat,
                                           num_branch,
                                           criterion=nn.MSELoss(),
                                           checkpoint_dir=None, 
                                           num_epochs=100, 
                                           train_dataset=None, 
                                           valid_dataset=None,
                                           metric_dict={'rmse':     lambda y_est,y: torch.sqrt(nn.MSELoss(reduction="mean")(y_est,y)),
                                                        'mean_l1':  lambda y_est,y: nn.L1Loss(reduction="mean")(y_est,y),
                                                        'l1_iqr':   lambda y_est,y: compute_iqr(nn.L1Loss(reduction="none")(y_est,y)),
                                                        'med-ape':  lambda y_est,y: torch.median((y-y_est).abs()/y.abs()),
                                                        'mape':     lambda y_est,y: torch.mean((y-y_est).abs()/y.abs()),
                                                        'mape_iqr': lambda y_est,y: compute_iqr((y-y_est).abs()/y.abs())},
                                           train_metric_samples=None,
                                           ):
    '''
    CPU/GPU Distributed Wrapper Function for Training procedure for cspd regression
    This function is written to allow training done on both CPU and GPU of a single machine at the same time.
    
    Args:

    Returns:
        result: Training metric results

    Source:
        This code is modified from the following: https://discuss.ray.io/t/different-trial-on-cpu-and-gpu-separately/2883

    Author:
        Dr. Calvin Chan
        calvin.chan@bayer.com
    '''
    
    a = filelock.FileLock("/tmp/gpu.lock")
    try:
        # Makes it so that 1 trial will use the GPU at once.
        a.acquire(timeout=1)
        result = train_cspd_raytune(config, 
                                    num_in_feat,
                                    num_branch,
                                    criterion,
                                    checkpoint_dir, 
                                    num_epochs, 
                                    train_dataset, 
                                    valid_dataset,
                                    metric_dict,
                                    train_metric_samples,
                                    force_cpu=False
                                    )
    except filelock.Timeout:
        # If the lock is acquired, you can just use CPU, and disable GPU access.
        result = train_cspd_raytune(config, 
                                    num_in_feat,
                                    num_branch,
                                    criterion,
                                    checkpoint_dir, 
                                    num_epochs, 
                                    train_dataset, 
                                    valid_dataset,
                                    metric_dict,
                                    train_metric_samples,
                                    force_cpu=True
                                    )
    finally:
        # Release the lock after training is done.
        a.release()
    return result


---

# Training

Data Preparation

In [36]:
dataset_model, dataset_test, dataset_train_kfold, dataset_valid_kfold = patitioned_data_object_cspd(x, y, subgroup_col, y_id_col, test_split_ratio, k)

Functions for Joining Results and Architecture Table

In [37]:
def convert_nested_numeric_to_string(in_list):
    return(' ; '.join([' '.join([str(c) for c in lst]) for lst in in_list]))

In [38]:
def join_nested(left, right, on):
    left['key'] = left[on].apply(convert_nested_numeric_to_string)
    right['key'] = right[on].apply(convert_nested_numeric_to_string)
    out = pd.merge(left.drop(columns=[on]), right, on='key', how='left')
    out = out.drop(columns=['key'])
    return(out)

### Hyperparameter Tuning (Ray Tune Using Network Architecture Table)

Model Training for Ray Tune with __Ramdom Sampled Network Architecture Hyperparameters__ 

In [39]:
architectures = architecture_table['h_branch'][0:5].tolist()

In [40]:
report_metrics = ["training_iteration",
                  "train_rmse", 
                  "valid_rmse",
                  "train_mean_l1", 
                  "valid_mean_l1",
                  "train_l1_iqr",
                  "valid_l1_iqr",
                  "train_med-ape",
                  "valid_med-ape",
                  "train_mape", 
                  "valid_mape",
                  "train_mape_iqr", 
                  "valid_mape_iqr",
                 ]

# reporter = tune.CLIReporter(max_progress_rows=35, metric_columns=report_metrics)
reporter = tune.JupyterNotebookReporter(overwrite=True, max_progress_rows=35,  metric_columns= report_metrics)
scheduler = HyperBandScheduler(metric="valid_mape", mode="min", max_t=num_epochs)
searchopt = BasicVariantGenerator(max_concurrent=15)

config = {'lr': tune.loguniform(lr_min, lr_max),                     # Learning Rate
          "dropout_p": tune.uniform(dropout_p_min, dropout_p_max),   # Dropout On/Off
          'k': tune.grid_search([*range(k)]),                        # K-Fold Index
          'batch_size': tune.choice(batch_size),                     # 1: SGD; 2+: Zero-Filled BGD
          'h_branch': tune.grid_search(architectures),
         }

In [41]:
t0 = time.time()

result = tune.run(
    tune.with_parameters(train_cspd_raytune_cpu_gpu_distributed, 
                         num_in_feat   = N_FEATURE,
                         num_branch    = N_SUBGROUP,
                         num_epochs    = num_epochs, 
                         train_dataset = dataset_train_kfold, 
                         valid_dataset = dataset_valid_kfold,
                         train_metric_samples = round(len(dataset_train_kfold[0])/10),
                         ),
    config = config,
    resources_per_trial = {"cpu": 2 ,"gpu": 0.1},
    num_samples = num_hp_search_samples,
    local_dir = chkpt_dir,
    max_failures = 5,
    progress_reporter = reporter,
    scheduler = scheduler,
    search_alg = searchopt,
)

t1 = time.time()
print(f"Time elapsed: {t1-t0}s")

Trial name,status,loc,batch_size,dropout_p,h_branch,k,lr,training_iteration,train_rmse,valid_rmse,train_mean_l1,valid_mean_l1,train_l1_iqr,valid_l1_iqr,train_med-ape,valid_med-ape,train_mape,valid_mape,train_mape_iqr,valid_mape_iqr
train_cspd_raytune_cpu_gpu_distributed_e4721_00000,TERMINATED,,64,0.0853914,"[[4, 3], [2, 2, 3], [2, 2, 3], [4, 3], [2, 2, 3, 3, 2]]",0,0.00176027,5,44517.7,45598.1,38371.2,39433.7,37274.8,38981.5,0.963136,0.962158,0.916497,0.91754,0.0527474,0.0454284
train_cspd_raytune_cpu_gpu_distributed_e4721_00001,TERMINATED,,64,0.697423,"[[2, 2, 3], [4, 3], [2, 5], [2, 2, 3], [5, 2, 5]]",0,0.0033187,5,42677.2,43943.8,36553.9,37580.6,38286.4,39306.7,0.914488,0.918149,0.862635,0.858257,0.103201,0.0915404
train_cspd_raytune_cpu_gpu_distributed_e4721_00002,TERMINATED,,64,0.453953,"[[2, 3, 2], [4, 3], [3, 5], [5, 2, 2, 2], [2, 3, 2]]",0,0.000382137,1,46084.4,46850.0,40217.4,40871.4,38416.5,39236.2,0.996796,0.99705,0.992111,0.993479,0.00485981,0.00352699
train_cspd_raytune_cpu_gpu_distributed_e4721_00003,TERMINATED,,64,0.403146,"[[4, 3], [3, 4], [2, 4, 2], [4, 3, 4], [2, 5]]",0,0.0346138,1,29202.0,28360.4,23474.6,23395.6,29565.5,26093.8,0.602103,0.556996,0.852773,1.05081,0.364137,0.325352
train_cspd_raytune_cpu_gpu_distributed_e4721_00004,TERMINATED,,64,0.612245,"[[2, 5, 2], [4, 3], [3, 2, 2], [2, 2, 4], [2, 2, 5]]",0,0.00632615,3,43081.1,42682.3,37001.7,36225.2,35847.7,38796.3,0.895171,0.884017,0.835857,0.830839,0.120791,0.125901
train_cspd_raytune_cpu_gpu_distributed_e4721_00005,TERMINATED,,64,0.388295,"[[4, 3], [2, 2, 3], [2, 2, 3], [4, 3], [2, 2, 3, 3, 2]]",1,0.0508286,5,23754.4,23089.2,20412.6,19869.4,19479.5,19547.6,0.429355,0.420032,1.47403,1.37029,0.605378,0.584318
train_cspd_raytune_cpu_gpu_distributed_e4721_00006,TERMINATED,,64,0.328781,"[[2, 2, 3], [4, 3], [2, 5], [2, 2, 3], [5, 2, 5]]",1,0.0016617,5,43369.5,45609.2,36738.0,39547.6,42050.7,39837.9,0.958085,0.961793,0.916795,0.922185,0.0647695,0.0442157
train_cspd_raytune_cpu_gpu_distributed_e4721_00007,TERMINATED,,64,0.376204,"[[2, 3, 2], [4, 3], [3, 5], [5, 2, 2, 2], [2, 3, 2]]",1,0.0384199,3,24629.3,24030.4,20722.6,20356.9,21638.8,20620.4,0.452192,0.431487,1.38091,1.30625,0.415867,0.461189
train_cspd_raytune_cpu_gpu_distributed_e4721_00008,TERMINATED,,64,0.459848,"[[4, 3], [3, 4], [2, 4, 2], [4, 3, 4], [2, 5]]",1,0.0141262,1,47510.9,45282.2,42019.0,39176.0,37301.9,39709.3,0.960628,0.952846,0.922977,0.907597,0.0378298,0.0582972
train_cspd_raytune_cpu_gpu_distributed_e4721_00009,TERMINATED,,64,0.259113,"[[2, 5, 2], [4, 3], [3, 2, 2], [2, 2, 4], [2, 2, 5]]",1,0.000479242,1,45112.6,46853.7,39517.6,40970.7,35672.1,39696.5,0.996909,0.997088,0.992551,0.993888,0.00365959,0.00334135


2022-03-25 16:49:21,401	INFO tune.py:561 -- Total run time: 259.66 seconds (258.95 seconds for the tuning loop).


Time elapsed: 269.33325386047363s


In [44]:
all_trials.columns

Index(['train_rmse', 'valid_rmse', 'train_mean_l1', 'valid_mean_l1',
       'train_l1_iqr', 'valid_l1_iqr', 'train_med-ape', 'valid_med-ape',
       'train_mape', 'valid_mape', 'train_mape_iqr', 'valid_mape_iqr',
       'time_this_iter_s', 'should_checkpoint', 'done', 'timesteps_total',
       'episodes_total', 'training_iteration', 'experiment_id', 'date',
       'timestamp', 'time_total_s', 'pid', 'hostname', 'node_ip',
       'time_since_restore', 'timesteps_since_restore',
       'iterations_since_restore', 'experiment_tag', 'lr', 'dropout_p', 'k',
       'batch_size', 'h_branch', 'key'],
      dtype='object')

In [46]:
best_trial = result.get_best_trial("valid_med-ape", "min", "last")

all_trials = result.results_df
all_trials.columns = all_trials.columns.str.replace('config.', '', regex=False).tolist()
join_nested(all_trials, architecture_table, on='h_branch')
all_trials[['k','batch_size','h_branch','dropout_p','lr','valid_med-ape']]

Unnamed: 0_level_0,k,batch_size,h_branch,dropout_p,lr,valid_med-ape
trial_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
e4721_00000,0,64,"[[4, 3], [2, 2, 3], [2, 2, 3], [4, 3], [2, 2, ...",0.085391,0.00176,0.962158
e4721_00001,0,64,"[[2, 2, 3], [4, 3], [2, 5], [2, 2, 3], [5, 2, 5]]",0.697423,0.003319,0.918149
e4721_00002,0,64,"[[2, 3, 2], [4, 3], [3, 5], [5, 2, 2, 2], [2, ...",0.453953,0.000382,0.99705
e4721_00003,0,64,"[[4, 3], [3, 4], [2, 4, 2], [4, 3, 4], [2, 5]]",0.403146,0.034614,0.556996
e4721_00004,0,64,"[[2, 5, 2], [4, 3], [3, 2, 2], [2, 2, 4], [2, ...",0.612245,0.006326,0.884017
e4721_00005,1,64,"[[4, 3], [2, 2, 3], [2, 2, 3], [4, 3], [2, 2, ...",0.388295,0.050829,0.420032
e4721_00006,1,64,"[[2, 2, 3], [4, 3], [2, 5], [2, 2, 3], [5, 2, 5]]",0.328781,0.001662,0.961793
e4721_00007,1,64,"[[2, 3, 2], [4, 3], [3, 5], [5, 2, 2, 2], [2, ...",0.376204,0.03842,0.431487
e4721_00008,1,64,"[[4, 3], [3, 4], [2, 4, 2], [4, 3, 4], [2, 5]]",0.459848,0.014126,0.952846
e4721_00009,1,64,"[[2, 5, 2], [4, 3], [3, 2, 2], [2, 2, 4], [2, ...",0.259113,0.000479,0.997088


In [47]:
print("Best trial config: {}".format(best_trial.config))
print("Best trial final validation loss: {}".format(best_trial.last_result["valid_med-ape"]))

Best trial config: {'lr': 0.03184793123042965, 'dropout_p': 0.4266056926273974, 'k': 4, 'batch_size': 64, 'h_branch': [[4, 3], [2, 2, 3], [2, 2, 3], [4, 3], [2, 2, 3, 3, 2]]}
Best trial final validation loss: 0.4125843968327914


---

### Hyperparameter Tuning (Ray Tune using Auto Network Architecture Tunning)

__Note__: Ray Tune does not yet support same sampling for all set of Random+Grid Random Search with same set of Grid Parameters.  Therefore, this is not suitable for K-Fold fair comparision yet.

In [48]:
report_metrics = ["training_iteration",
                  "train_rmse", 
                  "valid_rmse",
                  "train_mean_l1", 
                  "valid_mean_l1",
                  "train_l1_iqr",
                  "valid_l1_iqr",
                  "train_med-ape",
                  "valid_med-ape",
                  "train_mape", 
                  "valid_mape",
                  "train_mape_iqr", 
                  "valid_mape_iqr",
                 ]

reporter = tune.JupyterNotebookReporter(overwrite=True, max_progress_rows=35, metric_columns= report_metrics)
scheduler = HyperBandScheduler(metric="valid_mape", mode="min", max_t=num_epochs)
searchopt = BasicVariantGenerator(max_concurrent=15)

config = {"lr": tune.loguniform(lr_min, lr_max),                       # Learning Rate
          "dropout_p": tune.uniform(dropout_p_min, dropout_p_max),     # Dropout On/Off
          "k": tune.grid_search([*range(k)]),                          # K-Fold Index
          "batch_size": tune.choice(batch_size),                       # 1: SGD; 2+: Zero-Filled BGD
          "h_total": tune.choice([*range(h_total_min, h_total_max, h_total_step)]),
          "h_subgroup": tune.sample_from(lambda spec: split_sampling(num_ele = spec.config.h_total, 
                                                                    num_layers = N_SUBGROUP,
                                                                    n_min = h_subgroup_min_neuron_per_sgrp,
                                                                    n_max = h_subgroup_max_neuron_per_sgrp,
                                                                    single_sample = True)),
          "h_branch": tune.sample_from(lambda spec: [ split_sampling(num_ele = h_sgrp_ele, 
                                                                     num_layers = h_branch_max_layer,
                                                                     n_min = h_branch_min_neuron_per_layer,
                                                                     n_max = h_branch_max_neuron_per_layer,
                                                                     single_sample = True) for h_sgrp_ele in spec.config.h_subgroup ]),
         }

In [49]:
t0 = time.time()

result = tune.run(
    tune.with_parameters(train_cspd_raytune_cpu_gpu_distributed, 
                         num_in_feat   = N_FEATURE,
                         num_branch    = N_SUBGROUP,
                         num_epochs    = num_epochs, 
                         train_dataset = dataset_train_kfold, 
                         valid_dataset = dataset_valid_kfold,
                         train_metric_samples = round(len(dataset_train_kfold[0])/10),
                         ),
    config = config,
    resources_per_trial={"cpu": 1},
    num_samples = num_hp_search_samples,
    local_dir = chkpt_dir,
    progress_reporter = reporter,
    scheduler = scheduler,
    search_alg = searchopt,
)

t1 = time.time()
print(f"Time elapsed: {t1-t0}s")

Trial name,status,loc,batch_size,dropout_p,h_branch,h_subgroup,h_total,k,lr,training_iteration,train_rmse,valid_rmse,train_mean_l1,valid_mean_l1,train_l1_iqr,valid_l1_iqr,train_med-ape,valid_med-ape,train_mape,valid_mape,train_mape_iqr,valid_mape_iqr
train_cspd_raytune_cpu_gpu_distributed_9d03e_00000,TERMINATED,,64,0.201241,"[[2, 3, 3, 2], [2, 5], [5, 3], [4, 2, 2, 2, 2], [2, 3, 3]]","[10, 7, 8, 12, 8]",45,0,0.0208903,5,23584.4,23914.7,19694.2,20299.0,21419.0,20809.1,0.434326,0.415087,1.59344,1.55312,0.980396,0.524093
train_cspd_raytune_cpu_gpu_distributed_9d03e_00001,TERMINATED,,64,0.486102,"[[5, 3], [3, 2, 3], [3, 2, 3], [3, 4, 2], [2, 5]]","[8, 8, 8, 9, 7]",40,1,0.00200156,5,43392.3,45486.0,37211.7,39404.7,37562.6,39649.8,0.955967,0.9574,0.904384,0.915648,0.0507016,0.0492652
train_cspd_raytune_cpu_gpu_distributed_9d03e_00002,TERMINATED,,64,0.302968,"[[5, 3, 2, 2], [3, 5], [3, 2, 2, 2], [4, 3], [2, 2, 5]]","[12, 8, 9, 7, 9]",45,2,0.00607891,1,45329.0,45358.5,39151.5,39348.1,39064.5,39932.4,0.984127,0.983779,0.96545,0.966027,0.0234205,0.0211702
train_cspd_raytune_cpu_gpu_distributed_9d03e_00003,TERMINATED,,64,0.190956,"[[4, 4], [2, 3, 2, 2], [3, 2, 2], [2, 3, 2, 2], [2, 5]]","[8, 9, 7, 9, 7]",40,3,0.00906328,3,40002.8,39058.6,33800.0,32678.8,37461.2,37264.9,0.816422,0.795858,0.820393,0.800123,0.183966,0.190475
train_cspd_raytune_cpu_gpu_distributed_9d03e_00004,TERMINATED,,64,0.431493,"[[2, 4, 2], [2, 2, 3], [2, 2, 3, 3], [3, 4, 2], [3, 5, 3]]","[8, 7, 10, 9, 11]",45,4,0.00295892,1,47423.3,46102.1,41672.5,40352.4,38152.6,37861.1,0.992448,0.992815,0.983752,0.984764,0.00956165,0.00853514


2022-03-25 16:58:20,983	INFO tune.py:561 -- Total run time: 62.04 seconds (60.75 seconds for the tuning loop).


Time elapsed: 69.64378261566162s


In [50]:
best_trial = result.get_best_trial("valid_med-ape", "min", "last")

all_trials = result.results_df
all_trials.columns = all_trials.columns.str.replace('config.', '', regex=False).tolist()
all_trials[['k','batch_size','h_branch','h_subgroup','h_total','dropout_p','lr','valid_med-ape']]

Unnamed: 0_level_0,k,batch_size,h_branch,h_subgroup,h_total,dropout_p,lr,valid_med-ape
trial_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
9d03e_00000,0,64,"[[2, 3, 3, 2], [2, 5], [5, 3], [4, 2, 2, 2, 2]...","[10, 7, 8, 12, 8]",45,0.201241,0.02089,0.415087
9d03e_00001,1,64,"[[5, 3], [3, 2, 3], [3, 2, 3], [3, 4, 2], [2, 5]]","[8, 8, 8, 9, 7]",40,0.486102,0.002002,0.9574
9d03e_00002,2,64,"[[5, 3, 2, 2], [3, 5], [3, 2, 2, 2], [4, 3], [...","[12, 8, 9, 7, 9]",45,0.302968,0.006079,0.983779
9d03e_00003,3,64,"[[4, 4], [2, 3, 2, 2], [3, 2, 2], [2, 3, 2, 2]...","[8, 9, 7, 9, 7]",40,0.190956,0.009063,0.795858
9d03e_00004,4,64,"[[2, 4, 2], [2, 2, 3], [2, 2, 3, 3], [3, 4, 2]...","[8, 7, 10, 9, 11]",45,0.431493,0.002959,0.992815


In [51]:
print("Best trial config: {}".format(best_trial.config))
print("Best trial final validation loss: {}".format(best_trial.last_result["valid_med-ape"]))

Best trial config: {'lr': 0.020890300100544615, 'dropout_p': 0.20124093472181864, 'k': 0, 'batch_size': 64, 'h_total': 45, 'h_subgroup': [10, 7, 8, 12, 8], 'h_branch': [[2, 3, 3, 2], [2, 5], [5, 3], [4, 2, 2, 2, 2], [2, 3, 3]]}
Best trial final validation loss: 0.4150868202638076


---

### Modeling (No Ray Tune)

In [None]:
arch_ind = 2

t0 = time.time()

architecture = architecture_table['h_branch'][arch_ind]
model = cspd(N_FEATURE,N_SUBGROUP,architecture,dropout_p=0.5).to(device)
model.apply(initialize_weights)
optimizer = torch.optim.Adam(model.parameters(), lr = 0.1)
criterion = nn.MSELoss()
metric_dict = {'rmse': lambda y_est,y: torch.sqrt(nn.MSELoss(reduction="mean")(y_est,y)), 
               'mape': lambda y_est,y: torch.mean((y-y_est).abs()/y.abs()),
               'l1_iqr': lambda y_est,y: compute_iqr(nn.L1Loss(reduction="none")(y_est,y))}


training_results = train_cspd(model=model, 
                              train_dataset=dataset_model, 
                              valid_dataset=dataset_test, 
                              criterion=criterion,
                              optimizer=optimizer,
                              metric_dict=metric_dict,
                              epochs=num_epochs, 
                              batch_size=64)

t1 = time.time()
print(f"Time elapsed: {t1-t0}s")