In [1]:
import polars as pl
import numpy as np
import datetime
from datetime import timedelta

In [2]:
begin_hist = datetime.datetime(
    day=1,
    month=1,
    year=2024
)
end_hist = datetime.datetime(
    day=31,
    month=12,
    year=2024
)
begin_recent = datetime.datetime(
    day=1,
    month=1,
    year=2025
)
end_recent = datetime.datetime(
    day=30,
    month=1,
    year=2025
)

In [None]:
user_path = "processed_data/processed_user.parquet"
item_path = "processed_data/processed_item.parquet"
purchase_path = "processed_data/processed_purchase.parquet"
base_path = "train_data/base.parquet"

user_df = pl.scan_parquet(user_path)
item_df = pl.scan_parquet(item_path)
purchase_df_hist = pl.scan_parquet(purchase_path).filter(pl.col('datetime').is_between(begin_hist, end_hist))
base_df = pl.scan_parquet(base_path)

# =============================================================================
# HYPER-PARAMETERS
# =============================================================================

In [3]:
# =============================================
# FEATURE 2: Recency Decay Score
# =============================================
BRAND_LAMBDA = 0.023
TYPE_LAMBDA = 0.015

# =============================================
# FEATURE 4: Item Popularity
# =============================================
POPULARITY_WINDOW_30D = 30
POPULARITY_WINDOW_90D = 90

# =============================================
# FEATURE 5: Baby Age Prediction
# =============================================
SIGMA_AGE_END_HIST = 4.0
SIGMA_AGE_MIDPOINT = 4.0

# =============================================
# FEATURE 6: Price Compatibility
# =============================================
PRICE_EPSILON = 1e-3
PRICE_CAPACITY_MULTIPLIER = 1.05

# =============================================
# FEATURE 8: Global Co-purchase
# =============================================
CO_COUNT_THRESHOLD = 9
AFFINITY_THRESHOLD = 0.1
RECENT_CONTEXT_DAYS = 90

# =============================================================================
# FEATURE 1: Tần suất mua món hàng (Customer-Item Frequency)
# =============================================================================

In [5]:
feat_item_frequency = (
    purchase_df_hist
    .group_by(['customer_id', 'item_id'])
    .agg(
        pl.len().alias('feat1_customer_item_freq')
    )
)

# =============================================================================
# FEATURE 2: Recency Decay Score (Suy hao theo thời gian)
# =============================================================================

In [6]:
# Lấy thông tin Brand và item_type cho Purchase DF (History)
purchase_enriched_df = purchase_df_hist.join(
    item_df.select(["item_id", "brand", "item_type"]),
    on="item_id",
    how="left"
)

# Cấp độ BRAND: Lần cuối user mua BẤT KỲ món nào của Brand này là khi nào?
last_brand_buy = purchase_enriched_df.group_by(["customer_id", "brand"]).agg(
    pl.col("date").max().alias("last_brand_date")
)

# Cấp độ ITEM_TYPE: Lần cuối user mua BẤT KỲ món nào của item_type này là khi nào?
last_type_buy = purchase_enriched_df.group_by(["customer_id", "item_type"]).agg(
    pl.col("date").max().alias("last_type_date")
)

# Tạo feature table với brand và item_type decay scores
# Cần join customer_id với item_id thông qua brand và item_type
feat_recency_decay = (
    base_df.select(["customer_id", "item_id"])
    .join(
        item_df.select(["item_id", "brand", "item_type"]),
        on="item_id",
        how="left"
    )
    .join(last_brand_buy, on=["customer_id", "brand"], how="left")
    .join(last_type_buy, on=["customer_id", "item_type"], how="left")
    .with_columns([
        # Tính số ngày trôi qua (Days Diff)
        (pl.lit(begin_recent) - pl.col("last_brand_date")).dt.total_days().alias("days_since_brand"),
        (pl.lit(begin_recent) - pl.col("last_type_date")).dt.total_days().alias("days_since_type"),
    ])
    .with_columns([
        # Score = e^(-lambda * t)
        # Nếu days_since_brand là null -> Score = 0

        # 1. BRAND MEMORY (Decay) (lambda = 0.023)
        pl.when(pl.col("days_since_brand").is_not_null())
        .then((-BRAND_LAMBDA * pl.col("days_since_brand")).exp())
        .otherwise(0.0).alias("feat2_brand_affinity"),

        # 2. TYPE MEMORY (Decay) (lambda = 0.01)
        # Lambda có thể nhỏ hơn Brand một chút vì nhu cầu Type (vd: Tã) bền vững hơn sở thích Brand
        pl.when(pl.col("days_since_type").is_not_null())
        .then((-TYPE_LAMBDA * pl.col("days_since_type")).exp()) 
        .otherwise(0.0).alias("feat2_type_affinity"),
    ])
    .select(["customer_id", "item_id", "feat2_brand_affinity", "feat2_type_affinity"])
)

# =============================================================================
# FEATURE 3: Window-Based Urgency (Độ khẩn cấp theo khung thời gian)
# =============================================================================

In [7]:
# 1. Tính toán thống kê cấp cá nhân (level 1)
user_level_stats = (
    purchase_df_hist
    .sort(["customer_id", "item_id", "datetime"])
    .group_by(["customer_id", "item_id"])
    .agg([
        pl.min("datetime").alias("first_buy_date"),
        pl.max("datetime").alias("last_buy_date"),
        pl.sum("quantity").alias("total_qty"),
        pl.last("quantity").alias("last_qty"),
        pl.len().alias("transaction_count")
    ])
    .with_columns([
        (pl.col("last_buy_date") - pl.col("first_buy_date")).dt.total_days().alias("duration_days"),
        # Tổng lượng hàng đã tiêu thụ (trừ lần mua cuối vì vừa mua nên chưa dùng)
        (pl.col("total_qty") - pl.col("last_qty")).alias("consumed_qty")
    ])
    .with_columns([
        # Mỗi sản phẩm tốn bao nhiêu ngày để tiêu thụ?
        # Chỉ tính nếu mua >= 2 lần và đã tiêu thụ (đã có pattern)
        pl.when((pl.col("transaction_count") > 1) & (pl.col("consumed_qty") > 0))
          .then(pl.col("duration_days") / pl.col("consumed_qty"))
          .otherwise(None)
          .alias("user_cycle")
    ])
)

# 2. Tính toán thống kê cấp sản phẩm (level 2)
item_level_stats = (
    user_level_stats
    .filter(pl.col("user_cycle").is_not_null())
    .group_by("item_id")
    .agg([
        pl.median("user_cycle").alias("item_global_cycle")
    ])
)

# 3. Tính toán thống kê cấp danh mục (level 3)
cat_level_stats = (
    user_level_stats
    .join(
        item_df.select(["item_id", "category_l1"]),
        on="item_id"
    )
    .filter(pl.col("user_cycle").is_not_null())
    .group_by("category_l1")
    .agg([
        pl.median("user_cycle").alias("cat_global_cycle")
    ])
)

# 4. Hợp nhất và tính urgency
# Tính trung điểm của cửa sổ
window_center = begin_recent + (end_recent - begin_recent)/2

feat_urgency = (
    user_level_stats
    .join(
        item_df.select(["item_id", "category_l1"]),
        on="item_id",
        how="left"
    )
    .join(
        item_level_stats,
        on="item_id",
        how="left",
    )
    .join(
        cat_level_stats,
        on="category_l1",
        how="left"
    )
    .with_columns([
        pl.coalesce([
            pl.col("user_cycle"),
            pl.col("item_global_cycle"),
            pl.col("cat_global_cycle"),
            pl.lit(20.0)
        ]).alias("estimated_cycle_per_unit")
    ])
    .with_columns([
        # Ngày dự kiến mua tiếp theo
        (pl.col("last_buy_date") + pl.duration(days=pl.col("estimated_cycle_per_unit")*pl.col("last_qty"))).alias("next_expected_date")
    ])
    .with_columns([
        # Bước 1: Tính khoảng cách tuyệt đối (Temporary column)
        (pl.col("next_expected_date") - pl.lit(window_center)).dt.total_days().abs().alias("temp_dist_abs"),
        
        # Có rơi vào khoảng (begin_recent, end_recent) hay không
        pl.when(
            pl.col("next_expected_date").is_between(begin_recent, end_recent)
        ).then(1).otherwise(0).alias("feat3_is_in_window")
    ])
    .with_columns([
        # Bước 2: ĐẢO NGƯỢC THÀNH SCORE (Càng gần càng cao)
        # Công thức: 1 / (Distance + 1)
        # Distance = 0 -> Score = 1.0 (Rất gấp)
        # Distance = 30 -> Score = 0.03 (Không gấp)
        (1.0 / (pl.col("temp_dist_abs") + 1.0)).alias("feat3_urgency_score")
    ])
    .select(["customer_id", "item_id", "feat3_urgency_score", "feat3_is_in_window"])
)

# =============================================================================
# FEATURE 4: Item Popularity
# =============================================================================

In [8]:
# 1. Xác định khung thời gian
cutoff_30d = end_hist - timedelta(days=POPULARITY_WINDOW_30D)
cutoff_90d = end_hist - timedelta(days=POPULARITY_WINDOW_90D)

# 2. Tính toán chỉ số thô từ lịch sử giao dịch
item_raw_stats = (
    purchase_df_hist
    .group_by("item_id")
    .agg([
        pl.col("quantity").sum().alias("pop_qty"),
        pl.col("customer_id").n_unique().alias("pop_user"),

        # Sức mua 30 ngày gần nhất
        pl.col("quantity")
        .filter(pl.col("datetime") >= cutoff_30d)
        .sum().fill_null(0).alias("pop_30d"),

        # Sức mua 90 ngày gần nhất
        pl.col("quantity")
        .filter(pl.col("datetime") >= cutoff_90d)
        .sum().fill_null(0).alias("pop_90d"),
    ])
)

# 3. Join với bảng Item để lấy thông tin category_l1
item_with_cat = item_raw_stats.join(
    item_df.select(["item_id", "category_l1"]),
    on="item_id",
    how="left"
)

# 4. Tính toán các features
feat_item_popularity = (
    item_with_cat
    .with_columns([
        # So sánh 30 ngày gần nhất với trung bình 3 tháng qua (trend)
        (pl.col("pop_30d") / ((pl.col("pop_90d") / 3) + 1e-6)).alias("feat4_pop_trend"),

        # Xếp hạng so với toàn bộ sản phẩm
        (
            pl.col("pop_user").rank(method="dense", descending=False)
            / pl.col("pop_user").len()
        ).alias("feat4_pop_global_rank"),

        # Xếp hạng so với các món hàng cùng category_l1
        (
            pl.col("pop_user")
            .rank(method="dense", descending=False)
            .over("category_l1") # partition theo category_l1
            / pl.col("item_id").count().over("category_l1")
        ).alias("feat4_pop_category_rank"),

        # Chuẩn hóa
        pl.col("pop_30d").log1p().alias("feat4_pop_30d_log"),
    ])
    .select([
        "item_id",
        "feat4_pop_30d_log",
        "feat4_pop_trend",
        "feat4_pop_category_rank",
        "feat4_pop_global_rank",
    ])
)

# =============================================================================
# FEATURE 5: Baby Age Prediction & Age Alignment Score
# =============================================================================

In [9]:
# --- 1. CẤU HÌNH ---
PREDICTION_WINDOW_DAYS = (end_recent - begin_recent).days
MIDPOINT_OFFSET = PREDICTION_WINDOW_DAYS // 2
SIGMA = 6.0 

regex_step1 = r"step\s*1"
regex_mom = r"\bmom\b"

# --- 2. HÀM TÁCH AGE MIN - MAX ---
def extract_age_range_expr(col_name):
    # Trích xuất list số tháng
    months_list = (
        pl.col(col_name).str.to_lowercase()
        .str.extract_all(r"(\d+)\s*([ym]?)")
        .list.eval(
            pl.element()
            .str.replace("y", "*12").str.replace("m", "*1")
            .str.replace_all(r"^(\d+)$", r"${1}*1")
        )
        .list.eval(pl.element().str.split("*"))
        .list.eval(
            pl.element().list.get(0).cast(pl.Float64) * pl.element().list.get(1).cast(pl.Float64)
        )
    )
    
    # Trả về Struct min/max
    # Nếu là Mẹ/Bầu -> Gán 0.0 cho cả min và max
    return (
        pl.when(pl.col(col_name).str.to_lowercase().str.contains("mẹ|bầu"))
        .then(
            pl.struct(min_month=pl.lit(0.0), max_month=pl.lit(0.0))
        )
        .otherwise(
            pl.struct(
                min_month=months_list.list.min(),
                max_month=months_list.list.max()
            )
        )
    )

item_convert_df = item_df.with_columns(
    extract_age_range_expr("age_group").alias("age_range")
).unnest("age_range") # Bung ra thành cột min_month, max_month

# --- 3. DỰ ĐOÁN TUỔI EM BÉ THEO min_month ---
merged_df = (
    purchase_df_hist
    .select(["customer_id", "item_id", "datetime"])
    .join(
        item_convert_df.select(["item_id", "category", "category_l1", "min_month"]), 
        on="item_id", 
        how="inner"
    )
    .with_columns(
        (pl.col("datetime") - pl.duration(days=pl.col("min_month").fill_null(0)*30))
        .alias("estimated_birthday")
    )
)

user_age_features = merged_df.group_by("customer_id").agg([
    # Ngày mua step 1 đầu tiên
    pl.col("datetime").filter(
        pl.col("category").str.to_lowercase().str.contains(regex_step1),
        pl.col("category_l1").str.to_lowercase().str.contains("sữa")
    ).min().alias("first_date_buy_step1"),
    
    # Ngày mua sữa bầu cuối cùng
    pl.col("datetime").filter(
        pl.col("category").str.to_lowercase().str.contains(regex_mom),
        pl.col("category_l1").str.to_lowercase().str.contains("sữa")
    ).max().alias("last_date_buy_milk4mom"),

    # Median ngày sinh từ tất cả sản phẩm đã mua (có age_group)
    pl.col("estimated_birthday").median().alias("estimated_birthday_median")
])

# Tính tuổi tương lai (kể từ end_hist)
user_age_predicted = (
    user_age_features
    .with_columns([
        # Mua step 1 lần đầu là lúc mới sinh
        ((pl.lit(end_hist) - pl.col("first_date_buy_step1")).dt.total_days() / 30.0)
        .alias("age_by_step1"),
        
        # Mua sữa bầu lần cuối là lúc sinh
        ((pl.lit(end_hist) - pl.col("last_date_buy_milk4mom")).dt.total_days() / 30.0)
        .alias("age_by_milk4mom"),

        # Tuổi theo sinh nhật dự kiến
        ((pl.lit(end_hist) - pl.col("estimated_birthday_median")).dt.total_days() / 30.0)
        .alias("age_by_est"),
    ])
    .with_columns([
        pl.coalesce([
            pl.col("age_by_milk4mom"),
            pl.col("age_by_step1"),
            pl.col("age_by_est"),
            pl.lit(-1.0)
        ]).alias("predicted_baby_age")
    ])
    .select(["customer_id", "predicted_baby_age"])
)

In [10]:
# --- 4. TÍNH SCORE (SO SÁNH TUỔI VỚI KHOẢNG [MIN, MAX]) ---

# Hàm tính score
def calculate_range_score(user_age_col, item_min_col, item_max_col, sigma):
    return pl.when(
        (user_age_col >= pl.col(item_min_col)) & 
        (user_age_col <= pl.col(item_max_col))
    ).then(1.0).otherwise(
        np.exp(
            -(
                pl.min_horizontal([
                    (user_age_col - pl.col(item_min_col)).abs(),
                    (user_age_col - pl.col(item_max_col)).abs()
                ]) ** 2
            ) / (2 * sigma**2)
        )
    )

# Không biết chắc tuổi em bé HIỆN TẠI là bao nhiêu nên lấy lúc end_hist và sau đó tại midpoint
OFFSET_IN_MONTHS = MIDPOINT_OFFSET / 30.0

feat_baby_age = (
    base_df.select(["customer_id", "item_id"])
    .join(
        item_convert_df.select(["item_id", "min_month", "max_month"]),
        on="item_id",
        how="left"
    )
    .join(
        user_age_predicted,
        on="customer_id",
        how="left"
    )
    .with_columns([
        # 1. SCORE TẠI END_HIST: Dùng đúng tuổi gốc (feat_baby_age_predicted)
        calculate_range_score(
            user_age_col=pl.col("predicted_baby_age"), 
            item_min_col="min_month", 
            item_max_col="max_month", 
            sigma=SIGMA_AGE_END_HIST
        ).alias("feat5_score_age_end_hist"),

        # 2. SCORE TẠI MIDPOINT: Lấy tuổi gốc + Offset
        calculate_range_score(
            user_age_col=(pl.col("predicted_baby_age") + OFFSET_IN_MONTHS), 
            item_min_col="min_month", 
            item_max_col="max_month", 
            sigma=SIGMA_AGE_MIDPOINT
        ).alias("feat5_score_age_midpoint")
    ])
    .select(["customer_id", "item_id", "feat5_score_age_end_hist", "feat5_score_age_midpoint"])
)

# =============================================================================
# FEATURE 6: Price Compatibility
# =============================================================================

In [11]:
# Thống kê chi tiêu của user
user_spending_features = purchase_df_hist.group_by("customer_id").agg([
    pl.col("price").mean().alias("user_avg_spend"),
    pl.col("price").std().alias("user_std_spend"),
    pl.col("price").max().alias("user_max_spend")
])

# Thống kê giá theo category
category_price_stats = item_df.group_by("category_l1").agg([
    pl.col("price").mean().alias("cat_avg_price"),
    pl.col("price").std().alias("cat_price_std")
])

# Tính vị thế giá của item trong category
item_price_features = (
    item_df.select(["item_id", "price", "category_l1"])
    .join(
        category_price_stats,
        on="category_l1",
        how="left"
    )
    .with_columns([
        # Feature vị thế giá trong category (giá đó là rẻ hay mắc theo từng category)
        ((pl.col("price") - pl.col("cat_avg_price")) / (pl.col("cat_price_std") + PRICE_EPSILON))
        .alias("item_price_rel_score")
    ])
    .select(["item_id", "price", "item_price_rel_score"])
)

# Tính global stats để fill null
stats = purchase_df_hist.select([
    pl.col("price").mean().alias("mean"),
    pl.col("price").std().alias("std")
]).collect()
global_avg = stats["mean"][0]
global_std = stats["std"][0]

In [12]:
# Tạo feature table price compatibility
feat_price_compatibility = (
    base_df.select(["customer_id", "item_id"])
    .join(
        item_price_features,
        on="item_id",
        how="left"
    )
    .join(
        user_spending_features,
        on="customer_id",
        how="left"
    )
    .with_columns([
        pl.col("user_avg_spend").fill_null(global_avg),
        pl.col("user_std_spend").fill_null(global_std),
        pl.col("user_max_spend").fill_null(global_avg + global_std),
        pl.col("item_price_rel_score").fill_null(0)
    ])
    .with_columns([
        # Price compatibility: Giá sản phẩm so với khả năng chi tiêu của user
        ((pl.col("price") - pl.col("user_avg_spend")) / (pl.col("user_std_spend") + PRICE_EPSILON))
        .alias("feat6_price_compatibility"),

        # Flag: Vượt quá khả năng chi tiêu của user
        (pl.col("price") > (pl.col("user_max_spend") * PRICE_CAPACITY_MULTIPLIER)).cast(pl.Int8)
        .alias("feat6_is_above_user_capacity")
    ])
    .select(["customer_id", "item_id", "feat6_price_compatibility", "feat6_is_above_user_capacity"])
)

# =============================================================================
# FEATURE 7: Brand Loyalty
# =============================================================================

In [13]:
joined_brand_df = purchase_df_hist.join(
    item_df.select(["item_id", "brand"]),
    on="item_id",
)

# Độ thân thuộc giữa brand và user
user_brand_affinity = (
    joined_brand_df
    .group_by(["customer_id", "brand"])
    .agg(
        pl.len().alias("count_user_buy_brand")
    )
    .with_columns([
        # Tổng số lần mua hàng của từng user
        pl.col("count_user_buy_brand").sum().over("customer_id").alias("total_transaction_user")
    ])
    .with_columns([
        # Tỉ lệ thân thuộc, yêu thích
        (pl.col("count_user_buy_brand") / pl.col("total_transaction_user")).alias("feat7_user_brand_affinity")
    ])
    .select(["customer_id", "brand", "feat7_user_brand_affinity"])
)

# Độ xịn của brand (độ hot, phổ biến, tỉ lệ giữ chân)
brand_global_stats = (
    joined_brand_df
    .group_by("brand")
    .agg([
        pl.col("customer_id").n_unique().alias("total_unique_users"),
        pl.col("customer_id")
        .filter(pl.col("customer_id").is_duplicated())
        .n_unique()
        .alias("total_retained_users"),

        pl.len().alias("total_transaction_brand")
    ])
    .with_columns([
        # Tỉ lệ giữ chân
        (pl.col("total_retained_users") / pl.col("total_unique_users"))
        .fill_nan(0).alias("feat7_brand_repeat_rate"),

        # Xếp hạng brand theo lượt mua
        (pl.col("total_transaction_brand").rank(method="dense", descending=True))
        .alias("feat7_brand_rank")
    ])
    .select(["brand", "feat7_brand_repeat_rate", "feat7_brand_rank", "total_unique_users"])
)

In [14]:
feat_brand_loyalty = (
    base_df.select(["customer_id", "item_id"])
    .join(
        item_df.select(["item_id", "brand"]),
        on="item_id",
        how="left"
    )
    .join(
        brand_global_stats,
        on="brand",
        how="left"
    )
    .join(
        user_brand_affinity,
        on=["customer_id", "brand"],
        how="left"
    )
    .with_columns([
        # 1. Fill null cho các chỉ số cơ bản
        pl.col("feat7_brand_repeat_rate").fill_null(0),
        pl.col("feat7_user_brand_affinity").fill_null(0),
        
        # 2. Xử lý Rank: Fill 999 cho Null (Coi như rank tệ nhất)
        pl.col("feat7_brand_rank").fill_null(999) 
    ])
    .with_columns([
        # 3. TẠO CỘT MỚI: RANK SCORE (NGHỊCH ĐẢO)
        # Công thức: 1 / (Rank)
        # Rank 1 (Top 1) --> 1.0 (Điểm tối đa)
        # Rank 10 --> 0.1
        # Rank 977 --> 0.001 (Rất nhỏ)
        # Rank 999 (Null) --> 0.001 (Rất nhỏ)
        (1.0 / pl.col("feat7_brand_rank")).alias("feat7_brand_rank_inv")
    ])
    .select([
        "customer_id", 
        "item_id", 
        "feat7_brand_repeat_rate", 
        "feat7_brand_rank_inv",      # Dùng cột mới này
        "feat7_user_brand_affinity"
    ])
)

# =============================================================================
# FEATURE 8: Global Co-purchase (Luật kết hợp toàn cục)
# =============================================================================

In [15]:
# Tạo "Giỏ hàng" (Basket): Gom user + date
# Dùng date thay vì datetime vì một ngày dù một user mua nhiều lần cũng có thể coi là mua một phiên -> giảm phức tạp
baskets = (
    purchase_df_hist
    .select(["customer_id", "date", "item_id"])
    .unique() # Tránh trùng (Ồ tôi lỡ mua thiếu nên tôi đặt thêm)
)

# Đếm số lần xuất hiện của từng item (Support)
item_support = (
    baskets
    .group_by("item_id")
    .agg(pl.len().alias("count_source"))
)


# Tìm từ điển luật kết hợp
# Self-join để tìm các cặp đi cùng nhau (Co-occurrence)
global_rules = (
    baskets
    .join(baskets, on=["customer_id", "date"], suffix="_target")
    .filter(pl.col("item_id") != pl.col("item_id_target")) # Loại bỏ tự kết hợp A-A
    .group_by(["item_id", "item_id_target"])
    .agg(
        pl.len().alias("co_count")
    )
    # LỌC NHIỄU: Chỉ giữ các quy luật xuất hiện chung phổ biến
    .filter(pl.col("co_count") >= CO_COUNT_THRESHOLD) 
)

# Tính Score: P(Target | Source) = count(A&B) / count(A)
final_global_rules = (
    global_rules
    .join(item_support, on="item_id")
    .with_columns(
        (pl.col("co_count") / pl.col("count_source")).alias("affinity_score")
    )
    # Chỉ giữ lại các score có ý nghĩa để giảm kích thước bảng lookup
    .filter(pl.col("affinity_score") >= AFFINITY_THRESHOLD)
    .select(["item_id", "item_id_target", "affinity_score"])
)

# Lấy "bối cảnh" của User: Trong những ngày gần đây User đã mua những gì?
# Đây là input để tra cứu vào bảng luật bên trên
user_recent_context = (
    purchase_df_hist
    .filter(pl.col("datetime") >= (end_hist - timedelta(days=RECENT_CONTEXT_DAYS)))
    .select(["customer_id", "item_id"])
    .unique()
    .rename({"item_id": "item_source"}) # Đổi tên để tránh nhầm với item đích
)

feat_co_purchase = (
    base_df.select(["customer_id", "item_id"])
    # 2.1: Mở rộng User ra thành danh sách các món họ đã mua gần đây (Source)
    .join(user_recent_context, on="customer_id", how="inner")
    # 2.2: So khớp Source -> Target bằng bảng luật Global
    .join(
        final_global_rules,
        left_on=["item_source", "item_id"],    # item_source khớp lịch sử, item_id khớp target
        right_on=["item_id", "item_id_target"], 
        how="inner"
    )
    
    # 2.3: Tổng hợp điểm số
    # Ví dụ: User đã mua A, B. Cả A và B đều gợi ý ra Target T.
    # Score = Max(Score(A->T), Score(B->T)) hoặc Sum
    .group_by(["customer_id", "item_id"])
    .agg([
        pl.max("affinity_score").alias("feat8_co_purchase_max"), # Mối quan hệ mạnh nhất
        pl.sum("affinity_score").alias("feat8_co_purchase_sum"), # Tổng cường độ quan hệ
        pl.len().alias("feat8_co_purchase_count")                # Bao nhiêu món trong lịch sử ủng hộ target này
    ])
)

# =============================================================================
# JOIN TẤT CẢ FEATURES VÀO BASE_DF
# =============================================================================

In [16]:
# Join tất cả features vào base_df theo thứ tự
base_df = (
    base_df
    # Feature 1: Item Frequency
    .join(
        feat_item_frequency,
        on=['customer_id', 'item_id'],
        how='left'
    )
    .with_columns(
        pl.col('feat1_customer_item_freq').fill_null(0)
    )
    
    # Feature 2: Recency Decay (Brand & Type Affinity)
    .join(
        feat_recency_decay,
        on=['customer_id', 'item_id'],
        how='left'
    )
    .with_columns([
        pl.col('feat2_brand_affinity').fill_null(0),
        pl.col('feat2_type_affinity').fill_null(0)
    ])
    
    # Feature 3: Urgency (Window-Based)
    .join(
        feat_urgency,
        on=['customer_id', 'item_id'],
        how='left'
    )
    .with_columns([
        # FILL NULL BẰNG 0 (Thay vì 999)
        # Nghĩa là: Không có lịch sử mua -> Urgency Score = 0 (Thấp nhất)
        pl.col('feat3_urgency_score').fill_null(0.0), 
        
        # is_in_window thì vẫn fill 0 như cũ
        pl.col('feat3_is_in_window').fill_null(0)
    ])
    
    # Feature 4: Item Popularity
    .join(
        feat_item_popularity,
        on='item_id',
        how='left'
    )
    .with_columns([
        pl.col('feat4_pop_30d_log').fill_null(0),
        pl.col('feat4_pop_trend').fill_null(0),
        pl.col('feat4_pop_category_rank').fill_null(0.5),
        pl.col('feat4_pop_global_rank').fill_null(0.5)
    ])
    
    # Feature 5: Baby Age Alignment
    .join(
        feat_baby_age,
        on=['customer_id', 'item_id'],
        how='left'
    )
    .with_columns([
        pl.col('feat5_score_age_end_hist').fill_null(0),
        pl.col('feat5_score_age_midpoint').fill_null(0)
    ])
    
    # Feature 6: Price Compatibility
    .join(
        feat_price_compatibility,
        on=['customer_id', 'item_id'],
        how='left'
    )
    .with_columns([
        pl.col('feat6_price_compatibility').fill_null(0),
        pl.col('feat6_is_above_user_capacity').fill_null(0)
    ])
    
    # Feature 7: Brand Loyalty
    .join(
        feat_brand_loyalty,
        on=['customer_id', 'item_id'],
        how='left'
    )
    .with_columns([
        pl.col('feat7_brand_repeat_rate').fill_null(0),
        
        # Fill null cho cột nghịch đảo bằng 0 (Thấp nhất)
        pl.col('feat7_brand_rank_inv').fill_null(0.0), 
        
        pl.col('feat7_user_brand_affinity').fill_null(0)
    ])

    # Feature 8: Co-purchase
    .join(
        feat_co_purchase,
        on=['customer_id', 'item_id'],
        how='left'
    )
    .with_columns(
        pl.col('feat8_co_purchase_max').fill_null(0),
        pl.col('feat8_co_purchase_sum').fill_null(0),
        pl.col('feat8_co_purchase_count').fill_null(0),
    )
)

print("✅ Base DataFrame với tất cả features đã sẵn sàng!")
print(f"Columns: {base_df.collect_schema().names()}")

✅ Base DataFrame với tất cả features đã sẵn sàng!
Columns: ['customer_id', 'item_id', 'Y', 'feat1_customer_item_freq', 'feat2_brand_affinity', 'feat2_type_affinity', 'feat3_urgency_score', 'feat3_is_in_window', 'feat4_pop_30d_log', 'feat4_pop_trend', 'feat4_pop_category_rank', 'feat4_pop_global_rank', 'feat5_score_age_end_hist', 'feat5_score_age_midpoint', 'feat6_price_compatibility', 'feat6_is_above_user_capacity', 'feat7_brand_repeat_rate', 'feat7_brand_rank_inv', 'feat7_user_brand_affinity', 'feat8_co_purchase_max', 'feat8_co_purchase_sum', 'feat8_co_purchase_count']


# =============================================================================
# COLLECT & VERIFY
# =============================================================================

In [None]:
base_df.sink_parquet(f"train_data/train.parquet")