In [1]:
from model import preprocess_data_lgbm as preprocess_data
import os
import pandas as pd
import ray
from ray.train import CheckpointConfig, RunConfig, ScalingConfig
from ray.train.lightgbm import LightGBMTrainer
from utils import create_train_test_group

In [None]:
csv_file_name = "./Data/mro_daily_clean.csv"
target_mro = ["mro"]

maintain_repair_mro = "full"

add_mro_prev = True
add_purchase_time = True
add_driver_behavior = True
agg_weeks = 1
agg_fun = ["mean", "sum", "max", "min", "std", "skew"]
# time window could be 4, 8, 12
time_window = 8

# ------------------------------------------
# LightGBM Parameters
metric: list = ["binary_logloss", "binary_error", "auc", "average_precision"]
learning_rate: float = 0.05
num_leaves: int = 64
max_depth: int = 8
is_unbalance: bool = True
# boosting could be "gbdt", "rf" (random forest) and "dart"
boosting: str = "gbdt"


# ------------------------------------------
# data record folder
data_lgbm_file_name = f"data_lgbm_db{int(add_driver_behavior)}_mp{int(add_mro_prev)}_pt{int(add_purchase_time)}_aw{agg_weeks}_tw{time_window}.gzip"
data_lgbm_path = os.path.join("./Data", data_lgbm_file_name)
data_lgbm_path = os.path.abspath(data_lgbm_path)

# ------------------------------------------
# model record folder
model_name = f"model_lgbm_{boosting}_db{int(add_driver_behavior)}_mp{int(add_mro_prev)}_pt{int(add_purchase_time)}_aw{agg_weeks}_tw{time_window}.json"
model_output_dir = "./output/lgbm"
os.makedirs(model_output_dir, exist_ok=True)
model_path = os.path.join(model_output_dir, model_name)

In [None]:
if os.path.isfile(data_lgbm_path):
    print(f"Data file {data_lgbm_path} exists.")
    data_lgbm = pd.read_parquet(data_lgbm_path)
else:
    print(f"{data_lgbm_path} does not exist.")
    # control parameter: data preparation
    data = preprocess_data(
        file_name=csv_file_name,
        target_mro=target_mro,
        maintain_repair_mro=maintain_repair_mro,
        add_mro_prev=add_mro_prev,
        add_purchase_time=add_purchase_time,
        add_driver_behavior=add_driver_behavior,
        agg_weeks=agg_weeks,
        agg_fun=agg_fun,
        time_window=time_window,
    )

    data_lgbm = create_train_test_group(
        data=data,
        sample_frac=1.0,
        test_size=0.1,
        valid_size=0.1,
        random_state=42,
    )

    data_lgbm.to_parquet(data_lgbm_path, compression="gzip", engine="pyarrow")

In [None]:
def prepare_data(data: pd.DataFrame):
    """Load and split the dataset into train, validation, and test sets."""

    train_dataset = data[data["group"] == "train"]
    valid_dataset = data[data["group"] == "valid"]
    test_dataset = data[data["group"] == "test"]

    train_dataset = train_dataset.drop(["group", "id"], axis=1)
    valid_dataset = valid_dataset.drop(["group", "id"], axis=1)
    test_dataset = test_dataset.drop(["group", "id"], axis=1)

    return train_dataset, valid_dataset, test_dataset

In [None]:
train_dataset, valid_dataset, test_dataset = prepare_data(data_lgbm)
ray_train_dataset = ray.data.from_pandas(train_dataset)
ray_valid_dataset = ray.data.from_pandas(valid_dataset)


# Configure checkpointing to save progress during training
run_config = RunConfig(
    checkpoint_config=CheckpointConfig(
        # Checkpoint every 10 iterations.
        checkpoint_frequency=10,
        # Only keep the latest checkpoint and delete the others.
        num_to_keep=20,
    )
)
# Set up the XGBoost trainer with the specified configuration
trainer = LightGBMTrainer(
    # see "How to scale out training?" for more details
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=4,
        # Whether to use GPU acceleration. Set to True to schedule GPU workers.
        use_gpu=True,
    ),
    label_column="target_mro",
    num_boost_round=20,
    # XGBoost specific params (see the `xgboost.train` API reference)
    params={
        "objective": "binary",
        "metric": metric,
        "learning_rate": learning_rate,
        "num_leaves": num_leaves,
        "max_depth": max_depth,
        "is_unbalance": is_unbalance,
        "boosting": boosting,
        "device_type": "gpu",
    },
    datasets={"train": ray_train_dataset, "valid": ray_valid_dataset},
    # store the preprocessor in the checkpoint for inference later
    run_config=run_config,
)
result = trainer.fit()

In [None]:
import json

booster = trainer.get_model(result.checkpoint)
json_model = booster.dump_model()

model_json_file = open(model_path, 'w')
try:
    json.dump(json_model, model_json_file, indent=4)
finally:
    model_json_file.close()

In [None]:
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score


def get_X_y(df):
    X = df.drop("target_mro", axis=1)
    y = df["target_mro"]
    return X, y


X_train, y_train = get_X_y(train_dataset)
X_valid, y_valid = get_X_y(valid_dataset)
X_test, y_test = get_X_y(test_dataset)


def predict_and_eval(booster, X, y_true, dataset_name="dataset"):
    y_prob = booster.predict(X)
    y_pred = (y_prob >= 0.5).astype(int)

    acc = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)

    print(f"\nEvaluation on {dataset_name}:")
    print(f"Accuracy:  {acc:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print(f"F1 Score:  {f1:.4f}")

    result_df = pd.DataFrame(
        {"y_true": y_true.values, "y_prob": y_prob, "y_pred": y_pred}
    )

    return acc, precision, recall, f1, result_df


# predict_and_eval(booster, X_train, y_train, "Train Set")
# predict_and_eval(booster, X_valid, y_valid, "Validation Set")
# predict_and_eval(booster, X_test, y_test, "Test Set")
acc, precision, recall, f1, df_result = predict_and_eval(
    booster, X_test, y_test, "Test Set"
)