## Kaggle Playground S6E1 - Predicting Student Test Scores

이 노트북은 **재현 가능한 EDA + 전처리 + 교차검증 + 앙상블(블렌딩) + 제출 파일 생성**까지 한 번에 수행합니다.

- **Target**: `exam_score`
- **Metric(로컬 검증)**: RMSE (낮을수록 좋음)

> 주의: Kaggle Public/Private LB는 CV와 차이가 날 수 있으니, 최종 선택은 **OOF 기반**으로 진행합니다.


In [None]:
# 필수 라이브러리
import os
import warnings
from pathlib import Path

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import KFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler

from sklearn.linear_model import Ridge
from sklearn.ensemble import HistGradientBoostingRegressor

warnings.filterwarnings('ignore')

# 재현성
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

# Matplotlib 한글 폰트 설정 (요청사항)
import platform
system_name = platform.system()
if system_name == 'Windows':
    plt.rc('font', family='Malgun Gothic')
elif system_name == 'Darwin':  # Mac
    plt.rc('font', family='AppleGothic')
else:  # Linux (Colab, Docker)
    plt.rc('font', family='NanumGothic')
plt.rc('axes', unicode_minus=False)

DATA_DIR = Path('Predicting Student Test Scores/data')
TRAIN_PATH = DATA_DIR / 'train.csv'
TEST_PATH = DATA_DIR / 'test.csv'
SAMPLE_SUB_PATH = DATA_DIR / 'sample_submission.csv'

TRAIN_PATH, TEST_PATH, SAMPLE_SUB_PATH

In [None]:
# 데이터 로드 (에러 가능 구간은 try-except로 보호)
try:
    train_df = pd.read_csv(TRAIN_PATH)
    test_df = pd.read_csv(TEST_PATH)
    sample_sub = pd.read_csv(SAMPLE_SUB_PATH)
except FileNotFoundError as e:
    raise FileNotFoundError(
        f"파일을 찾을 수 없습니다. 경로를 확인하세요.\n- TRAIN: {TRAIN_PATH}\n- TEST: {TEST_PATH}\n- SAMPLE: {SAMPLE_SUB_PATH}"
    ) from e

print('train:', train_df.shape, 'test:', test_df.shape)
display(train_df.head())
display(test_df.head())
display(sample_sub.head())

In [None]:
# 기본 품질 체크 (결측/중복/타입)
TARGET_COL = 'exam_score'
ID_COL = 'id'

assert TARGET_COL in train_df.columns, '타겟 컬럼이 train에 없습니다.'
assert TARGET_COL not in test_df.columns, 'test에 타겟이 들어있습니다(누수 가능)!'

print('--- Dtypes ---')
display(train_df.dtypes)

print('--- Missing values (train) ---')
display(train_df.isna().mean().sort_values(ascending=False).to_frame('missing_rate'))

print('--- Missing values (test) ---')
display(test_df.isna().mean().sort_values(ascending=False).to_frame('missing_rate'))

print('--- Duplicates ---')
print('train duplicate rows:', train_df.duplicated().sum())
print('test duplicate rows:', test_df.duplicated().sum())

# 타겟 분포 확인
plt.figure(figsize=(8, 4))
sns.histplot(train_df[TARGET_COL], bins=50, kde=True)
plt.title('타겟(exam_score) 분포')
plt.xlabel('exam_score')
plt.ylabel('count')
plt.show()

In [None]:
# 간단 EDA: 수치형/범주형 요약
feature_cols = [c for c in train_df.columns if c not in [TARGET_COL]]

X = train_df[feature_cols].copy()
y = train_df[TARGET_COL].copy()
X_test = test_df[feature_cols].copy()

# 컬럼 타입 분리
categorical_cols = X.select_dtypes(include=['object']).columns.tolist()
numeric_cols = [c for c in X.columns if c not in categorical_cols]

print('numeric_cols:', numeric_cols)
print('categorical_cols:', categorical_cols)

print('\n--- Numeric summary ---')
display(train_df[numeric_cols + [TARGET_COL]].describe().T)

print('\n--- Categorical cardinality ---')
display(train_df[categorical_cols].nunique().sort_values(ascending=False).to_frame('n_unique'))

# 상관관계(수치형만) - 샘플링해서 빠르게 확인
sample_df = train_df.sample(n=min(50_000, len(train_df)), random_state=RANDOM_STATE)
plt.figure(figsize=(10, 6))
sns.heatmap(sample_df[numeric_cols + [TARGET_COL]].corr(), annot=False, cmap='coolwarm', center=0)
plt.title('수치형 상관관계(샘플)')
plt.show()

In [None]:
# 전처리 파이프라인
# - 선형 모델: OneHotEncoder + (희소행렬이므로) StandardScaler(with_mean=False)
# - 트리 모델: OrdinalEncoder(unknown=-1)로 빠르고 메모리 효율적으로 처리

numeric_transformer_basic = Pipeline(
    steps=[
        ('imputer', SimpleImputer(strategy='median')),
    ]
)

categorical_transformer_ohe = Pipeline(
    steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=True)),
    ]
)

categorical_transformer_ord = Pipeline(
    steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('ordinal', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1)),
    ]
)

preprocess_ohe = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer_basic, numeric_cols),
        ('cat', categorical_transformer_ohe, categorical_cols),
    ],
    remainder='drop',
    sparse_threshold=0.3,
)

preprocess_ord = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer_basic, numeric_cols),
        ('cat', categorical_transformer_ord, categorical_cols),
    ],
    remainder='drop',
)

# 모델 정의
ridge_model = Ridge(alpha=2.0, random_state=RANDOM_STATE)

hgb_model = HistGradientBoostingRegressor(
    loss='squared_error',
    learning_rate=0.06,
    max_depth=None,
    max_leaf_nodes=63,
    min_samples_leaf=40,
    l2_regularization=0.0,
    max_bins=255,
    random_state=RANDOM_STATE,
)

model_ridge = Pipeline(
    steps=[
        ('preprocess', preprocess_ohe),
        ('scaler', StandardScaler(with_mean=False)),
        ('model', ridge_model),
    ]
)

model_hgb = Pipeline(
    steps=[
        ('preprocess', preprocess_ord),
        ('model', hgb_model),
    ]
)

model_ridge, model_hgb

In [None]:
# CV 유틸리티

def rmse(y_true, y_pred):
    return float(np.sqrt(mean_squared_error(y_true, y_pred)))


def train_cv_oof(model, X, y, X_test, n_splits=5, random_state=RANDOM_STATE):
    """KFold로 OOF + test prediction을 생성.

    - OOF는 블렌딩/스태킹의 핵심 재료
    - test_pred는 fold 평균
    """
    cv = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)

    oof_pred = np.zeros(len(X), dtype=np.float64)
    test_pred = np.zeros(len(X_test), dtype=np.float64)
    scores = []

    for fold, (tr_idx, va_idx) in enumerate(cv.split(X), start=1):
        X_tr, X_va = X.iloc[tr_idx], X.iloc[va_idx]
        y_tr, y_va = y.iloc[tr_idx], y.iloc[va_idx]

        # 모델 학습
        model.fit(X_tr, y_tr)

        # 검증 예측
        va_pred = model.predict(X_va)
        oof_pred[va_idx] = va_pred

        fold_rmse = rmse(y_va, va_pred)
        scores.append(fold_rmse)
        print(f'[fold {fold}] RMSE: {fold_rmse:.5f}')

        # 테스트 예측 누적
        test_pred += model.predict(X_test) / n_splits

    print(f'CV RMSE mean: {np.mean(scores):.5f}  std: {np.std(scores):.5f}')
    print(f'OOF RMSE: {rmse(y, oof_pred):.5f}')
    return oof_pred, test_pred, scores


In [None]:
# 모델 학습 (OOF 생성)
print('=== Ridge(OHE) ===')
ridge_oof, ridge_test, ridge_scores = train_cv_oof(model_ridge, X, y, X_test, n_splits=5)

print('\n=== HistGB(Ordinal) ===')
hgb_oof, hgb_test, hgb_scores = train_cv_oof(model_hgb, X, y, X_test, n_splits=5)

# 성능 비교
results_df = pd.DataFrame({
    'model': ['Ridge(OHE)', 'HistGB(Ordinal)'],
    'oof_rmse': [rmse(y, ridge_oof), rmse(y, hgb_oof)],
})
display(results_df.sort_values('oof_rmse'))

In [None]:
# OOF 기반 최적 블렌딩 (2모델)
# MSE 최소화 기준으로 가중치 w를 해석적으로 구함.
# pred = w * pred_a + (1-w) * pred_b

def optimal_blend_weight(y_true, pred_a, pred_b):
    diff = pred_a - pred_b
    denom = np.dot(diff, diff)
    if denom == 0:
        return 0.5

    w = np.dot((y_true - pred_b), diff) / denom
    # 과적합/불안정 방지를 위해 [0,1]로 클리핑
    return float(np.clip(w, 0.0, 1.0))

w_ridge = optimal_blend_weight(y.values, ridge_oof, hgb_oof)
print(f'Optimal weight for Ridge(OHE): {w_ridge:.4f}  (HistGB weight: {1-w_ridge:.4f})')

blend_oof = w_ridge * ridge_oof + (1 - w_ridge) * hgb_oof
blend_test = w_ridge * ridge_test + (1 - w_ridge) * hgb_test

print('Ridge OOF RMSE:', rmse(y, ridge_oof))
print('HistGB OOF RMSE:', rmse(y, hgb_oof))
print('Blend OOF RMSE:', rmse(y, blend_oof))

# 예측 분포 sanity check
plt.figure(figsize=(10, 4))
sns.kdeplot(ridge_oof, label='Ridge OOF', lw=2)
sns.kdeplot(hgb_oof, label='HistGB OOF', lw=2)
sns.kdeplot(blend_oof, label='Blend OOF', lw=2)
plt.title('OOF 예측 분포 비교')
plt.xlabel('pred')
plt.ylabel('density')
plt.legend()
plt.show()

In [None]:
# 제출 파일 생성
sub = sample_sub.copy()

# sample_submission 컬럼명을 신뢰 (대회마다 다를 수 있으므로)
pred_col = [c for c in sub.columns if c != ID_COL]
assert len(pred_col) == 1, f'sample_submission의 예측 컬럼이 1개가 아닙니다: {pred_col}'
pred_col = pred_col[0]

sub[pred_col] = blend_test

# 저장 경로
out_path = Path('Predicting Student Test Scores/submission.csv')
out_path.parent.mkdir(parents=True, exist_ok=True)
sub.to_csv(out_path, index=False)

print('saved:', out_path)
display(sub.head())

# 간단 확인: id 정렬/중복 여부
print('id duplicates in submission:', sub[ID_COL].duplicated().sum())
print('submission shape:', sub.shape)

## (선택) Streamlit으로 빠른 EDA 대시보드

원하면 아래처럼 간단한 Streamlit 앱을 만들어서, 컬럼별 분포/결측/상관을 인터랙티브하게 볼 수 있습니다.

- 파일 예시: `Predicting Student Test Scores/streamlit_app.py`
- 실행: `streamlit run "Predicting Student Test Scores/streamlit_app.py"`
