### DataModule

In [1]:
import pandas as pd
import os
import os.path as path
from random import choice
import numpy as np
import torch
import torch.nn as nn
import pytorch_lightning as pl
from pytorch_lightning.loggers import MLFlowLogger
import torch.optim as optim
import mlflow
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from yacs.config import CfgNode as CN
from typing import Any, Optional, Union, List
from datetime import datetime
import warnings
warnings.filterwarnings("ignore", ".*does not have many workers.*")

# load csv
# DATA_PATH = "../data/custom/tmp_dataset_w_bldtype_area.csv"
DATA_PATH = "../data/custom/tmp_dataset_with_interest_col_modified.csv"

In [2]:
# Dataset - Single Series
class TSSingleDataset(Dataset):
    def __init__(self, data, x_cols, input_steps, output_steps, scaler=None):
        self.dataframe= data if isinstance(data, pd.DataFrame) else pd.DataFrame(data)
        self.input_steps = input_steps
        self.output_steps = output_steps

        # setup scaler
        if scaler is not None:
            self.scaler = scaler
        else:
            self.scaler = StandardScaler()
            self.scaler.fit(self.dataframe[x_cols])

        # prepare data
        self.data = torch.tensor(
            self.scaler.transform(self.dataframe[x_cols]), 
            dtype=torch.float32,
        )
        self.n_features = self.data.shape[1]    # feature-dim

    def __len__(self):
        return len(self.data) - self.input_steps - self.output_steps + 1

    def __getitem__(self, idx):
        # x : (input_steps, n_features)
        x = self.data[idx:idx + self.input_steps,:]
        y = self.data[idx + self.input_steps:idx + self.input_steps + self.output_steps,:]
        return x, y

In [3]:
# Dataset - Multi Series
class TSMultiDataset(Dataset):
    def __init__(self, 
                 data: Union[List[pd.DataFrame], pd.DataFrame], 
                 x_cols, 
                 input_steps, 
                 output_steps):
        
        # data : list of dataframe or single dataframe (all same shape / time series length)
        self.input_steps = input_steps
        self.output_steps = output_steps
        self.df_list = [data] if isinstance(data, pd.DataFrame) else data
        self.df_combined = pd.concat(self.df_list, axis=0)
        self.series_length = self.df_list[0].shape[0] - self.input_steps - self.output_steps + 1   # length of each series
        
        # setup scaler & data
        self.scaler = StandardScaler()
        self.scaler.fit(self.df_combined[x_cols])

        # dim : (N_series, N_timesteps, N_features)
        self.data = torch.tensor(
            np.asarray([self.scaler.transform(df[x_cols]) for df in self.df_list]),
            dtype=torch.float32,
        )
        self.n_features = self.data.shape[2]

    def __len__(self):
        return self.series_length * len(self.df_list)

    def __getitem__(self, idx):
        idx_1 = idx // self.series_length
        idx_2 = idx % self.series_length

        # shape : X - (input_steps, N_features), Y - (output_steps, N_features)
        x = self.data[idx_1,idx_2:idx_2 + self.input_steps,:]
        y = self.data[idx_1,idx_2 + self.input_steps:idx_2 + self.input_steps + self.output_steps,:]
        return x, y

### LightningDataModule

In [4]:
# DataModule - Single Series
class TSSingleDataModule(pl.LightningDataModule):
    def __init__(self, data_path: str, emd_cd: str, input_steps: int, output_steps: int, x_cols: list=None) -> None:
        super().__init__()
        self.data_path = data_path
        self.input_steps = input_steps
        self.output_steps = output_steps
        self.emd_cd = emd_cd
        self.x_cols = x_cols
    
    def prepare_data(self) -> None:
        df = pd.read_csv(self.data_path, low_memory=False)
        df.dropna(how='any',inplace=True)
        self.dataframe = df.loc[df['EMD_CD']==self.emd_cd].sort_values(by='STD_YM')
        if not self.x_cols is not None:
            self.x_cols = list(df.columns[2:])    # exclude index columns
        
        # Y (vacancy_rate) column must be at the front
        if self.x_cols[0] != 'vacancy_rate':
            self.x_cols.insert(0, self.x_cols.pop(self.x_cols.index('vacancy_rate')))

    def setup(self, stage: str=None) -> None:
        # (train, val, test) -> (0.6, 0.2, 0.2)
        train, test = train_test_split(
            self.dataframe,
            test_size=0.2,
            shuffle=False,
        )
        train, val = train_test_split(
            train,
            test_size=0.25,
            shuffle=False
        )
        # scaler is set from train-set only
        self.train = TSSingleDataset(train, self.x_cols, self.input_steps, self.output_steps)
        self.scaler = self.train.scaler
        self.n_features = self.train.n_features
        self.validation = TSSingleDataset(val, self.x_cols, self.input_steps, self.output_steps, scaler=self.scaler)
        self.test = TSSingleDataset(test, self.x_cols, self.input_steps, self.output_steps, scaler=self.scaler)
    
    def train_dataloader(self, batch_size: int=8):
        return DataLoader(self.train, batch_size=batch_size, shuffle=True)
    
    def val_dataloader(self, batch_size: int=8):
        return DataLoader(self.validation, batch_size=batch_size, shuffle=False)
    
    def test_dataloader(self, batch_size: int=8):
        return DataLoader(self.test, batch_size=batch_size, shuffle=False)

In [5]:
# DataModule - Multi Series
class TSMultiDataModule(pl.LightningDataModule):
    def __init__(self, data_path: str, input_steps: int, output_steps: int, x_cols: list=None) -> None:
        super().__init__()
        self.data_path = data_path
        self.input_steps = input_steps
        self.output_steps = output_steps
        self.x_cols = x_cols
    
    def prepare_data(self) -> None:
        df = pd.read_csv(self.data_path, low_memory=False)
        df.dropna(how='any',inplace=True)
        df.set_index('EMD_CD',inplace=True)
        self.df_list = [df.loc[emd].reset_index(drop=False).sort_values(by='STD_YM') for emd in df.index.unique()]
        if not self.x_cols:
            self.x_cols = list(self.df_list[0].columns[2:])    # exclude index columns

        # Y (vacancy_rate) column must be at the front
        if self.x_cols[0] != 'vacancy_rate':
            self.x_cols.insert(0, self.x_cols.pop(self.x_cols.index('vacancy_rate')))

    def setup(self, stage: str=None) -> None:
        # (train, val, test) -> (0.6, 0.2, 0.2)
        splits = [train_test_split(df, test_size=0.2, shuffle=False) for df in self.df_list]
        trains_t, tests = [x[0] for x in splits], [x[1] for x in splits]
        splits = [train_test_split(df, test_size=0.25, shuffle=False) for df in trains_t]
        trains, vals = [x[0] for x in splits], [x[1] for x in splits]

        # scaler is set from train-set only
        self.train = TSMultiDataset(trains, x_cols=self.x_cols, input_steps=self.input_steps, output_steps=self.output_steps)
        self.scaler = self.train.scaler
        self.n_features = self.train.n_features
        self.validation = TSMultiDataset(vals, x_cols=self.x_cols, input_steps=self.input_steps, output_steps=self.output_steps)
        self.test = TSMultiDataset(tests, x_cols=self.x_cols, input_steps=self.input_steps, output_steps=self.output_steps)
    
    def train_dataloader(self, batch_size: int=8):
        return DataLoader(self.train, batch_size=batch_size, shuffle=True)
    
    def val_dataloader(self, batch_size: int=8):
        return DataLoader(self.validation, batch_size=batch_size, shuffle=False)
    
    def test_dataloader(self, batch_size: int=8):
        return DataLoader(self.test, batch_size=batch_size, shuffle=False)

### LSTM (Simple)

In [6]:
# Model
class LSTMSimple(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, bidirectional=False, scaler=None):
        super(LSTMSimple, self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bidirectional = bidirectional
        self.scaler = scaler
        self.lstm = nn.LSTM(input_size, hidden_size, 
                            num_layers=num_layers, 
                            dropout=0, 
                            bidirectional=bidirectional, 
                            batch_first=True)
        self.fc = nn.Linear(hidden_size * (2 if self.bidirectional else 1), output_size)

    def forward(self, x):
        # hidden state
        h_0 = torch.zeros(self.num_layers * (2 if self.bidirectional else 1), x.size(0), self.hidden_size).to(x.device)
        c_0 = torch.zeros(self.num_layers * (2 if self.bidirectional else 1), x.size(0), self.hidden_size).to(x.device)

        # forward lstm & fcn
        out, _ = self.lstm(x, (h_0, c_0))
        out = self.fc(out[:, -1, :])
        return out

In [7]:
# Lightning module
class LSTMSimpleLightningModule(pl.LightningModule):
    def __init__(self, model=None, cfg=None, scaler=None):
        super(LSTMSimpleLightningModule, self).__init__()
        assert (model is not None) or (cfg is not None)
        
        # init by either model or CfgNode
        if model is not None:
            self.model = model
        else:
            assert scaler is not None, "Dataset Scaler must be provided with CfgNode"
            self.model = LSTMSimple(
                input_size=cfg['input_size'],
                output_size=cfg['output_size'],
                hidden_size=cfg['hidden_size'],
                num_layers=cfg['num_layers'],
                bidirectional=cfg['LSTM_bidirectional'] if 'LSTM_bidirectional' in cfg else False,
                scaler=scaler,
            )
        
        self.scaler = self.model.scaler
        self.criterion = nn.MSELoss()
        self.test_predictions = []

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        x, y = batch
        y = y.view(y.size(0), -1)
        y_hat = self(x)
        loss = self.criterion(y_hat, y)
        self.log_dict(
            {
                'train_mse_loss': loss,
            },
            on_epoch=True,
            # on_step=False,
        )
        return {'loss': loss}
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y = y.view(y.size(0), -1)
        y_hat = self(x)
        loss = self.criterion(y_hat, y)
        self.log_dict(
            {
                'val_mse_loss': loss,
            },
            on_epoch=True,
            on_step=False,
        )
        return {'loss': loss}
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        y = y.view(y.size(0), -1)
        y_hat = self(x)
        loss = self.criterion(y_hat, y)
        self.test_predictions.append((y_hat, y))

        self.log_dict(
            {
                'test_mse_loss': loss,
            },
            on_step=True,
        )
        return {'loss': loss}
    
    def on_test_epoch_end(self):
        preds = [x[0] for x in self.test_predictions]
        gt = [x[1] for x in self.test_predictions]
        print(len(preds), len(gt))
        print(preds[0].shape, gt[0].shape)
        return None

    def configure_optimizers(self):
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.scheduler = {
            "scheduler": torch.optim.lr_scheduler.ReduceLROnPlateau(
                self.optimizer,
                mode="min",
                patience=2,
                min_lr=1e-5,
            ),
            "monitor": "val_mse_loss",
        }
        return {"optimizer": self.optimizer, "lr_scheduler": self.scheduler}

### MLFlow Run

In [8]:
# Lightning-Datamodule : Multi
data_module = TSMultiDataModule(
    data_path=DATA_PATH,
    input_steps=3,
    output_steps=1,
)

data_module.prepare_data()
data_module.setup()

In [9]:
# lightningModule
cfg = CN(dict(
    # input/output dimensions
    input_size=data_module.n_features,
    output_size=data_module.n_features * data_module.output_steps,
    # hyperparams
    num_layers=1,
    hidden_size=32,
    LSTM_bidirectional=True,
))
l_model = LSTMSimpleLightningModule(
    cfg=cfg,
    scaler=data_module.scaler,
)

In [10]:
# Train with MLflow
mlflow.login()
name = "Compas_LSTM"
timestamp = datetime.strftime(datetime.now(),"%Y-%m-%d_%H-%m-%s")

trainer = pl.Trainer(
    max_epochs=25, 
    devices="auto",
    # callbacks=[],
    logger=MLFlowLogger(
        experiment_name=f"/Users/cwwojin@gmail.com/{name}",
        run_name=f"run_{timestamp}",
        tracking_uri="databricks",
        log_model=True,
    ),
    # log_every_n_steps=1,
    check_val_every_n_epoch=1,
)

trainer.fit(
    model=l_model, 
    datamodule=data_module,
)

2024/07/11 17:00:03 INFO mlflow.utils.credentials: Successfully connected to MLflow hosted tracking server! Host: https://community.cloud.databricks.com.
GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs

  | Name      | Type       | Params | Mode 
-------------------------------------------------
0 | model     | LSTMSimple | 16.4 K | train
1 | criterion | MSELoss    | 0      | train
-------------------------------------------------
16.4 K    Trainable params
0         Non-trainable params
16.4 K    Total params
0.066     Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

/Users/cwwojin/anaconda3/envs/ml2/lib/python3.10/site-packages/pytorch_lightning/loops/fit_loop.py:298: The number of training batches (29) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=25` reached.


Uploading artifacts:   0%|          | 0/2 [00:00<?, ?it/s]

In [11]:
trainer.test(
    model=l_model,
    datamodule=data_module,
)

Testing: |          | 0/? [00:00<?, ?it/s]

8 8
torch.Size([8, 24]) torch.Size([8, 24])
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
   test_mse_loss_epoch      0.5889014601707458
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────


[{'test_mse_loss_epoch': 0.5889014601707458}]

#### from GPT

In [12]:
# import pandas as pd
# import numpy as np
# import torch
# import torch.nn as nn
# import pytorch_lightning as pl
# import torch.optim as optim
# import mlflow.pytorch
# from torch.utils.data import Dataset, DataLoader
# from sklearn.preprocessing import StandardScaler

# # Create dummy data
# data = np.random.randn(48, 7)
# df = pd.DataFrame(data, columns=[f'feature_{i}' for i in range(7)])

# # Standardize data
# scaler = StandardScaler()
# scaled_data = scaler.fit_transform(df)
# data_tensor = torch.tensor(scaled_data, dtype=torch.float32)

# # Dataset
# class TimeSeriesDataset(Dataset):
#     def __init__(self, data, input_steps, output_steps):
#         self.data = data
#         self.input_steps = input_steps
#         self.output_steps = output_steps

#     def __len__(self):
#         return len(self.data) - self.input_steps - self.output_steps + 1

#     def __getitem__(self, idx):
#         x = self.data[idx:idx + self.input_steps]
#         y = self.data[idx + self.input_steps:idx + self.input_steps + self.output_steps]
#         return x, y

# input_steps = 36
# output_steps = 12
# dataset = TimeSeriesDataset(data_tensor, input_steps, output_steps)
# dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

# # Model
# class LSTMModel(nn.Module):
#     def __init__(self, input_size, hidden_size, num_layers, output_size):
#         super(LSTMModel, self).__init__()
#         self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
#         self.fc = nn.Linear(hidden_size, output_size)

#     def forward(self, x):
#         h_0 = torch.zeros(num_layers, x.size(0), hidden_size).to(x.device)
#         c_0 = torch.zeros(num_layers, x.size(0), hidden_size).to(x.device)
#         out, _ = self.lstm(x, (h_0, c_0))
#         out = self.fc(out[:, -1, :])
#         return out

# input_size = 7
# hidden_size = 64
# num_layers = 2
# output_size = 7 * output_steps
# model = LSTMModel(input_size, hidden_size, num_layers, output_size)

# # Lightning module
# class TimeSeriesLightningModule(pl.LightningModule):
#     def __init__(self, model):
#         super(TimeSeriesLightningModule, self).__init__()
#         self.model = model
#         self.criterion = nn.MSELoss()

#     def forward(self, x):
#         return self.model(x)

#     def training_step(self, batch, batch_idx):
#         x, y = batch
#         y = y.view(y.size(0), -1)
#         y_hat = self(x)
#         loss = self.criterion(y_hat, y)
#         return loss

#     def configure_optimizers(self):
#         optimizer = optim.Adam(self.model.parameters(), lr=0.001)
#         return optimizer

# lightning_model = TimeSeriesLightningModule(model)

# # Train with MLflow
# mlflow.pytorch.autolog()
# trainer = pl.Trainer(max_epochs=50, gpus=1 if torch.cuda.is_available() else 0)
# with mlflow.start_run() as run:
#     trainer.fit(lightning_model, dataloader)
#     mlflow.pytorch.log_model(lightning_model.model, "model")