In [1]:
from pyspark.sql import SparkSession
import pyspark

AWS_ACCESS_KEY = "minioadmin"
AWS_SECRET_KEY = "minioadmin"
AWS_S3_ENDPOINT = "http://minio_server:9000"
WAREHOUSE = "s3a://gold/" 
NESSIE_URI = "http://nessie:19120/api/v1"

conf = (
    pyspark.SparkConf()
    .setAppName("Lakehouse-Iceberg-TrainModel")  
    .set('spark.jars.packages',
         'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,'
         'org.projectnessie.nessie-integrations:nessie-spark-extensions-3.3_2.12:0.67.0,'
         'org.apache.hadoop:hadoop-aws:3.3.4,'
         'com.amazonaws:aws-java-sdk-bundle:1.12.300')
    .set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.nessie.uri", NESSIE_URI)
    .set("spark.sql.catalog.nessie.ref", "main")
    .set("spark.sql.catalog.nessie.authentication.type", "NONE")
    .set("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
    .set("spark.sql.catalog.nessie.warehouse", WAREHOUSE)
    .set("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
    .set("spark.sql.catalog.nessie.s3.endpoint", AWS_S3_ENDPOINT)
    .set("spark.sql.catalog.nessie.s3.access-key", AWS_ACCESS_KEY)
    .set("spark.sql.catalog.nessie.s3.secret-key", AWS_SECRET_KEY)
    .set("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .set("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .set("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .set("spark.hadoop.fs.s3a.path.style.access", "true")
)

spark = (
    SparkSession.builder
    .config(conf=conf) 
    .config("spark.driver.memory", "4g") 
    .config("spark.executor.memory", "4g")
    .getOrCreate()
)

spark._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")


In [2]:
df_fact = spark.table("nessie.fact_order")
df_customer = spark.table("nessie.dim_customer")
df_product = spark.table("nessie.dim_product")
df_time = spark.table("nessie.dim_time")
df_location = spark.table("nessie.dim_location")


In [3]:
query = """
SELECT  
    f.time_id,
    f.customer_id,
    f.product_id,
    f.location_id,
    f.purchase_price_per_unit,
    f.quantity,
    f.total_price,

    -- Dim_time
    t.order_date,
    t.year,
    t.month,
    t.day,
    t.quarter,
    t.weekday_name,

    -- Dim_customer
    c.age_group,
    c.gender,
    c.education,
    c.income,
    c.race,
    c.state,

    -- Dim_product
    p.product_title,
    p.product_category,

    -- Dim_location
    l.state_code,
    l.state_name,
    l.region

FROM nessie.fact_order AS f
LEFT JOIN nessie.dim_time AS t ON f.time_id = t.time_id
LEFT JOIN nessie.dim_customer AS c ON f.customer_id = c.customer_id
LEFT JOIN nessie.dim_product AS p ON f.product_id = p.product_id
LEFT JOIN nessie.dim_location AS l ON f.location_id = l.location_id
"""

In [4]:
df_fact_full = spark.sql(query)
df_fact_full.limit(10).toPandas()


Unnamed: 0,time_id,customer_id,product_id,location_id,purchase_price_per_unit,quantity,total_price,order_date,year,month,...,gender,education,income,race,state,product_title,product_category,state_code,state_name,region
0,439,R_1jZkLNE1JdtyVpH,000217653X,44,29.99,1.0,29.99,2020-09-16,2020,9,...,Female,High school diploma or GED,"Less than $25,000",White or Caucasian,Florida,THE DINAH'S CUPBOARD COOK BOOK: Recipes and Me...,ABIS_BOOK,FL,Unknown,Unknown
1,439,R_1jZkLNE1JdtyVpH,000217653X,39,13.55,1.0,13.55,2020-09-16,2020,9,...,Female,High school diploma or GED,"Less than $25,000",White or Caucasian,Florida,THE DINAH'S CUPBOARD COOK BOOK: Recipes and Me...,ABIS_BOOK,TX,Texas,South
2,444,R_3qIPMah81MezsJn,0007137508,33,19.95,1.0,19.95,2022-12-05,2022,12,...,Male,Bachelor's degree,"$50,000 - $74,999",White or Caucasian,Tennessee,Wellington: The Iron Duke,ABIS_BOOK,TN,Unknown,Unknown
3,428,R_vD2O13NgdnWBXMt,0007302622,4,13.25,1.0,13.25,2019-08-10,2019,8,...,Female,"Graduate or professional degree (MA, MS, MBA, ...","$50,000 - $74,999",White or Caucasian,New Jersey,Duck in the Truck,ABIS_BOOK,NJ,New Jersey,Northeast
4,1573,R_1QsZS0nI2sw5gl5,000745287X,41,14.96,1.0,14.96,2022-06-27,2022,6,...,Male,"Graduate or professional degree (MA, MS, MBA, ...","$150,000 or more",White or Caucasian,Georgia,Sharpe's Regiment: Richard Sharpe and the Inva...,ABIS_BOOK,GA,Unknown,Unknown
5,1815,R_2aldwxmUZox7Yfd,0007483791,16,10.84,1.0,10.84,2018-03-21,2018,3,...,Male,"Graduate or professional degree (MA, MS, MBA, ...","$150,000 or more",White or Caucasian,California,Deep Time,ABIS_BOOK,CA,California,West
6,23,R_3GD1CL4OyjglmbZ,0007510837,36,24.04,1.0,24.04,2020-01-21,2020,1,...,Female,High school diploma or GED,"$25,000 - $49,999",White or Caucasian,Pennsylvania,Collins German Dictionary Complete and Unabrid...,ABIS_BOOK,PA,Unknown,Unknown
7,328,R_3Pc1ZZfNy58AvgE,0007544790,16,18.72,1.0,18.72,2019-07-23,2019,7,...,Male,Bachelor's degree,"$100,000 - $149,999",Other,California,My Virgin Kitchen: Delicious recipes you can m...,ABIS_BOOK,CA,California,West
8,354,R_27Nf8ImFlWu3J9O,000756032X,4,9.99,1.0,9.99,2018-06-20,2018,6,...,Male,High school diploma or GED,"Less than $25,000",White or Caucasian,California,Born into the Children of God: My life in a re...,ABIS_BOOK,NJ,New Jersey,Northeast
9,1201,R_3Pp1HTLxoglta9u,0008100713,32,23.08,1.0,23.08,2022-06-08,2022,6,...,Male,"Graduate or professional degree (MA, MS, MBA, ...","$75,000 - $99,999",White or Caucasian,Ohio,Well Gardened Mind,ABIS_BOOK,OH,Unknown,Unknown


In [5]:
# Hiển thị schema sau khi làm sạch
df_fact_full.printSchema()

root
 |-- time_id: long (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- location_id: long (nullable = true)
 |-- purchase_price_per_unit: double (nullable = true)
 |-- quantity: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- order_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- weekday_name: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- education: string (nullable = true)
 |-- income: string (nullable = true)
 |-- race: string (nullable = true)
 |-- state: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- region: string (nullable = true)



In [6]:
# Đếm số dòng
num_rows =df_fact_full.count()
# Đếm số cột
num_cols = len(df_fact_full.columns)
print(f"\nKích thước dữ liệu: ({num_rows}, {num_cols})")


Kích thước dữ liệu: (1675015, 24)


In [7]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator

# =====================================================
# Tính toán toàn bộ feature trong MỘT lần groupBy duy nhất
# =====================================================
df_final = (
    df_fact_full.groupBy("customer_id")
    .agg(
        # Tổng chi tiêu
        F.sum("total_price").alias("total_spend"),

        # Trung bình chi tiêu trên mỗi đơn hàng
        (F.sum("total_price") / F.countDistinct("time_id")).alias("avg_order_value"),

        # Độ lệch chuẩn chi tiêu trên đơn hàng
        F.stddev("total_price").alias("std_order_value"),

        # Trung bình số lượng sản phẩm mỗi đơn
        (F.sum("quantity") / F.countDistinct("time_id")).alias("avg_items_per_order"),

        # Tổng số đơn hàng
        F.countDistinct("time_id").alias("total_orders"),

        # Năm đầu và năm cuối hoạt động
        F.min("year").alias("first_year"),
        F.max("year").alias("last_year")
    )
    # Thêm số năm hoạt động và chi tiêu trung bình mỗi năm
    .withColumn("years_active", (F.col("last_year") - F.col("first_year") + 1))
    .withColumn("years_active", F.when(F.col("years_active") <= 0, 1).otherwise(F.col("years_active")))
    .withColumn("avg_spend_per_year", F.col("total_spend") / F.col("years_active"))
    .na.fill(0)  # đảm bảo không có null
)

In [8]:

df_final.select(
    "customer_id",
    "total_spend", "avg_spend_per_year",
    "avg_order_value", "std_order_value",
    "avg_items_per_order", "total_orders",
    "years_active"
).show(10, truncate=False)


+-----------------+------------------+------------------+------------------+------------------+-------------------+------------+------------+
|customer_id      |total_spend       |avg_spend_per_year|avg_order_value   |std_order_value   |avg_items_per_order|total_orders|years_active|
+-----------------+------------------+------------------+------------------+------------------+-------------------+------------+------------+
|R_1l6oxKA9uiM9GUo|5212.49           |1042.498          |66.82679487179487 |37.65645361471535 |2.3205128205128207 |78          |5           |
|R_1jO4s7oht3pyKEc|13041.71          |2173.6183333333333|60.37828703703703 |25.637963404064184|2.8333333333333335 |216         |6           |
|R_10TV1zyi4yCEEkl|30578.98999999998 |5096.49833333333  |74.4014355231143  |83.67537499523928 |2.67639902676399   |411         |6           |
|R_2dyITPHbbfmCXJn|16020.08          |2670.0133333333333|92.06942528735632 |49.117791465504325|3.1666666666666665 |174         |6           |
|R_1eW

In [9]:
# =====================================================
#  Xác định features & target
# =====================================================
num_features = [
    "total_spend",
    "years_active",
    "avg_order_value",
    "std_order_value",
    "avg_items_per_order",
    "total_orders"
]
target = "avg_spend_per_year"

df_clean = df_final.na.drop(subset=num_features + [target])
# Chia train/test
train_df, test_df = df_clean.randomSplit([0.8, 0.2], seed=42)
print(f"Train: {train_df.count()} dòng | Test: {test_df.count()} dòng")

assembler = VectorAssembler(
    inputCols=num_features,
    outputCol="features",
    handleInvalid="keep"
)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withMean=True,
    withStd=True
)

Train: 4030 dòng | Test: 930 dòng


## Random Forest

In [10]:
# --------------------------
# Pipeline mô hình Random Forest
# --------------------------
rf = RandomForestRegressor(
    labelCol=target,
    featuresCol="scaledFeatures",
    seed=42
)

rf_pipeline = Pipeline(stages=[assembler, scaler, rf])

# --------------------------
# Tập siêu tham số cần thử
# --------------------------
param_grid = (
    ParamGridBuilder()
    .addGrid(rf.numTrees, [50, 200])
    .addGrid(rf.maxDepth, [8, 10, 12])
    .build()
)

# --------------------------
# TrainValidationSplit
# --------------------------
evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")

tvs = TrainValidationSplit(
    estimator=rf_pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    trainRatio=0.8,
    parallelism=4
)

# --------------------------
# Huấn luyện mô hình
# --------------------------
rf_tvs_model = tvs.fit(train_df)

# --------------------------
# In thông tin mô hình tốt nhất
# --------------------------
best_rf = rf_tvs_model.bestModel.stages[-1]
print("Best Model:")
print(" - numTrees:", best_rf.getNumTrees)
print(" - maxDepth:", best_rf.getMaxDepth())


Best Model:
 - numTrees: 200
 - maxDepth: 12


In [11]:
# --------------------------
# Đánh giá mô hình
# --------------------------
train_pred = rf_tvs_model.bestModel.transform(train_df)
test_pred  = rf_tvs_model.bestModel.transform(test_df)

metrics = ['r2', 'mae', 'rmse']
for metric in metrics:
    evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName=metric)
    print(f"{metric.upper()} Train:", round(evaluator.evaluate(train_pred), 4))
    print(f"{metric.upper()} Test :", round(evaluator.evaluate(test_pred), 4))
    print("----------")

R2 Train: 0.9914
R2 Test : 0.9684
----------
MAE Train: 53.4572
MAE Test : 88.3366
----------
RMSE Train: 152.5212
RMSE Test : 294.7476
----------


In [14]:
import mlflow
import mlflow.spark
from pyspark.ml.evaluation import RegressionEvaluator

mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("Customer_Spending_Prediction")

# Tính metrics train/test
metrics_dict = {}
for metric in metrics:  # metrics = ['r2', 'mae', 'rmse']
    evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName=metric)
    metrics_dict[f"{metric}_train"] = round(evaluator.evaluate(train_pred), 4)
    metrics_dict[f"{metric}_test"]  = round(evaluator.evaluate(test_pred), 4)

# Lưu vào MLflow
with mlflow.start_run(run_name="RandomForest_Model"):
    # Log param
    mlflow.log_param("model", "RandomForestRegressor")
    mlflow.log_param("numTrees", best_rf.getNumTrees)
    mlflow.log_param("maxDepth", best_rf.getMaxDepth())
    
    # Log metric
    for k, v in metrics_dict.items():
        mlflow.log_metric(k, v)
    
    # Log mô hình (PipelineModel đầy đủ)
    mlflow.spark.log_model(rf_tvs_model.bestModel, "rf_model")

    print("Logged Random Forest model to MLflow")




Logged Random Forest model to MLflow


## XGB

In [15]:
from pyspark.ml import Pipeline
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# --------------------------
# XGBoost Spark phân tán
# --------------------------
xgb = SparkXGBRegressor(
    features_col="scaledFeatures",
    label_col=target,
    num_workers=1,
    n_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    subsample=0.8,
    colsample_bytree=0.8,
    tree_method="hist",
    objective="reg:squarederror"
)

# --------------------------
# Pipeline
# --------------------------
xgb_pipeline = Pipeline(stages=[assembler, scaler, xgb])

# --------------------------
# Grid Search / TrainValidationSplit
# --------------------------
param_grid = (
    ParamGridBuilder()
    .addGrid(xgb.max_depth, [4, 6])
    .addGrid(xgb.n_estimators, [50, 100])
    .addGrid(xgb.learning_rate, [0.05, 0.1])
    .addGrid(xgb.subsample, [0.7, 0.8])
    .build()
)

evaluator = RegressionEvaluator(
    labelCol=target,
    predictionCol="prediction",
    metricName="r2"
)

tvs = TrainValidationSplit(
    estimator=xgb_pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    trainRatio=0.8,
    parallelism=4,   # chạy song song 4 mô hình
    seed=42
)

# --------------------------
# Huấn luyện mô hình
# --------------------------
xgb_tvs_model = tvs.fit(train_df)

# --------------------------

2025-11-12 03:09:02,314 INFO XGBoost-PySpark: _fit Running xgboost-3.1.1 on 1 workers with
	booster params: {'colsample_bytree': 0.8, 'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 4, 'objective': 'reg:squarederror', 'subsample': 0.8, 'tree_method': 'hist', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 50}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2025-11-12 03:09:02,314 INFO XGBoost-PySpark: _fit Running xgboost-3.1.1 on 1 workers with
	booster params: {'colsample_bytree': 0.8, 'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 4, 'objective': 'reg:squarederror', 'subsample': 0.8, 'tree_method': 'hist', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 50}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2025-11-12 03:09:02,320 INFO XGBoost-PySpark: _fit Running xgboost-3.1.1 on 1 workers with
	booster params: {'colsample_bytree': 0.8, 'device': 'cpu', 'learning_rate': 0.1, 'max_depth': 4, 'objective': 'r

In [16]:
# Lấy mô hình tốt nhất
best_xgb = xgb_tvs_model.bestModel.stages[-1]
tuned_params = ["max_depth", "n_estimators", "learning_rate", "subsample"]
print("Best XGBoost Parameters (from grid search):")
for p in tuned_params:
    value = best_xgb.getOrDefault(best_xgb.getParam(p))
    print(f" - {p}: {value}")

Best XGBoost Parameters (from grid search):
 - max_depth: 4
 - n_estimators: 100
 - learning_rate: 0.1
 - subsample: 0.7


In [17]:
# Đánh giá mô hình
# --------------------------
train_pred = xgb_tvs_model.transform(train_df)
test_pred  = xgb_tvs_model.transform(test_df)

metrics = ['r2', 'mae', 'rmse']
for metric in metrics:
    evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName=metric)
    print(f"{metric.upper()} Train:", round(evaluator.evaluate(train_pred), 4))
    print(f"{metric.upper()} Test :", round(evaluator.evaluate(test_pred), 4))
    print("----------")

R2 Train: 0.9987
R2 Test : 0.9905
----------
MAE Train: 33.9499
MAE Test : 49.7622
----------
RMSE Train: 58.4547
RMSE Test : 161.3474
----------


In [12]:
import mlflow
import mlflow.spark

mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("XGBoost_Regression")

with mlflow.start_run():
    mlflow.spark.log_model(best_xgb, "xgb_model")
    mlflow.log_metric("r2_train", 0.9987)
    mlflow.log_metric("r2_test", 0.9905)



In [18]:
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("Customer_Spending_Prediction")

# Tính metrics train/test
metrics_dict = {}
for metric in metrics:  # metrics = ['r2', 'mae', 'rmse']
    evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName=metric)
    metrics_dict[f"{metric}_train"] = round(evaluator.evaluate(xgb_tvs_model.transform(train_df)), 4)
    metrics_dict[f"{metric}_test"]  = round(evaluator.evaluate(xgb_tvs_model.transform(test_df)), 4)

# Lưu vào MLflow
with mlflow.start_run(run_name="XGBoost_Model"):
    # Log param
    mlflow.log_param("model", "SparkXGBRegressor")
    mlflow.log_param("max_depth", best_xgb.getOrDefault(best_xgb.getParam("max_depth")))
    mlflow.log_param("n_estimators", best_xgb.getOrDefault(best_xgb.getParam("n_estimators")))
    mlflow.log_param("learning_rate", best_xgb.getOrDefault(best_xgb.getParam("learning_rate")))
    mlflow.log_param("subsample", best_xgb.getOrDefault(best_xgb.getParam("subsample")))
    
    # Log metric
    for k, v in metrics_dict.items():
        mlflow.log_metric(k, v)
        print(f"{k}: {v}")
    
    # Log mô hình (PipelineModel đầy đủ: assembler + scaler + XGB)
    mlflow.spark.log_model(xgb_tvs_model.bestModel, "xgb_model")

    print("Logged XGBoost Spark model to MLflow")


r2_train: 0.9987
r2_test: 0.9905
mae_train: 33.9499
mae_test: 49.7622
rmse_train: 58.4547
rmse_test: 161.3474
Logged XGBoost Spark model to MLflow


In [None]:
mlflow.spark.log_model(
    best_xgb,
    artifact_path="xgb_model",
    registered_model_name="XGB_Model"
)
