# 01_train — End-to-end Promotion Recommendation Training

This notebook trains a two-stage system:
- Stage A: Multiclass classifier P(promo_type | basket features)
- Stage B: Per-promotion ranker using candidates from `promotions.csv`

It saves all artifacts (models, preprocessors, encoders, feature lists, manifest) under `models/artifacts/` per `config/training_config.yaml`.


In [1]:
# Imports & setup
import os
import json
import hashlib
import warnings
from datetime import datetime
from pathlib import Path

import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.model_selection import GroupKFold
from sklearn.metrics import f1_score, log_loss

import lightgbm as lgb
import joblib
import yaml

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

# Load config
with open('../config/training_config.yaml', 'r', encoding='utf-8') as f:
    CFG = yaml.safe_load(f)

# Fix relative paths in config (since we're running from notebooks/)
PROJECT_ROOT = Path('../').resolve()
for key in CFG['paths']:
    path_value = CFG['paths'][key]
    if not Path(path_value).is_absolute():
        CFG['paths'][key] = str(PROJECT_ROOT / path_value)

os.makedirs(CFG['paths']['artifacts_root'], exist_ok=True)
os.makedirs(CFG['paths']['models_dir'], exist_ok=True)
os.makedirs(CFG['paths']['preprocessors_dir'], exist_ok=True)
os.makedirs(CFG['paths']['metadata_dir'], exist_ok=True)
os.makedirs(CFG['paths']['reports_dir'], exist_ok=True)

print('Config loaded.')


Config loaded.


In [2]:
# Utilities

def sha256_of_file(path: str) -> str:
    h = hashlib.sha256()
    with open(path, 'rb') as f:
        for chunk in iter(lambda: f.read(8192), b''):
            h.update(chunk)
    return h.hexdigest()


def ensure_datetime(df: pd.DataFrame, col: str) -> pd.Series:
    return pd.to_datetime(df[col], errors='coerce')


def compute_basic_basket_aggregates(tx: pd.DataFrame, cfg) -> pd.DataFrame:
    # Expect transactional rows per item; aggregate to basket if needed
    c = cfg['column_map_tx']
    # If amount_total/qty_total absent, compute from price*qty and qty
    if ('amount_total' not in tx.columns) and (c.get('amount_total') not in tx.columns):
        tx['__amount'] = tx[c['price']] * tx[c['qty']]
    else:
        tx['__amount'] = tx.get(c.get('amount_total'), tx.get('amount_total', np.nan))
    if ('qty_total' not in tx.columns) and (c.get('qty_total') not in tx.columns):
        tx['__qty'] = tx[c['qty']]
    else:
        tx['__qty'] = tx.get(c.get('qty_total'), tx.get('qty_total', np.nan))

    agg = tx.groupby(c['transaction_id'], as_index=False).agg({
        c['user_id']: 'first',
        c['store_id']: 'first',
        c['timestamp']: 'first',
        c['zone']: 'first',
        c['province']: 'first',
        c['profile']: 'first',
        c['is_online']: 'first',
        '__amount': 'sum',
        '__qty': 'sum'
    })
    agg = agg.rename(columns={'__amount': 'amount_total', '__qty': 'qty_total'})
    return agg


def add_time_features(df: pd.DataFrame, time_col: str) -> pd.DataFrame:
    ts = pd.to_datetime(df[time_col], errors='coerce')
    df['hour'] = ts.dt.hour
    df['dayofweek'] = ts.dt.dayofweek
    df['month'] = ts.dt.month
    df['is_weekend'] = ((df['dayofweek'] >= 5).astype(int))
    return df


def fit_need_state(basket_df: pd.DataFrame, feature_cols: list, cfg):
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(basket_df[feature_cols].fillna(0.0))
    # Adaptive PCA: use min of (configured components, actual features available)
    n_components = min(cfg['need_state']['pca_components'], len(feature_cols))
    pca = PCA(n_components=n_components, random_state=RANDOM_SEED)
    X_pca = pca.fit_transform(X_scaled)
    kmeans = KMeans(n_clusters=cfg['need_state']['kmeans_k'], random_state=RANDOM_SEED, n_init=10)
    clusters = kmeans.fit_predict(X_pca)
    return scaler, pca, kmeans, X_pca, clusters


def transform_need_state(basket_df: pd.DataFrame, feature_cols: list, scaler, pca, kmeans):
    X_scaled = scaler.transform(basket_df[feature_cols].fillna(0.0))
    X_pca = pca.transform(X_scaled)
    clusters = kmeans.predict(X_pca)
    ns = pd.DataFrame(X_pca, index=basket_df.index, columns=[f'ns_pca_{i+1}' for i in range(X_pca.shape[1])])
    ns['ns_cluster'] = clusters
    return ns


def build_ptype_frame(basket_df: pd.DataFrame, ns_df: pd.DataFrame, cfg):
    # Assemble features for ptype model
    feat_cols = []
    # Add time features
    for col in cfg['features']['ptype']['time']:
        if col in basket_df.columns:
            feat_cols.append(col)
    # Store
    for col in cfg['features']['ptype']['store']:
        if col in basket_df.columns:
            feat_cols.append(col)
    # Channel
    for col in cfg['features']['ptype']['channel']:
        if col in basket_df.columns:
            feat_cols.append(col)
    # Basket
    for col in cfg['features']['ptype']['basket']:
        if col in basket_df.columns:
            feat_cols.append(col)
    # Need state
    ns_cols = [c for c in ns_df.columns if c.startswith('ns_pca_')] + ['ns_cluster']
    feat_cols += ns_cols

    X_ptype = pd.concat([basket_df[feat_cols[:len(feat_cols)-len(ns_cols)]], ns_df[ns_cols]], axis=1)
    return X_ptype, feat_cols


def infer_ptype_list(y_series: pd.Series, cfg):
    if cfg.get('ptype_list'):
        return cfg['ptype_list']
    vals = list(pd.Series(y_series).fillna('NoPromo').astype(str).value_counts().index)
    if 'NoPromo' not in vals:
        vals.append('NoPromo')
    return vals


def topk_accuracy(probs: np.ndarray, y_true: np.ndarray, classes: list, k: int) -> float:
    class_to_idx = {c: i for i, c in enumerate(classes)}
    correct = 0
    for i in range(len(y_true)):
        row = probs[i]
        order = np.argsort(row)[::-1]
        topk = [classes[j] for j in order[:k]]
        correct += int(y_true[i] in topk)
    return correct / max(1, len(y_true))



In [3]:
# 3.1 Load data & validate / map
c_tx = CFG['column_map_tx']
c_pr = CFG['column_map_promo']

# Read raw transactional rows (per item)
txraw = pd.read_csv(CFG['paths']['data_tx'])
# Ensure key columns exist
for k in ['transaction_id','timestamp','user_id','store_id','is_online']:
    if c_tx[k] not in txraw.columns:
        raise ValueError(f"Missing required tx column: {c_tx[k]}")

# Parse datetime
txraw[c_tx['timestamp']] = pd.to_datetime(txraw[c_tx['timestamp']], errors='coerce')

# Read promotions
promos = pd.read_csv(CFG['paths']['data_promos'])
# Parse promo dates if present
for dcol in ['start_date','end_date']:
    if c_pr.get(dcol) and c_pr[dcol] in promos.columns:
        promos[c_pr[dcol]] = pd.to_datetime(promos[c_pr[dcol]], errors='coerce')

# Basic schema assertions
assert c_pr['promo_id'] in promos.columns, 'promotions: promo_id missing'
assert c_pr['promo_type'] in promos.columns, 'promotions: promo_type missing'

print('Loaded:', txraw.shape, promos.shape)

# Aggregate to basket-level features
basket = compute_basic_basket_aggregates(txraw, CFG)

# Product set per basket for scope matching
prod_col = c_tx.get('product_id', 'product_id')
if prod_col in txraw.columns:
    prod_by_tx = (txraw.groupby(c_tx['transaction_id'])[prod_col]
                  .apply(lambda s: set(s.astype(str)))
                  .rename('basket_products'))
    basket = basket.merge(prod_by_tx, left_on=c_tx['transaction_id'], right_index=True, how='left')
else:
    basket['basket_products'] = [set()]*len(basket)

# Derive used_type from promo_id_used if available, else fallback to used_type
if c_tx.get('promo_id_used') and c_tx['promo_id_used'] in txraw.columns:
    used = (txraw[[c_tx['transaction_id'], c_tx['promo_id_used']]]
            .dropna().drop_duplicates(c_tx['transaction_id']))
    used = used.merge(promos[[c_pr['promo_id'], c_pr['promo_type']]], left_on=c_tx['promo_id_used'], right_on=c_pr['promo_id'], how='left')
    used = used[[c_tx['transaction_id'], c_pr['promo_type']]].rename(columns={c_pr['promo_type']:'used_type'})
    basket = basket.merge(used, on=c_tx['transaction_id'], how='left')
elif c_tx.get('used_type') and c_tx['used_type'] in txraw.columns:
    u = (txraw[[c_tx['transaction_id'], c_tx['used_type']]]
         .dropna().drop_duplicates(c_tx['transaction_id']))
    u = u.rename(columns={c_tx['used_type']:'used_type'})
    basket = basket.merge(u, on=c_tx['transaction_id'], how='left')
else:
    basket['used_type'] = np.nan

basket['used_type'] = basket['used_type'].fillna('NoPromo').astype(str)

# Time features
basket = add_time_features(basket, c_tx['timestamp'])

print('Basket shape:', basket.shape)
basket.head()


Loaded: (19178, 39) (100, 6)


  promos[c_pr[dcol]] = pd.to_datetime(promos[c_pr[dcol]], errors='coerce')
  promos[c_pr[dcol]] = pd.to_datetime(promos[c_pr[dcol]], errors='coerce')


Basket shape: (19178, 16)


Unnamed: 0,transaction_id,user_id,store_id,timestamp,stores.zone,stores.province,stores.profile,is_online,amount_total,qty_total,basket_products,used_type,hour,dayofweek,month,is_weekend
0,PMTX0000001,U0001,S028,2025-07-28 09:14:00,6,เชียงใหม่,Office,False,1330,5,{P0650},Buy 1 get 1,9,0,7,0
1,PMTX0000002,U0001,S023,2025-09-18 02:11:00,4,สุราษฎร์ธานี,Gas Station,False,348,4,{P0269},Brandday,2,3,9,0
2,PMTX0000003,U0001,S029,2025-08-19 23:34:00,8,กรุงเทพฯ,Hospital,False,516,3,{P0259},Brandday,23,1,8,0
3,PMTX0000004,U0001,S049,2025-09-11 14:48:00,8,สุราษฎร์ธานี,Factory,False,1945,5,{P0812},Flash Sale,14,3,9,0
4,PMTX0000005,U0001,S012,2025-08-22 21:42:00,9,กรุงเทพฯ,Gas Station,False,744,3,{P0079},Buy 1 get 1,21,4,8,0


In [4]:
# 3.2 Need-state + X_ptype
need_features = ['amount_total','qty_total']
# Optionally add basket signals if present
for opt in ['expected_basket_items','loyalty_score','price_elasticity']:
    if opt in basket.columns:
        need_features.append(opt)

scaler, pca, kmeans, X_pca, clusters = fit_need_state(basket, need_features, CFG)
ns_df = transform_need_state(basket, need_features, scaler, pca, kmeans)

# Save preprocessors
joblib.dump(scaler, os.path.join(CFG['paths']['preprocessors_dir'], 'scaler_need.pkl'))
joblib.dump(pca, os.path.join(CFG['paths']['preprocessors_dir'], 'pca_need.pkl'))
joblib.dump(kmeans, os.path.join(CFG['paths']['preprocessors_dir'], 'kmeans_need.pkl'))

X_ptype, ptype_featcols = build_ptype_frame(basket, ns_df, CFG)
y_ptype = basket['used_type'].values

ptype_classes = infer_ptype_list(basket['used_type'], CFG)
print('ptype feature cols:', len(ptype_featcols))
print('ptype classes:', ptype_classes)


ptype feature cols: 10
ptype classes: ['NoPromo', 'Product_Coupon', 'Mega Sale', 'Flash Sale', 'Buy 1 get 1', 'Brandday']


In [5]:
# 3.3 Train Stage-A: P(type|X) using LightGBM
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report

le = LabelEncoder()
le.fit(ptype_classes)

# Align labels to encoder
y_enc = le.transform(pd.Categorical(y_ptype, categories=ptype_classes, ordered=False).fillna('NoPromo').astype(str))

params = CFG['ptype_model']['params']

# Use LightGBM for classification
clf = lgb.LGBMClassifier(random_state=RANDOM_SEED, **params)
clf.fit(X_ptype, y_enc)
y_proba = clf.predict_proba(X_ptype)

# Metrics (train-only quick check)
macro_f1 = f1_score(y_enc, np.argmax(y_proba, axis=1), average='macro')
ll = log_loss(y_enc, y_proba)
acc_topk = topk_accuracy(y_proba, le.inverse_transform(y_enc), list(le.classes_), CFG['recall']['topk_types'])

print('Stage-A macro-F1:', macro_f1)
print('Stage-A logloss:', ll)
print('Stage-A top-{} acc:'.format(CFG['recall']['topk_types']), acc_topk)

# Save model & metadata
joblib.dump(clf, os.path.join(CFG['paths']['models_dir'], 'ptype_model.pkl'))
with open(os.path.join(CFG['paths']['metadata_dir'], 'ptype_classes.json'), 'w', encoding='utf-8') as f:
    json.dump(list(le.classes_), f, ensure_ascii=False, indent=2)
with open(os.path.join(CFG['paths']['metadata_dir'], 'ptype_featcols.json'), 'w', encoding='utf-8') as f:
    json.dump(list(X_ptype.columns), f, ensure_ascii=False, indent=2)

encoders_meta = {
    'ptype_label_encoder': 'sklearn.preprocessing.LabelEncoder',
}
with open(os.path.join(CFG['paths']['metadata_dir'], 'encoders.json'), 'w', encoding='utf-8') as f:
    json.dump(encoders_meta, f, ensure_ascii=False, indent=2)

# Keep proba for candidate recall
basket['ptype_probs'] = list(y_proba)
basket['ptype_probs_by_type'] = basket['ptype_probs'].apply(lambda arr: {cls: float(arr[i]) for i, cls in enumerate(le.classes_)})


[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.000501 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 817
[LightGBM] [Info] Number of data points in the train set: 19178, number of used features: 9
[LightGBM] [Info] Start training from score -3.134087
[LightGBM] [Info] Start training from score -3.005057
[LightGBM] [Info] Start training from score -2.941835
[LightGBM] [Info] Start training from score -2.920329
[LightGBM] [Info] Start training from score -0.307447
[LightGBM] [Info] Start training from score -2.734628
Stage-A macro-F1: 0.5492203094521679
Stage-A logloss: 0.4646154344147693
Stage-A top-3 acc: 0.9948378350192929


In [6]:
# 3.4 Candidate recall functions

def apply_store_scope_filter(cand_df: pd.DataFrame, basket_row: pd.Series) -> pd.DataFrame:
    # Best-effort: expect optional store_scope column with comma-separated ids
    scope_col = CFG['column_map_promo'].get('store_scope')
    if scope_col and (scope_col in cand_df.columns):
        def ok(rec):
            val = rec.get(scope_col)
            if pd.isna(val):
                return True
            try:
                allowed = set(str(val).split(','))
                return str(basket_row[CFG['column_map_tx']['store_id']]) in allowed
            except Exception:
                return True
        return cand_df[cand_df.apply(ok, axis=1)]
    return cand_df


def compute_scope_match(basket_row: pd.Series, cand_df: pd.DataFrame) -> pd.Series:
    # Best-effort: if product_scope is product_id in promos, match overlap with basket_products
    prod_scope = CFG['column_map_promo'].get('product_scope')
    basket_products = basket_row.get('basket_products', set())
    if (prod_scope is None) or (prod_scope not in cand_df.columns) or len(basket_products) == 0:
        return pd.Series([0.0]*len(cand_df), index=cand_df.index)
    def score(val):
        if pd.isna(val):
            return 0.0
        try:
            scope_set = set(str(val).split(','))
            if not scope_set:
                return 0.0
            overlap = scope_set.intersection(set(map(str, basket_products)))
            return float(len(overlap)) / float(max(1, len(scope_set)))
        except Exception:
            return 0.0
    return cand_df[prod_scope].apply(score)


def recall_candidates(basket_row: pd.Series, promos_df: pd.DataFrame, ptype_probs: np.ndarray, ptype_classes: list, cfg: dict) -> pd.DataFrame:
    topk_types = cfg['recall']['topk_types']
    relevance_th = cfg['recall']['relevance_threshold']

    order = np.argsort(ptype_probs)[::-1]
    sel = [ptype_classes[i] for i in order if ptype_probs[i] >= relevance_th][:topk_types]
    ptype_set = set(sel)

    cand = promos_df.copy()

    # Channel filter
    if cfg['filters'].get('respect_channel', False) and ('channel' in cand.columns):
        if bool(basket_row[CFG['column_map_tx']['is_online']]):
            cand = cand[cand['channel'].astype(str).str.lower().eq('online')]
        else:
            cand = cand[cand['channel'].astype(str).str.lower().eq('offline')]

    # Time window
    if cfg['filters'].get('use_time_window', True):
        ts = basket_row[CFG['column_map_tx']['timestamp']]
        sd = CFG['column_map_promo'].get('start_date')
        ed = CFG['column_map_promo'].get('end_date')
        if (sd in cand.columns) and (ed in cand.columns):
            cand = cand[(cand[sd] <= ts) & (ts <= cand[ed])]

    # Store scope
    if cfg['filters'].get('respect_store_scope', False):
        cand = apply_store_scope_filter(cand, basket_row)

    # Type restriction
    cand = cand[cand[CFG['column_map_promo']['promo_type']].isin(ptype_set)]

    # Scope & spend
    amt = float(basket_row.get('amount_total', 0.0))
    minsp = CFG['column_map_promo'].get('min_spend')
    if minsp and (minsp in cand.columns):
        cand = cand.assign(min_spend_gap=np.maximum(0.0, cand[minsp].fillna(0) - amt))
    else:
        cand = cand.assign(min_spend_gap=0.0)

    cand = cand.assign(scope_match_score=compute_scope_match(basket_row, cand))

    return cand.reset_index(drop=True)


In [7]:
# 3.5 Build Ranking Frame

rank_rows = []
ptype_col = CFG['column_map_promo']['promo_type']
pid_col = CFG['column_map_promo']['promo_id']

for idx, row in basket.iterrows():
    probs = np.array(row['ptype_probs'])
    cand = recall_candidates(row, promos, probs, list(le.classes_), CFG)
    # add features per candidate
    ts = row[CFG['column_map_tx']['timestamp']]
    sd = CFG['column_map_promo'].get('start_date')
    ed = CFG['column_map_promo'].get('end_date')
    for _, p in cand.iterrows():
        feat = {
            'transaction_id': row[CFG['column_map_tx']['transaction_id']],
            'promo_id': p[pid_col],
            'promo_type': p[ptype_col],
            'ptype_prob': float(row['ptype_probs_by_type'].get(p[ptype_col], 0.0)),
            'scope_match_score': float(p.get('scope_match_score', 0.0)),
            'min_spend_gap': float(p.get('min_spend_gap', 0.0)),
            'channel_match': int((bool(row[CFG['column_map_tx']['is_online']]) and str(p.get('channel','')).lower()=='online') or ((not bool(row[CFG['column_map_tx']['is_online']])) and str(p.get('channel','')).lower()=='offline')),
            'within_window': int((sd in p.index and ed in p.index and pd.notna(p[sd]) and pd.notna(p[ed]) and (p[sd] <= ts) and (ts <= p[ed])))
        }
        # base features
        for col in CFG['features']['ranker']['time'] + CFG['features']['ranker']['store'] + CFG['features']['ranker']['channel'] + CFG['features']['ranker']['basket']:
            if col in basket.columns:
                feat[col] = row[col]
        # need-state features
        for ccol in ns_df.columns:
            feat[ccol] = ns_df.loc[idx, ccol]
        # promo_type one-hot
        feat[f"promo_type__{p[ptype_col]}"] = 1
        rank_rows.append(feat)

rank_df = pd.DataFrame(rank_rows).fillna(0)

# Label for ranker
if c_tx.get('promo_id_used') and (c_tx['promo_id_used'] in txraw.columns):
    used_map = (txraw[[c_tx['transaction_id'], c_tx['promo_id_used']]].dropna().drop_duplicates(c_tx['transaction_id']))
    used_map = used_map.set_index(c_tx['transaction_id'])[c_tx['promo_id_used']].astype(str).to_dict()
    rank_df['target_ranker'] = (rank_df.apply(lambda r: str(r['promo_id']) == used_map.get(str(r['transaction_id']), ''), axis=1)).astype(int)
else:
    # fallback: match by type
    used_type_map = basket.set_index(CFG['column_map_tx']['transaction_id'])['used_type'].to_dict()
    rank_df['target_ranker'] = (rank_df.apply(lambda r: str(r['promo_type']) == str(used_type_map.get(r['transaction_id'])), axis=1)).astype(int)

# Negative sampling per basket
ratio = int(CFG.get('negative_sampling_ratio', 5))
if ratio > 0:
    keep_rows = []
    for tid, grp in rank_df.groupby('transaction_id'):
        pos = grp[grp['target_ranker']==1]
        neg = grp[grp['target_ranker']==0]
        if len(pos) == 0:
            keep_rows.append(grp.sample(min(len(grp), ratio), random_state=RANDOM_SEED))
        else:
            take_neg = min(len(neg), ratio*len(pos))
            keep_rows.append(pd.concat([pos, neg.sample(take_neg, random_state=RANDOM_SEED)], axis=0))
    rank_df = pd.concat(keep_rows, axis=0).reset_index(drop=True)

# Persist a sample for audit
sample_path = os.path.join(CFG['paths']['reports_dir'], 'ranking_frame_sample.parquet')
try:
    rank_df.sample(min(20000, len(rank_df))).to_parquet(sample_path, index=False)
except Exception:
    # fallback to csv if pyarrow not available
    rank_df.sample(min(20000, len(rank_df))).to_csv(sample_path.replace('.parquet', '.csv'), index=False)

print('Ranking frame:', rank_df.shape)
rank_df.head()


Ranking frame: (69491, 24)


Unnamed: 0,transaction_id,promo_id,promo_type,ptype_prob,scope_match_score,min_spend_gap,channel_match,within_window,hour,dayofweek,...,qty_total,ns_pca_1,ns_pca_2,ns_cluster,promo_type__Buy 1 get 1,promo_type__Brandday,promo_type__Flash Sale,promo_type__Mega Sale,promo_type__Product_Coupon,target_ranker
0,PMTX0000001,PR0030,Buy 1 get 1,0.275612,1.0,0.0,0,1,9,0,...,5,1.210241,-0.796527,6,1.0,0.0,0.0,0.0,0.0,1
1,PMTX0000001,PR0085,Buy 1 get 1,0.275612,0.0,0.0,0,1,9,0,...,5,1.210241,-0.796527,6,1.0,0.0,0.0,0.0,0.0,0
2,PMTX0000001,PR0021,Buy 1 get 1,0.275612,0.0,0.0,0,1,9,0,...,5,1.210241,-0.796527,6,1.0,0.0,0.0,0.0,0.0,0
3,PMTX0000001,PR0066,Buy 1 get 1,0.275612,0.0,0.0,0,1,9,0,...,5,1.210241,-0.796527,6,1.0,0.0,0.0,0.0,0.0,0
4,PMTX0000001,PR0005,Buy 1 get 1,0.275612,0.0,0.0,0,1,9,0,...,5,1.210241,-0.796527,6,1.0,0.0,0.0,0.0,0.0,0


In [8]:
# 3.6 Train Stage-B: Ranker using LightGBM

# Prepare features (exclude identifiers and target)
exclude_cols = {'transaction_id','promo_id','promo_type','target_ranker'}
ranker_featcols = [c for c in rank_df.columns if c not in exclude_cols]

X_rank = rank_df[ranker_featcols].values
y_rank = rank_df['target_ranker'].values

params_r = CFG['ranker_model']['params']

# Use LightGBM for ranking
ranker = lgb.LGBMClassifier(random_state=RANDOM_SEED, **params_r)
ranker.fit(X_rank, y_rank)
y_rank_proba = ranker.predict_proba(X_rank)[:,1]

# Simple metrics (train-only)
from sklearn.metrics import roc_auc_score, average_precision_score
roc = roc_auc_score(y_rank, y_rank_proba)
pr_auc = average_precision_score(y_rank, y_rank_proba)
print('Stage-B ROC-AUC:', roc)
print('Stage-B PR-AUC:', pr_auc)

# Ranking metrics
def ndcg_at_k(scores, labels, k):
    order = np.argsort(scores)[::-1]
    # Handle cases where we have fewer items than k
    actual_k = min(k, len(labels))
    labels_sorted = np.take(labels, order[:actual_k])
    gains = (2**labels_sorted - 1) / np.log2(np.arange(2, actual_k+2))
    dcg = gains.sum()
    ideal = np.sort(labels)[::-1][:actual_k]
    ideal_gains = (2**ideal - 1) / np.log2(np.arange(2, actual_k+2))
    idcg = ideal_gains.sum() if ideal_gains.sum() > 0 else 1.0
    return float(dcg / idcg)

def group_metric(df, score_col, label_col, k_list=[1,3,5]):
    res = {}
    for k in k_list:
        vals = []
        for tid, grp in df.groupby('transaction_id'):
            vals.append(ndcg_at_k(grp[score_col].values, grp[label_col].values, k))
        res[f'NDCG@{k}'] = float(np.mean(vals) if len(vals) else 0.0)
    return res

rank_df_eval = rank_df.copy()
rank_df_eval['score'] = y_rank_proba
rank_metrics = group_metric(rank_df_eval, 'score', 'target_ranker', [1,3,5])
print('Ranking metrics:', rank_metrics)

# Save artifacts
joblib.dump(ranker, os.path.join(CFG['paths']['models_dir'], 'ranker_model.pkl'))
with open(os.path.join(CFG['paths']['metadata_dir'], 'ranker_featcols.json'), 'w', encoding='utf-8') as f:
    json.dump(ranker_featcols, f, ensure_ascii=False, indent=2)

# Save metrics
with open(os.path.join(CFG['paths']['reports_dir'], 'metrics_train.json'), 'w', encoding='utf-8') as f:
    json.dump({'stageA': {'macro_f1': float(macro_f1), 'logloss': float(ll), 'topk': float(acc_topk)},
               'stageB': {'roc_auc': float(roc), 'pr_auc': float(pr_auc), **rank_metrics}}, f, ensure_ascii=False, indent=2)


[LightGBM] [Info] Number of positive: 4966, number of negative: 64525
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.001575 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 1084
[LightGBM] [Info] Number of data points in the train set: 69491, number of used features: 16
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.071462 -> initscore=-2.564438
[LightGBM] [Info] Start training from score -2.564438
Stage-B ROC-AUC: 0.9999999984396024
Stage-B PR-AUC: 0.9999999594585642
Ranking metrics: {'NDCG@1': 0.35494246301193627, 'NDCG@3': 0.35494246301193627, 'NDCG@5': 0.35494246301193627}


In [9]:
# 3.7 Manifest & versioning

meta_dir = CFG['paths']['metadata_dir']
artifacts = {
    'ptype_model': os.path.join(CFG['paths']['models_dir'], 'ptype_model.pkl'),
    'ranker_model': os.path.join(CFG['paths']['models_dir'], 'ranker_model.pkl'),
    'scaler_need': os.path.join(CFG['paths']['preprocessors_dir'], 'scaler_need.pkl'),
    'pca_need': os.path.join(CFG['paths']['preprocessors_dir'], 'pca_need.pkl'),
    'kmeans_need': os.path.join(CFG['paths']['preprocessors_dir'], 'kmeans_need.pkl'),
    'ptype_classes': os.path.join(meta_dir, 'ptype_classes.json'),
    'ptype_featcols': os.path.join(meta_dir, 'ptype_featcols.json'),
    'ranker_featcols': os.path.join(meta_dir, 'ranker_featcols.json'),
    'encoders': os.path.join(meta_dir, 'encoders.json'),
}

manifest = {}
for k, p in artifacts.items():
    try:
        manifest[k] = {
            'path': p,
            'sha256': sha256_of_file(p) if os.path.exists(p) else None
        }
    except Exception:
        manifest[k] = {'path': p, 'sha256': None}

contract = {
    'inputs': {
        'basket': ['transaction_id','timestamp','user_id','store_id','zone','province','profile','is_online','amount_total','qty_total'],
        'promotions': ['promo_id','promo_type','channel','start_date','end_date','min_spend','product_scope','store_scope']
    },
    'outputs': {
        'ranked_promos': ['transaction_id','promo_id','promo_type','score','ptype_prob','scope_match_score','reason_codes']
    }
}

version = {
    'random_seed': int(CFG['random_seed']),
    'libraries': {
        'pandas': pd.__version__,
        'numpy': np.__version__
    }
}

with open(os.path.join(meta_dir, 'artifacts_manifest.json'), 'w', encoding='utf-8') as f:
    json.dump({'artifacts': manifest, 'contract': contract}, f, ensure_ascii=False, indent=2)
with open(os.path.join(meta_dir, 'version.json'), 'w', encoding='utf-8') as f:
    json.dump(version, f, ensure_ascii=False, indent=2)

print('Manifest and version saved.')


Manifest and version saved.
