# LSTM Production
Notebook to put LSTM into production on AWS

In [None]:
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import sqlalchemy
from sqlalchemy import create_engine

from functools import partial
import ray
from ray import tune
from ray.tune import JupyterNotebookReporter
from ray.tune.schedulers import ASHAScheduler

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

from dotenv import load_dotenv

In [None]:
load_dotenv()

## Data Loading, Cleaning, and Processing

In [None]:
# Data Processing Functions
def load_data_to_df():
    location = f"postgresql://postgres:{os.environ.get('db_password')}@{os.environ.get('db_location')}"
    engine = create_engine(location)
    conn = engine.connect()
    raw_df = pd.read_sql("select * from public.vw_final_dataset where subjectid = 2033176", conn)
    return raw_df

def clean_data(df):
    # Drop rows with no Y value
    df = df.dropna(subset='bg')
    
    # Sort by timestamp
    df = df.sort_values(by="timestamp_clean")
    
    # Set index to time_stamp_clean
    df.index = df['timestamp_clean']
    df = df.drop(labels=['timestamp_clean'], axis=1)
    
    # Drop first row by subject which has data quality issues
    df = df[df.groupby('subjectid').cumcount() > 0] 
    
    # Drop columns that are indices, irrelevant, or capture in OHE variables
    drop_cols = ['subjectid', 'entryid', 'timestamp', 'date', 'time']
    df = df.drop(labels=drop_cols, axis=1)
    
    # Fill nulls (lag BG values) with 0 to indicate data is unavailable
    print(f"Null values to be filled by column:")
    nulls = df.isna().sum()
    null_idx = list(nulls.index)
    vals = list(nulls)
    for col, val in list(zip(null_idx, vals)):
        if val > 0:
            print(col,val)
    df = df.fillna(0)
    
    # One hot Encode Weekdays
    weekdays = np.unique(df['weekday'])
    ohe_weekdays = [f"ohe_{day}" for day in weekdays]
    df[ohe_weekdays] = pd.get_dummies(df.weekday)
    df = df.drop(labels="weekday", axis=1)
    
    return df

def split_and_scale(df):
    # train/val/test split
    train_df = df.loc[df['train_set'] ==1, :]
    val_df = df.loc[df['validation_set'] ==1, :]
    test_df = df.loc[df['test_set'] == 1, :] 
    
    # Extract y vars
    train_y = train_df['bg']
    val_y = val_df['bg']
    test_y = test_df['bg']
    
    # Drop non-X columns
    drop_cols = ['train_set', 'validation_set', 'test_set', 'bg']
    train_df = train_df.drop(labels=drop_cols, axis=1)
    val_df = val_df.drop(labels=drop_cols, axis=1)
    test_df = test_df.drop(labels=drop_cols, axis=1)
    
    # Select Scaling columns (i.e. don't scale one hot encoded variables)
    ohe_cols = train_df.columns[train_df.columns.str.contains('ohe')]
    scaling_cols = train_df.columns.difference(ohe_cols)
    print(f"{len(ohe_cols)} one hot encoded columns ")
    print(f"{len(scaling_cols)} scaled columns")
    
    # Fit Scaler
    scaler = MinMaxScaler()
    scaler.fit(train_df[scaling_cols])
    
    # Perform Scaling 
    train_array = scaler.transform(train_df[scaling_cols])
    val_array = scaler.transform(val_df[scaling_cols])
    test_array = scaler.transform(test_df[scaling_cols])
    
    # Recombine Scaled Data into DataFrame Format 
    train_scaled_df = pd.DataFrame(train_array, columns=scaling_cols, index=train_df.index)
    val_scaled_df = pd.DataFrame(val_array, columns=scaling_cols, index=val_df.index)
    test_scaled_df = pd.DataFrame(test_array, columns=scaling_cols, index=test_df.index)
    
    train_df = pd.concat([train_scaled_df, train_df.loc[:,ohe_cols], train_y], axis=1)
    val_df = pd.concat([val_scaled_df, val_df.loc[:,ohe_cols], val_y], axis=1)
    test_df = pd.concat([test_scaled_df, test_df.loc[:,ohe_cols], test_y], axis=1)
    
    return train_df, val_df, test_df

def df_to_Xy_tensors(df, window_size=12):
    X = []
    y = []
    num_features = len(df.columns) - 1
    for idx in range(window_size, len(df)-window_size):
        window_df = df.iloc[idx-window_size:idx]
        X.append(window_df.loc[:, df.columns != 'bg'].values)
        # The first element is the y value associated with the sequence of X values 
        y.append(window_df['bg'].iloc[0])
        
    X_tensor = torch.cat([torch.tensor(i).float() for i in X]).view(len(X), window_size, num_features)
    y_tensor = torch.tensor(y).float()
    return X_tensor, y_tensor

In [None]:
class Net(nn.Module):
    
    def __init__(self, input_size, hidden_size=8, num_lstm_layers=1, dropout=0):
        super().__init__()
        self.lstm = nn.LSTM(input_size=input_size,
                            hidden_size=hidden_size,
                            num_layers=num_lstm_layers,
                            dropout=dropout)
        self.fc1 = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        x = self.fc1(lstm_out)
        return x

In [None]:
def train_lstm(config, checkpoint_dir=None):

    # Load and clean data
    print('Loading Data')
    raw_df = load_data_to_df()

    print('Processing Data')
    clean_df = clean_data(raw_df)
    train_df, val_df, test_df = split_and_scale(clean_df)
    
    # Process data for a given window_size
    window_size = config['window_size']
    train_X, train_y = df_to_Xy_tensors(train_df, window_size=window_size)
    val_X, val_y = df_to_Xy_tensors(val_df, window_size=window_size)
    test_X, test_y = df_to_Xy_tensors(test_df, window_size=window_size)
    print("Data processing finished")
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda:0"
    
    print(f"DEVICE: {device}")
    train_X.to(device)
    train_y.to(device)
    val_X.to(device)
    val_y.to(device)
    
    # Configure the network and send it to the device
    # Width of the dataframe - 1 (y variable) is feature set size 
    input_size = train_df.shape[1]-1
    net = Net(input_size=input_size,
              hidden_size=config['hidden_size'],
              num_lstm_layers=config['num_lstm_layers'],
              dropout=config['dropout'])
    
    if torch.cuda.device_count() > 1:
        net = nn.DataParallel(net)     
    net.to(device)
    
    optimizer = optim.Adam(net.parameters(), lr=config['learning_rate'])
    
    # Checkpoint Dir Stuff -- handled by Tune 
    if checkpoint_dir:
        checkpoint = os.path.join(checkpoint_dir, "checkpoint")
        model_state, optimizer_state = torch.load(checkpoint)
        net.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)    
    
    # train
    BATCH_SIZE = config['batch_size']
    for epoch in range(config['epoch']):
        epoch_start = datetime.now()
        print(f"Epoch: {epoch}")
        running_loss = 0
        epoch_steps = 0
        for i in range(0, len(train_X)-BATCH_SIZE, BATCH_SIZE):
            X = train_X[i:i+BATCH_SIZE]
            y = train_y[i:i+BATCH_SIZE]
            net.zero_grad()
            
            out_seq = net(X)
            first_dim, second_dim, _ = out_seq.shape
            pred = out_seq.view(first_dim, second_dim)[:, -1]
            loss = F.mse_loss(pred, y)
            loss.backward()
            
            running_loss += loss.item()
            epoch_steps += 1
            
            # Print loss every 1000 batches
            if i % 1000 == 999:
                avg_loss = running_loss / epoch_steps
                print(f"Epoch {epoch}, steps {epoch_steps-1000}:{epoch_steps} avg loss: {avg_loss}")
                running_loss = 0 
                
        # Validate each epoch
        val_loss = 0
        val_steps = 0
        with torch.no_grad():
            for i in range(0, len(val_X)-BATCH_SIZE, BATCH_SIZE):
                X = val_X[i:i+BATCH_SIZE]
                y = val_y[i:i+BATCH_SIZE]
                out_seq = net(X)
                first_dim, second_dim, _ = out_seq.shape
                pred = out_seq.view(first_dim, second_dim)[:, -1]
                loss = F.mse_loss(pred, y)
                val_loss += loss.item()
                val_steps += 1
        
        with tune.checkpoint_dir(step=epoch) as checkpoint_dir:
            path = os.path.join(checkpoint_dir, "checkpoint")
            torch.save((net.state_dict(), optimizer.state_dict()), path)
            
        tune.report(val_loss=(val_loss/val_steps), train_loss=(running_loss/epoch_steps))
        print(f"Finished epoch {epoch} in {datetime.now()-epoch_start}")
    print("finished!")

In [None]:
ray.shutdown()
ray.init()

In [None]:
config= {
    'hidden_size': tune.sample_from(lambda spec: 2 ** np.random.randint(3,8)), # 2^3 to 2^8, 8 to 256
    'num_lstm_layers': tune.sample_from(lambda spec: np.random.randint(1,5)),
    'dropout': tune.sample_from(lambda spec: np.random.rand() * 0.5), # [0,0.5]
    'learning_rate': tune.sample_from(lambda spec: 10 ** (-10 * np.random.rand())),
    'window_size': tune.sample_from(lambda spec: 6 * np.random.randint(1,12)), # 6 Observations = 30 minutes, 6*12 = 6 hours
    'batch_size': tune.sample_from(lambda spec: 2 ** np.random.randint(3,8)), # 2^3 to 2^8, 8 to 256
    'epoch': tune.sample_from(lambda spec: 1 * np.random.randint(1,50))
}
scheduler = ASHAScheduler(
    metric="val_loss",
    mode="min",
    max_t=10,
    grace_period=1,
    reduction_factor=2)

reporter = JupyterNotebookReporter(
    metric_columns = ["val_loss", "loss", "training_iteration"]
)

result = tune.run(
    partial(train_lstm),
    resources_per_trial={"cpu":1},
    config=config,
    num_samples=3,
    progress_reporter=reporter
)

In [None]:
best_trial = result.get_best_trial('val_loss', 'min', 'last')

In [None]:
print("Best trial config: {}".format(best_trial.config))
print("Best trial final validation loss: {}".format(
    best_trial.last_result["val_loss"]))