In [None]:
COLAB = True # if notebook is run in Google Colab
if COLAB:
    !pip install boto3 mlforecast optuna lightgbm GPUtil -q

    import requests

    utils = requests.get("https://raw.githubusercontent.com/gabriel1628/End-to-end-MLOps-for-Time-Series/main/utils.py")
    open('utils.py', 'wb').write(utils.content)

    !mkdir -p preprocessing
    preprocessing = requests.get("https://raw.githubusercontent.com/gabriel1628/End-to-end-MLOps-for-Time-Series/main/preprocessing/preprocessing.py")
    open('preprocessing/preprocessing.py', 'wb').write(preprocessing.content)

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/139.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/72.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m72.5/72.5 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/364.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m364.4/364.4 kB[0m [31m26.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m233.5/233.5 kB[0m [31m18.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.2/13.2 MB[0m [31m86.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import numpy as np
import pandas as pd

from lightgbm import LGBMRegressor

from sklearn.model_selection import TimeSeriesSplit, cross_val_score
from sklearn.metrics import mean_absolute_error

import logging
import optuna

import boto3

import GPUtil
import os
import sys
from pathlib import Path
import yaml
import time

from utils import train_test_split
from preprocessing.preprocessing import *

import warnings
warnings.filterwarnings("ignore", category=UserWarning)

Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [None]:
GPUtil.getAvailable()

[]

# Global variables

In [None]:
forecast_horizon = 48
n_lags = 48
model_name = "lightgbm"
preprocessing_version = 2  # preprocessing version
config_files_loc = "./configuration_files"
study_path = "./optuna_studies"

# Downloading data and Optuna studies from S3

In [None]:
if COLAB: # If you are on Colab, you can use secret environment variables (click on the key in the left panel)
    from google.colab import userdata
    s3 = boto3.client(
        's3',
        aws_access_key_id=userdata.get("ACCESS_KEY"),
        aws_secret_access_key=userdata.get("SECRET_KEY"),
    )
else:
    s3 = boto3.client("s3") # use credentials in the ~/.aws folder

# list objects in the enefit-competition bucket to check connection
response = s3.list_objects(
    Bucket='enefit-competition',
    MaxKeys=5,
)
for content in response["Contents"]:
    print(content["Key"])

configuration_files/lightgbm/config_1.yaml
configuration_files/lightgbm/config_2.yaml
data/consumption.csv
optuna-studies/lightgbm-preprocessing2-config2.db


In [None]:
def download_s3_folder(bucket_name, s3_folder, local_dir):
    """function to download objects from an S3 bucket located in the s3_folder directory"""
    # Ensure the local directory exists
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)

    # List objects within the specified S3 folder
    paginator = s3.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket_name, Prefix=s3_folder)

    for page in pages:
        if "Contents" in page:
            for obj in page["Contents"]:
                # Get the path of the object key
                s3_key = obj["Key"]
                if s3_key == s3_folder:
                    continue
                # Remove the prefix from the key to get the relative file path
                relative_path = os.path.relpath(s3_key, s3_folder)

                # Create the full local path
                local_file_path = os.path.join(local_dir, relative_path)

                # Create local directory if not exists
                local_file_dir = os.path.dirname(local_file_path)
                if not os.path.exists(local_file_dir):
                    os.makedirs(local_file_dir)

                # Download the file
                print(f"Downloading s3://{bucket_name}/{s3_key} to {local_file_path}...")
                s3.download_file(bucket_name, s3_key, local_file_path)

In [None]:
bucket_name = "enefit-competition"
s3_folders = ["data/", "optuna_studies/", "configuration_files/"]  # These are the "folders" in the S3 bucket
local_dirs = ["./data/", "./optuna_studies/", "./configuration_files/"]  # Local directory to save files

for s3_folder, local_dir in zip(s3_folders, local_dirs):
    download_s3_folder(bucket_name, s3_folder, local_dir)

Downloading s3://enefit-competition/data/consumption.csv to ./data/consumption.csv...
Downloading s3://enefit-competition/configuration_files/lightgbm/config_1.yaml to ./configuration_files/lightgbm/config_1.yaml...
Downloading s3://enefit-competition/configuration_files/lightgbm/config_2.yaml to ./configuration_files/lightgbm/config_2.yaml...


# Read the data

In [None]:
df = pd.read_csv("./data/consumption.csv")
df["datetime"] = pd.to_datetime(df["datetime"])
df.head()

Unnamed: 0,datetime,prediction_unit_id,consumption
0,2021-09-01,0,96.59
1,2021-09-01,1,17.314
2,2021-09-01,2,656.859
3,2021-09-01,3,59.0
4,2021-09-01,4,501.76


In [None]:
df.shape

(1009176, 3)

# Train/Test split

In [None]:
# taking the last 60 days of each unit for test
df_train, df_test = train_test_split(df, test_window=24 * 60)

In [None]:
assert df.shape[0] == df_train.shape[0] + df_test.shape[0]
assert df.shape[1] == df_train.shape[1] == df_test.shape[1]

In [None]:
test_size = df_test.shape[0] / (df.shape[0])
print(f"test set : {round(test_size*100, 2)}% of the data set")

test set : 9.85% of the data set


# Preprocessing

In [None]:
# get preprocessing function from preprocessing/preprocessing.py
preprocessing = vars()[f"preprocessing_{preprocessing_version}"]

In [None]:
X_train, y_train = preprocessing(df_train)
print(f"X_train shape : {X_train.shape}")
print(f"y_train shape : {y_train.shape}")
X_train.head()

  df[feat_name] = feat_vals[restore_idxs]
  df[feat_name] = feat_vals[restore_idxs]


X_train shape : (854079, 99)
y_train shape : (854079,)


Unnamed: 0,lag48,lag49,lag50,lag51,lag52,lag53,lag54,lag55,lag56,lag57,...,rolling_mean_lag68_window_size24,expanding_mean_lag69,rolling_mean_lag69_window_size24,expanding_mean_lag70,rolling_mean_lag70_window_size24,expanding_mean_lag71,rolling_mean_lag71_window_size24,month,dayofweek,hour
5795,120.54,134.986,150.412,152.763,136.13,121.033,80.621,43.428,46.84,43.671,...,82.505417,82.612111,81.8605,81.756654,81.308,81.1456,80.502083,9,5,23
5856,107.129,120.54,134.986,150.412,152.763,136.13,121.033,80.621,43.428,46.84,...,83.241833,83.355714,82.505417,82.612111,81.8605,81.756654,81.308,9,6,0
5917,81.92,107.129,120.54,134.986,150.412,152.763,136.13,121.033,80.621,43.428,...,83.893958,84.131655,83.241833,83.355714,82.505417,82.612111,81.8605,9,6,1
5978,96.193,81.92,107.129,120.54,134.986,150.412,152.763,136.13,121.033,80.621,...,84.539375,84.841667,83.893958,84.131655,83.241833,83.355714,82.505417,9,6,2
6039,94.536,96.193,81.92,107.129,120.54,134.986,150.412,152.763,136.13,121.033,...,84.552333,85.716806,84.539375,84.841667,83.893958,84.131655,83.241833,9,6,3


In [None]:
X_test, y_test = preprocessing(df_test)
print(f"X_test shape : {X_test.shape}")
print(f"y_test shape : {y_test.shape}")
X_test.head()

X_test shape : (84735, 99)
y_test shape : (84735,)


  df[feat_name] = feat_vals[restore_idxs]
  df[feat_name] = feat_vals[restore_idxs]


Unnamed: 0,lag48,lag49,lag50,lag51,lag52,lag53,lag54,lag55,lag56,lag57,...,rolling_mean_lag68_window_size24,expanding_mean_lag69,rolling_mean_lag69_window_size24,expanding_mean_lag70,rolling_mean_lag70_window_size24,expanding_mean_lag71,rolling_mean_lag71_window_size24,month,dayofweek,hour
920263,1057.285,1107.518,1177.874,1114.973,860.687,624.847,382.365,246.753,193.661,160.944,...,722.741458,747.325111,723.395542,740.683269,723.535208,734.97888,725.296292,4,2,23
920328,1055.621,1057.285,1107.518,1177.874,1114.973,860.687,624.847,382.365,246.753,193.661,...,723.69675,753.981429,722.741458,747.325111,723.395542,740.683269,723.535208,4,3,0
920393,999.628,1055.621,1057.285,1107.518,1177.874,1114.973,860.687,624.847,382.365,246.753,...,724.803458,761.656966,723.69675,753.981429,722.741458,747.325111,723.395542,4,3,1
920458,1001.917,999.628,1055.621,1057.285,1107.518,1177.874,1114.973,860.687,624.847,382.365,...,726.249667,769.130867,724.803458,761.656966,723.69675,753.981429,722.741458,4,3,2
920523,1014.902,1001.917,999.628,1055.621,1057.285,1107.518,1177.874,1114.973,860.687,624.847,...,731.313625,778.516258,726.249667,769.130867,724.803458,761.656966,723.69675,4,3,3


# HPO with Optuna

## Experiment configurations

In [None]:
experiment_config = {
    "int_params": [
        {
            "name": "num_trees",
            "low": 3,
            "high": 10,
            "log": False,
        },
        {
            "name": "max_depth",
            "low": 3,
            "high": 10,
        },
        {
            "name": "num_leaves",
            "low": 5,
            "high": 25,
        },
    ],
    "float_params": [
        {
            "name": "learning_rate",
            "low": 0.001,
            "high": 0.1,
            "log": True,
        },
        {
            "name": "feature_fraction",
            "low": 0.1,
            "high": 0.75,
        }
    ],
    "objective_values": "mean",
}

In [None]:
# Create config files directory if does not exist
path = Path(config_files_loc, model_name)
try:
    path.mkdir(parents=True)
except:
    pass

In [None]:
# Check if the config has already been tested. If not, create a new config file
config_version = 1
config_files_path = Path(config_files_loc, model_name)
list_config_files = list(config_files_path.glob("*.yaml"))
n_config_files = len(list_config_files)
if n_config_files == 0:
    with open(Path(config_files_path, "config_1.yaml"), "w") as file:
        yaml.dump(experiment_config, file)
else:
    for config_file in list_config_files:
        with open(config_file, "rb") as file:
            config = yaml.safe_load(file)
        if experiment_config == config:
            break
        config_version += 1
        if config_version > n_config_files:
            config_file = Path(config_files_path, f"config_{config_version}.yaml")
            with open(config_file, "w") as file:
                yaml.dump(experiment_config, file)

print(f"using {config_file}")

with open(config_file, "rb") as file:
    config = yaml.safe_load(file)

using configuration_files/lightgbm/config_3.yaml


In [None]:
# upload configuration files to S3
list_config_files = list(config_files_path.glob("*.yaml"))
for file_path in list_config_files:
    s3.upload_file(
        file_path,
        bucket_name,
        str(file_path),
    )

## Objective function

In [None]:
def objective(trial):
    # Define hyperparameters
    study_params = {
        "verbosity": -1,
        "random_state": 0,
    }
    for int_param in config["int_params"]:
        study_params[int_param["name"]] = trial.suggest_int(**int_param)
    for float_param in config["float_params"]:
        study_params[float_param["name"]] = trial.suggest_float(**float_param)

    # Evaluate model using cross-validation
    tscv = TimeSeriesSplit(n_splits=5)
    model = LGBMRegressor(**study_params)
    cv_errors = cross_val_score(
        model, X_train, y_train, scoring="neg_mean_absolute_error", cv=tscv
    )

    # Log CV results
    cv_errors = -cv_errors
    for i in range(len(cv_errors)):
        trial.set_user_attr(f"error_split_{i+1}", cv_errors[i])
    # trial.set_user_attr("cv_errors", list(cv_errors))
    trial.set_user_attr("cv_errors_std", cv_errors.std())

    # Log train MAE
    model.fit(X_train, y_train)
    y_fit = model.predict(X_train)
    train_mae = mean_absolute_error(y_train, y_fit)
    trial.set_user_attr("train_mae", train_mae)

    # Log test MAE
    y_pred = model.predict(X_test)
    test_mae = mean_absolute_error(y_test, y_pred)
    trial.set_user_attr("test_mae", test_mae)

    return cv_errors.mean()

## Create and run study

In [None]:
experiment_name = (
    f"{model_name}_preprocessing{preprocessing_version}_config{config_version}"
)
storage_name = "sqlite:///{}/{}.db".format(study_path, experiment_name)
print(f"experiment name : {experiment_name}")

experiment name : lightgbm_preprocessing2_config3


In [None]:
# Add stream handler of stdout to show the messages
logger = optuna.logging.get_logger("optuna")
if logger.hasHandlers():
    logger.handlers.clear()
logger.addHandler(logging.StreamHandler(sys.stdout))

# restored_sampler = pickle.load(open("{}-study-sampler.pkl".format(study_name), "rb"))
sampler = optuna.samplers.TPESampler(
    seed=0
)  # RandomSampler, GridSampler, TPESampler, CmaEsSampler, NSGAIISampler, QMCSampler, GPSampler, BoTorchSampler, BruteForceSampler

In [None]:
# Initialize the Optuna study
study = optuna.create_study(
    study_name=experiment_name,
    storage=storage_name,
    load_if_exists=True,
    directions=["minimize"],
    sampler=sampler,
    # pruner=pruner,
)

A new study created in RDB with name: lightgbm_preprocessing2_config3


In [None]:
len(study.trials)

0

In [None]:
experiment_path = Path(f"{study_path}/{experiment_name}.db")
experiment_path

PosixPath('optuna_studies/lightgbm_preprocessing2_config3.db')

### Run study and upload it on S3 sequentially

In [None]:
# Execute the hyperparameter optimization trials
%%time
for i in range(3):
    study.optimize(objective, n_trials=1)
    s3.upload_file(
        experiment_path,
        bucket_name,
        str(experiment_path),
    )

Trial 0 finished with value: 562.0289973165827 and parameters: {'num_trees': 7, 'max_depth': 8, 'num_leaves': 17, 'learning_rate': 0.012296071107325713, 'feature_fraction': 0.3753756195702881}. Best is trial 0 with value: 562.0289973165827.
Trial 1 finished with value: 326.0017935863218 and parameters: {'num_trees': 8, 'max_depth': 6, 'num_leaves': 23, 'learning_rate': 0.08459126528049378, 'feature_fraction': 0.34923698723675556}. Best is trial 1 with value: 326.0017935863218.
Trial 2 finished with value: 341.7653447494029 and parameters: {'num_trees': 9, 'max_depth': 7, 'num_leaves': 16, 'learning_rate': 0.07098936257405905, 'feature_fraction': 0.14617343782862652}. Best is trial 1 with value: 326.0017935863218.
CPU times: user 2min 11s, sys: 3.47 s, total: 2min 15s
Wall time: 2min 17s


In [None]:
!rm "{experiment_path}"

rm: cannot remove 'optuna_studies/lightgbm_preprocessing2_config3.db': No such file or directory


In [None]:
# Initialize the Optuna study
study = optuna.create_study(
    study_name=experiment_name,
    storage=storage_name,
    load_if_exists=True,
    directions=["minimize"],
    # sampler=sampler,
    # pruner=pruner,
)

A new study created in RDB with name: lightgbm-preprocessing2-config2


In [None]:
len(study.trials)

0

## With asyncio

Version 1 (ChatGPT)

In [None]:
# import asyncio

# async def optimize_task(n_iter=10):
#     """Task to continuously run the optimization."""
#     for i in range(n_iter):
#         study.optimize(objective, n_trials=1)
#         await asyncio.sleep(0)  # Yield control to allow other tasks to run


# async def upload_task(
#     sleep_time=60, file_name=f"{study_path}/{experiment_name}.db",
#     bucket=bucket_name,
#     object_name=f"optuna-studies/{experiment_name}.db"):
#     """Task to upload the file every 60 seconds."""
#     while True:
#         await asyncio.sleep(sleep_time)  # Wait for 60 seconds
#         s3.upload_file(
#             file_name, bucket, object_name
#         )


# async def main():
#     # Run both tasks concurrently
#     await asyncio.gather(
#         optimize_task(),
#         upload_task()
#     )

In [None]:
# import nest_asyncio
# nest_asyncio.apply()

In [None]:
# %%time
# # Run the event loop
# asyncio.run(main())


Version 2 (Qwen2.5 Coder 32B Instruct)

In [None]:
!pip install optuna boto3 aiofiles aiobotocore

In [None]:
import aiofiles
import aiobotocore
import asyncio

# Define your objective function
def objective(trial):
    x = trial.suggest_float('x', -10, 10)
    return (x - 2) ** 2

# Asynchronous function to upload the study database to S3
async def upload_to_s3(file_name, bucket_name, object_name=None):
    if object_name is None:
        object_name = os.path.basename(file_name)

    session = aiobotocore.get_session()
    async with session.create_client('s3', region_name='us-west-2') as client:
        try:
            async with aiofiles.open(file_name, 'rb') as f:
                file_content = await f.read()
                response = await client.put_object(
                    Bucket=bucket_name,
                    Key=object_name,
                    Body=file_content
                )
                print(f"Successfully uploaded {file_name} to {bucket_name}/{object_name}")
        except Exception as e:
            print(f"Failed to upload {file_name} to {bucket_name}/{object_name}: {e}")

# Asynchronous function to periodically backup the study database
async def backup_study(db_path, bucket_name, interval=300):
    while True:
        try:
            await upload_to_s3(db_path, bucket_name)
        except Exception as e:
            print(f"Error during backup: {e}")
        await asyncio.sleep(interval)

# Main function to run the study and backup
async def run_study_and_backup(db_path, bucket_name, n_trials=100):
    study = optuna.load_study(study_name="example_study", storage=f"sqlite:///{db_path}")

    # Create a task for the backup
    backup_task = asyncio.create_task(backup_study(db_path, bucket_name))

    # Run the optimization
    study.optimize(objective, n_trials=n_trials)

    # Wait for the backup task to finish
    await backup_task

# Set up the study database and bucket name
db_path = "optuna_study.db"
bucket_name = "your-s3-bucket-name"

# Ensure the study database exists
if not os.path.exists(db_path):
    optuna.create_study(study_name="example_study", storage=f"sqlite:///{db_path}")

# Run the study and backup using asyncio
await run_study_and_backup(db_path, bucket_name)

## With threading and concurrent.futures

In [None]:
import lightgbm as lgb
import time
import concurrent.futures
import threading

# CPU-bound task
def optimize_task(n_trials):
    print("HPO optimization started")
    study.optimize(objective, n_trials=n_trials)
    print("HPO optimization finished")


# I/O-bound task
def upload_to_s3(
        sleep_time=60,
        file_name=f"{study_path}/{experiment_name}.db",
        bucket=bucket_name,
        object_name=f"optuna-studies/{experiment_name}.db"
    ):
    # Simulate an I/O operation
    time.sleep(sleep_time)
    print("Uploading files to S3")
    s3.upload_file(
        file_name, bucket, object_name
    )
    print("Files uploaded")

# Function to run the I/O-bound task using ThreadPoolExecutor
def run_upload_to_s3():
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(upload_to_s3)
        result = future.result()
    return result

# Create a thread for the I/O-bound task
io_thread = threading.Thread(target=run_upload_to_s3)

# Start the I/O-bound task
io_thread.start()

# Run the optimization task in the main process
optimize_task(n_trials=3)

# Wait for the I/O-bound task to complete
io_thread.join()

print("Both tasks are done")

HPO optimization started
Uploading files to S3
Trial 3 failed with parameters: {'num_trees': 3, 'max_depth': 3, 'num_leaves': 22, 'learning_rate': 0.0360009119291161, 'feature_fraction': 0.6655078963604325} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
  File "<ipython-input-31-67417427e59f>", line 15, in objective
    cv_errors = cross_val_score(
  File "/usr/local/lib/python3.10/dist-packages/sklearn/utils/_param_validation.py", line 213, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/sklearn/model_selection/_validation.py", line 712, in cross_val_score
    cv_results = cross_validate(
  File "/usr/local/lib/python3.10/dist-packages/sklearn/utils/_param_validation.py", line 213, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packag

KeyboardInterrupt: 

In [None]:
print("best trial number :", study.best_trial.number)
print("best params :", study.best_params)
print("best error :", study.best_value)

best trial number : 400
best params : {'num_trees': 93, 'learning_rate': 0.09965941125937848, 'max_depth': 9, 'num_leaves': 35}
best error : 63.19364006608713


## Experimental history

In [None]:
study = optuna.create_study(
    study_name=experiment_name, storage=storage_name, load_if_exists=True
)
print("number of trials in the study :", len(study.trials))
trials_df = study.trials_dataframe().drop(
    columns=["datetime_start", "datetime_complete"]
)
trials_df.sort_values(by="value").head()

Using an existing study with name 'lightgbm-preprocessing2-config1' instead of creating a new one.
number of trials in the study : 691


Unnamed: 0,number,value,duration,params_learning_rate,params_max_depth,params_num_leaves,params_num_trees,user_attrs_cv_errors_std,user_attrs_error_split_1,user_attrs_error_split_2,user_attrs_error_split_3,user_attrs_error_split_4,user_attrs_error_split_5,user_attrs_test_mae,user_attrs_train_mae,state
400,400,63.19364,0 days 00:00:15.705318,0.099659,9,35,93,38.742354,53.388416,57.152069,35.108136,138.161619,32.157961,73.871434,55.813876,COMPLETE
301,301,63.213407,0 days 00:00:15.849273,0.072456,9,32,100,37.942814,53.909184,57.993,35.362319,136.415355,32.387178,74.998595,57.314667,COMPLETE
296,296,63.215329,0 days 00:00:16.442244,0.087321,9,49,96,39.351986,53.197649,56.591022,35.165786,139.474059,31.64813,73.422721,54.379721,COMPLETE
132,132,63.219725,0 days 00:00:15.500934,0.09971,9,35,95,38.808659,53.380351,57.049509,35.512525,138.334141,31.8221,73.83719,55.568589,COMPLETE
628,628,63.239345,0 days 00:00:16.001369,0.086211,9,36,100,38.632593,53.273483,57.820008,34.992164,137.894456,32.216613,73.6644,55.962821,COMPLETE
