# 2-4: FDS 비용 최적화 + pytest

FDS 비용 함수와 pytest 테스트를 구현합니다.

## 학습 목표
1. FDS 비용 함수 (FN vs FP 비용 차이)
2. 비용 기반 Threshold 최적화
3. pytest 테스트 작성 및 로컬 실행

## 예상 시간
- 약 1시간

## 선수 조건
- 2-3 Evidently 드리프트 완료
- `pip install pytest`

In [None]:
# 패키지 임포트
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, roc_auc_score, recall_score
import joblib
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# 한글 폰트
plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False

print("패키지 로드 완료!")

---
## 1. FDS 비용 함수

### 1.1 왜 비용 함수가 필요한가?

FDS에서 **오류의 비용이 다릅니다**:

| 오류 유형 | 의미 | 비용 |
|----------|------|------|
| **FN (False Negative)** | 사기를 놓침 | **100만원** (피해액) |
| **FP (False Positive)** | 정상을 사기로 오탐 | **5만원** (검토 비용) |

```
FN이 FP보다 20배 비쌈!
→ 단순 Accuracy보다 비용 기반 최적화 필요
```

In [None]:
# 비용 함수 구현

def calculate_business_cost(y_true, y_prob, threshold=0.18,
                            fn_cost=1_000_000, fp_cost=50_000):
    """
    FDS 비용 계산
    
    Parameters:
    -----------
    y_true : array-like
        실제 라벨 (0: 정상, 1: 사기)
    y_prob : array-like
        예측 확률 (0~1)
    threshold : float
        분류 기준 (기본 0.18)
    fn_cost : int
        FN 비용 (놓친 사기, 기본 100만원)
    fp_cost : int
        FP 비용 (오탐, 기본 5만원)
    
    Returns:
    --------
    dict : 비용 상세 정보
    """
    y_true = np.array(y_true)
    y_prob = np.array(y_prob)
    y_pred = (y_prob >= threshold).astype(int)
    
    # Confusion Matrix
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    
    # 비용 계산
    total_fn_cost = fn * fn_cost
    total_fp_cost = fp * fp_cost
    total_cost = total_fn_cost + total_fp_cost
    
    # 절감액 (사기 차단)
    savings = tp * fn_cost
    net_savings = savings - total_cost
    
    return {
        "threshold": threshold,
        "fn_count": int(fn),
        "fp_count": int(fp),
        "tp_count": int(tp),
        "tn_count": int(tn),
        "fn_cost": total_fn_cost,
        "fp_cost": total_fp_cost,
        "total_cost": total_cost,
        "savings": savings,
        "net_savings": net_savings,
        "recall": tp / (tp + fn) if (tp + fn) > 0 else 0,
    }

print("비용 함수 정의 완료!")

### 1.2 비용 함수 테스트 (시뮬레이션)

In [None]:
# 시뮬레이션 데이터
np.random.seed(42)

# 10,000건 거래, 3% 사기
n_samples = 10000
fraud_ratio = 0.03

y_true_sim = np.random.choice([0, 1], size=n_samples, p=[1-fraud_ratio, fraud_ratio])

# 가상의 모델 예측 (사기에 높은 확률, 정상에 낮은 확률)
y_prob_sim = np.where(
    y_true_sim == 1,
    np.random.beta(5, 2, n_samples),  # 사기: 높은 확률
    np.random.beta(2, 8, n_samples)   # 정상: 낮은 확률
)

print(f"전체 거래: {n_samples:,}건")
print(f"실제 사기: {y_true_sim.sum():,}건 ({y_true_sim.mean()*100:.1f}%)")

In [None]:
# 기본 threshold (0.18)로 비용 계산
cost_result = calculate_business_cost(y_true_sim, y_prob_sim, threshold=0.18)

print("=" * 50)
print("  FDS 비용 분석 (threshold = 0.18)")
print("=" * 50)
print(f"\nConfusion Matrix:")
print(f"  TP (사기 차단): {cost_result['tp_count']:,}건")
print(f"  TN (정상 승인): {cost_result['tn_count']:,}건")
print(f"  FP (오탐):      {cost_result['fp_count']:,}건")
print(f"  FN (놓친 사기): {cost_result['fn_count']:,}건")
print(f"\n비용 분석:")
print(f"  FN 비용 (놓친 사기): {cost_result['fn_cost']:,}원")
print(f"  FP 비용 (오탐):      {cost_result['fp_cost']:,}원")
print(f"  총 비용:             {cost_result['total_cost']:,}원")
print(f"\n  사기 차단 절감액:    {cost_result['savings']:,}원")
print(f"  순 절감액:           {cost_result['net_savings']:,}원")
print(f"\n  Recall (탐지율):    {cost_result['recall']*100:.1f}%")

In [None]:
# 체크포인트 1
assert "total_cost" in cost_result, "비용 함수에 total_cost 필요"
assert "net_savings" in cost_result, "비용 함수에 net_savings 필요"
assert cost_result["recall"] > 0, "Recall이 0보다 커야 함"

print("체크포인트 1 통과! 비용 함수 구현 완료")

---
## 2. Threshold 최적화

### 2.1 비용 최소화 Threshold 찾기

In [None]:
def find_optimal_threshold(y_true, y_prob, fn_cost=1_000_000, fp_cost=50_000):
    """
    비용 함수를 최소화하는 threshold 찾기
    
    Returns:
    --------
    tuple : (최적 threshold, 최소 비용, 전체 결과 리스트)
    """
    results = []
    best_threshold = 0.5
    best_cost = float('inf')
    
    for threshold in np.arange(0.05, 0.95, 0.01):
        cost_info = calculate_business_cost(
            y_true, y_prob, threshold, fn_cost, fp_cost
        )
        results.append(cost_info)
        
        if cost_info['total_cost'] < best_cost:
            best_cost = cost_info['total_cost']
            best_threshold = threshold
    
    return best_threshold, best_cost, results

print("Threshold 최적화 함수 정의 완료!")

In [None]:
# Threshold 최적화 실행
optimal_threshold, min_cost, all_results = find_optimal_threshold(
    y_true_sim, y_prob_sim
)

print(f"최적 Threshold: {optimal_threshold:.2f}")
print(f"최소 비용: {min_cost:,}원")

# 최적 threshold에서의 상세 결과
optimal_result = calculate_business_cost(y_true_sim, y_prob_sim, optimal_threshold)
print(f"최적 Recall: {optimal_result['recall']*100:.1f}%")

### 2.2 Threshold vs Cost 시각화

In [None]:
# 시각화
thresholds = [r['threshold'] for r in all_results]
total_costs = [r['total_cost'] for r in all_results]
fn_costs = [r['fn_cost'] for r in all_results]
fp_costs = [r['fp_cost'] for r in all_results]
recalls = [r['recall'] for r in all_results]

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# 그래프 1: Threshold vs 비용
ax1 = axes[0]
ax1.plot(thresholds, np.array(total_costs)/1e6, 'b-', linewidth=2, label='총 비용')
ax1.plot(thresholds, np.array(fn_costs)/1e6, 'r--', linewidth=1.5, label='FN 비용 (놓친 사기)')
ax1.plot(thresholds, np.array(fp_costs)/1e6, 'g--', linewidth=1.5, label='FP 비용 (오탐)')
ax1.axvline(optimal_threshold, color='orange', linestyle=':', linewidth=2, 
            label=f'최적 Threshold ({optimal_threshold:.2f})')
ax1.set_xlabel('Threshold', fontsize=12)
ax1.set_ylabel('비용 (백만원)', fontsize=12)
ax1.set_title('Threshold vs 비용', fontsize=14)
ax1.legend()
ax1.grid(True, alpha=0.3)

# 그래프 2: Threshold vs Recall
ax2 = axes[1]
ax2.plot(thresholds, np.array(recalls)*100, 'b-', linewidth=2)
ax2.axvline(optimal_threshold, color='orange', linestyle=':', linewidth=2,
            label=f'최적 Threshold ({optimal_threshold:.2f})')
ax2.axhline(optimal_result['recall']*100, color='red', linestyle='--', linewidth=1,
            label=f'최적 Recall ({optimal_result["recall"]*100:.1f}%)')
ax2.set_xlabel('Threshold', fontsize=12)
ax2.set_ylabel('Recall (%)', fontsize=12)
ax2.set_title('Threshold vs Recall', fontsize=14)
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"\n핵심 인사이트:")
print(f"  - Threshold 낮추면: FN 감소(좋음) but FP 증가(나쁨)")
print(f"  - Threshold 높이면: FP 감소(좋음) but FN 증가(나쁨)")
print(f"  - 최적점: 두 비용의 합이 최소가 되는 지점")

In [None]:
# 체크포인트 2
assert 0.05 <= optimal_threshold <= 0.95, "Threshold 범위 이상"
assert min_cost < float('inf'), "최소 비용 계산 실패"

print("체크포인트 2 통과! Threshold 최적화 완료")

---
## 3. 실제 모델에 적용

### 3.1 모델 로드 및 비용 분석

In [None]:
# 실제 모델 및 데이터 로드
MODEL_PATH = Path("../../models/xgb_model.joblib")
DATA_PATH = Path("../../data/processed")

if MODEL_PATH.exists():
    model = joblib.load(MODEL_PATH)
    print(f"모델 로드 완료: {MODEL_PATH}")
    
    # 테스트 데이터 로드 시도
    test_file = DATA_PATH / "test_features.csv"
    if test_file.exists():
        test_df = pd.read_csv(test_file)
        if 'isFraud' in test_df.columns:
            y_test = test_df['isFraud']
            X_test = test_df.drop('isFraud', axis=1)
            
            # 예측
            y_prob = model.predict_proba(X_test)[:, 1]
            
            # 비용 최적화
            opt_thresh, opt_cost, _ = find_optimal_threshold(y_test, y_prob)
            
            print(f"\n실제 모델 비용 분석:")
            print(f"  테스트 데이터: {len(y_test):,}건")
            print(f"  최적 Threshold: {opt_thresh:.2f}")
            print(f"  최소 비용: {opt_cost:,}원")
        else:
            print("테스트 데이터에 isFraud 컬럼 없음")
    else:
        print(f"테스트 데이터 없음: {test_file}")
        print("시뮬레이션 데이터로 대체합니다.")
else:
    print(f"모델 파일 없음: {MODEL_PATH}")
    print("시뮬레이션 데이터로 진행합니다.")

---
## 4. pytest 테스트

### 4.1 왜 테스트가 필요한가?

```
테스트 없이 배포하면:
1. 모델 파일이 없어도 몰름
2. 추론이 깨져도 몰름
3. 성능이 떨어져도 몰름

pytest로 자동 검증:
1. 모델 파일 존재 확인
2. 기본 추론 테스트
3. 비용 함수 테스트
```

### 4.2 테스트 파일 구조

In [None]:
# tests/test_model.py 내용 확인
test_code = '''
# tests/test_model.py

import pytest
import numpy as np
import joblib
from pathlib import Path

# 프로젝트 루트 기준 경로
PROJECT_ROOT = Path(__file__).parent.parent
MODEL_PATH = PROJECT_ROOT / "models" / "xgb_model.joblib"


class TestModel:
    """FDS 모델 테스트"""
    
    def test_model_file_exists(self):
        """모델 파일 존재 확인"""
        assert MODEL_PATH.exists(), f"모델 파일 없음: {MODEL_PATH}"
    
    def test_model_can_load(self):
        """모델 로드 가능 확인"""
        if not MODEL_PATH.exists():
            pytest.skip("모델 파일 없음")
        
        model = joblib.load(MODEL_PATH)
        assert model is not None
    
    def test_model_prediction(self):
        """기본 추론 테스트"""
        if not MODEL_PATH.exists():
            pytest.skip("모델 파일 없음")
        
        model = joblib.load(MODEL_PATH)
        
        # 더미 데이터 (피처 수는 모델에 맞게 조정 필요)
        n_features = model.n_features_in_
        X_dummy = np.random.rand(10, n_features)
        
        # 예측
        proba = model.predict_proba(X_dummy)
        
        # 검증
        assert proba.shape == (10, 2), "출력 shape 오류"
        assert np.all((proba >= 0) & (proba <= 1)), "확률 범위 오류"


class TestCostFunction:
    """비용 함수 테스트"""
    
    def test_cost_calculation(self):
        """비용 계산 테스트"""
        from sklearn.metrics import confusion_matrix
        
        # 간단한 예시
        y_true = np.array([0, 0, 1, 1, 1])
        y_prob = np.array([0.1, 0.2, 0.6, 0.7, 0.3])  # 1개 FN (마지막)
        threshold = 0.5
        
        y_pred = (y_prob >= threshold).astype(int)
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
        
        # 비용 계산
        fn_cost = fn * 1_000_000
        fp_cost = fp * 50_000
        total_cost = fn_cost + fp_cost
        
        # 검증
        assert fn == 1, f"FN 계산 오류: {fn}"
        assert fp == 0, f"FP 계산 오류: {fp}"
        assert total_cost == 1_000_000, f"총 비용 계산 오류: {total_cost}"
'''

print(test_code)

### 4.3 pytest 실행

In [None]:
# pytest 실행 (로컬에서)
# 이 셀은 tests/test_model.py 파일이 있어야 동작합니다.

import subprocess
import sys

# 프로젝트 루트로 이동하여 pytest 실행
project_root = Path("../..").resolve()
tests_dir = project_root / "tests"

if tests_dir.exists() and any(tests_dir.glob("test_*.py")):
    print(f"pytest 실행: {tests_dir}")
    result = subprocess.run(
        [sys.executable, "-m", "pytest", str(tests_dir), "-v"],
        capture_output=True,
        text=True,
        cwd=str(project_root)
    )
    print(result.stdout)
    if result.stderr:
        print("STDERR:", result.stderr)
else:
    print(f"tests/ 디렉토리가 없거나 테스트 파일이 없습니다.")
    print(f"테스트 파일을 먼저 생성하세요: tests/test_model.py")

---
## 5. 면접 포인트

### Q1: "FDS에서 비용 최적화는 어떻게 했나요?"

**답변:**
> FN(놓친 사기)과 FP(오탐)의 비용이 다릅니다.
> - FN: 평균 피해액 **100만원** 손실
> - FP: 검토 비용 **5만원**
> 
> 이 비용 함수로 최적 Threshold를 도출했습니다.
> 단순 Accuracy나 F1 대신 **비용 기반 최적화**가 비즈니스에 더 적합합니다.

---

### Q2: "Threshold는 어떻게 정했나요?"

**답변:**
> 비용 함수 `Cost = FN × 100만 + FP × 5만`을 최소화하는 threshold를 탐색했습니다.
> - Threshold 낮으면: FN 감소(좋음), FP 증가(나쁨)
> - Threshold 높으면: FP 감소(좋음), FN 증가(나쁨)
> - **최적점**: 두 비용의 합이 최소가 되는 지점

---

### Q3: "테스트는 어떻게 했나요?"

**답변:**
> pytest로 자동화된 테스트를 작성했습니다.
> - 모델 파일 존재 확인
> - 기본 추론 테스트 (shape, 확률 범위)
> - 비용 함수 로직 테스트

---
## 6. 최종 요약

In [None]:
print("=" * 60)
print("  2-4 완료: FDS 비용 최적화 + pytest")
print("=" * 60)
print()
print("배운 것:")
print()
print("1. FDS 비용 함수")
print("   - FN(놓친 사기): 100만원")
print("   - FP(오탐): 5만원")
print("   - FN이 FP보다 20배 비쌈!")
print()
print("2. Threshold 최적화")
print("   - 비용 최소화 threshold 탐색")
print("   - Recall과 비용의 Trade-off 시각화")
print()
print("3. pytest 테스트")
print("   - 모델 파일 존재 확인")
print("   - 기본 추론 테스트")
print("   - 비용 함수 테스트")
print()
print("=" * 60)
print("다음: 2-5 A/B 테스트로!")
print("=" * 60)

### 학습 체크리스트

| 항목 | 이해도 |
|------|--------|
| FN vs FP 비용 차이 | |
| 비용 함수 구현 | |
| Threshold 최적화 | |
| pytest 테스트 작성 | |