In [11]:
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings("ignore")

# CONFIG
TRAIN_PATH = "/Users/alexandre/Desktop/X/Python for Data Science/Projet Final Churn/train.parquet"
OUTPUT_PATH = "train_features_v2.parquet"

HORIZON_DAYS = 10          # churn in 10 days after T0
BUFFER_DAYS  = 10          # we start snapshots after 10 days of history
SNAPSHOT_FREQ = "7D"       # we take a snapshot every 7 days

WINDOWS_DAYS = [3, 7, 14, 30]   # activity windows (days)
KEY_PAGES = [
    "Thumbs Up", "Thumbs Down", "Roll Advert", "Error",
    "Upgrade", "Downgrade", "Add to Playlist", "Cancel"
]

In [12]:
print("Loading of train dataset...")
df = pd.read_parquet(TRAIN_PATH)

# Parse timestamps (ms â†’ datetime)
df["ts"] = pd.to_datetime(df["ts"], unit="ms", errors="coerce")
df["date"] = df["ts"].dt.date

# Sanity checks
print("Train shape:", df.shape)
print("ts min/max:", df["ts"].min(), df["ts"].max())
print("Colonnes:", list(df.columns))

Loading of train dataset...
Train shape: (17499636, 20)
ts min/max: 2018-10-01 00:00:01 2018-11-20 00:00:00
Colonnes: ['status', 'gender', 'firstName', 'level', 'lastName', 'userId', 'ts', 'auth', 'page', 'sessionId', 'location', 'itemInSession', 'userAgent', 'method', 'length', 'song', 'artist', 'time', 'registration', 'date']


In [13]:
# Build snapshot dates (T0)
min_ts = df["ts"].min()
max_ts = df["ts"].max()

start_T0 = min_ts + pd.Timedelta(days=BUFFER_DAYS)
end_T0   = max_ts - pd.Timedelta(days=HORIZON_DAYS)

T0_list = pd.date_range(start=start_T0, end=end_T0, freq=SNAPSHOT_FREQ)

print("Nb snapshots:", len(T0_list))
print("T0_list:", list(T0_list))

Nb snapshots: 5
T0_list: [Timestamp('2018-10-11 00:00:01'), Timestamp('2018-10-18 00:00:01'), Timestamp('2018-10-25 00:00:01'), Timestamp('2018-11-01 00:00:01'), Timestamp('2018-11-08 00:00:01')]


In [14]:
# Helpers used to build user-level features at each snapshot date
# (shared logic for train + test)
def _is_song_event(d: pd.DataFrame) -> pd.Series:
    """Return a boolean mask for rows that correspond to a song listening event."""
    if "page" in d.columns:
        return d["page"].eq("NextSong")
    if "length" in d.columns:
        return d["length"].notna()
    return pd.Series(False, index=d.index)

def _detect_os(user_agent) -> str:
    """Rough OS detection from userAgent string (kept intentionally simple)."""
    if pd.isna(user_agent):
        return "Unknown"
    ua = str(user_agent)
    if ("Mac" in ua) or ("iPhone" in ua) or ("iPad" in ua):
        return "Apple"
    if "Windows" in ua:
        return "Windows"
    if "Linux" in ua:
        return "Linux"
    return "Other"

def finalize_features(df: pd.DataFrame, exclude_cols=None) -> pd.DataFrame:
    """
    Final cleanup step applied consistently on both train snapshots and test matrix:
    - one-hot encode object/category columns (except excluded)
    - bool -> int
    - replace inf with nan and fill numeric nan with 0
    """
    exclude_cols = set(exclude_cols or [])
    out = df.copy()

    # bool -> int (only for non-excluded columns)
    for c in out.select_dtypes(include=["bool"]).columns:
        if c not in exclude_cols:
            out[c] = out[c].astype(int)

    # one-hot for categoricals (only for non-excluded columns)
    cat_cols = [c for c in out.select_dtypes(include=["object", "category"]).columns if c not in exclude_cols]
    if cat_cols:
        out = pd.get_dummies(out, columns=cat_cols, dummy_na=True)

    # numeric cleaning only (avoid touching datetime like snapshot_time)
    num_cols = out.select_dtypes(include=[np.number]).columns.tolist()
    if num_cols:
        out[num_cols] = out[num_cols].replace([np.inf, -np.inf], np.nan).fillna(0)

    return out

def build_window_stats(obs: pd.DataFrame, T0: pd.Timestamp, window_days: int, suffix: str) -> pd.DataFrame:
    """Aggregate user activity over a recent time window ending at T0."""
    start = T0 - pd.Timedelta(days=window_days)
    win = obs[obs["ts"] >= start].copy()
    if win.empty:
        return pd.DataFrame({"userId": obs["userId"].unique()})

    song_mask = _is_song_event(win)
    win_songs = win[song_mask].copy()

    agg = win.groupby("userId").agg(
        events_count=("ts", "count"),
        sessions=("sessionId", "nunique") if "sessionId" in win.columns else ("ts", "count"),
        active_days=("date", "nunique") if "date" in win.columns else ("ts", "count"),
    ).reset_index()

    if "length" in win_songs.columns:
        lt = win_songs.groupby("userId")["length"].sum().reset_index(name="listen_time")
        agg = agg.merge(lt, on="userId", how="left")
    else:
        agg["listen_time"] = 0.0

    if "artist" in win_songs.columns:
        ua = win_songs.groupby("userId")["artist"].nunique().reset_index(name="uniq_artists")
        agg = agg.merge(ua, on="userId", how="left")
    else:
        agg["uniq_artists"] = 0

    if "song" in win_songs.columns:
        us = win_songs.groupby("userId")["song"].nunique().reset_index(name="uniq_songs")
        agg = agg.merge(us, on="userId", how="left")
    else:
        agg["uniq_songs"] = 0

    # NB: here events_count = all events, not only songs. Kept as-is for consistency with the pipeline.
    agg["listen_per_active_day"] = agg["listen_time"] / (agg["active_days"] + 1e-6)
    agg["songs_per_session"] = agg["events_count"] / (agg["sessions"] + 1e-6)

    rename = {c: f"{c}_{suffix}" for c in agg.columns if c != "userId"}
    agg = agg.rename(columns=rename).fillna(0)
    return agg

def add_page_counts(obs: pd.DataFrame, users) -> pd.DataFrame:
    """Counts of key actions/pages per user."""
    if "page" not in obs.columns:
        return pd.DataFrame({"userId": users})

    page_counts = pd.pivot_table(
        obs, index="userId", columns="page", values="ts", aggfunc="count", fill_value=0
    ).reset_index()

    keep = ["userId"] + [p for p in KEY_PAGES if p in page_counts.columns]
    out = page_counts[keep].copy()

    if ("Thumbs Up" in out.columns) and ("Thumbs Down" in out.columns):
        out["satisfaction_ratio"] = out["Thumbs Up"] / (out["Thumbs Down"] + 1)
    if "Roll Advert" in out.columns:
        out["ad_events"] = out["Roll Advert"]
    if "Error" in out.columns:
        out["error_events"] = out["Error"]

    return out

def add_recency_features(obs: pd.DataFrame, T0: pd.Timestamp, users) -> pd.DataFrame:
    """Days since last key page event per user."""
    out = pd.DataFrame({"userId": users})
    for p in KEY_PAGES:
        colname = f"recency_{p.replace(' ','_').lower()}"
        if "page" not in obs.columns:
            out[colname] = 999
            continue
        last = obs[obs["page"] == p].groupby("userId")["ts"].max().reset_index(name="last")
        last[colname] = (T0 - last["last"]).dt.total_seconds() / 86400.0
        last = last.drop(columns=["last"])
        out = out.merge(last, on="userId", how="left")
        out[colname] = out[colname].fillna(999)
    return out

def add_session_stats(obs: pd.DataFrame) -> pd.DataFrame:
    """Session-level aggregates (duration, activity dispersion)."""
    if "sessionId" not in obs.columns:
        return pd.DataFrame({"userId": obs["userId"].unique()})

    g = obs.groupby(["userId", "sessionId"]).agg(
        sess_events=("ts", "count"),
        sess_start=("ts", "min"),
        sess_end=("ts", "max"),
        sess_listen=("length", "sum") if "length" in obs.columns else ("ts", "count")
    ).reset_index()

    g["sess_duration_min"] = (g["sess_end"] - g["sess_start"]).dt.total_seconds() / 60.0

    agg = g.groupby("userId").agg(
        sess_events_mean=("sess_events", "mean"),
        sess_events_std=("sess_events", "std"),
        sess_duration_mean=("sess_duration_min", "mean"),
        sess_duration_std=("sess_duration_min", "std"),
        sess_listen_mean=("sess_listen", "mean"),
        sess_listen_std=("sess_listen", "std"),
        sess_listen_max=("sess_listen", "max"),
    ).reset_index()

    return agg.fillna(0)

def add_level_features(obs: pd.DataFrame, users) -> pd.DataFrame:
    """Subscription level features (last level, number of changes)."""
    if "level" not in obs.columns:
        return pd.DataFrame({"userId": users})

    last_level = obs.sort_values("ts").groupby("userId")["level"].last().reset_index(name="level_last")
    changes = obs.sort_values("ts").groupby("userId")["level"].apply(lambda s: (s != s.shift(1)).sum()).reset_index(name="level_changes")

    out = pd.DataFrame({"userId": users}).merge(last_level, on="userId", how="left").merge(changes, on="userId", how="left")
    return out

def add_demo_features(obs: pd.DataFrame, users) -> pd.DataFrame:
    """Simple demographic features (last known gender)."""
    out = pd.DataFrame({"userId": users})
    for col in ["gender"]:
        if col in obs.columns:
            last = obs.sort_values("ts").groupby("userId")[col].last().reset_index(name=f"{col}_last")
            out = out.merge(last, on="userId", how="left")
    return out


In [15]:

# compute_snapshot (TRAIN)
def compute_snapshot_train(df_all, T0):
    T0 = pd.Timestamp(T0)
    obs = df_all[df_all["ts"] <= T0].copy()

    # futur for label
    future = df_all[(df_all["ts"] > T0) & (df_all["ts"] <= T0 + pd.Timedelta(days=HORIZON_DAYS))].copy()

    # remove past churners
    if "page" in obs.columns:
        past_churners = obs[obs["page"] == "Cancellation Confirmation"]["userId"].unique()
        obs = obs[~obs["userId"].isin(past_churners)]

    users = obs["userId"].unique()

    # target (churn in future window)
    churn_future = []
    if "page" in future.columns:
        churn_future = future[future["page"] == "Cancellation Confirmation"]["userId"].unique()

    target_df = pd.DataFrame({"userId": users})
    target_df["target"] = target_df["userId"].isin(churn_future).astype(int)

    # song mask for listening stats
    song_mask = _is_song_event(obs)
    obs_songs = obs[song_mask].copy()

    # Global core
    global_feats = obs.groupby("userId").agg(
        n_active_days=("date","nunique") if "date" in obs.columns else ("ts","count"),
        n_sessions=("sessionId","nunique") if "sessionId" in obs.columns else ("ts","count"),
        n_events=("ts","count"),
    ).reset_index()

    if "length" in obs_songs.columns:
        lt = obs_songs.groupby("userId")["length"].sum().reset_index(name="total_listening_time")
        global_feats = global_feats.merge(lt, on="userId", how="left")
    else:
        global_feats["total_listening_time"] = 0.0

    if "artist" in obs_songs.columns:
        ua = obs_songs.groupby("userId")["artist"].nunique().reset_index(name="uniq_artists_global")
        global_feats = global_feats.merge(ua, on="userId", how="left")
    else:
        global_feats["uniq_artists_global"] = 0

    if "song" in obs_songs.columns:
        us = obs_songs.groupby("userId")["song"].nunique().reset_index(name="uniq_songs_global")
        global_feats = global_feats.merge(us, on="userId", how="left")
    else:
        global_feats["uniq_songs_global"] = 0

    # recency/account age
    last_ts = obs.groupby("userId")["ts"].max().reset_index(name="last_ts")
    global_feats = global_feats.merge(last_ts, on="userId", how="left")

    if "registration" in obs.columns:
        reg = obs.groupby("userId")["registration"].min().reset_index(name="registration_ts")
        reg["registration_ts"] = pd.to_datetime(reg["registration_ts"], unit="ms", errors="coerce")
        global_feats = global_feats.merge(reg, on="userId", how="left")
        global_feats["account_age_days"] = (T0 - global_feats["registration_ts"]).dt.total_seconds() / 86400.0
    else:
        global_feats["account_age_days"] = np.nan

    global_feats["recency_days"] = (T0 - global_feats["last_ts"]).dt.total_seconds() / 86400.0
    global_feats["avg_daily_listen"] = global_feats["total_listening_time"] / (global_feats["account_age_days"].fillna(0) + 1)

    # ratios
    global_feats["sessions_per_day"] = global_feats["n_sessions"] / (global_feats["n_active_days"] + 1e-6)
    global_feats["events_per_session"] = global_feats["n_events"] / (global_feats["n_sessions"] + 1e-6)
    global_feats["uniq_songs_per_day"] = global_feats["uniq_songs_global"] / (global_feats["n_active_days"] + 1e-6)

    # drop raw timestamps to avoid datetime issues downstream
    global_feats = global_feats.drop(columns=[c for c in ["last_ts","registration_ts"] if c in global_feats.columns])

    # windows
    windows_df = pd.DataFrame({"userId": users})
    for w in WINDOWS_DAYS:
        windows_df = windows_df.merge(build_window_stats(obs, T0, w, f"{w}d"), on="userId", how="left")
    windows_df = windows_df.fillna(0)

    if "listen_time_7d" in windows_df.columns and "listen_time_14d" in windows_df.columns:
        windows_df["ratio_listen_7d_14d"] = windows_df["listen_time_7d"] / (windows_df["listen_time_14d"] + 1)
    if "listen_time_3d" in windows_df.columns and "listen_time_14d" in windows_df.columns:
        windows_df["ratio_listen_3d_14d"] = windows_df["listen_time_3d"] / (windows_df["listen_time_14d"] + 1)

    # behavior + recency pages
    behavior_df = add_page_counts(obs, users)
    recency_df  = add_recency_features(obs, T0, users)

    # trend: recent 14d / global
    recent = obs[obs["ts"] >= (T0 - pd.Timedelta(days=14))].copy()
    if not recent.empty and "length" in recent.columns:
        recent_songs = recent[_is_song_event(recent)]
        recent_listen = recent_songs.groupby("userId")["length"].sum().reset_index(name="listen_time_recent_14d")
    else:
        recent_listen = pd.DataFrame({"userId": users, "listen_time_recent_14d": 0.0})

    trends = pd.DataFrame({"userId": users}).merge(recent_listen, on="userId", how="left").fillna(0)
    trends = trends.merge(global_feats[["userId","avg_daily_listen"]], on="userId", how="left").fillna(0)
    trends["avg_daily_listen_recent_14d"] = trends["listen_time_recent_14d"] / 14.0
    trends["trend_listening"] = trends["avg_daily_listen_recent_14d"] / (trends["avg_daily_listen"] + 1e-6)
    trends = trends[["userId","trend_listening"]]

    # session stats
    session_stats = add_session_stats(obs)

    # tech OS
    if "userAgent" in obs.columns:
        last_agent = obs.sort_values("ts").groupby("userId")["userAgent"].last().reset_index()
        last_agent["os_type"] = last_agent["userAgent"].apply(_detect_os)
        tech = pd.get_dummies(last_agent[["userId","os_type"]], columns=["os_type"], prefix="os")
    else:
        tech = pd.DataFrame({"userId": users})

    # level + demo
    level_df = add_level_features(obs, users)
    demo_df  = add_demo_features(obs, users)

    # merge
    snap = (target_df
        .merge(global_feats, on="userId", how="left")
        .merge(windows_df, on="userId", how="left")
        .merge(behavior_df, on="userId", how="left")
        .merge(recency_df, on="userId", how="left")
        .merge(trends, on="userId", how="left")
        .merge(session_stats, on="userId", how="left")
        .merge(tech, on="userId", how="left")
        .merge(level_df, on="userId", how="left")
        .merge(demo_df, on="userId", how="left")
    )

    snap["snapshot_time"] = T0

    #single consistent finalization step
    snap = finalize_features(snap, exclude_cols=["userId", "snapshot_time", "target"])

    return snap

print(" compute_snapshot_train ready.")

 compute_snapshot_train ready.


In [16]:
# Build all snapshots and export training table
snapshots = []
for t0 in T0_list:
    print("Building snapshot", t0)
    snapshots.append(compute_snapshot_train(df, t0))

final = pd.concat(snapshots, ignore_index=True)

print("Final dataset:", final.shape)
print("Unique snapshot_time:", final["snapshot_time"].nunique())
print("target rate:", final["target"].mean())

final.to_parquet(OUTPUT_PATH, index=False)
print(" Saved ->", OUTPUT_PATH)

Building snapshot 2018-10-11 00:00:01
Building snapshot 2018-10-18 00:00:01
Building snapshot 2018-10-25 00:00:01
Building snapshot 2018-11-01 00:00:01
Building snapshot 2018-11-08 00:00:01
Final dataset: (75863, 86)
Unique snapshot_time: 5
target rate: 0.05157982152037225
 Saved -> train_features_v2.parquet
