# 파이프라인과 실무 (Pipeline & Practice)

sklearn의 Pipeline과 ColumnTransformer를 사용하여 전처리와 모델링을 하나의 워크플로우로 통합하는 방법을 학습합니다.

**학습 목표:**
- Pipeline의 필요성과 장점 이해
- ColumnTransformer로 다양한 타입의 특성 처리
- 커스텀 Transformer 작성
- Pipeline과 GridSearchCV 결합
- 모델 저장 및 배포

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import StandardScaler, MinMaxScaler, OneHotEncoder
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.datasets import load_iris, load_breast_cancer

import warnings
warnings.filterwarnings('ignore')

## 1. Pipeline 기초

### Pipeline을 사용하지 않을 때의 문제점

1. **데이터 누수 (Data Leakage)**: 테스트 데이터 정보가 학습에 반영될 위험
2. **코드 복잡성**: 여러 단계를 수동으로 관리해야 함
3. **재현성 문제**: 순서 실수, 파라미터 불일치 가능성

### Pipeline의 장점

1. 코드 간소화
2. 데이터 누수 방지
3. 교차 검증과 완벽 통합
4. 하이퍼파라미터 튜닝 용이
5. 모델 저장/배포 편리

In [None]:
# 데이터 로드
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.2, random_state=42
)

# Pipeline 생성 (명시적 이름)
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),
    ('classifier', LogisticRegression())
])

# 학습 및 예측
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
score = pipeline.score(X_test, y_test)

print(f"Pipeline 정확도: {score:.4f}")

# make_pipeline (자동 이름 생성)
pipeline_auto = make_pipeline(
    StandardScaler(),
    PCA(n_components=2),
    LogisticRegression()
)

pipeline_auto.fit(X_train, y_train)
print(f"make_pipeline 정확도: {pipeline_auto.score(X_test, y_test):.4f}")

### Pipeline 단계 접근하기

In [None]:
# 단계 이름 확인
print("Pipeline 단계:")
for name, step in pipeline.named_steps.items():
    print(f"  {name}: {type(step).__name__}")

# 특정 단계 접근
print(f"\nPCA 설명된 분산: {pipeline.named_steps['pca'].explained_variance_ratio_}")
print(f"로지스틱 회귀 계수 형상: {pipeline.named_steps['classifier'].coef_.shape}")

# 중간 단계 결과 얻기
X_scaled = pipeline.named_steps['scaler'].transform(X_test)
X_pca = pipeline.named_steps['pca'].transform(X_scaled)
print(f"\n스케일링 후 형상: {X_scaled.shape}")
print(f"PCA 후 형상: {X_pca.shape}")

## 2. ColumnTransformer - 다양한 타입의 특성 처리

실제 데이터에서는 수치형과 범주형 특성이 혼재되어 있습니다. ColumnTransformer를 사용하면 각 타입에 맞는 전처리를 적용할 수 있습니다.

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OrdinalEncoder
from sklearn.ensemble import RandomForestClassifier

# 샘플 데이터 생성
data = {
    'age': [25, 32, 47, 51, 62, 28, 35, 42, 55, 60],
    'income': [50000, 60000, 80000, 120000, 95000, 55000, 70000, 85000, 110000, 100000],
    'gender': ['M', 'F', 'M', 'F', 'M', 'F', 'M', 'F', 'M', 'F'],
    'education': ['Bachelor', 'Master', 'PhD', 'Bachelor', 'Master', 
                  'Bachelor', 'PhD', 'Master', 'PhD', 'Bachelor'],
    'purchased': [0, 1, 1, 1, 0, 0, 1, 1, 1, 0]
}
df = pd.DataFrame(data)

X = df.drop('purchased', axis=1)
y = df['purchased']

print("데이터 타입:")
print(X.dtypes)
print("\n데이터 샘플:")
print(X.head())

In [None]:
# 특성 분류
numeric_features = ['age', 'income']
categorical_features = ['gender', 'education']

# ColumnTransformer 정의
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(drop='first', sparse_output=False), categorical_features)
    ],
    remainder='passthrough'  # 나머지 특성 처리: 'drop' 또는 'passthrough'
)

# 변환
X_transformed = preprocessor.fit_transform(X)

print(f"원본 형상: {X.shape}")
print(f"변환 후 형상: {X_transformed.shape}")

# 변환된 특성 이름
feature_names = (
    numeric_features +
    list(preprocessor.named_transformers_['cat'].get_feature_names_out(categorical_features))
)
print(f"\n특성 이름: {feature_names}")

### Pipeline + ColumnTransformer 결합

In [None]:
# 전체 파이프라인
full_pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# 학습
full_pipeline.fit(X, y)

# 새로운 데이터로 예측
new_data = pd.DataFrame({
    'age': [30, 45],
    'income': [70000, 90000],
    'gender': ['F', 'M'],
    'education': ['Master', 'PhD']
})

predictions = full_pipeline.predict(new_data)
print(f"예측 결과: {predictions}")

## 3. 결측치 처리를 포함한 복잡한 파이프라인

In [None]:
from sklearn.impute import SimpleImputer

# 결측치가 있는 데이터 생성
data_missing = {
    'age': [25, np.nan, 47, 51, 62, 28, np.nan, 42, 55, 60],
    'income': [50000, 60000, np.nan, 120000, 95000, np.nan, 70000, 85000, 110000, 100000],
    'gender': ['M', 'F', 'M', None, 'M', 'F', 'M', None, 'M', 'F'],
    'education': ['Bachelor', 'Master', 'PhD', 'Bachelor', None, 
                  'Bachelor', 'PhD', 'Master', None, 'Bachelor'],
    'purchased': [0, 1, 1, 1, 0, 0, 1, 1, 1, 0]
}
df_missing = pd.DataFrame(data_missing)
X_missing = df_missing.drop('purchased', axis=1)
y_missing = df_missing['purchased']

print("결측치 개수:")
print(X_missing.isnull().sum())

In [None]:
# 수치형 파이프라인 (결측치 처리 포함)
numeric_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# 범주형 파이프라인 (결측치 처리 포함)
categorical_transformer = Pipeline([
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('encoder', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'))
])

# ColumnTransformer
preprocessor_full = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

# 전체 파이프라인
complete_pipeline = Pipeline([
    ('preprocessor', preprocessor_full),
    ('classifier', RandomForestClassifier(random_state=42))
])

complete_pipeline.fit(X_missing, y_missing)
print("결측치 포함 파이프라인 학습 완료")
print(f"학습 정확도: {complete_pipeline.score(X_missing, y_missing):.4f}")

## 4. Pipeline과 교차 검증 및 하이퍼파라미터 튜닝

In [None]:
# 실제 데이터셋으로 실습
cancer = load_breast_cancer()
X, y = cancer.data, cancer.target

# 파이프라인 정의
pipeline_cv = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

# 교차 검증 (올바른 방법: 각 폴드에서 스케일러가 학습 데이터만으로 fit)
scores = cross_val_score(pipeline_cv, X, y, cv=5, scoring='accuracy')

print("교차 검증 결과:")
print(f"  각 폴드: {scores}")
print(f"  평균: {scores.mean():.4f} (+/- {scores.std():.4f})")

### GridSearchCV로 하이퍼파라미터 튜닝

Pipeline에서 하이퍼파라미터 이름은 `step__parameter` 형식을 사용합니다.

In [None]:
# 파라미터 그리드 (step__parameter 형식)
param_grid = {
    'scaler': [StandardScaler(), MinMaxScaler()],
    'classifier__C': [0.1, 1, 10],
    'classifier__penalty': ['l1', 'l2'],
    'classifier__solver': ['liblinear']
}

# Grid Search
grid_search = GridSearchCV(
    pipeline_cv,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)

grid_search.fit(X, y)

print("\nGrid Search 결과:")
print(f"  최적 파라미터: {grid_search.best_params_}")
print(f"  최적 점수: {grid_search.best_score_:.4f}")

### 여러 모델 비교

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC

# 여러 모델을 위한 파이프라인
pipeline_multi = Pipeline([
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression())  # placeholder
])

# 모델별 다른 파라미터
param_grid_multi = [
    {
        'classifier': [LogisticRegression(max_iter=1000)],
        'classifier__C': [0.1, 1, 10]
    },
    {
        'classifier': [RandomForestClassifier(random_state=42)],
        'classifier__n_estimators': [50, 100],
        'classifier__max_depth': [None, 5, 10]
    },
    {
        'classifier': [SVC()],
        'classifier__C': [0.1, 1],
        'classifier__kernel': ['rbf', 'linear']
    }
]

grid_search_multi = GridSearchCV(
    pipeline_multi,
    param_grid_multi,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)

grid_search_multi.fit(X, y)

print("\n여러 모델 비교 결과:")
print(f"  최적 모델: {type(grid_search_multi.best_params_['classifier']).__name__}")
print(f"  최적 파라미터: {grid_search_multi.best_params_}")
print(f"  최적 점수: {grid_search_multi.best_score_:.4f}")

## 5. 모델 저장과 로드

학습된 파이프라인을 저장하고 나중에 다시 사용할 수 있습니다.

In [None]:
import joblib
import pickle
import sklearn
from datetime import datetime

# 최적 모델
best_pipeline = grid_search.best_estimator_

# 1. joblib 저장 (권장)
joblib.dump(best_pipeline, 'best_model.joblib')
print("모델 저장 완료: best_model.joblib")

# 모델 로드
loaded_model = joblib.load('best_model.joblib')

# 테스트
X_test_sample = X[:5]
predictions = loaded_model.predict(X_test_sample)
print(f"로드된 모델 예측: {predictions}")

In [None]:
# 2. pickle 저장
with open('model.pkl', 'wb') as f:
    pickle.dump(best_pipeline, f)

# pickle 로드
with open('model.pkl', 'rb') as f:
    loaded_model_pkl = pickle.load(f)

print("pickle 모델 예측:", loaded_model_pkl.predict(X[:3]))

### 메타데이터와 함께 저장 (권장)

In [None]:
# 메타데이터와 함께 저장
model_metadata = {
    'model': best_pipeline,
    'sklearn_version': sklearn.__version__,
    'training_date': datetime.now().isoformat(),
    'feature_names': list(cancer.feature_names),
    'target_names': list(cancer.target_names),
    'cv_score': grid_search.best_score_,
    'best_params': grid_search.best_params_
}

joblib.dump(model_metadata, 'model_with_metadata.joblib')

# 로드 및 검증
loaded_metadata = joblib.load('model_with_metadata.joblib')
print("모델 메타데이터:")
print(f"  학습 날짜: {loaded_metadata['training_date']}")
print(f"  sklearn 버전: {loaded_metadata['sklearn_version']}")
print(f"  CV 점수: {loaded_metadata['cv_score']:.4f}")
print(f"  최적 파라미터: {loaded_metadata['best_params']}")

## 6. 커스텀 Transformer 작성

sklearn의 BaseEstimator와 TransformerMixin을 상속하여 자신만의 Transformer를 만들 수 있습니다.

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin

class OutlierRemover(BaseEstimator, TransformerMixin):
    """이상치를 경계값으로 대체하는 트랜스포머"""
    
    def __init__(self, threshold=3):
        self.threshold = threshold
        self.mean_ = None
        self.std_ = None
    
    def fit(self, X, y=None):
        self.mean_ = np.mean(X, axis=0)
        self.std_ = np.std(X, axis=0)
        return self
    
    def transform(self, X):
        X = np.array(X)
        z_scores = np.abs((X - self.mean_) / (self.std_ + 1e-10))
        # 이상치를 경계값으로 대체
        X_clipped = np.where(z_scores > self.threshold,
                             self.mean_ + self.threshold * self.std_ * np.sign(X - self.mean_),
                             X)
        return X_clipped


class FeatureSelector(BaseEstimator, TransformerMixin):
    """특성 선택 트랜스포머"""
    
    def __init__(self, feature_indices=None):
        self.feature_indices = feature_indices
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X = np.array(X)
        if self.feature_indices is not None:
            return X[:, self.feature_indices]
        return X


# 커스텀 트랜스포머 사용
custom_pipeline = Pipeline([
    ('outlier', OutlierRemover(threshold=3)),
    ('scaler', StandardScaler()),
    ('classifier', LogisticRegression(max_iter=1000))
])

scores = cross_val_score(custom_pipeline, X, y, cv=5)
print(f"커스텀 트랜스포머 CV 점수: {scores.mean():.4f} (+/- {scores.std():.4f})")

## 7. 실전 템플릿 - 분류 문제용 파이프라인 생성 함수

In [None]:
from sklearn.compose import make_column_selector

def create_classification_pipeline(model, numeric_features=None, categorical_features=None):
    """
    분류 문제용 파이프라인 생성 함수
    
    Parameters:
    -----------
    model : sklearn estimator
        분류 모델
    numeric_features : list, optional
        수치형 특성 이름 리스트
    categorical_features : list, optional
        범주형 특성 이름 리스트
    
    Returns:
    --------
    pipeline : Pipeline
        전처리 + 모델 파이프라인
    """
    
    # 수치형 특성 파이프라인
    numeric_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    # 범주형 특성 파이프라인
    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(drop='first', sparse_output=False, handle_unknown='ignore'))
    ])
    
    # ColumnTransformer
    if numeric_features is None and categorical_features is None:
        # 자동 감지
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, make_column_selector(dtype_include=np.number)),
                ('cat', categorical_transformer, make_column_selector(dtype_include=object))
            ]
        )
    else:
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features or []),
                ('cat', categorical_transformer, categorical_features or [])
            ]
        )
    
    # 전체 파이프라인
    pipeline = Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', model)
    ])
    
    return pipeline


# 사용 예시
from sklearn.ensemble import GradientBoostingClassifier

pipeline_template = create_classification_pipeline(
    GradientBoostingClassifier(random_state=42),
    numeric_features=['age', 'income'],
    categorical_features=['gender', 'education']
)

print("분류 파이프라인 템플릿 생성 완료")
print(pipeline_template)

## 8. 배포를 위한 모델 래퍼 클래스

In [None]:
class ModelWrapper:
    """배포용 모델 래퍼"""
    
    def __init__(self, model_path):
        self.model = joblib.load(model_path)
        self.feature_names = None
    
    def set_feature_names(self, names):
        """특성 이름 설정"""
        self.feature_names = names
    
    def predict(self, input_data):
        """딕셔너리 또는 DataFrame 입력 처리"""
        if isinstance(input_data, dict):
            input_data = pd.DataFrame([input_data])
        
        if self.feature_names:
            input_data = input_data[self.feature_names]
        
        return self.model.predict(input_data)
    
    def predict_proba(self, input_data):
        """확률 예측"""
        if isinstance(input_data, dict):
            input_data = pd.DataFrame([input_data])
        
        if self.feature_names:
            input_data = input_data[self.feature_names]
        
        return self.model.predict_proba(input_data)


# 사용 예시
# wrapper = ModelWrapper('best_model.joblib')
# wrapper.set_feature_names(cancer.feature_names)
# prediction = wrapper.predict(X[0:1])
# print(f"예측 결과: {prediction}")

## 요약 및 Best Practices

### Pipeline 사용 시 장점

1. **데이터 누수 방지**: 교차 검증 시 각 폴드에서 전처리가 학습 데이터만으로 수행됨
2. **코드 간소화**: 여러 단계를 하나의 객체로 관리
3. **재현성**: 모든 전처리 단계가 저장되어 동일한 처리 보장
4. **배포 용이**: 하나의 파일로 전체 워크플로우 저장 가능

### 하이퍼파라미터 명명 규칙

```python
# 형식: step_name__parameter_name
'classifier__C'  # classifier 단계의 C 파라미터
'preprocessor__num__scaler__with_mean'  # 중첩된 파라미터
```

### 모델 저장 방법 비교

| 방법 | 장점 | 단점 |
|------|------|------|
| joblib | 대용량 NumPy 배열 효율적 처리 | sklearn 전용 |
| pickle | 파이썬 표준 라이브러리 | 대용량에서 느림 |
| ONNX | 프레임워크 독립적, 다양한 언어 지원 | 변환 작업 필요 |

### 실무 체크리스트

- [ ] 항상 Pipeline 사용하여 데이터 누수 방지
- [ ] ColumnTransformer로 수치형/범주형 전처리 분리
- [ ] 모델 저장 시 메타데이터 포함 (버전, 날짜, 성능 등)
- [ ] 입력 검증 함수 작성
- [ ] 커스텀 Transformer는 BaseEstimator, TransformerMixin 상속
- [ ] GridSearchCV로 전체 파이프라인 튜닝