# Telco Churn — Spark Demo Notebook

Демо к репозиторию: ETL → фичи → агрегаты.  
Логика в `src/`, ноут — для быстрых проверок и обзора данн.



## Импорты и конфиг

In [14]:
import sys, yaml

from copy import deepcopy
from pathlib import Path
from src.etl import read_table, basic_clean
from src.features import add_features, group_report
from src.job import main as run_job
from src.session import build_spark

In [6]:
# Найти корень репозитория (ищет configs/config.yaml и src/)
def find_repo_root(start: Path) -> Path:
    for p in [start, *start.parents]:
        if (p / "configs" / "config.yaml").exists() and (p / "src").exists():
            return p
    return start

ROOT = find_repo_root(Path.cwd())
CONFIG = ROOT / "configs" / "config.yaml"

if str(ROOT) not in sys.path:
    sys.path.append(str(ROOT))

print("ROOT:", ROOT)
print("CONFIG exists:", CONFIG.exists())

ROOT: D:\ML\LS\MLJ\Infrastructure_for_ML\spark-analytics-starter
CONFIG exists: True


## старт Spark

In [7]:
with open(CONFIG, "r", encoding="utf-8") as f:
    cfg = yaml.safe_load(f)

spark = build_spark(
    cfg.get("app_name", "SparkTelcoChurn"),
    cfg.get("shuffle_partitions", 8)
)
spark

## загрузка

In [10]:
data_path = (ROOT / cfg["input"]["path"]).resolve().as_posix()
print("DATA PATH:", data_path)

raw = read_table(
    spark,
    path=data_path, 
    fmt=cfg["input"].get("fmt", "csv"),
    header=cfg["input"].get("header", True),
    infer_schema=cfg["input"].get("infer_schema", True),
)

print("rows:", raw.count())
raw.printSchema()
raw.show(5, truncate=False)

DATA PATH: D:/ML/LS/MLJ/Infrastructure_for_ML/spark-analytics-starter/data/WA_Fn-UseC_-Telco-Customer-Churn.csv
rows: 7043
root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = tr

## фичи и быстрые проверки

In [12]:
feat = add_features(basic_clean(raw))
feat.printSchema()
feat.select("MonthlyCharges","MonthlyCharges_log1p","TotalCharges","TotalCharges_double","TenureBucket","MCharges_per_Tenure").show(5)

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)
 |-- TotalCharges_double: double (nullable = true)
 |-- MonthlyCharges_double: dou

## отчёт по группам

In [13]:
grp = group_report(feat, tuple(cfg["report"]["group_by"]))
grp.orderBy("n", ascending=False).show(10, truncate=False)

+--------------+---------------+-----+----+--------------------------+-------------------------+-------------------------+------------------------+-----------------------+-----------------------+
|Contract      |InternetService|Churn|n   |MonthlyCharges_double_mean|MonthlyCharges_double_std|MonthlyCharges_double_p50|TotalCharges_double_mean|TotalCharges_double_std|TotalCharges_double_p50|
+--------------+---------------+-----+----+--------------------------+-------------------------+-------------------------+------------------------+-----------------------+-----------------------+
|Month-to-month|Fiber optic    |Yes  |1162|86.47332185886413         |11.110322891071286       |85.35                    |1490.2345094664367      |1615.725205175718      |865.55                 |
|Month-to-month|Fiber optic    |No   |966 |87.68022774327133         |11.27311682973646        |87.8                     |2546.4803830227743      |1922.8267270195415     |2151.6                 |
|Month-to-month|DSL 

## запуск job из ноута

In [16]:
tmp_cfg = deepcopy(cfg)

# Абсолютные пути для input и output
tmp_cfg["input"]["path"]  = (ROOT / cfg["input"]["path"]).resolve().as_posix()
tmp_cfg["output"]["dir"]  = (ROOT / cfg["output"]["dir"]).resolve().as_posix()

TMP_CONFIG = ROOT / "configs" / "_tmp_config_abs.yaml"
with open(TMP_CONFIG, "w", encoding="utf-8") as f:
    yaml.safe_dump(tmp_cfg, f, allow_unicode=True)

from src.job import main as run_job
run_job(str(TMP_CONFIG))

sorted([p.as_posix() for p in (ROOT / "artifacts").glob("*/")])

['D:/ML/LS/MLJ/Infrastructure_for_ML/spark-analytics-starter/artifacts/features',
 'D:/ML/LS/MLJ/Infrastructure_for_ML/spark-analytics-starter/artifacts/report']

## остановка Spark

In [18]:
spark.stop()