In [34]:
# ================================================================
# LTV Prediction Case Study – Codeway
# Author: Ege Aktan
# ================================================================

# ================================================================
# Imports
# ================================================================

import pandas as pd
import numpy as np
from pathlib import Path
from collections import Counter

from sklearn.model_selection import train_test_split
from sklearn.calibration import CalibratedClassifierCV
import lightgbm as lgb
import xgboost as xgb


# ================================================================
# User Input Parameters
# ================================================================
FILE_PATH = Path()
OUTPUT_PATH = Path()  


# ================================================================
# Data Loading & Preprocessing
# ================================================================
FILE_PATH = "/Users/egeaktan/Desktop/WORK/Codeway/ltvprediction_case.parquet"
df = pd.read_parquet(FILE_PATH, engine="fastparquet")

df["revenue"] = df["revenue"].fillna(0)
df["event_date"] = pd.to_datetime(df["event_date"])
df["first_event_date"] = pd.to_datetime(df["first_event_date"])
df["event_time"] = pd.to_datetime(df["event_time"], unit="ms")   
df["event_hour"] = df["event_time"].dt.hour
df["tenure_days_row"] = (df["event_date"] - df["first_event_date"]).dt.days
df["days_since_install"] = (df["event_date"] - df["first_event_date"]).dt.days

df["country"] = df["country"].fillna("UNKNOWN")
df.loc[df["operating_system"] == "ios", "operating_system"] = "iOS"
bad_users = (
    df.groupby("user_id")["first_event_date"]
    .nunique()
    .reset_index()
    .query("first_event_date > 1")["user_id"]
)
df = df[~df["user_id"].isin(bad_users)].copy()

In [35]:
# ================================================================
# Feature Generation
# ================================================================
journey_table = (
    df.pivot_table(index="user_id", columns="event_name", values="event_date",
                     aggfunc="count", fill_value=0)
    .reset_index()
)

journey_table.loc[
    (journey_table.get("auto_renew_off", 0) > 0) |
    (journey_table.get("renewal", 0) > 0) |
    (journey_table.get("refund", 0) > 0),
    "subscribe"
] = journey_table.get("subscribe", 0).where(journey_table.get("subscribe", 0) > 0, 1)

binary_event_cols =["auto_renew_off", "renewal", "refund", "subscribe", "free_trial"]
journey_binary = journey_table.copy()
journey_binary[binary_event_cols] = (journey_binary[binary_event_cols] > 0).astype(int)

tenure_df = (
    df.groupby("user_id").agg(
        app_launch_date=("event_date", "min"),
        last_event_date=("event_date", "max"),
        install_date=("first_event_date", "min"),
    )
    .reset_index()
)
tenure_df["tenure_days"] = (tenure_df["last_event_date"] - tenure_df["install_date"]).dt.days
tenure_df["days_until_app_launch"] = (tenure_df["app_launch_date"] - tenure_df["install_date"]).dt.days

df_sorted = df.sort_values(["user_id", "event_time"])[["user_id","event_time"]]
df_sorted["time_diff_min"] = (
    df_sorted.groupby("user_id")["event_time"]
             .diff() / pd.Timedelta(minutes=1)
)

avg_inter_event = (
    df_sorted.groupby("user_id")["time_diff_min"]
    .mean()
    .reset_index(name="avg_inter_event_time")
)

country_user_stats = (
    df.groupby(["user_id"]).agg(
        country = ("country", "first"),
        country_user_rev=("revenue", "sum")
        )
      .reset_index()
)

country_stats = (
    country_user_stats.groupby(["country"]).agg(
        country_avg_rev=("country_user_rev","mean"),
        country_std_rev=("country_user_rev","std")
        )
        .reset_index()
)

country_stats["country_var_rev"] = (
    country_stats["country_avg_rev"] / country_stats["country_std_rev"]
)

country_user_stats = country_user_stats[["user_id", "country"]].merge(country_stats, on= "country", how="left")

user_os = df[["user_id", "operating_system"]].groupby("user_id").first()["operating_system"]

ios_flag = (user_os.isin(["iOS", "iPadOS"])).astype(int).reset_index(name="ios_flag")

df_daily_max_events = (
    df.groupby(["user_id", "event_date"]).size().groupby("user_id").max()
      .reset_index(name="daily_max_events")
)

subs_first = (
    df[df["event_name"].isin(["subscribe", "auto_renew_off", "renewal", "refund"])]
      .groupby("user_id")["event_time"].min()
      .reset_index(name="first_subscribe_time")
)

installs = (
    df.groupby("user_id")["first_event_date"].min()
      .reset_index(name="install_date")
)
installs["install_date"] = pd.to_datetime(installs["install_date"])
time_to_first_subscribe = subs_first.merge(installs, on="user_id", how="left")
time_to_first_subscribe["time_to_first_subscribe"] = (
    time_to_first_subscribe["first_subscribe_time"] - time_to_first_subscribe["install_date"]
).dt.days
time_to_first_subscribe = time_to_first_subscribe[["user_id", "time_to_first_subscribe"]]

revenue_15days = (df.groupby("user_id")["revenue"].sum().reset_index(name="revenue_15days"))


trial_first = (
    df[df["event_name"] == "free_trial"]
      .groupby("user_id")["event_time"].min()
      .reset_index(name="first_trial_time")
)

time_to_first_trial = trial_first.merge(installs, on="user_id", how="left")
time_to_first_trial["time_to_first_trial"] = (
    time_to_first_trial["first_trial_time"] - time_to_first_trial["install_date"]
).dt.days
time_to_first_trial = time_to_first_trial[["user_id", "time_to_first_trial"]]


bins   = [-1, 7, 15, 23]
labels = ["Early", "Daytime", "Evening"]

df["event_hour_bin"] = pd.cut(df["event_hour"], bins=bins, labels=labels)
df["event_hour_bin"] = df["event_hour_bin"].astype(pd.CategoricalDtype(categories=labels, ordered=True))

event_hour_df = (
    df.groupby("user_id")["event_hour_bin"]
      .value_counts(normalize=True)
      .unstack(fill_value=0)
      .reindex(columns=labels, fill_value=0) 
      .add_prefix("ratio_")
      .reset_index()
)

agg_user = (
    df.groupby("user_id").agg(
        active_days=("event_date", "nunique"),
        events_total=("event_name", "count"),
        first_year_revenue=("first_year_revenue", "mean"),
    )
    .reset_index()
)

days_since = (df["event_time"] - df["first_event_date"]).dt.days
windows = [1, 3, 7]
out = []

for w in windows:
    s = (
        df.loc[days_since.between(0, w, inclusive="both")]
          .groupby("user_id")["revenue"].sum()
          .rename(f"revenue_d{w}")
    )
    out.append(s)

rev_windows = pd.concat(out, axis=1).fillna(0).reset_index()


In [36]:
# ================================================================
# Joining Features & Further Preprocessing
# ================================================================

master_df = (
    journey_binary
    .merge(tenure_df[["user_id", "tenure_days", "days_until_app_launch"]], on="user_id", how="left")
    .merge(agg_user, on="user_id", how="left")
    .merge(df_daily_max_events, on="user_id", how="left")
    .merge(avg_inter_event, on="user_id", how="left")
    .merge(time_to_first_subscribe, on="user_id", how="left")
    .merge(revenue_15days, on="user_id", how="left")
    .merge(time_to_first_trial, on="user_id", how="left")
    .merge(event_hour_df, on="user_id", how="left")
    .merge(rev_windows, on="user_id", how="left")
    .merge(ios_flag, on="user_id", how="left")
    .merge(country_user_stats, on="user_id", how="left")
)

bin_cols = ["auto_renew_off", "renewal", "refund", "subscribe", "free_trial"]
bin_cols = [c for c in bin_cols if c in master_df.columns]
master_df[bin_cols] = master_df[bin_cols].fillna(0).astype(int)

ratio_cols = [c for c in master_df.columns if c.startswith("ratio_")]
master_df[ratio_cols] = master_df[ratio_cols].fillna(0.0)

master_df[["revenue_d1", "revenue_d3", "revenue_d7"]] = master_df[["revenue_d1", "revenue_d3", "revenue_d7"]].fillna(0)
master_df["country_var_rev"] = master_df.country_var_rev.fillna(master_df.country_var_rev.max())

master_df = master_df[
    (master_df[["first_year_revenue"]] >= 0).all(axis=1)
]

In [37]:

import numpy as np, pandas as pd
from sklearn.model_selection import train_test_split
import lightgbm as lgb
from sklearn.calibration import CalibratedClassifierCV
from collections import Counter
import xgboost as xgb

# --- Prep ---------------------------------------------------------------
dfm = master_df.copy()
dfm = dfm[dfm["events_total"] > 0].copy()
dfm["payer_flag"] = (dfm["first_year_revenue"] > 0).astype(int)

# normalize revenue snapshots only for payers
mask = dfm["payer_flag"] > 0
for c in ["revenue_d1", "revenue_d3", "revenue_d7"]:
    if c in dfm.columns and "revenue_15days" in dfm.columns:
        dfm.loc[mask, c] = dfm.loc[mask, c] / dfm.loc[mask, "revenue_15days"]

clf_features = [
    "tenure_days","free_trial","paywall",
    "active_days","events_total","daily_max_events",
    "avg_inter_event_time","time_to_first_trial","ios_flag",
    "ratio_Evening","ratio_Early","ratio_Daytime","country_var_rev"
]
clf_features = [c for c in clf_features if c in dfm.columns]

reg_features = [
    "revenue_15days","tenure_days","revenue_d7",
    "avg_inter_event_time","time_to_first_subscribe",
    "revenue_d1","ratio_Evening","ratio_Early","ratio_Daytime",
    "daily_max_events","events_total","active_days","country_avg_rev","ios_flag"
]
reg_features = [c for c in reg_features if c in dfm.columns]

def sanitize(X: pd.DataFrame) -> pd.DataFrame:
    """Ensure clean floats for ML."""
    return X.replace([np.inf, -np.inf], np.nan).astype(np.float32)

dfm.loc[:, clf_features] = sanitize(dfm[clf_features])
dfm.loc[:, reg_features] = sanitize(dfm[reg_features])

idx_tr, idx_hold = train_test_split(
    dfm.index, test_size=0.20, stratify=dfm["payer_flag"], random_state=42
)
df_tr, df_hold = dfm.loc[idx_tr].copy(), dfm.loc[idx_hold].copy()

# =========================
# Stage 1: Multiclass classifier + calibration
# =========================
pos_rev = df_tr.loc[df_tr["first_year_revenue"] > 0, "first_year_revenue"]
whale_cut = np.percentile(pos_rev, 80) if len(pos_rev) > 0 else 0.0  

def label_multiclass(r):
    if r <= 0: return 0
    return 2 if r >= whale_cut else 1

df_tr["y_cls"] = df_tr["first_year_revenue"].apply(label_multiclass).astype(int)

Xc_tr, Xc_cal, yc_tr, yc_cal = train_test_split(
    df_tr[clf_features], df_tr["y_cls"],
    test_size=0.20, stratify=df_tr["y_cls"], random_state=42
)

cls_counts = Counter(yc_tr)
total = len(yc_tr)
class_weight = {k: total / (3 * cls_counts.get(k, 1)) for k in [0, 1, 2]}

clf_base = lgb.LGBMClassifier(
    objective="multiclass",
    num_class=3,
    n_estimators=400,
    learning_rate=0.08,
    num_leaves=31,
    max_bin=63,
    subsample=0.8,
    colsample_bytree=0.8,
    reg_lambda=1.0,
    class_weight=class_weight,
    n_jobs=-1,
    random_state=42,
)
clf_base.fit(
    Xc_tr, yc_tr,
    eval_set=[(Xc_cal, yc_cal)],
    eval_metric="multi_logloss",
    callbacks=[lgb.early_stopping(50, verbose=False)]
)

cal_counts = Counter(yc_cal)

cal_clf = CalibratedClassifierCV(
    estimator=clf_base,
    method="sigmoid" ,
    cv="prefit"
)
cal_clf.fit(Xc_cal, yc_cal)

proba_tr = cal_clf.predict_proba(df_tr[clf_features])
df_tr["p_nonpayer"] = proba_tr[:, 0]
df_tr["p_regular"]  = proba_tr[:, 1]
df_tr["p_whale"]    = proba_tr[:, 2]
df_tr["p_payer"]    = df_tr["p_regular"] + df_tr["p_whale"]

proba_hold = cal_clf.predict_proba(df_hold[clf_features])
df_hold["p_nonpayer"] = proba_hold[:, 0]
df_hold["p_regular"]  = proba_hold[:, 1]
df_hold["p_whale"]    = proba_hold[:, 2]
df_hold["p_payer"]    = df_hold["p_regular"] + df_hold["p_whale"]


reg_plus = reg_features + ["p_whale"]
pay_tr = df_tr[df_tr["first_year_revenue"] > 0].copy()


y_raw = pay_tr["first_year_revenue"].values
cap99 = np.percentile(y_raw, 99)
y = np.clip(y_raw, 0, cap99)

Xr_tr, Xr_val, yr_tr, yr_val = train_test_split(
    pay_tr[reg_plus], y, test_size=0.20, random_state=42
)

dtrain = xgb.DMatrix(Xr_tr, label=yr_tr)
dvalid = xgb.DMatrix(Xr_val, label=yr_val)

params = {
    "objective": "reg:squarederror",  
    "eval_metric": "rmse",
    "eta": 0.05,          
    "max_depth": 7,      
    "min_child_weight": 3,
    "subsample": 0.9,
    "colsample_bytree": 0.9,
    "lambda": 1.0,        
    "alpha": 0.0,        
    "seed": 42,
}

bst = xgb.train(
    params,
    dtrain,
    num_boost_round=1000,
    evals=[(dvalid, "valid")],
    early_stopping_rounds=50,
    verbose_eval=False,
)

X_hold_reg = xgb.DMatrix(df_hold[reg_plus])
df_hold["amount_hat"] = np.clip(bst.predict(X_hold_reg), 0, None)

df_hold["expected_ltv"] = df_hold["p_payer"] * df_hold["amount_hat"]

[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.058793 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 554
[LightGBM] [Info] Number of data points in the train set: 1407878, number of used features: 12
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612
[LightGBM] [Info] Start training from score -1.098612




In [38]:
df_all = dfm.copy()

proba = cal_clf.predict_proba(df_all[clf_features])
df_all["p_nonpayer"] = proba[:, 0]
df_all["p_regular"]  = proba[:, 1]
df_all["p_whale"]    = proba[:, 2]
df_all["p_payer"]    = df_all["p_regular"] + df_all["p_whale"]

X_hold_reg = xgb.DMatrix(df_all[reg_plus])
df_all["amount_hat"] = np.clip(bst.predict(X_hold_reg), 0, None)
df_all["expected_ltv"] = df_all["p_payer"] * df_all["amount_hat"]

df_all = df_all.merge(tenure_df[["user_id", "install_date"]], on="user_id", how="left")

df_result = df_all[["user_id", "country", "install_date", "expected_ltv"]].rename(
    columns={"install_date": "first_event_date",
             "expected_ltv": "predicted_ltv"}
)

output_file = OUTPUT_PATH / "predicted_ltv.parquet"
df_result.to_parquet(output_file, index=False, engine="fastparquet")
