In [None]:
!pip install -q feast==0.52.0 pandas==2.2.2 numpy==2.0.2 scikit-learn==1.6.1

### Imports

In [None]:
import shutil, warnings
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime, timezone, timedelta

from feast import FeatureStore, Entity, Field, FileSource, FeatureView, FeatureService
from feast.types import Int64, Float32
from feast.value_type import ValueType

from sklearn.metrics import classification_report, roc_auc_score, f1_score
from sklearn.preprocessing import LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer

warnings.filterwarnings("ignore", category=DeprecationWarning)

### Feast Repo Setup

In [None]:
REPO_DIR = Path("feast_telco_repo").absolute()
if REPO_DIR.exists():
    shutil.rmtree(REPO_DIR)
(REPO_DIR / "data").mkdir(parents=True, exist_ok=True)

(REPO_DIR / "feature_store.yaml").write_text(f"""
project: telco_churn
registry: {REPO_DIR / "registry.db"}
provider: local
offline_store:
  type: file
online_store:
  type: sqlite
  path: {REPO_DIR / "online_store.db"}
entity_key_serialization_version: 3
""".strip()+"\n")

### Fetch and Clean Dataset

In [None]:
URL = "https://raw.githubusercontent.com/aiplanethub/Datasets/master/WA_Fn-UseC_-Telco-Customer-Churn.csv"
df = pd.read_csv(URL).rename(columns={"customerID": "customer_id"}).copy()

# Convert numeric
df["TotalCharges"]   = pd.to_numeric(df["TotalCharges"], errors="coerce")
df["MonthlyCharges"] = pd.to_numeric(df["MonthlyCharges"], errors="coerce")
df["tenure"]         = pd.to_numeric(df["tenure"], errors="coerce")

# Drop NA criticals
df = df.dropna(subset=["customer_id", "tenure", "MonthlyCharges", "TotalCharges", "Churn"])

# Binary label
df["label"] = (df["Churn"].astype(str).str.strip().str.lower() == "yes").astype(int)

In [None]:
categorical_features = [
    "gender", "SeniorCitizen", "Partner", "Dependents", "PhoneService", "MultipleLines",
    "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport",
    "StreamingTV", "StreamingMovies", "Contract", "PaperlessBilling", "PaymentMethod"
]
numerical_features = ["tenure", "MonthlyCharges", "TotalCharges"]
all_features = numerical_features + categorical_features

### Simulate Feature & Label Timestamps

In [None]:
HORIZON_DAYS = 30
rng_end = datetime.now(timezone.utc)
rng_start = rng_end - timedelta(days=150)  # room for horizon
feat_span_sec = (rng_end - timedelta(days=HORIZON_DAYS) - rng_start).total_seconds()

np.random.seed(42)
df["feature_ts"] = [rng_start + timedelta(seconds=int(np.random.rand() * feat_span_sec)) for _ in range(len(df))]
df["label_ts"]   = df["feature_ts"] + timedelta(days=HORIZON_DAYS)
df["created_at"] = df["feature_ts"] + timedelta(minutes=5)  # ingestion lag

### Encode Data for Feast Storage

In [None]:
df_feast = df.copy()
for col in categorical_features:
    le = LabelEncoder()
    df_feast[col] = le.fit_transform(df_feast[col].astype(str))

### Save Feature + Label Datasets

In [None]:
features_path = REPO_DIR / "data" / "telco_features.parquet"
entity_label_path = REPO_DIR / "data" / "entity_labels.parquet"

df_feast[["customer_id", *all_features, "feature_ts", "created_at"]].rename(
    columns={"feature_ts": "event_timestamp"}
).to_parquet(features_path, index=False)

df[["customer_id", "label_ts", "label"]].rename(
    columns={"label_ts": "event_timestamp"}
).to_parquet(entity_label_path, index=False)

### Define Feast Entity, Source, and FeatureView

In [None]:
customer = Entity(
    name="customer_id",
    join_keys=["customer_id"],
    value_type=ValueType.STRING,
)

source = FileSource(
    path=str(features_path),
    timestamp_field="event_timestamp",
    created_timestamp_column="created_at",  # guards against late/backfilled data
)

# Use Float32 for numeric features, Int64 for encoded categoricals
schema = [
    Field(name=col, dtype=(Float32 if col in numerical_features else Int64))
    for col in all_features
]

customer_stats = FeatureView(
    name="customer_stats",
    entities=[customer],
    ttl=timedelta(days=365),
    schema=schema,
    source=source,
    online=True,
)

store = FeatureStore(repo_path=str(REPO_DIR))
store.apply([customer, customer_stats])

### Retrieve Point-in-Time Correct Features

In [None]:
entity_df = pd.read_parquet(entity_label_path)
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[f"customer_stats:{f}" for f in all_features],
).to_df()

training_df = training_df.dropna(subset=all_features + ["label"])
training_df["label"] = training_df["label"].astype(int)

### Time-Based Split (Leakage Safe)

In [None]:
ts = training_df["event_timestamp"]
q_train = ts.quantile(0.70)
q_val = ts.quantile(0.85)

train_df = training_df[ts <= q_train].copy()
val_df = training_df[(ts > q_train) & (ts <= q_val)].copy()
test_df = training_df[ts > q_val].copy()

X_tr, y_tr = train_df[all_features], train_df["label"]
X_va, y_va = val_df[all_features], val_df["label"]
X_te, y_te = test_df[all_features], test_df["label"]

### ML Preprocessing Pipeline

In [None]:
numeric_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="mean")),
    ("scaler", StandardScaler())
])

categorical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])

preprocessor = ColumnTransformer([
    ("num", numeric_pipeline, numerical_features),
    ("cat", categorical_pipeline, categorical_features)
])

pipeline = Pipeline([
    ("pre", preprocessor),
    ("clf", LogisticRegression(max_iter=5000, class_weight="balanced"))
])

In [None]:
pipeline.fit(X_tr, y_tr)

### Threshold Tuning

In [None]:
y_prob_va = pipeline.predict_proba(X_va)[:, 1]
thresholds = np.linspace(0.0, 1.0, 201)
best_t, best_f1 = 0.5, -1.0

for t in thresholds:
    preds = (y_prob_va >= t).astype(int)
    f1 = f1_score(y_va, preds, zero_division=0)
    if f1 > best_f1:
        best_t, best_f1 = t, f1

In [None]:
y_prob_te = pipeline.predict_proba(X_te)[:, 1]
y_pred_te = (y_prob_te >= best_t).astype(int)
print(f"\nChosen threshold (from val): {best_t:.2f}")
print(f"Test ROC AUC:   {roc_auc_score(y_te, y_prob_te):.4f}")
print("\n=== Test Classification Report ===")
print(classification_report(y_te, y_pred_te, digits=4))

### Materialize to Online Store & Online Feature Lookup (Serving Simulation)

In [None]:
store.materialize_incremental(end_date=datetime.now(timezone.utc))

svc = FeatureService(name="customer_stats_service", features=[customer_stats])
store.apply([svc])

sample_ids = training_df["customer_id"].drop_duplicates().sample(5, random_state=42).tolist()
online = store.get_online_features(
    features=svc,
    entity_rows=[{"customer_id": cid} for cid in sample_ids],
).to_dict()

print("\n=== Online features sample ===")
for i, cid in enumerate(sample_ids):
    row = {k: v[i] for k, v in online.items()}
    print(f"{cid}: {row}")