# Setup libraries & Import it

In [0]:
%pip install openpyxl polars lightgbm xgboost

In [0]:
dbutils.library.restartPython()

In [0]:
# Decorator to calculate running time.
def with_time_review(func):
    import time
    import traceback

    def wrapper(*args, **kwargs):
      begin = time.time()
      result = func(*args, **kwargs)
      end = round(time.time() - begin, 5)


      print (f"Function: '{func.__name__}' runs for: {end} seconds.")
      print ("----------------------------\n")
      return result
    return wrapper

In [0]:
import pandas as pd
import glob
import os

# Import parallel libraries
import pyspark.pandas as ps_pd
import polars as pl
import ray
import multiprocessing as mp
from joblib import Parallel, delayed

# Import machine learning model
from sklearn.ensemble import RandomForestRegressor
import lightgbm as lgbm
import xgboost as xgb

# Ray Setup
ray.init(
    log_to_driver=False,
    ignore_reinit_error=True,
    runtime_env={
        "pip": ["lightgbm", "xgboost"], 
        "env_vars": {"PYTHONHASHSEED": "0", }},
)

# Setup folder and Read data

In [0]:
OUTPUT_FOLDER = 'mnt/adls_gen2/HENRYDUCLAI/TRAINING/SESSION_01'
os.makedirs('/dbfs/' + OUTPUT_FOLDER, exist_ok=True)

In [0]:
FILE_PATH = '../dataset/Stallion-AbinBev-kaggle.csv'
df_dataset = pd.read_csv(FILE_PATH)
df_dataset['KEY'] = df_dataset['Agency'] + '_' + df_dataset['SKU']
df_dataset = df_dataset.drop(columns=['Price']) # This PRICE needs to remove due to leakage variables. PRICE = SALES + PROMO
spark_df_dataset = spark.createDataFrame(df_dataset)

# Quick review dataset
display(spark_df_dataset)
df_dataset.groupby('KEY').count()

# Forecast module

In [0]:
def codeblock_feature_engineering(key, df_group):
    df_group = df_group.sort_values(by=['YearMonth'])
    df_group = df_group.reset_index(drop=True)
    
    for col in ['Sales', 'Promotions']:
        for lag in range(1, 13):
            df_group[f'f__LAG_{col}_{lag}'] = df_group[col].shift(lag)
        for window in [3, 6, 9, 12]:
            df_group[f'f__MA_{col}_{window}'] = df_group[col].rolling(window).mean().shift(1)
            df_group[f'f__MSTD_{col}_{window}'] = df_group[col].rolling(window).std().shift(1)

    df_group = df_group.drop(columns=['Agency', 'SKU'])
    df_group = df_group.dropna()
    return df_group

def codeblock_model_forecasting(key, df_train, df_test):
    import numpy as np
    np.random.seed(1234)

    models_list = [
        RandomForestRegressor(n_jobs=1, random_state=1234),
        lgbm.LGBMRegressor(n_jobs=1, random_state=1234),
        xgb.XGBRegressor(n_jobs=1, random_state=1234),
    ]

    X_train, y_train = df_train.drop(columns=['Sales']), df_train['Sales']
    X_test, y_test = df_test.drop(columns=['Sales']), df_test['Sales']

    list_numeric_cols = X_train.select_dtypes(include=np.number).columns.tolist()

    for model in models_list:
        model.fit(X_train[list_numeric_cols], y_train)
        y_pred = model.predict(X_test[list_numeric_cols])
        df_test[model.__class__.__name__] = y_pred
    
    return df_test  

def codeblock_evalutation_pipeline(key, df_group, training_yearmonth):    

    df_feature_engineering = codeblock_feature_engineering(key, df_group)

    df_train = df_feature_engineering.query(f"YearMonth < {training_yearmonth}")
    df_test = df_feature_engineering.query(f"YearMonth >= {training_yearmonth}")

    df_forecast = codeblock_model_forecasting(key, df_train, df_test)
    
    return df_forecast

## Demo on 1 Key

In [0]:
training_yearmonth = 201707
key = 'Agency_01_SKU_01'
df_group = df_dataset.query(f"KEY == '{key}'")
df_demo = codeblock_evalutation_pipeline(key, df_group, training_yearmonth)
df_schema_spark = spark.createDataFrame(df_demo)

# Parallel setup for Ray and Spark

In [0]:
@with_time_review
def test_RAY(pandas_df, training_yearmonth):
    RAY_eval_func = ray.remote(codeblock_evalutation_pipeline)
    tasks_list = [
        RAY_eval_func.remote(key, df_group, training_yearmonth)
        for key, df_group in pandas_df.groupby("KEY")
    ]
    tasks_list = ray.get(tasks_list)
    df_output = pd.concat(tasks_list)
    df_output.to_parquet(f"/dbfs/{OUTPUT_FOLDER}/RAY_PANDAS.parquet" , index=False)

########################################################################################

@with_time_review
def test_SPARK(spark_df, training_yearmonth):
    spark_df_output = spark_df.groupBy("KEY").applyInPandas(
        lambda df_group: codeblock_evalutation_pipeline(
            key=df_group["KEY"].iloc[0], df_group=df_group, training_yearmonth=training_yearmonth
        ), schema=df_schema_spark.schema
    )
    spark_df_output.write.mode('overwrite').parquet(f"dbfs:/{OUTPUT_FOLDER}/SPARK_OUTPUT_parquet")

# Run the evaluation 5 times

In [0]:
for i in range(5):
    print(f"Test: {i}")
    test_RAY(pandas_df=df_dataset, training_yearmonth=training_yearmonth)

In [0]:
for i in range(5):
    print(f"Test: {i}")
    test_SPARK(spark_df=spark_df_dataset, training_yearmonth=training_yearmonth)

In [0]:
ray_output = pd.read_parquet(f"/dbfs/{OUTPUT_FOLDER}/RAY_PANDAS.parquet")
spark_output = pd.read_parquet(f"/dbfs/{OUTPUT_FOLDER}/SPARK_OUTPUT_parquet")

print( ray_output['RandomForestRegressor'].sum() == spark_output['RandomForestRegressor'].sum())
print( ray_output['LGBMRegressor'].sum() == spark_output['LGBMRegressor'].sum() )
print( ray_output['XGBRegressor'].sum() == spark_output['XGBRegressor'].sum() )