# Notebook 기본 세팅

In [1]:
# Constant 선언

# 프로젝트 루트 디렉토리를 식별하기 위한 마커 파일 이름
ROOT_MARKER = "pyproject.toml"

# 한글 표시를 위한 나눔바른고딕 폰트 파일 이름
# matplotlib 의 font_manager 에 실제 폰트 파일의 위치를 넣어주어야 한다.
KOREAN_FONT_FILE = "NanumBarunGothic.ttf"

# matplotlib 에서는 font-family 의 이름으로 font 를 설정한다.
# 그래서 font 파일 그 자체가 아니라, 그 파일의 family 이름을 적어준다.
KOREAN_FONT_FAMILY = "NanumBarunGothic"

# 참고
# Font Family 와 Font File 의 차이는,
# Font Family 는 비슷한 디자인 특성을 공유하는 글꼴 그룹을 의미한다.
#
# 예를 들어 '나눔바른고딕' 폰트 패밀리는 일반(Regular), 굵게(Bold), 기울임(Italic) 등 여러 스타일을 포함할 수 있다.
# 반면, 폰트 파일(.ttf, .otf 등)은 이러한 폰트의 하나의 스타일이 저장된 실제 파일이다.
#
# 이 프로젝트에서는 폰트 용량을 줄이기 위해 일반(Regular) 인 NanumBarunGothic.ttf 만 사용한다.

In [2]:
# 프로젝트 root 를 sys.path 에 추가해서 import 구문을 사용하기 쉽게
from pathlib import Path


def find_project_root() -> Path:
    """
    pyproject.toml 파일을 기준으로 루트 디렉토리를 찾는다.
    :return: Path: 프로젝트 루트 디렉토리 경로
    """

    current_path = Path().resolve()

    while current_path != current_path.parent:
        if (current_path / ROOT_MARKER).exists():
            return current_path

        current_path = current_path.parent

    raise FileNotFoundError("프로젝트 루트 디렉토리를 찾을 수 없습니다.")


ROOT_DIR = find_project_root()

In [None]:
# matplotlib 의 한글 font 설정
import matplotlib.font_manager as fm
import matplotlib.pyplot as plt


FONTS_DATA_DIR = ROOT_DIR / "notebooks" / "fonts"


def setup_korean_font():
    font_path = FONTS_DATA_DIR / KOREAN_FONT_FILE
    fm.fontManager.addfont(font_path)

    # 폰트 설정
    plt.rcParams["font.family"] = KOREAN_FONT_FAMILY
    plt.rcParams["axes.unicode_minus"] = False


setup_korean_font()

# ver.0

## (1) 코드흐름

[Step 1] 데이터 분할(split_data)
- S3에서 feature 데이터 로딩
- 학습/검증/테스트 셋으로 분할하여 JSON 반환

[Step 2] 모델 훈련(train)
- 모델 config 리스트 생성 (모델명, 파라미터)
- 각 모델 학습
- 학습된 모델 `.pkl`로 저장
- config에 저장 경로 추가하여 리턴

[Step 3] 모델 평가(evaluate)
- 모든 모델을 검증셋(X_val, y_val)으로 평가
- wandb에 metric log (val_f1, val_accuracy 등)
- best_f1 기준으로 가장 좋은 모델 선정
- best 모델 `.pkl` 경로와 run_name 리턴

[Step 4] 테스트(test)
- best 모델을 테스트셋(X_test, y_test)으로 평가
- wandb에 test metric log
- 실서비스 전 모델 최종 성능 검증

[Step 5] 아티팩트 등록(register_artifacts)
- 모든 모델 `.pkl`을 W&B Artifact로 업로드
- type="model", name=run_name 형식

## (2) 모델 분석결과

- **목표**: 다양한 분류 모델의 성능을 비교하여 최적의 날씨 예측 모델 선정
- **데이터셋**: 테스트셋 기준 평가
- **지표**: `accuracy`, `f1`, `precision`, `recall`, `balanced_accuracy`

### 평균 Test 성능 기준 Top 모델

| 순위 | 모델 이름 | 평균 점수 |
|------|-----------|------------|
| 1 | DecisionTreeClassifier (max_depth=10) | **1.0000** |
| 1 | DecisionTreeClassifier (max_depth=5) | **1.0000** |
| 3 | XGBClassifier (n_estimators=200, max_depth=5) | 0.99998 |
| 3 | XGBClassifier (n_estimators=100, max_depth=3) | 0.99998 |
| 3 | GradientBoostingClassifier (n_estimators=200, learning_rate=0.05) | 0.99998 |
| 3 | GradientBoostingClassifier (n_estimators=100, learning_rate=0.1) | 0.99998 |
| 7 | RandomForestClassifier (n_estimators=200, max_depth=10) | 0.99974 |
| 8 | RandomForestClassifier (n_estimators=100, max_depth=5) | 0.97087 |

### 상대적으로 성능이 낮은 모델

| 모델 이름 | 평균 점수 |
|-----------|------------|
| GaussianNB | 0.6852 |
| KNeighborsClassifier (n_neighbors=5) | 0.4211 |
| KNeighborsClassifier (n_neighbors=3) | 0.4135 |
| LogisticRegression (C=1.0) | 0.3046 |
| LogisticRegression (C=0.1) | 0.2898 |
| LinearSVC (C=1.0 / 0.5) | 0.1675 |

### 모델 선택 전략

| 전략 | 설명 |
|------|------|
| **단일 모델 서빙** | `DecisionTreeClassifier (max_depth=10)` 추천. 해석 용이하고 예측 속도 빠름. |
| **앙상블 모델** | XGB, GBDT, RF 모델 3~5개를 soft voting 또는 평균 확률 방식으로 앙상블 |
| **FastAPI 연동용 추천** | 성능 + 추론 속도 + 라이브러리 안정성 고려 시 `XGBClassifier` 최우선 추천 |


## (3) automated_pipeline.py

In [None]:
from datetime import datetime
from io import StringIO

import joblib
import pandas as pd
import wandb
from config.default_args import get_dynamic_default_args
from config.keys import KEY_FEATURE_DATASET_STORAGE_KEY
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, balanced_accuracy_score, f1_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import LinearSVC
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier

from airflow.decorators import dag, task
from airflow.models import Variable
from src.libs.storage import Storage
from src.utils.log import get_logger


default_args = get_dynamic_default_args()
_logger = get_logger("weather_automated_pipeline")


@dag(
    dag_id="weather_automated_pipeline",
    start_date=datetime(2025, 6, 1),
    schedule="@daily",
    catchup=False,
    tags=["weather", "ml-modeling"],
    default_args=default_args,
)
def automated_pipeline_dag():
    storage = Storage.create()

    def get_model_configs():
        model_registry = {
            "LogisticRegression": LogisticRegression,
            "RandomForestClassifier": RandomForestClassifier,
            "GradientBoostingClassifier": GradientBoostingClassifier,
            "LinearSVC": LinearSVC,
            "GaussianNB": GaussianNB,
            "KNeighborsClassifier": KNeighborsClassifier,
            "DecisionTreeClassifier": DecisionTreeClassifier,
            "XGBClassifier": XGBClassifier,
        }

        param_grid = {
            "LogisticRegression": [
                {"C": 0.1, "max_iter": 100},
                {"C": 1.0, "max_iter": 200},
            ],
            "RandomForestClassifier": [
                {"n_estimators": 100, "max_depth": 5},
                {"n_estimators": 200, "max_depth": 10},
            ],
            "GradientBoostingClassifier": [
                {"n_estimators": 100, "learning_rate": 0.1},
                {"n_estimators": 200, "learning_rate": 0.05},
            ],
            "LinearSVC": [
                {"C": 1.0, "max_iter": 1000},
                {"C": 0.5, "max_iter": 1000},
            ],
            "GaussianNB": [{}],
            "KNeighborsClassifier": [
                {"n_neighbors": 3},
                {"n_neighbors": 5},
            ],
            "DecisionTreeClassifier": [
                {"max_depth": 5},
                {"max_depth": 10},
            ],
            "XGBClassifier": [
                {"n_estimators": 100, "max_depth": 3, "use_label_encoder": False, "eval_metric": "mlogloss"},
                {"n_estimators": 200, "max_depth": 5, "use_label_encoder": False, "eval_metric": "mlogloss"},
            ],
        }

        config_list = []
        for model_name, model_class in model_registry.items():
            for param in param_grid[model_name]:
                config_list.append({"name": model_name, "model_class_str": model_class.__name__, "params": param})

        return config_list

    def evaluate_model(model, X, y, prefix="val"):
        preds = model.predict(X)
        return {
            f"{prefix}_accuracy": accuracy_score(y, preds),
            f"{prefix}_f1": f1_score(y, preds, average="macro"),
            f"{prefix}_precision": precision_score(y, preds, average="macro", zero_division=0),
            f"{prefix}_recall": recall_score(y, preds, average="macro", zero_division=0),
            f"{prefix}_balanced_accuracy": balanced_accuracy_score(y, preds),
        }

    @task
    def split_data():
        feature_storage_key = Variable.get(KEY_FEATURE_DATASET_STORAGE_KEY)
        features = storage.read_as_dataframe(feature_storage_key)
        X = features.drop(columns=["weather"])
        y = features["weather"]
        X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4, random_state=42)
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
        return {
            "X_train": X_train.to_json(orient="split"),
            "y_train": y_train.tolist(),
            "X_val": X_val.to_json(orient="split"),
            "y_val": y_val.tolist(),
            "X_test": X_test.to_json(orient="split"),
            "y_test": y_test.tolist(),
        }

    @task
    def train(data_dict: dict):
        configs = get_model_configs()
        X_train = pd.read_json(StringIO(data_dict["X_train"]), orient="split")
        y_train = pd.Series(data_dict["y_train"])

        for config in configs:
            model_cls = eval(config["model_class_str"])
            model = model_cls(**config["params"])
            model.fit(X_train, y_train)
            model_path = f"{config['name']}_{str(config['params']).replace(' ', '').replace(':', '-')}.pkl"
            joblib.dump(model, model_path)
            config["model_path"] = model_path
        return configs

    @task
    def evaluate(configs: list, data_dict: dict):
        X_val = pd.read_json(StringIO(data_dict["X_val"]), orient="split")
        y_val = pd.Series(data_dict["y_val"])

        best_model_path = ""
        best_f1 = -1
        best_run_name = ""

        for config in configs:
            model = joblib.load(config["model_path"])
            model_name = config["name"]
            params = config["params"]
            run_name = f"{model_name}_{'_'.join([f'{k}={v}' for k, v in params.items()])}".replace("=", "-")

            run = wandb.init(
                project="weather-classification", name=run_name, config={"model": model_name, **params}, reinit=True
            )
            val_metrics = evaluate_model(model, X_val, y_val)
            wandb.log(val_metrics)

            if val_metrics["val_f1"] > best_f1:
                best_f1 = val_metrics["val_f1"]
                best_model_path = config["model_path"]
                best_run_name = run_name

            wandb.save(config["model_path"])
            wandb.finish()

        return {"best_model_path": best_model_path, "best_run_name": best_run_name, "all_configs": configs}

    @task
    def test(meta: dict, data_dict: dict):
        X_test = pd.read_json(StringIO(data_dict["X_test"]), orient="split")
        y_test = pd.Series(data_dict["y_test"])

        for config in meta["all_configs"]:
            model = joblib.load(config["model_path"])
            model_name = config["name"]
            params = config["params"]
            run_name = f"{model_name}_{'_'.join([f'{k}={v}' for k, v in params.items()])}".replace("=", "-") + "_test"

            run = wandb.init(project="weather-classification", name=run_name, job_type="test-eval", reinit=True)
            test_metrics = evaluate_model(model, X_test, y_test, prefix="test")
            wandb.log(test_metrics)
            wandb.finish()

    @task
    def register_artifacts(meta: dict):
        for config in meta["all_configs"]:
            run_name = f"{config['name']}_{'_'.join([f'{k}={v}' for k, v in config['params'].items()])}".replace(
                "=", "-"
            )
            artifact = wandb.Artifact(run_name, type="model")
            artifact.add_file(config["model_path"])
            run = wandb.init(
                project="weather-classification", name=f"{run_name}_artifact", job_type="register", reinit=True
            )
            wandb.log_artifact(artifact)
            run.finish()

    data = split_data()
    model_info = train(data)
    eval_meta = evaluate(model_info, data)
    test(eval_meta, data)
    register_artifacts(eval_meta)


automated_pipeline_dag()

# ver.1

## (1) 코드흐름

[Step 1] 데이터 분할 (`split_data`)
- S3에서 feature 데이터 로딩 (`Storage.read_as_dataframe`)
- `weather` 컬럼을 타겟으로 학습/검증/테스트 셋 분할  
  - Train: 60%, Validation: 20%, Test: 20%
- 각 셋을 JSON 형식으로 반환

[Step 2] 모델 훈련 (`train`)
- 사용 모델:
  - `DecisionTreeClassifier(max_depth=10)`
  - `XGBClassifier`
  - `Soft Voting 앙상블` (XGB + GBDT + RF)
- 각 모델 학습
- 학습된 모델을 `.pkl`로 저장하고, config에 경로 추가하여 리턴

[Step 3] 모델 평가 (`evaluate`)
- 모든 모델을 검증셋 `(X_val, y_val)`으로 평가
- 주요 평가 지표:
  - `val_accuracy`, `val_f1`, `val_precision`, `val_recall`, `val_balanced_accuracy`
- 각 모델의 metric을 wandb에 기록
- `val_f1` 기준으로 가장 좋은 모델을 선정하여 `.pkl` 경로와 `run_name` 리턴

[Step 4] 테스트 (`test`)
- 선택된 모든 모델을 테스트셋 `(X_test, y_test)`으로 평가
- 주요 평가 지표:
  - `test_accuracy`, `test_f1`, `test_precision`, `test_recall`, `test_balanced_accuracy`
- wandb에 각 모델의 test metric 기록  
- 실제 배포 전 최종 성능 검증 단계

[Step 5] 아티팩트 등록 (`register_artifacts`)
- 모든 모델 `.pkl` 파일을 W&B Artifact로 업로드
  - Artifact 타입: `"model"`
  - 이름: 각 모델의 `run_name` 기준
- 이후 FastAPI API 서버 등에서 `wandb.Artifact`를 통해 불러올 수 있도록 저장

## (2) 모델 분석 결과

| 모델 | val_f1 | test_f1 | 특징 |
|------|--------|---------|------|
| `DecisionTreeClassifier(max_depth=10)` | 1.000 | 1.000 | 단일 모델, 해석 용이 |
| `XGBClassifier` | 1.000 | 1.000 | 성능 우수, FastAPI 적합 |
| `Soft Voting 앙상블` | 1.000 | 0.99998 | 성능 유사, 추론 시간 약간 증가 |


> ⚠ **Note**: 모든 모델이 `f1_score=1.0`에 가까움 → 데이터 누수 또는 클래스 불균형 여부 추가 검토 필요


## (3) automated_pipeline.py

In [None]:
from datetime import datetime

from config.default_args import KEY_FEATURE_DATASET_STORAGE_KEY, get_dynamic_default_args
from sklearn.ensemble import VotingClassifier

from airflow.decorators import dag
from src.utils.log import get_logger


default_args = get_dynamic_default_args()
_logger = get_logger("weather_automated_pipeline")


@dag(
    dag_id="weather_automated_pipeline",
    start_date=datetime(2025, 6, 1),
    schedule="@daily",
    catchup=False,
    tags=["weather", "ml-modeling"],
    default_args=default_args,
)
def automated_pipeline_dag():
    storage = Storage.create()

    def evaluate_model(model, X, y, prefix="val"):
        preds = model.predict(X)
        return {
            f"{prefix}_accuracy": accuracy_score(y, preds),
            f"{prefix}_f1": f1_score(y, preds, average="macro"),
            f"{prefix}_precision": precision_score(y, preds, average="macro", zero_division=0),
            f"{prefix}_recall": recall_score(y, preds, average="macro", zero_division=0),
            f"{prefix}_balanced_accuracy": balanced_accuracy_score(y, preds),
        }

    @task
    def split_data():
        feature_storage_key = Variable.get(KEY_FEATURE_DATASET_STORAGE_KEY)
        features = storage.read_as_dataframe(feature_storage_key)
        X = features.drop(columns=["weather"])
        y = features["weather"]
        X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.4, random_state=42)
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
        return {
            "X_train": X_train.to_json(orient="split"),
            "y_train": y_train.tolist(),
            "X_val": X_val.to_json(orient="split"),
            "y_val": y_val.tolist(),
            "X_test": X_test.to_json(orient="split"),
            "y_test": y_test.tolist(),
        }

    @task
    def train(data_dict: dict):
        X_train = pd.read_json(StringIO(data_dict["X_train"]), orient="split")
        y_train = pd.Series(data_dict["y_train"])

        models = {
            "decision_tree": DecisionTreeClassifier(max_depth=10),
            "xgb_classifier": XGBClassifier(
                n_estimators=200, max_depth=5, use_label_encoder=False, eval_metric="mlogloss"
            ),
            "ensemble_soft": VotingClassifier(
                estimators=[
                    (
                        "xgb",
                        XGBClassifier(n_estimators=100, max_depth=3, use_label_encoder=False, eval_metric="mlogloss"),
                    ),
                    ("rf", RandomForestClassifier(n_estimators=100, max_depth=5)),
                    ("gbdt", GradientBoostingClassifier(n_estimators=100, learning_rate=0.1)),
                ],
                voting="soft",
            ),
        }

        model_paths = {}
        for name, model in models.items():
            model.fit(X_train, y_train)
            path = f"{name}.pkl"
            joblib.dump(model, path)
            model_paths[name] = path
        return model_paths

    @task
    def evaluate(model_paths: dict, data_dict: dict):
        X_val = pd.read_json(StringIO(data_dict["X_val"]), orient="split")
        y_val = pd.Series(data_dict["y_val"])
        X_test = pd.read_json(StringIO(data_dict["X_test"]), orient="split")
        y_test = pd.Series(data_dict["y_test"])

        best_model_name = ""
        best_model_path = ""
        best_f1 = -1

        for name, path in model_paths.items():
            model = joblib.load(path)

            run = wandb.init(project="weather-classification", name=name, config={"model": name}, reinit=True)

            val_metrics = evaluate_model(model, X_val, y_val, prefix="val")
            test_metrics = evaluate_model(model, X_test, y_test, prefix="test")

            wandb.log({**val_metrics, **test_metrics})
            wandb.save(path)
            wandb.finish()

            if val_metrics["val_f1"] > best_f1:
                best_f1 = val_metrics["val_f1"]
                best_model_name = name
                best_model_path = path

        return {"name": best_model_name, "path": best_model_path}

    @task
    def register_best_artifact(meta: dict):
        artifact = wandb.Artifact(meta["name"], type="model")
        artifact.add_file(meta["path"])
        run = wandb.init(
            project="weather-classification", name=f"{meta['name']}_artifact", job_type="register", reinit=True
        )
        wandb.log_artifact(artifact)
        run.finish()

    data = split_data()
    model_paths = train(data)
    best_meta = evaluate(model_paths, data)
    register_best_artifact(best_meta)


automated_pipeline_dag()