## 0. 실버 Silver 레이어의 개념

Oracle AI Data Platform의 메달리온 구조에서 실버 레이어는 브론즈에 저장된 원본 데이터를 분석과 모델링에 사용할 수 있도록 정제하는 단계입니다.

브론즈 데이터는 원본 보관 목적이기 때문에 컬럼이 많고 문자열 표현이 제각각이며 결측치나 이상값이 포함될 수 있습니다.  
실버 레이어에서는 이러한 데이터를 깨끗하고 일관된 구조로 다듬어 바로 모델 학습에 사용할 수 있도록 준비합니다.

---

## 실버 레이어의 핵심 목적

- 브론즈 데이터를 실제 분석이나 머신러닝에 적합한 형태로 변환  
- 범주형 컬럼을 숫자 기반 표현으로 변환  
- 불필요한 컬럼 제거  
- 데이터 품질을 확보하여 이후 골드 단계에서 바로 예측에 활용할 수 있는 기반 마련  

---

## 브론즈 → 실버 단계가 필요한 이유

| 브론즈 데이터 문제점            | 실버 단계에서의 처리 방향                   |
| ------------------------------ | ------------------------------------------- |
| 값 표현이 제각각이고 통일되지 않음 | 문자열 정리와 일관된 값으로 변환              |
| 범주형 문자열이 많아 모델이 처리 불가 | 인덱싱, 원핫 인코딩 등 숫자형 표현으로 변환     |
| 결측치나 이상값이 일부 존재         | 필터링 또는 대체 처리로 모델 안정성 확보        |
| 컬럼이 너무 많거나 불필요한 정보 포함 | 모델에 필요한 컬럼만 선택해 데이터 구조 단순화 |

실버 레이어는 분석과 모델링을 위한 표준 데이터셋을 만드는 단계이며 골드 레이어에서 예측 결과를 생성하기 위한 핵심 기반이 됩니다.

---

## 0-1. 이 노트북에서 수행하는 전체 흐름

이 노트북은 실버 레이어 데이터를 기반으로 고객 이탈 예측 모델을 학습하는 전체 과정을 포함합니다.

1. 실버 카탈로그에서 정제된 Telco 고객 데이터를 불러오기  
2. 범주형 컬럼 인덱싱과 원핫 인코딩 적용  
3. 수치형 컬럼과 함께 Feature Vector 생성  
4. 로지스틱 회귀 모델 구성  
5. Spark ML Pipeline으로 전체 학습 과정 묶기  
6. 데이터를 학습용과 테스트용으로 분리  
7. 모델 학습 수행  
8. 테스트 데이터로 ROC AUC 평가  
9. 모델 저장 및 재사용  

이 과정을 통해 실버 데이터를 기반으로 완전한 머신러닝 학습 파이프라인을 구축하게 됩니다.

## 1. 실습 파라미터 정의

실버 레이어의 카탈로그/스키마 정보를 설정합니다.
워크플로우에서 실행할 경우, 파라미터로도 받을 수 있도록 구성했습니다.

In [1]:
# Catalog / Schema / Table 설정
silver_catalog = "demo_telco_churn_silver"
silver_schema  = "telco_churn_historical"
silver_table   = "telco_custchurn_history_par"   # SHOW TABLES에서 확인한 실제 테이블 이름

## 2. 라이브러리 import & Spark 세션 생성

이 노트북은 PySpark 기반으로 실행됩니다.
Spark 세션을 생성하면 실습 준비가 끝납니다.

In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Initialize Spark session
spark = SparkSession.builder.appName("CustomerChurn").getOrCreate()

## 3. Silver 카탈로그에서 Telco Churn 데이터 로드

Bronze → Silver 레이어로 ETL이 끝난 정제된 데이터를 불러옵니다.
- 실버 데이터는 스키마가 잘 정리되어 있고 사용할 수 있는 상태입니다.
- 불필요한 컬럼(year)은 제거합니다.
- 로딩 후 스키마 구조를 출력해 확인합니다.

In [1]:
# Read Customer Churn History data from Silver Catalog
data = spark.read.table(f"{silver_catalog}.{silver_schema}.{silver_table}")

# Remove 'year' column (not used in model training)
data = data.drop("year")

# Inspect schema
data.printSchema()

root
 |-- customerid: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- seniorcitizen: integer (nullable = true)
 |-- partner: string (nullable = true)
 |-- dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- phoneservice: string (nullable = true)
 |-- multiplelines: string (nullable = true)
 |-- internetservice: string (nullable = true)
 |-- onlinesecurity: string (nullable = true)
 |-- onlinebackup: string (nullable = true)
 |-- deviceprotection: string (nullable = true)
 |-- techsupport: string (nullable = true)
 |-- streamingtv: string (nullable = true)
 |-- streamingmovies: string (nullable = true)
 |-- contract: string (nullable = true)
 |-- paperlessbilling: string (nullable = true)
 |-- paymentmethod: string (nullable = true)
 |-- monthlycharges: double (nullable = true)
 |-- totalcharges: double (nullable = true)
 |-- churn: string (nullable = true)



## 4. 데이터 전처리

### 실습 목표
- 범주형(categorical) 컬럼 변환
- 수치형(numeric) 컬럼 확인
- 머신러닝 모델이 학습할 수 있도록 피처 엔지니어링 수행

## 4-1. 범주형 컬럼 변환

Spark ML에서는 범주형 컬럼을 그대로 사용할 수 없기 때문에 StringIndexer → OneHotEncoder 과정을 적용합니다.

In [1]:
# Process categorical and numeric attributes.
# Identify categorical columns (exclude numeric ones)
categoricalCols = [c for c in data.columns if data.schema[c].dataType == "string"]

# String Indexing
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_IDX")
    for col in categoricalCols
]

# OneHot Encoding
encoders = [
    OneHotEncoder(inputCols=[col + "_IDX"], outputCols=[col + "_OHE"])
    for col in categoricalCols
]

## 4-2. Feature Vector 생성

모든 피처를 하나의 벡터로 합쳐야 Spark ML이 처리할 수 있습니다.
VectorAssembler가 그 역할을 합니다.

In [1]:
# Assemble features into a single vector
assemblerInputs = [col + "_OHE" for col in categoricalCols] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

# Index target label
labelIndexer = StringIndexer(inputCol="churn", outputCol="label")

## 5. 모델 학습

Telco Churn 예측에서는 로지스틱 회귀(LogisticRegression)를 사용합니다.
- 이진 분류-binary classification에 적합
- 빠르고 실무에서 매우 많이 사용하는 모델

In [1]:
# Train the ML Model
# Logistic Regression Model
lr = LogisticRegression(featuresCol="features", labelCol="label")

## 6. ML 파이프라인 구성

Spark ML의 강력한 기능인 Pipeline을 사용하여 전처리 → 인코딩 → 피처 생성 → 모델 흐름을 하나의 파이프라인으로 묶습니다.
실무에서는 파이프라인객체를 모델 저장/배포에 그대로 활용합니다.

In [1]:
# Build pipeline

pipeline = Pipeline(
    stages=indexers + encoders + [assembler, labelIndexer, lr]
)

## 7. 모델 학습 실행

이제 데이터에 대해 전체 파이프라인을 fit합니다.

In [1]:
# Clean data (remove missing or invalid TotalCharges)
data = data.filter(col("TotalCharges").isNotNull())

# Split data
train, test = data.randomSplit([0.7, 0.3], seed=42)

# Fit model
model = pipeline.fit(train)

# 8. 모델 평가(Evaluation)

모델이 학습된 후에는 테스트 데이터에 대해 예측을 수행하고, 예측 결과가 실제 레이블을 얼마나 잘 맞추는지 평가해야 합니다.

여기서는 분류 모델에서 가장 널리 사용하는 평가 지표인 **ROC-AUC(Area&nbsp;Under&nbsp;ROC&nbsp;Curve)** 를 활용합니다.

- ROC-AUC 값이 1에 가까울수록 모델이 우수함  
- 0.5 수준이면 랜덤 추측과 동일  
- Telco Churn 같은 고객 이탈 예측에서는 매우 중요한 지표


In [1]:
# 테스트 데이터에 대해 모델 예측 수행
predictions = model.transform(test)

# Binary Classification Evaluator 생성
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

# ROC-AUC 계산
roc_auc = evaluator.evaluate(predictions)
print(f"Test ROC-AUC = {roc_auc:.3f}")

Test ROC-AUC = 0.803


모델이 테스트 데이터에 대해 계산한 예측 결과를 직접 확인해보는 단계입니다. 
이 과정은 모델이 예측을 어떻게 내리고 있는지, 그리고 확률 값이 어떻게 분포하는지 직관적으로 이해하는 데 도움이 됩니다.

Spark의 예측 결과 DataFrame predictions 에는 다음과 같은 주요 컬럼이 포함됩니다:

- Churn: 실제 고객 이탈 여부. 예: 0은 잔존, 1은 이탈
- prediction: 모델이 예측한 이탈 여부. 예: 0 또는 1
- probability: 모델이 계산한 이탈 확률. 예: [0.78, 0.22]

아래 코드는 이 세 가지 컬럼만 선택해서 예측 샘플을 10개 보여줍니다.

In [1]:
# Show sample predictions
predictions.select("Churn", "prediction", "probability").show(10, truncate=False)

+-----+----------+-----------------------------------------+
|Churn|prediction|probability                              |
+-----+----------+-----------------------------------------+
|No   |0.0       |[0.5955903579216851,0.40440964207831487] |
|No   |0.0       |[0.8122339992369273,0.1877660007630727]  |
|No   |0.0       |[0.9201335229637126,0.07986647703628735] |
|No   |0.0       |[0.9708539492458342,0.029146050754165764]|
|No   |0.0       |[0.9280859396468588,0.07191406035314118] |
|No   |0.0       |[0.8580050730666826,0.1419949269333174]  |
|No   |1.0       |[0.28014028459322393,0.7198597154067761] |
|No   |0.0       |[0.9685509803225579,0.03144901967744207] |
|No   |0.0       |[0.7577966071818952,0.24220339281810477] |
|Yes  |0.0       |[0.6732906407854523,0.3267093592145477]  |
+-----+----------+-----------------------------------------+
only showing top 10 rows



## 9. 모델 저장 단계

모델 학습이 완료되면, 이후 재학습 없이 바로 예측에 활용하거나 다른 노트북에서 불러오기 위해 모델을 저장할 수 있습니다.  

Spark의 PipelineModel 저장 기능을 사용하면 전체 전처리 과정과 모델이 하나의 객체로 저장됩니다.  
이 방식은 운영 환경에서 안정적으로 재사용할 수 있다는 장점이 있습니다.  

아래 코드는 학습된 모델을 지정된 경로에 저장하는 예입니다.  
기존 동일 경로에 모델이 있을 경우에는 overwrite 옵션으로 덮어쓰기 방식으로 저장됩니다.

In [1]:
#Save Model
model.write().overwrite().save("/Workspace/demo_telco_churn_bronze_to_silver/telco_customer_churn_ml_model")

## 10. 저장된 모델 불러오기 단계

학습이 완료된 모델은 지정된 경로에 저장되어 있으며, 다른 노트북이나 후속 처리 단계에서 재사용할 수 있습니다.  
Spark의 PipelineModel.load 기능을 사용하면 저장된 모델을 그대로 메모리에 불러올 수 있습니다.

아래 코드는 앞서 저장한 모델을 동일한 Workspace 경로에서 읽어오는 예입니다.  
로드된 모델은 학습 당시의 전처리 과정과 파이프라인 구성을 그대로 유지합니다.

In [1]:
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("/Workspace/demo_telco_churn_bronze_to_silver/telco_customer_churn_ml_model")