# Model Training with PyTorch Lightning

> This document is currently in a minimal version without sufficient annotations. We will update it in the future.

> Author: Tianyu Du

In [1]:
from time import time
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F

from torch_choice.data import ChoiceDataset, utils
from torch_choice.model import ConditionalLogitModel

# ======
# delete import statements below.

from torch_choice.utils.run_helper_lightning import LightningModelWrapper, section_print
import time
from copy import deepcopy
from typing import Optional, Union

import pandas as pd
import torch
import torch.nn.functional as F
import pytorch_lightning as pl
from pytorch_lightning.callbacks.early_stopping import EarlyStopping

from torch_choice.data import ChoiceDataset
from torch_choice.data.utils import create_data_loader
from torch_choice.model.conditional_logit_model import ConditionalLogitModel
from torch_choice.model.nested_logit_model import NestedLogitModel
from torch_choice.utils.std import parameter_std

  Referenced from: <B3E58761-2785-34C6-A89B-F37110C88A05> /Users/tianyudu/miniforge3/envs/dev/lib/python3.9/site-packages/torchvision/image.so
  Expected in:     <AE6DCE26-A528-35ED-BB3D-88890D27E6B9> /Users/tianyudu/miniforge3/envs/dev/lib/python3.9/site-packages/torch/lib/libtorch_cpu.dylib
  warn(f"Failed to load image Python extension: {e}")


# Load Dataset and Create the Model

In [12]:
# TODO: move this to a separate file.
def load_mode_canada_dataset():
    df = pd.read_csv('./public_datasets/ModeCanada.csv')
    df = df.query('noalt == 4').reset_index(drop=True)
    df.sort_values(by='case', inplace=True)
    item_index = df[df['choice'] == 1].sort_values(by='case')['alt'].reset_index(drop=True)
    item_names = ['air', 'bus', 'car', 'train']
    num_items = 4
    encoder = dict(zip(item_names, range(num_items)))
    print(f"{encoder=:}")
    item_index = item_index.map(lambda x: encoder[x])
    item_index = torch.LongTensor(item_index)
    print(f"{item_index=:}")
    price_cost_freq_ovt = utils.pivot3d(df, dim0='case', dim1='alt',
                                    values=['cost', 'freq', 'ovt'])
    print(f'{price_cost_freq_ovt.shape=:}')

    price_ivt = utils.pivot3d(df, dim0='case', dim1='alt', values='ivt')
    print(f'{price_ivt.shape=:}')
    
    session_income = df.groupby('case')['income'].first()
    session_income = torch.Tensor(session_income.values).view(-1, 1)
    dataset = ChoiceDataset(item_index=item_index,
                        price_cost_freq_ovt=price_cost_freq_ovt,
                        session_income=session_income,
                        price_ivt=price_ivt
                        )
    return dataset

In [13]:
dataset = load_mode_canada_dataset()
dataset

encoder={'air': 0, 'bus': 1, 'car': 2, 'train': 3}
item_index=tensor([0, 0, 0,  ..., 2, 2, 2])
price_cost_freq_ovt.shape=torch.Size([2779, 4, 3])
price_ivt.shape=torch.Size([2779, 4, 1])
No `session_index` is provided, assume each choice instance is in its own session.


ChoiceDataset(label=[], item_index=[2779], user_index=[], session_index=[2779], item_availability=[], price_cost_freq_ovt=[2779, 4, 3], session_income=[2779, 1], price_ivt=[2779, 4, 1], device=cpu)

# Option 1: Use the `run()` Helper Function as Before

In [10]:

def run(model: Union [ConditionalLogitModel, NestedLogitModel],
        dataset_train: ChoiceDataset,
        dataset_val: Optional[ChoiceDataset]=None,
        dataset_test: Optional[ChoiceDataset]=None,
        batch_size: int=-1,
        learning_rate: float=0.01,
        num_epochs: int=10,
        num_workers: int=0,
        device: Optional[str]=None,
        **kwargs) -> Union[ConditionalLogitModel, NestedLogitModel]:
    """_summary_

    Args:
        model (Union[ConditionalLogitModel, NestedLogitModel]): the constructed model.
        dataset_train (ChoiceDataset): the dataset for training.
        dataset_val (ChoiceDataset): an optional dataset for validation.
        dataset_test (ChoiceDataset): an optional dataset for testing.
        batch_size (int, optional): batch size for model training. Defaults to -1.
        learning_rate (float, optional): learning rate for model training. Defaults to 0.01.
        num_epochs (int, optional): maximum number of epochs for the training. Defaults to 10.
        num_workers (int, optional): number of parallel workers for data loading. Defaults to 0.
        device (Optional[str], optional): the device that trains the model, if None is specified, the function will
            use the current device of the provided model. Defaults to None.
        **kwargs: other keyword arguments for the pytorch lightning trainer, this is for users with experience in
            pytorch lightning and wish to customize the training process.

    Returns:
        Union[ConditionalLogitModel, NestedLogitModel]: the trained model.
    """
    # ==================================================================================================================
    # Setup the lightning wrapper.
    # ==================================================================================================================
    lightning_model = LightningModelWrapper(model, learning_rate=learning_rate)
    if device is None:
        # infer from the model device.
        device = model.device
    # the cloned model will be used for standard error calculation later.
    model_clone = deepcopy(model)
    section_print('model received')
    print(model)

    # ==================================================================================================================
    # Prepare the data.
    # ==================================================================================================================
    # present a summary of datasets received.
    section_print('data set received')
    print('[Train dataset]', dataset_train)
    print('[Validation dataset]', dataset_val)
    print('[Test dataset]', dataset_test)

    # create pytorch dataloader objects.
    train_dataloader = create_data_loader(dataset_train.to(device), batch_size=batch_size, shuffle=True, num_workers=num_workers)

    if dataset_val is not None:
        val_dataloader = create_data_loader(dataset_val.to(device), batch_size=batch_size, shuffle=False, num_workers=num_workers)
    else:
        val_dataloader = None

    if dataset_test is not None:
        test_dataloader = create_data_loader(dataset_test.to(device), batch_size=batch_size, shuffle=False, num_workers=num_workers)
    else:
        test_dataloader = None

    # ==================================================================================================================
    # Training the model.
    # ==================================================================================================================
    # if the validation dataset is provided, do early stopping.
    callbacks = list()
    if val_dataloader is not None:
        print("Validation dataset provided, do early stopping based on validation log-likelihood.")
        callbacks.append(EarlyStopping(monitor="val_ll", mode="max", patience=10, min_delta=0.001))
    else:
        print("No validation dataset provided, do early stopping based on training loss.")
        # TODO: figure out why early stopping isn't working.
        callbacks.append(EarlyStopping(monitor="train_loss", mode="min", patience=10, min_delta=0.001))

    trainer = pl.Trainer(accelerator="auto",
                         devices="auto",
                         auto_lr_find=False,
                         # gpus=1 if ('cuda' in str(model.device)) else 0,  # use GPU if the model is currently on the GPU.
                         max_epochs=num_epochs,
                         check_val_every_n_epoch=num_epochs // 100,
                         log_every_n_steps=num_epochs // 100,
                         callbacks=callbacks,
                         **kwargs)
    start_time = time.time()
    trainer.fit(lightning_model, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader)
    print(f'Time taken for training: {time.time() - start_time}')
    if test_dataloader is not None:
        trainer.test(lightning_model, test_dataloaders=test_dataloader)
    else:
        print('Skip testing, no test dataset is provided.')

    # ====== get the standard error of the model ====== #
    # current methods of computing standard deviation will corrupt the model, load weights into another model for returning.
    state_dict = deepcopy(lightning_model.model.state_dict())
    model_clone.load_state_dict(state_dict)

    # get mean of estimation.
    mean_dict = dict()
    for k, v in lightning_model.model.named_parameters():
        mean_dict[k] = v.clone()

    # estimate the standard error of the model.
    dataset_for_std = dataset_train.clone()

    if isinstance(model, ConditionalLogitModel):
        def nll_loss(model):
            y_pred = model(dataset_for_std)
            return F.cross_entropy(y_pred, dataset_for_std.item_index, reduction='sum')
    elif isinstance(model, NestedLogitModel):
        def nll_loss(model):
            d = dataset_for_std[torch.arange(len(dataset_for_std))]
            return model.negative_log_likelihood(d, d['item'].item_index)
    std_dict = parameter_std(model_clone, nll_loss)

    print('=' * 20, 'model results', '=' * 20)
    report = list()
    for coef_name, std in std_dict.items():
        std = std.cpu().detach().numpy()
        mean = mean_dict[coef_name].cpu().detach().numpy()
        coef_name = coef_name.replace('coef_dict.', '').replace('.coef', '')
        for i in range(mean.size):
            report.append({'Coefficient': coef_name + f'_{i}',
                           'Estimation': float(mean[i]),
                           'Std. Err.': float(std[i])})
    report = pd.DataFrame(report).set_index('Coefficient')
    # print(f'Training Epochs: {num_epochs}\n')
    # print(f'Learning Rate: {learning_rate}\n')
    # print(f'Batch Size: {batch_size if batch_size != -1 else len(dataset_list[0])} out of {len(dataset_list[0])} observations in total in test set\n')

    lightning_model.model.to(device)
    train_ll = - lightning_model.model.negative_log_likelihood(dataset_train, dataset_train.item_index).detach().item()

    if dataset_val is not None:
        val_ll = - lightning_model.model.negative_log_likelihood(dataset_val, dataset_val.item_index).detach().item()
    else:
        val_ll = 'N/A'

    if dataset_test is not None:
        test_ll = - lightning_model.model.negative_log_likelihood(dataset_test, dataset_test.item_index).detach().item()
    else:
        test_ll = 'N/A'
    print(f'Final Log-likelihood: [Training] {train_ll}, [Validation] {val_ll}, [Test] {test_ll}\n')
    print('Coefficients:\n')
    print(report.to_markdown())
    return model


In [11]:
model = ConditionalLogitModel(coef_variation_dict={'price_cost_freq_ovt': 'constant',
                                                   'session_income': 'item',
                                                   'price_ivt': 'item-full',
                                                   'intercept': 'item'},
                              num_param_dict={'price_cost_freq_ovt': 3,
                                              'session_income': 1,
                                              'price_ivt': 1,
                                              'intercept': 1},
                              num_items=4)
DEVICE = "mps"
trained_model = run(model.to(DEVICE), dataset.to(DEVICE), learning_rate=0.03, num_epochs=1000, device=DEVICE)

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
  rank_zero_warn(

  | Name  | Type                  | Params
------------------------------------------------
0 | model | ConditionalLogitModel | 13    
------------------------------------------------
13        Trainable params
0         Non-trainable params
13        Total params
0.000     Total estimated model params size (MB)
  rank_zero_warn(
  rank_zero_warn(


ConditionalLogitModel(
  (coef_dict): ModuleDict(
    (price_cost_freq_ovt[constant]): Coefficient(variation=constant, num_items=4, num_users=None, num_params=3, 3 trainable parameters in total, device=mps:0).
    (session_income[item]): Coefficient(variation=item, num_items=4, num_users=None, num_params=1, 3 trainable parameters in total, device=mps:0).
    (price_ivt[item-full]): Coefficient(variation=item-full, num_items=4, num_users=None, num_params=1, 4 trainable parameters in total, device=mps:0).
    (intercept[item]): Coefficient(variation=item, num_items=4, num_users=None, num_params=1, 3 trainable parameters in total, device=mps:0).
  )
)
Conditional logistic discrete choice model, expects input features:

X[price_cost_freq_ovt[constant]] with 3 parameters, with constant level variation.
X[session_income[item]] with 1 parameters, with item level variation.
X[price_ivt[item-full]] with 1 parameters, with item-full level variation.
X[intercept[item]] with 1 parameters, with ite

`Trainer.fit` stopped: `max_epochs=1000` reached.


Epoch 999: 100%|██████████| 1/1 [00:00<00:00, 25.79it/s, loss=8.46e+03, v_num=30, train_loss=7.93e+3]
Time taken for training: 22.78681492805481
Skip testing, no test dataset is provided.
Final Log-likelihood: [Training] -7899.5009765625, [Validation] N/A, [Test] N/A

Coefficients:

| Coefficient                     |   Estimation |   Std. Err. |
|:--------------------------------|-------------:|------------:|
| price_cost_freq_ovt[constant]_0 |   -0.235216  |         nan |
| price_cost_freq_ovt[constant]_1 |    0.357215  |         nan |
| price_cost_freq_ovt[constant]_2 |   -0.109957  |         nan |
| session_income[item]_0          |   -2.5607    |         nan |
| session_income[item]_1          |   -0.0483948 |         nan |
| session_income[item]_2          |   -0.179991  |         nan |
| price_ivt[item-full]_0          |    0.290493  |         nan |
| price_ivt[item-full]_1          |   -0.790084  |         nan |
| price_ivt[item-full]_2          |   -0.0222478 |         nan |
|

# Option 2: Have Full Control over the Training Loop