In [26]:
!pip show ray

Name: ray
Version: 2.40.0
Summary: Ray provides a simple, universal API for building distributed applications.
Home-page: https://github.com/ray-project/ray
Author: Ray Team
Author-email: ray-dev@googlegroups.com
License: Apache 2.0
Location: /root/miniconda3/envs/py3920/lib/python3.9/site-packages
Requires: aiosignal, click, filelock, frozenlist, jsonschema, msgpack, packaging, protobuf, pyyaml, requests
Required-by: 


In [2]:
%pip -q install -r /root/thu/BigDataProject/requirements.txt

[0mNote: you may need to restart the kernel to use updated packages.


In [3]:
# %pip -q install -r /root/thu/BigDataProject/ray/requirements.txt
#%pip -q install pendulum
%pip -q install "ray[tune]"
%pip -q install -U ipywidgets

[0mNote: you may need to restart the kernel to use updated packages.
[0mNote: you may need to restart the kernel to use updated packages.


In [None]:
import os
from datetime import datetime, timedelta
from typing import Dict, List

import mlflow
import pandas as pd
import pendulum

import ray
# from ray import tune
# ray.init(
#     include_dashboard=False,  # Disable dashboard to avoid port conflicts
#     ignore_reinit_error=True,  # Ignore if Ray is already initialized
#     # _temp_dir="./ray_temp"     # Use a custom temp directory
# )
# print(f"✅ Using local Ray - available resources: {ray.cluster_resources()}")

from ray.train.xgboost import XGBoostTrainer
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.optuna import OptunaSearch


ray.init("ray://10.200.2.51:10001", namespace="experiment-1", log_to_driver=False)

Note: you may need to restart the kernel to use updated packages.


2025-04-16 16:10:26,971	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: log_to_driver
SIGTERM handler is not set because current thread is not the main thread.


0,1
Python version:,3.9.20
Ray version:,2.40.0
Dashboard:,http://172.20.0.13:8265


Error in data channel:
Queue filler thread failed to join before timeout: 10
2025-04-16 16:21:54,167	ERROR dataclient.py:330 -- Unrecoverable error in data channel.


In [5]:
# Training Configuration
TRAINING_CONFIG = {
    "model_path": "model-checkpoints/final-model/xgb_model",
    "test_size": 0.3,
    "num_workers": 1,
    "resources_per_worker": {"CPU": 1}, #4
    "use_gpu": False,
    "num_boost_round": 1,
}

# XGBoost Parameters
XGBOOST_PARAMS = {
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error", "rmse", "mae", "auc"],
    "tree_method": "hist",
    "max_depth": 1,
    "eta": 0.3,
    "subsample": 0.8,
    "colsample_bytree": 0.8,
}

# Feature Configuration
FEATURE_COLUMNS = [
    "brand",
    "price",
    "event_weekday",
    "category_code_level1",
    "category_code_level2",
    "activity_count",
    "is_purchased",
]

CATEGORICAL_COLUMNS = [
    "brand",
    "event_weekday",
    "category_code_level1",
    "category_code_level2",
]

# DAG Configuration
DEFAULT_ARGS = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
    "execution_timeout": timedelta(hours=2),
    "start_date": pendulum.datetime(2024, 1, 1, tz="UTC"),
}

# Tune Configuration
TUNE_CONFIG = {
    "model_path": "model-checkpoints/hyperparameter-tuning/xgb_model",
    "num_trials": 4,  # Number of trials for hyperparameter search
    "max_epochs": 4,  # Maximum epochs per trial
    "grace_period": 4,  # Minimum epochs before pruning
    "mlflow_tracking_uri": os.getenv("MLFLOW_TRACKING_URI", "http://localhost:5001"),
}

# Tune Search Space
TUNE_SEARCH_SPACE = {
    "max_depth": tune.randint(3, 5),
    "learning_rate": tune.loguniform(1e-4, 1e-1),
    "min_child_weight": tune.choice([1, 2, 3, 4, 5]),
    "subsample": tune.uniform(0.5, 1.0),
    "colsample_bytree": tune.uniform(0.5, 1.0),
    "gamma": tune.uniform(0, 1),
}

# Model Configuration
MODEL_NAME = "purchase_prediction_model"


In [6]:
import psycopg2
import pandas as pd

In [7]:
class ModelPipeline:
    def __init__(self):        
        # self.postgres_conn = "postgresql://airflow:airflow@10.200.2.51:5432/airflow"

        # Kết nối đến PostgreSQL
        self.postgres_conn = psycopg2.connect(
            dbname="mlflow",
            user="mlflow",
            password="mlflow",
            host="localhost",
            port="5432"
        )
    
        mlflow.set_tracking_uri(TUNE_CONFIG["mlflow_tracking_uri"])
    
    def filter_features(df: pd.DataFrame) -> pd.DataFrame:
        columns_to_exclude = ["event_timestamp", "user_id", "product_id", "user_session", "is_purchased"]
        feature_columns = [col for col in df.columns if col not in columns_to_exclude]
        return df[feature_columns + ["is_purchased"]]

    def load_training_data(self) -> Dict[str, List[Dict]]:
        # Truy vấn, ép cột event_timestamp về dạng text để tránh chuyển đổi tự động thành datetime
        query = """
        SELECT
            CAST(event_timestamp AS text) AS event_timestamp,
            price,
            brand,
            category_code_level1,
            category_code_level2,
            event_weekday,
            activity_count,
            is_purchased
        FROM processed_data
        """

        # Đọc dữ liệu từ cơ sở dữ liệu
        df = pd.read_sql(query, self.postgres_conn)

        # Đóng kết nối sau khi đọc xong
        self.postgres_conn.close()

        # Chuyển đổi cột event_timestamp sang datetime; các giá trị không hợp lệ sẽ trở thành NaT
        df['event_timestamp'] = pd.to_datetime(df['event_timestamp'], errors='coerce')

        print(f"Loaded {len(df)} rows of data")

        # Data preprocessing
        df["price"] = df["price"].astype(float)

        # Create category mappings
        category_mappings = {}
        for col in CATEGORICAL_COLUMNS:
            unique_values = df[col].dropna().unique()
            category_mapping = {
                val: idx for idx, val in enumerate(sorted(unique_values))
            }
            category_mappings[col] = category_mapping
            df[col] = df[col].map(category_mapping).fillna(-1)

        return {
            "data": df.to_dict(orient="records"),
            "category_mappings": category_mappings,
        }

    def tune_hyperparameters(self, data: dict) -> dict:
        experiment_name = f"experiment-1_xgb_tune_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        mlflow.create_experiment(experiment_name)

        df = pd.DataFrame(data["data"])
        # dataset = ray.data.from_pandas(df)
        filtered_df = self.filter_features(df)
        dataset = ray.data.from_pandas(filtered_df)

        def train_xgboost(config):
            training_params = {**XGBOOST_PARAMS, **config}
            trainer = XGBoostTrainer(
                label_column="is_purchased",
                num_boost_round=TRAINING_CONFIG["num_boost_round"],
                params=training_params,
                datasets={"train": dataset},
            )
            results = trainer.fit()
            ray.train.report(results.metrics)

        tuner = ray.tune.run(
            train_xgboost,
            config=TUNE_SEARCH_SPACE,
            num_samples=TUNE_CONFIG["num_trials"],
            scheduler=ASHAScheduler(
                metric="train_rmse",
                mode="min",
            ),
            search_alg=OptunaSearch(
                metric="train_rmse",
                mode="min",
            ),
        )

        best_trial = tuner.get_best_trial("train_rmse", "min")
        return {
            "best_config": best_trial.config,
            "best_metrics": best_trial.last_result,
        }

    def train_final_model(self, data: dict, best_params: dict) -> dict:
        experiment_name = f"xgb_final_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        mlflow.create_experiment(experiment_name)

        with mlflow.start_run() as run:
            model_params = {**XGBOOST_PARAMS, **best_params["best_config"]}
            df = pd.DataFrame(data["data"])
            # dataset = ray.data.from_pandas(df)
            filtered_df = self.filter_features(df)
            dataset = ray.data.from_pandas(filtered_df)

            trainer = XGBoostTrainer(
                label_column="is_purchased",
                num_boost_round=TRAINING_CONFIG["num_boost_round"],
                params=model_params,
                datasets={"train": dataset},
            )

            result = trainer.fit()

            # Log model and metrics
            mlflow.xgboost.log_model(
                result.checkpoint.get_model(),
                "model",
                registered_model_name=MODEL_NAME,
            )

            # Log category mappings if available
            if "category_mappings" in data:
                mlflow.log_dict(data["category_mappings"], "category_mappings.json")

            return {
                "metrics": result.metrics,
                "checkpoint_path": result.checkpoint.path,
                "mlflow_run_id": run.info.run_id,
                "mlflow_model_uri": f"models:/{MODEL_NAME}/Staging",
            }

datetime chỉ cho phép năm trong khoảng từ 1 đến 9999.

In [8]:
pipeline = ModelPipeline()
# Load and preprocess data
data = pipeline.load_training_data()

  df = pd.read_sql(query, self.postgres_conn)
  df['event_timestamp'] = pd.to_datetime(df['event_timestamp'], errors='coerce')


Loaded 532576 rows of data


In [11]:
import ray
ray.init()

experiment_name = f"experiment-1_xgb_tune_{datetime.now().strftime('%Y%m%d%H%M%S')}"
mlflow.create_experiment(experiment_name)

df = pd.DataFrame(data["data"])
# dataset = ray.data.from_pandas(df)
filtered_df = ModelPipeline.filter_features(df)
dataset = ray.data.from_pandas(filtered_df)

dataset.show(1)

I0000 00:00:1744794682.888536  984639 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1744794683.082648  984639 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1744794683.268277  984639 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1744794683.326710  984639 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
I0000 00:00:1744794683.357759  984639 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers
2025-04-16 16:11:23,457	INFO worker.py:1812 -- Started a local Ray instance. View the dashboard at [1m[32m172.20.0.13:8265 [39m[22m
2025-04-16 16:11:29,214	INFO dataset.py:2631 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2025-04-16 16:11:29,228	INFO streaming_executor.py:108 -- Starting execu

Running 0: 0.00 row [00:00, ? row/s]

- limit=1 1: 0.00 row [00:00, ? row/s]

{'price': 742.36, 'brand': 2134, 'category_code_level1': 8, 'category_code_level2': 41, 'event_weekday': 3, 'activity_count': 2, 'is_purchased': 0}


![image.png](attachment:image.png)

In [None]:
%pip -q install optuna

I0000 00:00:1744794689.629525  984639 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers


E0000 00:00:1744794689.688300  986367 completion_queue.cc:736] Kick failed: UNKNOWN:eventfd_write: Bad file descriptor (9) {created_time:"2025-04-16T16:11:29.688282793+07:00"}
[0mNote: you may need to restart the kernel to use updated packages.


In [None]:
import optuna
print(f"✅ Optuna version: {optuna.__version__}")

✅ Optuna version: 4.3.0


In [14]:
%pip -q install ipython

I0000 00:00:1744794694.416538  984639 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers


[0mNote: you may need to restart the kernel to use updated packages.


In [16]:
def train_xgboost(config):
    training_params = {**XGBOOST_PARAMS, **config}
    trainer = XGBoostTrainer(
        label_column="is_purchased",
        num_boost_round=TRAINING_CONFIG["num_boost_round"],
        params=training_params,
        datasets={"train": dataset},
    )
    results = trainer.fit()
    ray.train.report(results.metrics)

In [19]:
!pip -q install IPython
ray.shutdown()
ray.init()

I0000 00:00:1744795340.786907  984639 fork_posix.cc:77] Other threads are currently calling into gRPC, skipping fork() handlers


E0000 00:00:1744795340.813279  994332 completion_queue.cc:736] Kick failed: UNKNOWN:eventfd_write: Bad file descriptor (9) {created_time:"2025-04-16T16:22:20.81324872+07:00"}
[0m

RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.

In [None]:
tuner = ray.tune.run(
    train_xgboost,
    config=TUNE_SEARCH_SPACE,
    num_samples=TUNE_CONFIG["num_trials"],
    scheduler=ASHAScheduler(
        metric="train_rmse",
        mode="min",
    ),
    name="xgboost_grid",
    resources_per_trial={"cpu": 2, "gpu": 0},
    reuse_actors=False,
    keep_checkpoints_num=1,
    verbose=1,
    log_to_file=True  # You can also use: ["stdout.log", "stderr.log"]
)

ConnectionError: Failed during this or a previous request. Exception that broke the connection: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.FAILED_PRECONDITION
	details = "No module named 'IPython'"
	debug_error_string = "UNKNOWN:Error received from peer ipv4:10.200.2.51:10001 {grpc_message:"No module named \'IPython\'", grpc_status:9, created_time:"2025-04-16T16:21:54.167211844+07:00"}"
>

In [17]:
best_trial = tuner.get_best_trial("train_rmse", "min")

NameError: name 'tuner' is not defined

In [None]:
# Train final model
results = pipeline.train_final_model(data, best_params)

print("Training completed successfully!")
print(f"Final metrics: {results['metrics']}")
print(f"Model URI: {results['mlflow_model_uri']}")