# KuaiRec Data Preparation (Clean, Burst-Aware, LightGBM-Ready)

This notebook rebuilds the entire data-prep pipeline end-to-end:

1. Load raw CSVs and construct sessions (1h gap)  
2. Build labels for 4 heads (`complete`, `long`, `rewatch`, `neg`)  
3. Merge **user** and **item** static features  
4. Add **caption category** features  
5. Create **context** + **session-structure** features  
6. Build **history** features (leakage-safe)  
7. Detect **bursts** and apply **prior resets** at the starts of bursts 2 & 3  
8. Clean redundancies and save outputs
9. Create empirical category distribution for each user's session

## Imports & Config

In [1]:
from pathlib import Path
import pandas as pd
import numpy as np
import csv

# Base path for your CSVs
BASE = Path("/Users/haozhangao/Desktop/RecSys Research/KuaiRec 2.0/data")

# Output dir
OUT = BASE / "prepared"
OUT.mkdir(parents=True, exist_ok=True)

pd.set_option("display.max_columns", 120)

## 1) Load `big_matrix.csv` and build sessions (1h gap)

In [2]:
# Load full interaction matrix
big_path = BASE / "big_matrix.csv"
df = pd.read_csv(big_path, low_memory=False)

# Normalize key columns
for col in ["user_id", "video_id"]:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")

# Timestamp → ts (auto-detect ms vs s)
if "timestamp" not in df.columns:
    raise KeyError("Expected 'timestamp' in big_matrix.csv")
ts_unit = "ms" if pd.to_numeric(df["timestamp"], errors="coerce").max() > 10**11 else "s"
df["ts"] = pd.to_datetime(pd.to_numeric(df["timestamp"], errors="coerce"), unit=ts_unit, errors="coerce")

# Sort and sessionize (1-hour gap)
df.sort_values(["user_id", "ts"], inplace=True, kind="mergesort")
gap = df.groupby("user_id")["ts"].diff().dt.total_seconds().fillna(np.inf)
df["session"] = (gap > 3600).groupby(df["user_id"]).cumsum().astype("Int64")

# Within-session order
df["sess_rank"] = df.groupby(["user_id","session"]).cumcount().astype("int32") + 1
df["sess_len"]  = df.groupby(["user_id","session"])["video_id"].transform("size").astype("int32")
df["sess_rank_frac"] = (df["sess_rank"] / df["sess_len"].clip(lower=1)).astype("float32")

print("Loaded interactions:", df.shape)
df.head(3)

Loaded interactions: (12530806, 13)


Unnamed: 0,user_id,video_id,play_duration,video_duration,time,date,timestamp,watch_ratio,ts,session,sess_rank,sess_len,sess_rank_frac
0,0,3649,13838,10867,2020-07-05 00:08:23.438,20200705,1593879000.0,1.273397,2020-07-04 16:08:23.437999964,1,1,6,0.166667
1,0,9598,13665,10984,2020-07-05 00:13:41.297,20200705,1593879000.0,1.244082,2020-07-04 16:13:41.296999931,1,2,6,0.333333
2,0,5262,851,7908,2020-07-05 00:16:06.687,20200705,1593879000.0,0.107613,2020-07-04 16:16:06.687000036,1,3,6,0.5


## 2) Create 4 head labels

In [3]:
# If watch_ratio missing, compute it
if "watch_ratio" not in df.columns:
    if "play_duration" in df.columns and "video_duration" in df.columns:
        df["watch_ratio"] = pd.to_numeric(df["play_duration"], errors="coerce") / pd.to_numeric(df["video_duration"], errors="coerce")
    else:
        raise KeyError("Need 'watch_ratio' or both 'play_duration' and 'video_duration'.")

# Determine duration unit (ms vs s) from 'video_duration' if present, else assume ms if values large
vd_col = None
for c in ["video_duration", "video_duration_x", "video_duration_y"]:
    if c in df.columns:
        vd_col = c
        break

if vd_col is None:
    # fallback: infer from play_duration
    ref = pd.to_numeric(df["play_duration"], errors="coerce").dropna()
    dur_is_ms = (ref.median() > 1000.0) if not ref.empty else True
else:
    ref = pd.to_numeric(df[vd_col], errors="coerce").dropna()
    dur_is_ms = (ref.median() > 1000.0) if not ref.empty else True

scale = 1000 if dur_is_ms else 1
twenty_s = 20 * scale
two_s = 2 * scale
unit_str = "milliseconds" if dur_is_ms else "seconds"

# Clean rows for safe labels
good = (
    df["watch_ratio"].replace([np.inf, -np.inf], np.nan).notna() &
    pd.to_numeric(df["play_duration"], errors="coerce").notna()
)
if vd_col is not None:
    good &= pd.to_numeric(df[vd_col], errors="coerce").notna() & (pd.to_numeric(df[vd_col], errors="coerce") > 0)

if (~good).any():
    print(f"Dropping {(~good).sum()} rows with invalid durations/ratios for label construction")
    df = df.loc[good].copy()

# Labels
df["y_complete"] = (df["watch_ratio"] >= 0.4).astype("int8")
if vd_col is not None:
    dur_cut = np.minimum(twenty_s, 0.8 * pd.to_numeric(df[vd_col], errors="coerce").to_numpy())
else:
    dur_cut = np.full(len(df), twenty_s, dtype="float64")
df["y_long"] = (pd.to_numeric(df["play_duration"], errors="coerce").to_numpy() >= dur_cut).astype("int8")
df["y_rewatch"] = (df["watch_ratio"] >= 2.0).astype("int8")
df["y_neg"] = (pd.to_numeric(df["play_duration"], errors="coerce") < two_s).astype("int8")

pos_rates = {k: round(float(df[k].mean()), 4) for k in ["y_complete","y_long","y_rewatch","y_neg"]}
print(f"Durations interpreted as {unit_str}. Positive rates:", pos_rates)

Durations interpreted as milliseconds. Positive rates: {'y_complete': 0.6995, 'y_long': 0.457, 'y_rewatch': 0.0747, 'y_neg': 0.1251}


## 3) Merge user static features (`user_features.csv`)

In [4]:
user_path = BASE / "user_features.csv"
hdr = pd.read_csv(user_path, nrows=0).columns.tolist()

LOW_CARD_CATS = ["user_active_degree","follow_user_num_range","fans_user_num_range","friend_user_num_range","register_days_range"]
BIN_FLAGS = ["is_lowactive_period","is_live_streamer","is_video_author"]
NUMS = ["follow_user_num","fans_user_num","friend_user_num","register_days"]
ONEHOTS = [f"onehot_feat{i}" for i in range(18)]

usecols = ["user_id"] + [c for c in (LOW_CARD_CATS + BIN_FLAGS + NUMS + ONEHOTS) if c in hdr]
uf = pd.read_csv(user_path, usecols=usecols, low_memory=False)

# Type cast
for c in BIN_FLAGS:
    if c in uf.columns:
        uf[c] = pd.to_numeric(uf[c], errors="coerce").fillna(0).astype("int8").clip(0,1)

for c in LOW_CARD_CATS + ONEHOTS:
    if c in uf.columns:
        uf[c] = uf[c].astype("string").fillna("unknown").replace({"": "unknown"}).astype("category")

for c in NUMS:
    if c in uf.columns:
        uf[c] = pd.to_numeric(uf[c], errors="coerce").astype("float32")
        uf[f"{c}_log1p"] = np.log1p(uf[c].clip(lower=0)).astype("float32")

# Prefix
rename_map = {c: f"u_{c}" for c in uf.columns if c != "user_id"}
uf.rename(columns=rename_map, inplace=True)

# Merge
before = set(df.columns)
df = df.merge(uf, on="user_id", how="left")
print("Added user columns:", len(df.columns) - len(before))

Added user columns: 34


## 4) Merge item static features (`item_daily_features.csv`)

In [5]:
item_path = BASE / "item_daily_features.csv"
ihdr = pd.read_csv(item_path, nrows=0).columns.tolist()

want = ["video_id","author_id","video_type","upload_dt","upload_type","visible_status","video_duration","video_width","video_height","music_id","video_tag_id","video_tag_name"]
usecols = [c for c in want if c in ihdr]
it = pd.read_csv(item_path, usecols=usecols, low_memory=False)

# Earliest upload per video_id
if "upload_dt" in it.columns:
    if pd.api.types.is_numeric_dtype(it["upload_dt"]):
        unit = "ms" if it["upload_dt"].max() > 10**11 else "s"
        up = pd.to_datetime(pd.to_numeric(it["upload_dt"], errors="coerce"), unit=unit, errors="coerce")
    else:
        up = pd.to_datetime(it["upload_dt"], errors="coerce")
    up_df = pd.DataFrame({"video_id": it["video_id"], "i_upload_ts": up})
    up_df = up_df.dropna().groupby("video_id", as_index=False)["i_upload_ts"].min()
else:
    up_df = pd.DataFrame({"video_id": it["video_id"].drop_duplicates(), "i_upload_ts": pd.NaT})

# Collapse static fields
agg = {c: "first" for c in it.columns if c not in ["video_id","upload_dt"]}
per_video = it.groupby("video_id", as_index=False).agg(agg).merge(up_df, on="video_id", how="left")

# Numeric transforms
if "video_duration" in per_video.columns:
    med = pd.to_numeric(per_video["video_duration"], errors="coerce").dropna().median()
    dur_is_ms_it = (med > 1000.0) if pd.notna(med) else True
    per_video["i_video_duration_s"] = (pd.to_numeric(per_video["video_duration"], errors="coerce") / (1000.0 if dur_is_ms_it else 1.0)).astype("float32")
else:
    per_video["i_video_duration_s"] = np.nan

if {"video_width","video_height"}.issubset(per_video.columns):
    per_video["i_aspect_ratio"] = (
        pd.to_numeric(per_video["video_width"], errors="coerce") / pd.to_numeric(per_video["video_height"], errors="coerce")
    ).astype("float32")
else:
    per_video["i_aspect_ratio"] = np.float32(np.nan)

# Categoricals
for c in ["author_id","video_type","upload_type","visible_status","music_id","video_tag_id","video_tag_name"]:
    if c in per_video.columns:
        per_video[c] = per_video[c].astype("string").fillna("unknown").replace({"": "unknown"}).astype("category")
        per_video.rename(columns={c: f"i_{c}"}, inplace=True)

# Merge to df
before = set(df.columns)
df = df.merge(per_video.rename(columns={"video_id":"_vid"}), left_on="video_id", right_on="_vid", how="left")
if "_vid" in df.columns:
    df.drop(columns=["_vid"], inplace=True)

# Age since upload at interaction time
if "i_upload_ts" in df.columns:
    df["i_age_since_upload_days"] = ((df["ts"] - df["i_upload_ts"]).dt.total_seconds() / 86400.0).astype("float32")
else:
    df["i_age_since_upload_days"] = np.nan

print("Added item columns:", len(df.columns) - len(before))

Added item columns: 14


## 5) Add caption/category (`kuairec_caption_category.csv`)

In [6]:
cap_path = BASE / "kuairec_caption_category.csv"

# Robust stream parse with Python csv to avoid parser errors
VID_ALIASES = ["video_id", "item_id"]
CAT_ID_ALIASES = ["first_level_category_id","category_id_1","top_category_id"]
CAT_NAME_ALIASES = ["first_level_category_name","category_name_1","top_category_name"]

mapping = {}
with open(cap_path, "r", encoding="utf-8", errors="ignore", newline="") as f:
    reader = csv.reader(f)
    header = next(reader)
    def find_idx(cands):
        for c in cands:
            if c in header: return header.index(c), c
        return None, None
    vid_idx, vid_name = find_idx(VID_ALIASES)
    cid_idx, _ = find_idx(CAT_ID_ALIASES)
    cnm_idx, _ = find_idx(CAT_NAME_ALIASES)
    if vid_idx is None: raise KeyError("No video_id/item_id in caption file.")
    for row in reader:
        if not row or len(row) <= vid_idx: continue
        vid = (row[vid_idx] or "").strip()
        if not vid: continue
        cat_id = (row[cid_idx].strip() if (cid_idx is not None and len(row)>cid_idx) else "unknown")
        cat_nm = (row[cnm_idx].strip() if (cnm_idx is not None and len(row)>cnm_idx) else "unknown")
        if vid not in mapping:
            mapping[vid] = (cat_id or "unknown", cat_nm or "unknown")

cat = pd.DataFrame([(k, v[0], v[1]) for k,v in mapping.items()],
                   columns=["video_id_key","i_top_category_id","i_top_category_name"])
cat["video_id_key"] = cat["video_id_key"].astype("string")
cat["i_top_category_id"] = cat["i_top_category_id"].astype("string").replace({"": "unknown"}).astype("category")
cat["i_top_category_name"] = cat["i_top_category_name"].astype("string").replace({"": "unknown"}).astype("category")

df["video_id_key"] = df["video_id"].astype("string").str.strip()
before = set(df.columns)
df = df.merge(cat, on="video_id_key", how="left")
df.drop(columns=["video_id_key"], inplace=True)
print("Added caption/category columns:", len(df.columns) - len(before))

Added caption/category columns: 1


## 6) Context/time features

In [7]:
secs = (df["ts"].dt.hour * 3600 + df["ts"].dt.minute * 60 + df["ts"].dt.second).astype("float32")
frac_day = secs / np.float32(24*3600)
df["ctx_hour_sin"] = np.sin(2*np.pi*frac_day).astype("float32")
df["ctx_hour_cos"] = np.cos(2*np.pi*frac_day).astype("float32")
df["ctx_dow"] = df["ts"].dt.dayofweek.astype("int8")
print("Context features added.")

Context features added.


## 7) History features (session-level and per-row; leakage-safe)

In [8]:
# Ensure sorted
df.sort_values(["user_id","session","ts"], inplace=True, kind="mergesort")

# Per-session aggregates
sess = (
    df.groupby(["user_id","session"], as_index=False)
      .agg(
          sess_start=("ts","min"),
          sess_end=("ts","max"),
          sess_len=("video_id","size"),
          sess_wr_mean=("watch_ratio","mean"),
          sess_wr_var=("watch_ratio","var"),
          sess_complete_rate=("y_complete","mean"),
      )
      .sort_values(["user_id","session"], kind="mergesort")
      .reset_index(drop=True)
)
# Session index
sess["sess_index"] = sess.groupby("user_id").cumcount().astype("int32") + 1

# Previous-session length & gap
g_user = sess.groupby("user_id", sort=False)
prev_start = g_user["sess_start"].shift(1)
prev_end   = g_user["sess_end"].shift(1)
sess["prev_session_length_min"] = ((prev_end - prev_start).dt.total_seconds()/60.0).astype("float32")
sess["inter_session_gap_hours"] = ((sess["sess_start"] - prev_end).dt.total_seconds()/3600.0).astype("float32")

# Rolling histories (t-1)
sess["hist_last3_complete_rate"] = (
    g_user["sess_complete_rate"].transform(lambda s: s.shift(1).rolling(3, min_periods=1).mean())
).astype("float32")
sess["hist_last10_complete_rate"] = (
    g_user["sess_complete_rate"].transform(lambda s: s.shift(1).rolling(10, min_periods=1).mean())
).astype("float32")
sess["hist_last3_wr_mean"] = (
    g_user["sess_wr_mean"].transform(lambda s: s.shift(1).rolling(3, min_periods=1).mean())
).astype("float32")
sess["hist_last3_wr_var"] = (
    g_user["sess_wr_mean"].transform(lambda s: s.shift(1).rolling(3, min_periods=2).var())
).astype("float32")

# EMAs up to t-1 (lambda=0.9 default for overall)
lam = 0.9; alpha = 1.0 - lam
sess["hist_ema_complete"] = (
    g_user["sess_complete_rate"].transform(lambda s: s.shift(1).ewm(alpha=alpha, adjust=False).mean())
).astype("float32")
sess["hist_ema_wr_mean"] = (
    g_user["sess_wr_mean"].transform(lambda s: s.shift(1).ewm(alpha=alpha, adjust=False).mean())
).astype("float32")

# Within-session fatigue slope for previous session
def _slope_wr_vs_rank(grp: pd.DataFrame) -> float:
    x = grp["sess_rank"].to_numpy(dtype="float64")
    y = grp["watch_ratio"].to_numpy(dtype="float64")
    n = x.size
    if n < 2: return np.nan
    xm, ym = x.mean(), y.mean()
    den = np.sum((x - xm)**2)
    if den <= 0: return np.nan
    return float(np.sum((x - xm)*(y - ym)) / den)

sl = (
    df.groupby(["user_id","session"], group_keys=False)
      .apply(lambda g: pd.Series({"within_sess_wr_slope": _slope_wr_vs_rank(g)}))
      .reset_index()
)
sess = sess.merge(sl, on=["user_id","session"], how="left")
sess["hist_prev_within_sess_wr_slope"] = sess.groupby("user_id")["within_sess_wr_slope"].shift(1).astype("float32")

# Merge back to df
hist_cols = ["sess_index","prev_session_length_min","inter_session_gap_hours",
             "hist_last3_complete_rate","hist_last10_complete_rate",
             "hist_last3_wr_mean","hist_last3_wr_var",
             "hist_ema_complete","hist_ema_wr_mean","hist_prev_within_sess_wr_slope"]
df = df.merge(sess[["user_id","session"] + hist_cols], on=["user_id","session"], how="left")
print("History features merged.")

  .apply(lambda g: pd.Series({"within_sess_wr_slope": _slope_wr_vs_rank(g)}))


History features merged.


In [9]:
# --- Author recency: time since user i last saw creator j (t-1) ---
# Requires: df with ['user_id','i_author_id','ts'] and already sorted by user + ts

# Make sure creator id exists and is stable
if "i_author_id" not in df.columns:
    raise KeyError("i_author_id not found. Merge item_daily_features first.")
df["i_author_id"] = df["i_author_id"].astype("string").fillna("unknown").astype("category")

# Previous seen time per (user, author), using t-1 via shift
prev_seen = df.groupby(["user_id", "i_author_id"], sort=False)["ts"].shift(1)

# Recency in days (NaN if never seen this creator before)
df["hist_author_recency_days"] = (
    (df["ts"] - prev_seen).dt.total_seconds() / 86400.0
).astype("float32")

# (Optional) a compressed version for tree models
df["hist_author_recency_log1p"] = np.log1p(df["hist_author_recency_days"]).astype("float32")

print("Added:", ["hist_author_recency_days", "hist_author_recency_log1p"])
df[["user_id","i_author_id","ts","hist_author_recency_days"]].head()


  prev_seen = df.groupby(["user_id", "i_author_id"], sort=False)["ts"].shift(1)


Added: ['hist_author_recency_days', 'hist_author_recency_log1p']


Unnamed: 0,user_id,i_author_id,ts,hist_author_recency_days
0,0,7136,2020-07-04 16:08:23.437999964,
1,0,2661,2020-07-04 16:13:41.296999931,
2,0,1854,2020-07-04 16:16:06.687000036,
3,0,5415,2020-07-04 16:20:26.792000055,
4,0,395,2020-07-04 16:43:05.128000020,


## 8) Per-category completion EMA + entropy (λ=0.95; leakage-safe)

In [10]:
cat_col = "i_top_category_id" if "i_top_category_id" in df.columns else (
          "i_top_category_name" if "i_top_category_name" in df.columns else None)
if cat_col is None:
    raise KeyError("Need i_top_category_id or i_top_category_name for category history.")

df.sort_values(["user_id","ts"], inplace=True, kind="mergesort")
lam = 0.95; alpha = 1.0 - lam

def _per_user_cat_ema(g: pd.DataFrame) -> pd.DataFrame:
    cat_ema = {}
    n = len(g)
    ema_cur = np.full(n, np.nan, dtype="float32")
    ent_arr = np.full(n, np.nan, dtype="float32")
    cats = g[cat_col].astype("string").fillna("unknown").to_numpy()
    ys = g["y_complete"].astype("float32").to_numpy()
    for i,(c,y) in enumerate(zip(cats, ys)):
        prev = cat_ema.get(c, np.nan)
        ema_cur[i] = np.float32(prev) if prev == prev else np.nan
        if cat_ema:
            vals = np.array(list(cat_ema.values()), dtype="float64")
            s = vals.sum()
            ent_arr[i] = np.float32(-np.sum((vals/s)*np.log(np.clip(vals/s,1e-12,1.0)))) if s>0 else np.nan
        else:
            ent_arr[i] = np.nan
        # update
        if prev != prev:
            cat_ema[c] = float(y)
        else:
            cat_ema[c] = (1.0 - alpha) * float(prev) + alpha * float(y)
    out = g.copy()
    out["hist_user_cat_complete_ema"] = ema_cur
    out["hist_cat_entropy"] = ent_arr
    return out

df = df.groupby("user_id", group_keys=False).apply(_per_user_cat_ema).reset_index(drop=True)
print("Category history features added.")

  df = df.groupby("user_id", group_keys=False).apply(_per_user_cat_ema).reset_index(drop=True)


Category history features added.


## 9) Detect bursts (daily activity runs)

In [11]:
# --- Burst detection (fixed) ---
# Idea:
# 1) Aggregate daily interaction counts.
# 2) Mark an "active day" if count >= threshold (thr).
# 3) Take the sequence of active-day dates only.
# 4) Whenever the gap between consecutive active days > GAP_BETWEEN_ACTIVE, start a new burst.

GAP_BETWEEN_ACTIVE = 3   # days between *active* days to start a new burst
# You can tighten/loosen this. Larger -> fewer, longer bursts.

# Daily counts
daily = (
    df.set_index("ts")
      .groupby(pd.Grouper(freq="D"))["user_id"]
      .size()
      .rename("n")
      .reset_index()
)

# Threshold for "active day"
med = daily.loc[daily["n"] > 0, "n"].median()
thr = max(100, int(0.10 * med)) if pd.notna(med) else 100
# ↑ Knob #1: 'thr' filters out low-activity days. Increase if you want stricter bursts.

daily["is_active"] = daily["n"] >= thr

# Sequence of active-day dates only
active_dates = (
    daily.loc[daily["is_active"], "ts"]
         .dt.normalize()
         .sort_values()
         .reset_index(drop=True)
)

if active_dates.empty:
    raise RuntimeError("No active days detected — raise 'thr' or check timestamps.")

# Split whenever the gap between *active* days is large
gaps = active_dates.diff().dt.days.fillna(0).astype("int32")
burst_group = (gaps > GAP_BETWEEN_ACTIVE).cumsum()

# Start/end per burst (fixed: no .to_series())
grp = active_dates.groupby(burst_group)
bursts = pd.DataFrame({
    "start": grp.min(),
    "end":   grp.max(),
})
bursts["days"] = (bursts["end"] - bursts["start"]).dt.days + 1
bursts.index = np.arange(1, len(bursts) + 1)  # burst_id 1..B

print("Detected bursts:")
display(bursts)

# Assign burst_id to every row using closed-left intervals [start, end+1)
intervals = pd.IntervalIndex.from_arrays(bursts["start"],
                                         bursts["end"] + pd.Timedelta(days=1),
                                         closed="left")
row_dates = df["ts"].dt.normalize()

idx = intervals.get_indexer(row_dates)      # NumPy int array; -1 means "no interval"
s = pd.Series(idx, index=df.index)          # make it a pandas Series
df["burst_id"] = s.where(s >= 0).add(1).astype("Int64")  # 0.. -> 1..; -1 -> <NA>

print("burst_id distribution:")
print(df["burst_id"].value_counts(dropna=False).sort_index())


Detected bursts:


Unnamed: 0,start,end,days
1,2020-07-04,2020-07-12,9
2,2020-07-31,2020-08-10,11
3,2020-08-26,2020-09-05,11


burst_id distribution:
burst_id
1       2191830
2       7007529
3       3329672
<NA>       1775
Name: count, dtype: Int64


## 10) Prior resets at starts of bursts 2 & 3 (EMA features only)

In [12]:
if "burst_id" not in df.columns:
    raise RuntimeError("burst_id missing — run burst detection first.")

# Compute priors before each burst start
priors = {}
for b, row in bursts.iterrows():
    start = row["start"]
    hist_mask = df["ts"] < start
    if hist_mask.any():
        g_prior = float(df.loc[hist_mask, "y_complete"].mean())
        c_prior = df.loc[hist_mask].groupby(cat_col)["y_complete"].mean().to_dict()
    else:
        g_prior = float(df["y_complete"].mean())
        c_prior = {}
    priors[int(b)] = {"global": g_prior, "per_cat": c_prior}

# Rebuild EMA features for bursts 2 & 3 only
target_bursts = [b for b in priors.keys() if b in (2,3)]
mask_b = df["burst_id"].isin(target_bursts)

def _apply_ema_with_priors(group: pd.DataFrame) -> pd.DataFrame:
    b = int(group.name[1])
    g_prior = priors[b]["global"]
    c_prior = priors[b]["per_cat"]
    g = group.sort_values("ts").copy()

    y = g["y_complete"].to_numpy(dtype="float32")
    cats = g[cat_col].astype("string").fillna("unknown").to_numpy()

    out_overall = np.full(len(g), np.nan, dtype="float32")
    out_cat = np.full(len(g), np.nan, dtype="float32")
    out_ent = np.full(len(g), np.nan, dtype="float32")

    ema_overall = np.nan
    ema_cat = {}

    lam = 0.95; alpha = 1.0 - lam
    for i,(c, yi) in enumerate(zip(cats, y)):
        out_overall[i] = ema_overall if ema_overall == ema_overall else np.float32(g_prior)
        if c in ema_cat and ema_cat[c] == ema_cat[c]:
            out_cat[i] = np.float32(ema_cat[c])
        else:
            out_cat[i] = np.float32(c_prior.get(c, g_prior))
        if ema_cat:
            vals = np.array(list(ema_cat.values()), dtype="float64")
            s = vals.sum()
            out_ent[i] = np.float32(-np.sum((vals/s)*np.log(np.clip(vals/s,1e-12,1.0)))) if s>0 else np.nan
        else:
            out_ent[i] = np.nan
        # update to t
        prev_overall = out_overall[i]
        ema_overall = (1.0 - alpha) * float(prev_overall) + alpha * float(yi)
        prev_cat = out_cat[i]
        ema_cat[c] = (1.0 - alpha) * float(prev_cat) + alpha * float(yi)

    g["hist_ema_complete"] = out_overall
    g["hist_user_cat_complete_ema"] = out_cat
    g["hist_cat_entropy"] = out_ent
    return g

df_reset = (
    df.loc[mask_b].groupby(["user_id","burst_id"], group_keys=False).apply(_apply_ema_with_priors)
)
cols_upd = ["hist_ema_complete","hist_user_cat_complete_ema","hist_cat_entropy"]
df.loc[df_reset.index, cols_upd] = df_reset[cols_upd].values

print("Re-initialized EMA features at the starts of bursts 2 & 3 using time-truncated priors.")

  c_prior = df.loc[hist_mask].groupby(cat_col)["y_complete"].mean().to_dict()
  df.loc[mask_b].groupby(["user_id","burst_id"], group_keys=False).apply(_apply_ema_with_priors)


Re-initialized EMA features at the starts of bursts 2 & 3 using time-truncated priors.


## 11) Clean redundant columns

In [13]:
to_drop = []
# raw duration duplicates or helpers
for c in ["video_duration_x","video_duration_y","time","date"]:
    if c in df.columns: to_drop.append(c)
# upload_ts (age is derived)
if "i_age_since_upload_days" in df.columns and "i_upload_ts" in df.columns:
    to_drop.append("i_upload_ts")
# keep IDs; drop names (adjust if you prefer)
if "i_video_tag_id" in df.columns and "i_video_tag_name" in df.columns:
    to_drop.append("i_video_tag_name")
if "i_top_category_id" in df.columns and "i_top_category_name" in df.columns:
    to_drop.append("i_top_category_name")

to_drop = sorted(set(to_drop))
df.drop(columns=[c for c in to_drop if c in df.columns], inplace=True, errors="ignore")
print(f"Dropped {len(to_drop)} redundant columns:", to_drop)
print("Now have", df.shape[1], "columns.")

Dropped 7 redundant columns: ['date', 'i_top_category_name', 'i_upload_ts', 'i_video_tag_name', 'time', 'video_duration_x', 'video_duration_y']
Now have 78 columns.


In [14]:
print(len(df.columns), "columns")
list(df.columns)

78 columns


['user_id',
 'video_id',
 'play_duration',
 'timestamp',
 'watch_ratio',
 'ts',
 'session',
 'sess_rank',
 'sess_len',
 'sess_rank_frac',
 'y_complete',
 'y_long',
 'y_rewatch',
 'y_neg',
 'u_user_active_degree',
 'u_is_lowactive_period',
 'u_is_live_streamer',
 'u_is_video_author',
 'u_follow_user_num',
 'u_follow_user_num_range',
 'u_fans_user_num',
 'u_fans_user_num_range',
 'u_friend_user_num',
 'u_friend_user_num_range',
 'u_register_days',
 'u_register_days_range',
 'u_onehot_feat0',
 'u_onehot_feat1',
 'u_onehot_feat2',
 'u_onehot_feat3',
 'u_onehot_feat4',
 'u_onehot_feat5',
 'u_onehot_feat6',
 'u_onehot_feat7',
 'u_onehot_feat8',
 'u_onehot_feat9',
 'u_onehot_feat10',
 'u_onehot_feat11',
 'u_onehot_feat12',
 'u_onehot_feat13',
 'u_onehot_feat14',
 'u_onehot_feat15',
 'u_onehot_feat16',
 'u_onehot_feat17',
 'u_follow_user_num_log1p',
 'u_fans_user_num_log1p',
 'u_friend_user_num_log1p',
 'u_register_days_log1p',
 'i_author_id',
 'i_video_type',
 'i_upload_type',
 'i_visible_sta

## 12) Save outputs

In [15]:
parquet_path = OUT / "features_full.parquet"
csv_path = OUT / "features_full.csv.gz"

df.to_parquet(parquet_path, index=False)
df.to_csv(csv_path, index=False, compression="gzip")

print("Saved:")
print("-", parquet_path)
print("-", csv_path)
print("Rows:", len(df), "Cols:", len(df.columns))

Saved:
- /Users/haozhangao/Desktop/RecSys Research/KuaiRec 2.0/data/prepared/features_full.parquet
- /Users/haozhangao/Desktop/RecSys Research/KuaiRec 2.0/data/prepared/features_full.csv.gz
Rows: 12530806 Cols: 78


## 13) Create empirical category distribuion and save

In [16]:
# === Empirical category distribution per (user, session) ===
# Assumes df has: ['user_id','session','ts', 'i_top_category_id' or 'i_top_category_name']

import numpy as np
import pandas as pd

# 1) Pick the category column (ID preferred, fallback to name)
cat_col = None
for c in ["i_top_category_id", "i_top_category_name"]:
    if c in df.columns:
        cat_col = c
        break
if cat_col is None:
    raise KeyError("No category column found. Expected i_top_category_id or i_top_category_name in df.")

# 2) Treat each row as an exposure; drop missing category
expo = df.loc[df[cat_col].notna(), ["user_id","session",cat_col,"ts"]].copy()
expo.sort_values(["user_id","session","ts"], inplace=True, kind="mergesort")

# 3) Counts per (user, session, category)
g = (expo.groupby(["user_id","session",cat_col], as_index=False)
         .size()
         .rename(columns={"size":"n_exposed"}))

# 4) Totals per (user, session)
tot = (g.groupby(["user_id","session"], as_index=False)["n_exposed"]
         .sum()
         .rename(columns={"n_exposed":"n_total"}))

# 5) Join + raw probabilities
emp = g.merge(tot, on=["user_id","session"], how="left")
emp["p_empirical_cat"] = (emp["n_exposed"] / emp["n_total"]).astype("float32")

# 5.5) Dirichlet/Laplace smoothing per session
eta = 1e-2  # try 1e-3 .. 1e-1
K = (emp.groupby(["user_id","session"], as_index=False)[cat_col]
       .nunique()
       .rename(columns={cat_col:"K_session"}))
emp = emp.merge(K, on=["user_id","session"], how="left")
emp["p_empirical_cat_sm"] = (
    (emp["n_exposed"] + eta) / (emp["n_total"] + eta * emp["K_session"])
).astype("float32")

# 6) Quality flag + entropy (use smoothed probs by default)
min_exposures = 3
emp["enough_expo"] = emp["n_total"] >= min_exposures

def _entropy(ps):
    p = np.asarray(ps, dtype="float64")
    p = p[(p > 0) & np.isfinite(p)]
    if p.size == 0:
        return np.nan
    return float(-np.sum(p * np.log(p)))

prob_col = "p_empirical_cat_sm"  # switch to "p_empirical_cat" for raw
sess_entropy = (emp.groupby(["user_id","session"])[prob_col]
                  .apply(_entropy)
                  .reset_index()
                  .rename(columns={prob_col:"empirical_cat_entropy"}))
emp = emp.merge(sess_entropy, on=["user_id","session"], how="left")

# 7) Optional wide pivot (one column per category)
num_cats = int(emp[cat_col].nunique())
emp_wide = None
if num_cats <= 200:  # adjust threshold if needed
    emp_wide = (emp.pivot_table(index=["user_id","session"],
                                columns=cat_col,
                                values=prob_col,
                                fill_value=0.0)
                  .reset_index())

# 8) Save artifacts
OUT_PREP = BASE / "prepared"
OUT_PREP.mkdir(parents=True, exist_ok=True)

emp_path = OUT_PREP / "empirical_category_distribution.csv"
emp.to_csv(emp_path, index=False)

if emp_wide is not None:
    emp_wide_path = OUT_PREP / "empirical_category_distribution_wide.csv"
    emp_wide.to_csv(emp_wide_path, index=False)
    print(f"Saved wide distribution to: {emp_wide_path}")

# 9) Quick sanity checks & preview
#   - per-session probs (smoothed) should sum ~1
chk = (emp.groupby(["user_id","session"])["p_empirical_cat_sm"].sum()
         .reset_index(drop=True).head(5).round(6).tolist())
print("Example per-session sum(p_empirical_cat_sm):", chk)
print(f"Saved empirical per-session category distribution to: {emp_path}")
display(emp.head(100))


  g = (expo.groupby(["user_id","session",cat_col], as_index=False)
  emp_wide = (emp.pivot_table(index=["user_id","session"],


Saved wide distribution to: /Users/haozhangao/Desktop/RecSys Research/KuaiRec 2.0/data/prepared/empirical_category_distribution_wide.csv
Example per-session sum(p_empirical_cat_sm): [1.0, 1.0, 1.0, 1.0, 1.0]
Saved empirical per-session category distribution to: /Users/haozhangao/Desktop/RecSys Research/KuaiRec 2.0/data/prepared/empirical_category_distribution.csv


Unnamed: 0,user_id,session,i_top_category_id,n_exposed,n_total,p_empirical_cat,K_session,p_empirical_cat_sm,enough_expo,empirical_cat_entropy
0,0,1,-124,0,6,0.0,40,0.001563,False,1.592372
1,0,1,1,0,6,0.0,40,0.001563,False,1.592372
2,0,1,10,0,6,0.0,40,0.001563,False,1.592372
3,0,1,11,0,6,0.0,40,0.001563,False,1.592372
4,0,1,12,0,6,0.0,40,0.001563,False,1.592372
...,...,...,...,...,...,...,...,...,...,...
95,0,3,22,0,1,0.0,40,0.007143,False,1.612163
96,0,3,23,0,1,0.0,40,0.007143,False,1.612163
97,0,3,24,0,1,0.0,40,0.007143,False,1.612163
98,0,3,25,0,1,0.0,40,0.007143,False,1.612163


### Example: empirical distribution of the 1st user's 1st session

In [21]:
# Empirical category distribution for FIRST user's FIRST session

# 1) find first user & their first session
df_srt = df.sort_values(["user_id","sess_index","ts"], kind="mergesort")
u0 = df_srt["user_id"].iloc[0]
s0 = df_srt.loc[df_srt["user_id"] == u0, "session"].min()+10

# 2) pick a category column
cat_col = next((c for c in ["i_top_category_id","i_top_category_name"] if c in df.columns), None)
if cat_col is None:
    raise KeyError("No category column found (looked for i_top_category_* or i_video_tag_*).")

# 3) slice the session & compute distribution
sess_df = (df[(df["user_id"] == u0) & (df["session"] == s0)]
             .sort_values("ts", kind="mergesort"))
tab = (sess_df.groupby(cat_col, as_index=False).size()
                  .rename(columns={"size":"n"}))
tab["p_empirical_cat"] = (tab["n"] / tab["n"].sum()).astype("float32")

# (optional) entropy of the session’s category mix
p = tab["p_empirical_cat"].to_numpy(dtype="float64")
entropy = float(-np.sum(p[p>0] * np.log(p[p>0])))

print(f"User {u0} — Session {s0} | exposures = {len(sess_df)} | entropy = {entropy:0.3f}")
display(tab.sort_values("p_empirical_cat", ascending=False))


User 0 — Session 11 | exposures = 59 | entropy = 2.801


  tab = (sess_df.groupby(cat_col, as_index=False).size()


Unnamed: 0,i_top_category_id,n,p_empirical_cat
21,28,11,0.186441
27,34,6,0.101695
34,5,5,0.084746
37,8,4,0.067797
3,11,4,0.067797
2,10,4,0.067797
35,6,3,0.050847
18,25,3,0.050847
26,33,2,0.033898
25,32,2,0.033898
