# gold_churn_predictions (Daily Job Final)

이 노트북은 `dlt_gold_user_behavior_snapshot`(T-1 스냅샷)을 기반으로
하루 1회 Churn 예측을 수행하여 `gold_churn_predictions` 테이블에 **해당 날짜(event_date)만** 갱신합니다.

- 기본 실행: `score_date` 미지정 → **T-1(어제)** 기준(없으면 T-1 이하 최신 날짜로 fallback)
- 백필(backfill): `score_date=YYYY-MM-DD` 지정 → 해당 날짜만 재계산/갱신
- 저장 방식: `replaceWhere`로 **event_date=score_date** 파티션만 overwrite → 과거 데이터는 유지(누적)

> 모델 로드는 팀 기존 방식 유지: `models:/...@latest`


In [0]:
# %pip install -U mlflow xgboost scikit-learn
# dbutils.library.restartPython()

Collecting mlflow
  Downloading mlflow-3.9.0-py3-none-any.whl (9.7 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 9.7/9.7 MB 63.5 MB/s eta 0:00:00
Collecting xgboost
  Downloading xgboost-3.2.0-py3-none-manylinux_2_28_x86_64.whl (131.7 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 131.7/131.7 MB 20.2 MB/s eta 0:00:00
Collecting scikit-learn
  Downloading scikit_learn-1.7.2-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (9.7 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 9.7/9.7 MB 148.4 MB/s eta 0:00:00
Collecting Flask<4
  Downloading flask-3.1.2-py3-none-any.whl (103 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 103.3/103.3 kB 32.5 MB/s eta 0:00:00
Collecting gunicorn<24
  Downloading gunicorn-23.0.0-py3-none-any.whl (85 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 85.0/85.0 kB 34.5 MB/s eta 0:00:00
Collecting alembic!=1.10.0,<2
  Downloading alembic-1.18.4-py3-none-any.whl (263 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 263.9/263.9 kB 79.3 MB/s eta 0:

In [0]:
import mlflow.xgboost
from pyspark.sql import functions as F
import pandas as pd
import numpy as np

# =========================
# 0) Config
# =========================
CATALOG = "signalcraft_databricks"
SCHEMA  = "default"

SNAPSHOT_TABLE = f"{CATALOG}.{SCHEMA}.dlt_gold_user_behavior_snapshot"
OUT_TABLE      = f"{CATALOG}.{SCHEMA}.gold_churn_predictions"

# 모델 로드 (팀 기존 방식 유지)
mlflow.set_registry_uri("databricks-uc")
model_name = f"{CATALOG}.{SCHEMA}.churn_predictor"
model_uri  = f"models:/{model_name}@latest"   # ✅ 유지

# =========================
# 1) score_date 결정 (기본: T-1)
# =========================
dbutils.widgets.text("score_date", "")
param_date = dbutils.widgets.get("score_date").strip()

# 원칙적으로 T-1(어제) 사용
t1 = spark.sql("SELECT date_sub(current_date(), 1) AS d").first()["d"]

snapshot_df = spark.table(SNAPSHOT_TABLE)

if param_date:
    score_date = param_date
else:
    # T-1이 없을 수 있으니 "T-1 이하"에서 가장 최신 날짜로 fallback
    score_date = (snapshot_df
                  .filter(F.col("event_date") <= F.lit(t1))
                  .agg(F.max("event_date").alias("max_d"))
                  .first()["max_d"])
    if score_date is None:
        score_date = t1

print(f"📅 추론 기준일(score_date): {score_date}")

# =========================
# 2) 모델 로드
# =========================
model = mlflow.xgboost.load_model(model_uri)
print(f"✅ Model loaded: {model_uri}")

# =========================
# 3) 대상 데이터 로드 (해당 날짜 & Active)
# =========================
required_cols = {
    "user_id","event_date","is_active",
    "daily_watch_time_min","watch_time_7d_min","watch_time_30d_min",
    "active_days_7","active_days_30","days_since_last_login",
    "churn_reason","churn_risk_level"
}
missing = sorted(list(required_cols - set(snapshot_df.columns)))
if missing:
    raise ValueError(f"Snapshot table missing required columns: {missing}")

features_df = (snapshot_df
               .filter((F.col("event_date") == F.lit(score_date)) & (F.col("is_active") == 1)))

target_cnt = features_df.count()
print(f"👥 실질 추론 대상(Active): {target_cnt}명")

if target_cnt == 0:
    raise RuntimeError(f"No active users found for score_date={score_date}. Job will fail fast.")

# =========================
# 4) 피처 엔지니어링 + Pandas 변환
# =========================
inference_ready_df = (features_df
    .withColumn("watch_time_ratio", F.col("watch_time_7d_min") / (F.col("watch_time_30d_min") / 4 + F.lit(1)))
    .withColumn("active_days_ratio", F.col("active_days_7") / (F.col("active_days_30") / 4 + F.lit(1)))
)

pdf = inference_ready_df.select(
    "user_id","event_date",
    "daily_watch_time_min","watch_time_7d_min","watch_time_30d_min",
    "active_days_7","active_days_30","days_since_last_login",
    "watch_time_ratio","active_days_ratio",
    "churn_reason","churn_risk_level"
).toPandas()

# =========================
# 5) 범주형 인코딩 (학습 시 정의 순서 고정)
# =========================
from sklearn.preprocessing import LabelEncoder

# churn_reason
le_reason = LabelEncoder()
le_reason.fit(['prechurned', 'data_gap', 'onboarding_fail', 'silent_decay', 'normal'])
if pdf["churn_reason"].isna().any():
    # 운영 안정성: 결측이 있으면 원인 파악이 우선이므로 실패 처리
    raise RuntimeError("Found null churn_reason values in snapshot. Please fix upstream data.")
pdf["churn_reason_encoded"] = le_reason.transform(pdf["churn_reason"])

# churn_risk_level
le_risk = LabelEncoder()
le_risk.fit(['Active', 'Soft Churn', 'Dormant', 'Churned'])
if pdf["churn_risk_level"].isna().any():
    raise RuntimeError("Found null churn_risk_level values in snapshot. Please fix upstream data.")
pdf["risk_level_encoded"] = le_risk.transform(pdf["churn_risk_level"])

# =========================
# 6) 피처 컬럼 정의 (학습 순서와 동일해야 함)
# =========================
feature_cols = [
    'daily_watch_time_min', 'watch_time_7d_min', 'watch_time_30d_min',
    'active_days_7', 'active_days_30', 'days_since_last_login',
    'watch_time_ratio', 'active_days_ratio',
    'churn_reason_encoded', 'risk_level_encoded'
]

# 결측 처리(최소 안전장치): 숫자 결측은 0으로 채움 (필요시 팀 방식으로 변경)
pdf[feature_cols] = pdf[feature_cols].replace([np.inf, -np.inf], np.nan)
pdf[feature_cols] = pdf[feature_cols].fillna(0)

# =========================
# 7) 추론 및 probability 계산
# =========================
pdf['churn_probability'] = model.predict_proba(pdf[feature_cols])[:, 1]

# =========================
# 8) Probability Banding (팀 기존 기준 유지)
# =========================
conditions = [
    (pdf['churn_probability'] < 0.44),
    (pdf['churn_probability'] >= 0.44) & (pdf['churn_probability'] < 0.51),
    (pdf['churn_probability'] >= 0.51) & (pdf['churn_probability'] < 0.55),
    (pdf['churn_probability'] >= 0.55)
]

choices = ['Low', 'Mid', 'High', 'Critical']
pdf['probability_band'] = np.select(conditions, choices, default='Unknown')

# =========================
# 9) 결과 저장 (해당 날짜만 overwrite → 히스토리 유지)
# =========================
result_columns = ['user_id', 'event_date', 'churn_probability', 'probability_band', 'churn_reason']
result_spark_df = spark.createDataFrame(pdf[result_columns])

(result_spark_df
    .write
    .format("delta")
    .mode("overwrite")
    .option("replaceWhere", f"event_date = '{score_date}'")
    .saveAsTable(OUT_TABLE)
)

# =========================
# 10) 요약 로그
# =========================
print("✅ band distribution")
print(pdf['probability_band'].value_counts())
print(f"✅ {score_date} 기준 추론 완료! → {OUT_TABLE}")


📅 추론 기준일(score_date): 2026-02-17
✅ Model loaded: models:/signalcraft_databricks.default.churn_predictor@latest
👥 실질 추론 대상(Active): 1968명
✅ band distribution
Mid         912
Low         700
High        277
Critical     79
Name: probability_band, dtype: int64
✅ 2026-02-17 기준 추론 완료! → signalcraft_databricks.default.gold_churn_predictions


In [0]:
predictions_df = spark.table("signalcraft_databricks.default.gold_churn_predictions_bak").where("event_date = '2026-02-18'")
predictions_pdf = predictions_df.toPandas()

# 현재 활성 유저들의 실제 점수 분포 확인
predictions_stats = predictions_pdf['churn_probability'].describe(percentiles=[.25, .5, .75, .9])
print(predictions_stats)

count    1896.000000
mean        0.443554
std         0.086477
min         0.122447
25%         0.384212
50%         0.477124
75%         0.503163
90%         0.525853
max         0.712867
Name: churn_probability, dtype: float64
