In [25]:
import os
import random
import torch
import pandas as pd
import numpy as np
from ray.data.preprocessor import Preprocessor
import ray


DATASET_LOC = "/Users/ankushgarg/Desktop/projects/comed-pricing/data/raw_data.csv"
HOLDOUT_LOC = "/Users/ankushgarg/Desktop/projects/comed-pricing/data/holdout.csv"

# Initialize Ray
if ray.is_initialized():
    ray.shutdown()
ray.init()

num_workers = 6  # prefer to do a few less than total available CPU (1 for head node + 1 for background tasks)
resources_per_worker={"CPU": 1, "GPU": 0}

2023-09-06 11:07:39,361	INFO worker.py:1621 -- Started a local Ray instance.


In [26]:
def get_x_y_splits(data, columns, targets, n_steps_in, n_steps_out, gap, include_target_in_X=False):
    """This function converts a dataframe into X and Y sequences for training"""

    # Include target column
    if include_target_in_X:
        columns = columns + targets

    complete_x_array = data[columns].to_numpy()
    complete_y_array = data[targets].to_numpy()

    upper_bound = len(data) - (n_steps_in + n_steps_out + gap)
    
    # Pre-allocate arrays for performance
    X_shape = (upper_bound, n_steps_in, complete_x_array.shape[1])
    y_shape = (upper_bound, n_steps_out, complete_y_array.shape[1])

    X_arrays = np.empty(X_shape, dtype=np.float32)
    y_arrays = np.empty(y_shape, dtype=np.float32)

    for index in range(upper_bound):
        starting_X_index = index
        ending_X_index = starting_X_index + n_steps_in
        starting_y_index = ending_X_index + gap
        ending_y_index = starting_y_index + n_steps_out

        X_arrays[index] = complete_x_array[starting_X_index: ending_X_index]
        y_arrays[index] = complete_y_array[starting_y_index: ending_y_index]

    return torch.tensor(X_arrays, dtype=torch.float32), torch.tensor(y_arrays, dtype=torch.float32)

def get_x_y_split(data, columns, targets, n_steps_in, n_steps_out, gap, include_target_in_X = False):
    """This function converts a dataframe into X and Y sequences for training"""
    # columns = [column for column in data.columns if column not in targets]

    # Include target column
    if include_target_in_X:
        columns = columns + targets

    complete_x_array = data[columns].to_numpy()
    complete_y_array = data[targets].to_numpy()

    X_arrays = []
    y_arrays = []

    upper_bound = len(data) - (n_steps_in + n_steps_out + gap)

    # Loop through the entire dataset
    for index in range(0, upper_bound):
        # Based on parameters construct the training data made up of 
        # smaller arrays
        starting_X_index = index
        ending_X_index = starting_X_index + n_steps_in # number of features

        starting_y_index = ending_X_index + gap
        ending_y_index = starting_y_index + n_steps_out

        individual_x_array = complete_x_array[starting_X_index: ending_X_index, :]
        individual_y_array = complete_y_array[starting_y_index: ending_y_index, :]

        X_arrays.append(individual_x_array)
        y_arrays.append(individual_y_array)

    # Convert a list of arrays into an array
    X_array = np.array(X_arrays)
    y_array = np.array(y_arrays)

    # Flatten the array in case it's 3 dimensional
    if len(y_array.shape) == 3:
        y_array = np.array([individual_array.flatten() for individual_array in y_array])
    
    return X_array, y_array


def preprocess(data, columns, targets, n_steps_in, n_steps_out, gap, include_target_in_X=False, resample_units=None):
    # # reset_index
    #  # Convert the 'timestamp' column to datetime format and set it as the index
    # data['millisUTC'] = pd.to_datetime(data['millisUTC'])
    # data.set_index('millisUTC', inplace=True)

    # # Resample dataset
    # if resample_units is not None:
    #     data = data.resample(resample_units, label="right").mean()

    # # Need a better way to handle missing values
    # data['price'] = data['price'].ffill()
    # data.reset_index(drop=True, inplace=True)
    
    X, y = get_x_y_split(
        data, 
        columns=columns, 
        targets=targets, 
        n_steps_in=n_steps_in, 
        n_steps_out=n_steps_out, 
        gap=gap, 
        include_target_in_X=include_target_in_X
    )
    
    return {"X": X, "y": y}



In [27]:
def set_seeds(seed=42):
    """Set seeds for reproducibility."""
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    eval("setattr(torch.backends.cudnn, 'deterministic', True)")
    eval("setattr(torch.backends.cudnn, 'benchmark', False)")
    os.environ["PYTHONHASHSEED"] = str(seed)

In [28]:
def load_data(num_samples=None):
    ds = ray.data.read_csv(DATASET_LOC)
    ds = ds.random_shuffle(seed=1234)
    ds = ray.data.from_items(ds.take(num_samples)) if num_samples else ds
    return ds

In [29]:
class CustomPreprocessor(Preprocessor):
    def _fit(self, ds):
        return self

    def _transform_pandas(self, batch):
        return preprocess(
            batch,
            # resample_units="10T", # Resample values by 60 minutes
            columns=['price'],
            targets=['price'], 
            n_steps_in=5, 
            n_steps_out=10, 
            gap=60, 
            include_target_in_X=True
        )

In [94]:
class LSTM(torch.nn.Module):
    """LSTM neural network"""
    
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, output_size)
    
    def forward(self, X, h=None):
        X, _ = X['X'], X['y']
        
        if h is None:
            h = (torch.zeros(1, X.size(0), 32).to(X.device),
                 torch.zeros(1, X.size(0), 32).to(X.device))

        print(f"Type of X: {X.dtype}")
        
        output, hidden_state = self.lstm(X, h)
        last_hidden_state = output[:, -1, :]
        output = self.linear(last_hidden_state)
        output = F.relu(output)  # Apply ReLU activation
        return output

    @torch.inference_mode()
    def predict(self, batch):
        print("calling predict")
        print("SNAP")
        self.eval()
        return self(batch)


In [95]:
from ray.air import Checkpoint, session
from ray.air.config import CheckpointConfig, DatasetConfig, RunConfig, ScalingConfig
import ray.train as train
from ray.train.torch import TorchCheckpoint, TorchTrainer
import torch.nn.functional as F
from ray.train.torch import get_device

In [96]:
def collate_fn(batch):
    dtypes = {"X": torch.float32, "y": torch.float32}
    tensor_batch = {}
    for key, array in batch.items():
        tensor_batch[key] = torch.as_tensor(array, dtype=dtypes[key], device=get_device())
    return tensor_batch


In [97]:
def train_step(ds, batch_size, model, loss_fn, optimizer):
    model.train()
    loss = 0.0
    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)
    for i, batch in enumerate(ds_generator):
        optimizer.zero_grad()  # reset gradients        
        z = model(batch)  # forward pass
        targets = batch['y']
        J = loss_fn(z, targets)  # define loss
        J.backward()  # backward pass
        optimizer.step()  # update weights
        loss += (J.detach().item() - loss) / (i + 1)  # cumulative loss
    return loss

In [98]:
def eval_step(ds, batch_size, model, loss_fn):
    """Eval step."""
    model.eval()
    loss = 0.0
    y_trues, y_preds = [], []
    ds_generator = ds.iter_torch_batches(batch_size=batch_size, collate_fn=collate_fn)
    with torch.inference_mode():
        for i, batch in enumerate(ds_generator):
            z = model(batch)
            targets = batch['y']
            J = loss_fn(z, targets).item()
            loss += (J - loss) / (i + 1)
            y_trues.extend(batch["y"].cpu().numpy())
            y_preds.extend(torch.argmax(z, dim=1).cpu().numpy())
    return loss, np.vstack(y_trues), np.vstack(y_preds)

In [99]:
# Training loop
def train_loop_per_worker(config):
    # Hyperparameters
    dropout_p = config["dropout_p"]
    lr = config["lr"]
    lr_factor = config["lr_factor"]
    lr_patience = config["lr_patience"]
    num_epochs = config["num_epochs"]
    batch_size = config["batch_size"]
    input_size = config["input_size"]
    hidden_size = config["hidden_size"]
    output_size = config["output_size"]

    # Get datasets
    set_seeds()
    train_ds = session.get_dataset_shard("train")
    val_ds = session.get_dataset_shard("val")

    # Model
    model =  LSTM(input_size=input_size, hidden_size=hidden_size, output_size=output_size)
    model = train.torch.prepare_model(model)

    # Training components
    loss_fn = nn.L1Loss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=lr_factor, patience=lr_patience)

    # Training
    batch_size_per_worker = batch_size // session.get_world_size()
    for epoch in range(num_epochs):
        # Step
        train_loss = train_step(train_ds, batch_size_per_worker, model, loss_fn, optimizer)
        val_loss, _, _ = eval_step(val_ds, batch_size_per_worker, model, loss_fn)
        scheduler.step(val_loss)

        # Checkpoint
        metrics = dict(epoch=epoch, lr=optimizer.param_groups[0]["lr"], train_loss=train_loss, val_loss=val_loss)
        checkpoint = TorchCheckpoint.from_model(model=model)
        session.report(metrics, checkpoint=checkpoint)

In [100]:
# Train loop config
train_loop_config = {
    "dropout_p": 0.5,
    "lr": 1e-4,
    "lr_factor": 0.8,
    "lr_patience": 3,
    "num_epochs": 1,
    "batch_size": 256,
    "input_size": 2,
    "hidden_size": 32,
    "output_size": 10
}

# Scaling config
scaling_config = ScalingConfig(
    num_workers=num_workers,
    use_gpu=bool(resources_per_worker["GPU"]),
    resources_per_worker=resources_per_worker,
    _max_cpu_fraction_per_node=0.8,
)

In [101]:
def split_dataset(data):
    # Split the data into training and testing sets
    train_size = int(0.8 * len(data))
    train_data = data.iloc[:train_size]
    test_data = data.iloc[train_size:]

    return train_data, test_data

In [102]:
data = pd.read_csv("/Users/ankushgarg/Desktop/projects/comed-pricing/data/raw_data.csv")
# This is a bit strange to do
# We're reading the dataset using pandas, then splitting it, and then creating distributed datasets from pandas
train_data, val_data = split_dataset(data)

In [103]:
train_ds = ray.data.from_pandas(train_data)
val_ds = ray.data.from_pandas(val_data)

In [104]:
# Preprocess
preprocessor = CustomPreprocessor()
train_ds =  preprocessor.fit_transform(train_ds)
val_ds = preprocessor.transform(val_ds)
train_ds = train_ds.materialize()
val_ds = val_ds.materialize()

2023-09-06 11:12:25,208	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(CustomPreprocessor._transform_pandas)]
2023-09-06 11:12:25,209	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-06 11:12:25,210	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-09-06 11:12:25,355	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(CustomPreprocessor._transform_pandas)]
2023-09-06 11:12:25,356	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_

In [105]:
# Dataset config
dataset_config = {
    "train": DatasetConfig(fit=False, transform=False, randomize_block_order=False),
    "val": DatasetConfig(fit=False, transform=False, randomize_block_order=False),
}

# MLFlow

In [106]:
import mlflow
from pathlib import Path
from ray.air.integrations.mlflow import MLflowLoggerCallback
import time

In [107]:
# Config MLflow
MODEL_REGISTRY = Path("/tmp/mlflow")
Path(MODEL_REGISTRY).mkdir(parents=True, exist_ok=True)
MLFLOW_TRACKING_URI = "file://" + str(MODEL_REGISTRY.absolute())
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
print(mlflow.get_tracking_uri())


file:///tmp/mlflow


In [108]:
# MLflow callback
experiment_name = f"comed-{int(time.time())}"
mlflow_callback = MLflowLoggerCallback(
    tracking_uri=MLFLOW_TRACKING_URI,
    experiment_name=experiment_name,
    save_artifact=True)


In [109]:
# Trainer
checkpoint_config = CheckpointConfig(num_to_keep=1, checkpoint_score_attribute="val_loss", checkpoint_score_order="min")

# Run configuration with MLflow callback
run_config = RunConfig(
    callbacks=[mlflow_callback],
    checkpoint_config=checkpoint_config,
)

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config=train_loop_config,
    scaling_config=scaling_config,
    run_config=run_config,
    datasets={"train": train_ds, "val": val_ds},
    dataset_config=dataset_config,
    preprocessor=preprocessor,
)




In [110]:
results = trainer.fit()

0,1
Current time:,2023-09-06 11:12:36
Running for:,00:00:07.89
Memory:,17.9/32.0 GiB

Trial name,status,loc,iter,total time (s),epoch,lr,train_loss
TorchTrainer_2caf8_00000,TERMINATED,127.0.0.1:86546,1,4.14249,0,0.0001,3.20721


[2m[36m(TorchTrainer pid=86546)[0m The dict form of `dataset_config` is deprecated. Use the DataConfig class instead. Support for this will be dropped in a future release.
[2m[36m(TorchTrainer pid=86546)[0m The `preprocessor` arg to Trainer is deprecated. Apply preprocessor transformations ahead of time by calling `preprocessor.transform(ds)`. Support for the preprocessor arg will be dropped in a future release.
[2m[36m(TorchTrainer pid=86546)[0m Starting distributed worker processes: ['86550 (127.0.0.1)', '86551 (127.0.0.1)', '86552 (127.0.0.1)', '86553 (127.0.0.1)', '86554 (127.0.0.1)', '86555 (127.0.0.1)']
[2m[36m(RayTrainWorker pid=86550)[0m Setting up process group for: env:// [rank=0, world_size=6]


[2m[36m(RayTrainWorker pid=86555)[0m Type of X: torch.float32
[2m[36m(RayTrainWorker pid=86555)[0m Type of X: torch.float32
[2m[36m(RayTrainWorker pid=86555)[0m Type of X: torch.float32
[2m[36m(RayTrainWorker pid=86555)[0m Type of X: torch.float32
[2m[36m(RayTrainWorker pid=86555)[0m Type of X: torch.float32
[2m[36m(RayTrainWorker pid=86553)[0m 
[2m[36m(RayTrainWorker pid=86553)[0m 


[2m[36m(RayTrainWorker pid=86550)[0m Moving model to device: cpu
[2m[36m(RayTrainWorker pid=86550)[0m Wrapping provided model in DistributedDataParallel.
2023-09-06 11:12:36,067	INFO tune.py:1148 -- Total run time: 7.90 seconds (7.89 seconds for the tuning loop).


In [111]:
# Sorted runs
sorted_runs = mlflow.search_runs(experiment_names=[experiment_name], order_by=["metrics.train_loss ASC"])
sorted_runs


Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.config/train_loop_config/lr_patience,metrics.train_loss,metrics.config/train_loop_config/batch_size,metrics.time_this_iter_s,...,params.train_loop_config/input_size,params.train_loop_config/batch_size,params.train_loop_config/lr,params.train_loop_config/num_epochs,params.train_loop_config/dropout_p,params.train_loop_config/lr_factor,params.train_loop_config/lr_patience,params.train_loop_config/output_size,tags.trial_name,tags.mlflow.runName
0,9dc915d0ed2d4cdab2d8a53f4828f606,767405631797657633,FINISHED,file:///tmp/mlflow/767405631797657633/9dc915d0...,2023-09-06 16:12:29.822000+00:00,2023-09-06 16:12:36.060000+00:00,3.0,3.207213,256.0,4.142494,...,2,256,0.0001,1,0.5,0.8,3,10,TorchTrainer_2caf8_00000,TorchTrainer_2caf8_00000


In [112]:
# Sorted runs
sorted_runs = mlflow.search_runs(experiment_names=[experiment_name], order_by=["metrics.val_loss ASC"])
sorted_runs['run_id'].values[0]

'9dc915d0ed2d4cdab2d8a53f4828f606'

In [144]:
from ray.train.torch import TorchPredictor

best_checkpoint = results.best_checkpoints[0][0]
predictor = TorchPredictor.from_checkpoint(best_checkpoint)
preprocessor = predictor.get_preprocessor()


test_df = pd.read_csv(HOLDOUT_LOC, dtype="float32")
test_ds = ray.data.from_pandas(test_df)
preprocessed_ds = preprocessor.transform(test_ds)
preprocessed_ds.take(1)

2023-09-06 12:14:10,403	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(CustomPreprocessor._transform_pandas)]
2023-09-06 12:14:10,403	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-06 12:14:10,404	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


[{'X': array([[2.1, 2.1],
         [2.2, 2.2],
         [2.4, 2.4],
         [2.2, 2.2],
         [2.3, 2.3]], dtype=float32),
  'y': array([2.9, 2.6, 2.6, 3. , 2.9, 2.9, 2.9, 2.5, 2.5, 2.5], dtype=float32)}]

In [114]:
# y_true
values = preprocessed_ds.select_columns(cols=["y"]).take_all()
y_true = np.stack([item["y"] for item in values])

2023-09-06 11:12:36,243	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV->SplitBlocks(20)] -> TaskPoolMapOperator[MapBatches(CustomPreprocessor._transform_pandas)->MapBatches(<lambda>)]
2023-09-06 11:12:36,244	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-06 11:12:36,244	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


In [145]:
# y_pred
y_pred = predictor.predict(test_ds.to_pandas())["predictions"]

Type of X: torch.float32


In [146]:
import ray.data
from ray.train.torch import TorchPredictor
from ray.data import ActorPoolStrategy

class Predictor:
    def __init__(self, checkpoint):
        self.predictor = TorchPredictor.from_checkpoint(checkpoint)
        
    def __call__(self, batch):
        z = self.predictor.predict(batch)["predictions"]
        y_pred = np.stack(z)
        return {"prediction": y_pred}

In [147]:
# Batch predict
predictions = test_ds.map_batches(
    Predictor,
    batch_size=128,
    compute=ActorPoolStrategy(min_size=1, max_size=2),  # scaling
    batch_format="pandas",
    fn_constructor_kwargs={"checkpoint": best_checkpoint})


In [149]:
predictions.take(3)

2023-09-06 12:15:07,489	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(Predictor)]
2023-09-06 12:15:07,490	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-06 12:15:07,491	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-09-06 12:15:07,517	INFO actor_pool_map_operator.py:117 -- MapBatches(Predictor): Waiting for 1 pool actors to start...


[2m[36m(MapWorker(MapBatches(Predictor)) pid=90331)[0m Type of X: torch.float32[32m [repeated 21x across cluster][0m


[{'prediction': array([0.34149176, 0.3253997 , 0.34754878, 0.        , 0.        ,
         0.46825808, 0.        , 0.        , 0.5945572 , 0.80085206],
        dtype=float32)},
 {'prediction': array([0.34712058, 0.33643743, 0.35290688, 0.        , 0.        ,
         0.4759032 , 0.        , 0.        , 0.60753024, 0.8126106 ],
        dtype=float32)},
 {'prediction': array([0.34332034, 0.3337593 , 0.35338175, 0.        , 0.        ,
         0.4740445 , 0.        , 0.        , 0.6021232 , 0.8105459 ],
        dtype=float32)}]