<a href="https://colab.research.google.com/github/SangLee5661/New-Folder2/blob/main/%EC%8B%A0%EC%9A%A9%EC%9C%84%ED%97%98%EB%8F%84%EB%B6%84%EC%84%9D_%ED%95%99%EC%8A%B5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score
from sklearn.preprocessing import LabelEncoder
from imblearn.over_sampling import SMOTE
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Google Drive 마운트 및 데이터 로드
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# 데이터 로드
train_path = '/content/drive/MyDrive/card_train.csv'
test_path = '/content/drive/MyDrive/card_test.csv'

train_df = pd.read_csv(train_path)
test_df = pd.read_csv(test_path)

print("데이터 로드 완료:")
print(f"Train: {train_df.shape}, Test: {test_df.shape}")
print("\nTrain Segment 분포:")
print(train_df['Segment'].value_counts(dropna=False))

데이터 로드 완료:
Train: (70560, 738), Test: (1440, 737)

Train Segment 분포:
Segment
E    56505
D    10270
C     3753
A       28
B        4
Name: count, dtype: int64


In [5]:
# 유효 레이블 필터링 (train 데이터만)
valid_labels = ['A', 'B', 'C', 'D', 'E']
train_df = train_df[train_df['Segment'].isin(valid_labels)]
print(f"\n유효 레이블 필터링 후 Train: {train_df.shape}")
print("필터링 후 Segment 분포:")
print(train_df['Segment'].value_counts())


유효 레이블 필터링 후 Train: (70560, 738)
필터링 후 Segment 분포:
Segment
E    56505
D    10270
C     3753
A       28
B        4
Name: count, dtype: int64


In [6]:
# 전처리 함수 정의
def preprocess_data(df, is_train=True):
    """데이터 전처리 통합 함수"""
    # 불필요한 컬럼 제거
    drop_cols = ['ID', 'Unnamed: 0.1', 'Segment.1']
    if is_train:
        X = df.drop(columns=drop_cols + ['Segment'], errors='ignore')
        y = df['Segment'] if 'Segment' in df.columns else None
    else:
        X = df.drop(columns=drop_cols, errors='ignore')
        y = None

    return X, y

In [7]:
# 데이터 전처리
X_full, y_full = preprocess_data(train_df, is_train=True)
X_final_test, _ = preprocess_data(test_df, is_train=False)

print(f"\n전처리 후 형태:")
print(f"X_full: {X_full.shape}, y_full: {len(y_full)}")
print(f"X_final_test: {X_final_test.shape}")

# Train 데이터를 8:2로 분할 (내부 검증용)
X_train, X_val, y_train, y_val = train_test_split(
    X_full, y_full, test_size=0.2, random_state=42, stratify=y_full
)

print(f"\n8:2 분할 결과:")
print(f"Train: {X_train.shape}, Validation: {X_val.shape}")
print("\nTrain set 등급 분포:")
print(pd.Series(y_train).value_counts().sort_index())
print("Validation set 등급 분포:")
print(pd.Series(y_val).value_counts().sort_index())


전처리 후 형태:
X_full: (70560, 734), y_full: 70560
X_final_test: (1440, 735)

8:2 분할 결과:
Train: (56448, 734), Validation: (14112, 734)

Train set 등급 분포:
Segment
A       22
B        3
C     3003
D     8216
E    45204
Name: count, dtype: int64
Validation set 등급 분포:
Segment
A        6
B        1
C      750
D     2054
E    11301
Name: count, dtype: int64


In [8]:
# 피처 엔지니어링 및 전처리 클래스
class FeatureProcessor:
    def __init__(self):
        self.encoders = {}
        self.fill_values = {}
        self.feature_names = None

    def fit_transform(self, X_train):
        """학습 데이터로 fit하고 transform"""
        X_processed = X_train.copy()

        # 범주형 변수 인코딩
        categorical_cols = X_processed.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            le = LabelEncoder()
            X_processed[col] = le.fit_transform(X_processed[col].astype(str))
            self.encoders[col] = le

        # 결측치 처리를 위한 중앙값 저장
        numeric_cols = X_processed.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            self.fill_values[col] = X_processed[col].median()

        # 결측치 및 무한값 처리
        X_processed = self._handle_missing_values(X_processed)

        # 피처 엔지니어링
        X_processed = self._create_features(X_processed)

        self.feature_names = X_processed.columns.tolist()
        return X_processed

    def transform(self, X_test):
        """학습된 변환을 테스트 데이터에 적용"""
        X_processed = X_test.copy()

        # 범주형 변수 인코딩 (fit된 encoder 사용)
        for col, encoder in self.encoders.items():
            if col in X_processed.columns:
                # 새로운 범주 처리
                X_processed[col] = X_processed[col].astype(str)
                unknown_mask = ~X_processed[col].isin(encoder.classes_)
                if unknown_mask.any():
                    # 가장 빈번한 클래스로 대체
                    most_frequent = encoder.classes_[0]
                    X_processed.loc[unknown_mask, col] = most_frequent
                X_processed[col] = encoder.transform(X_processed[col])

        # 결측치 처리
        X_processed = self._handle_missing_values(X_processed)

        # 피처 엔지니어링
        X_processed = self._create_features(X_processed)

        # 학습 시와 동일한 컬럼만 유지
        missing_cols = set(self.feature_names) - set(X_processed.columns)
        extra_cols = set(X_processed.columns) - set(self.feature_names)

        # 누락된 컬럼은 0으로 채움
        for col in missing_cols:
            X_processed[col] = 0

        # 추가 컬럼 제거
        X_processed = X_processed.drop(columns=list(extra_cols), errors='ignore')

        # 컬럼 순서 맞춤
        X_processed = X_processed[self.feature_names]

        return X_processed

    def _handle_missing_values(self, X):
        """결측치 및 무한값 처리"""
        X = X.replace([np.inf, -np.inf], np.nan)

        for col in X.columns:
            if col in self.fill_values:
                X[col] = X[col].fillna(self.fill_values[col])
            else:
                if X[col].dtype in ['int64', 'float64']:
                    X[col] = X[col].fillna(X[col].median())
                else:
                    X[col] = X[col].fillna(X[col].mode()[0] if not X[col].mode().empty else 0)

        return X

    def _create_features(self, X):
        """피처 엔지니어링"""
        X_new = X.copy()

        # 사용률 피처 생성
        usage_cols = [col for col in X.columns if any(p in col.lower() for p in ['사용금액', '사용액', 'usage', 'amount'])]
        limit_cols = [col for col in X.columns if any(p in col.lower() for p in ['한도금액', '한도액', 'limit'])]

        created_features = 0
        for usage_col in usage_cols[:5]:  # 상위 5개만
            for limit_col in limit_cols[:3]:  # 상위 3개만
                if any(period in usage_col and period in limit_col for period in ['R3M', 'R6M', 'R12M']):
                    ratio_name = f"사용률_{usage_col.split('_')[-1] if '_' in usage_col else created_features}"
                    safe_limit = X[limit_col].replace(0, 1)
                    X_new[ratio_name] = np.clip(X[usage_col] / safe_limit, 0, 5)  # 이상치 제한
                    created_features += 1
                    if created_features >= 10:  # 최대 10개 피처
                        break
            if created_features >= 10:
                break

        # 거래 빈도 피처
        count_cols = [col for col in X.columns if any(p in col.lower() for p in ['건수', 'count', '횟수'])]
        for i, count_col in enumerate(count_cols[:5]):
            if 'R3M' in count_col or '3개월' in count_col:
                freq_name = f"월평균빈도_{i}"
                X_new[freq_name] = X[count_col] / 3

        return X_new

In [18]:
# 개선된 클래스 불균형 처리 함수
def handle_class_imbalance(X_train, y_train, strategy='auto'):
    """클래스 불균형 문제를 다양한 방법으로 해결"""

    class_counts = pd.Series(y_train).value_counts().sort_index()
    min_count = class_counts.min()
    max_count = class_counts.max()

    print("원본 클래스 분포:")
    for cls, count in class_counts.items():
        print(f"  {cls}: {count}개 ({count/len(y_train)*100:.1f}%)")

    print(f"\n불균형 비율: {max_count/min_count:.1f}:1")

    # 전략 1: 극소수 클래스 제거 후 SMOTE
    if strategy == 'auto' or strategy == 'remove_minority':
        # 5개 미만인 클래스는 제거
        valid_classes = class_counts[class_counts >= 5].index

        if len(valid_classes) < len(class_counts):
            print(f"극소수 클래스 제거: {set(class_counts.index) - set(valid_classes)}")
            mask = pd.Series(y_train).isin(valid_classes)
            X_train_filtered = X_train[mask]
            y_train_filtered = pd.Series(y_train)[mask].values

            # 필터링 후 SMOTE 시도
            return apply_smote_or_weights(X_train_filtered, y_train_filtered)

    # 전략 2: 클래스 가중치만 사용
    if strategy == 'auto' or strategy == 'weights_only':
        print("SMOTE 대신 클래스 가중치 사용")
        return X_train, y_train, True  # use_class_weight=True

    # 전략 3: 수동 오버샘플링
    if strategy == 'manual_oversample':
        return manual_oversample(X_train, y_train)

    return X_train, y_train, True

def apply_smote_or_weights(X_train, y_train):
    """SMOTE 적용 또는 클래스 가중치 사용"""
    class_counts = pd.Series(y_train).value_counts()
    min_count = class_counts.min()

    if min_count >= 5:
        try:
            k_neighbors = min(3, min_count - 1)  # k_neighbors 줄임
            smote = SMOTE(random_state=42, k_neighbors=k_neighbors)
            X_balanced, y_balanced = smote.fit_resample(X_train, y_train)

            print(f"✅ SMOTE 적용 완료 (k_neighbors={k_neighbors})")
            print("SMOTE 적용 후 클래스 분포:")
            balanced_counts = pd.Series(y_balanced).value_counts().sort_index()
            for cls, count in balanced_counts.items():
                print(f"  {cls}: {count}개")

            return X_balanced, y_balanced, False  # use_class_weight=False
        except Exception as e:
            print(f"❌ SMOTE 실패: {str(e)}")

    print("SMOTE 적용 불가 → 클래스 가중치 사용")
    return X_train, y_train, True  # use_class_weight=True

def manual_oversample(X_train, y_train, target_ratio=0.3):
    """수동 오버샘플링 (복제 방식)"""
    class_counts = pd.Series(y_train).value_counts()
    max_count = class_counts.max()
    target_min_count = max(int(max_count * target_ratio), 10)

    X_resampled = []
    y_resampled = []

    for cls in class_counts.index:
        cls_mask = pd.Series(y_train) == cls
        cls_X = X_train[cls_mask]
        cls_y = y_train[cls_mask]

        current_count = len(cls_y)
        if current_count < target_min_count:
            # 복제로 샘플 증가
            n_copies = target_min_count // current_count
            n_extra = target_min_count % current_count

            # 기본 복제
            for _ in range(n_copies):
                X_resampled.append(cls_X)
                y_resampled.extend(cls_y)

            # 추가 샘플 (랜덤 선택)
            if n_extra > 0:
                extra_idx = np.random.choice(len(cls_X), n_extra, replace=False)
                X_resampled.append(cls_X.iloc[extra_idx])
                y_resampled.extend(cls_y[extra_idx])
        else:
            X_resampled.append(cls_X)
            y_resampled.extend(cls_y)

    X_final = pd.concat(X_resampled, ignore_index=True)
    y_final = np.array(y_resampled)

    print("수동 오버샘플링 완료:")
    final_counts = pd.Series(y_final).value_counts().sort_index()
    for cls, count in final_counts.items():
        print(f"  {cls}: {count}개")

    return X_final, y_final, False

In [15]:
# 피처 처리 및 모델 학습
print(f"\n{'='*50}")
print("=== 피처 처리 및 모델 학습 ===")

# 피처 프로세서 학습
processor = FeatureProcessor()
X_train_processed = processor.fit_transform(X_train)
X_val_processed = processor.transform(X_val)

print(f"피처 처리 완료:")
print(f"처리된 Train: {X_train_processed.shape}")
print(f"처리된 Validation: {X_val_processed.shape}")


=== 피처 처리 및 모델 학습 ===
피처 처리 완료:
처리된 Train: (56448, 736)
처리된 Validation: (14112, 736)


In [20]:
# 클래스 불균형 처리 (학습 데이터에만!)
print("클래스 불균형 처리 시작...")
X_train_balanced, y_train_balanced, use_class_weight = handle_class_imbalance(
    X_train_processed, y_train, strategy='auto'
)

클래스 불균형 처리 시작...
원본 클래스 분포:
  A: 22개 (0.0%)
  B: 3개 (0.0%)
  C: 3003개 (5.3%)
  D: 8216개 (14.6%)
  E: 45204개 (80.1%)

불균형 비율: 15068.0:1
극소수 클래스 제거: {'B'}
✅ SMOTE 적용 완료 (k_neighbors=3)
SMOTE 적용 후 클래스 분포:
  A: 45204개
  C: 45204개
  D: 45204개
  E: 45204개


In [23]:
# 모델 학습 및 비교 (클래스 가중치 고려)
models = {}
if use_class_weight:
    print("클래스 가중치 적용 모델 사용")
    models = {
        'RandomForest': RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced'),
        'GradientBoosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
        'LogisticRegression': LogisticRegression(random_state=42, max_iter=1000, class_weight='balanced')
    }
else:
    print("일반 모델 사용 (데이터 균형 처리됨)")
    models = {
        'RandomForest': RandomForestClassifier(n_estimators=100, random_state=42),
        'GradientBoosting': GradientBoostingClassifier(n_estimators=100, random_state=42),
        'LogisticRegression': LogisticRegression(random_state=42, max_iter=1000)
    }

print(f"\n=== 모델 성능 비교 ===")
best_model = None
best_score = 0
best_name = ""

model_results = {}

for name, model in models.items():
    try:
        # 학습
        model.fit(X_train_balanced, y_train_balanced)

        # 검증
        y_val_pred = model.predict(X_val_processed)
        accuracy = accuracy_score(y_val, y_val_pred)

        # 교차 검증
        cv_scores = cross_val_score(model, X_train_balanced, y_train_balanced, cv=3, scoring='accuracy')
        cv_mean = cv_scores.mean()

        print(f"{name:20s}: 검증 정확도 {accuracy:.3f}, CV 평균 {cv_mean:.3f}")

        model_results[name] = {
            'model': model,
            'val_accuracy': accuracy,
            'cv_accuracy': cv_mean
        }

        if accuracy > best_score:
            best_score = accuracy
            best_model = model
            best_name = name

    except Exception as e:
        print(f"{name} 학습 실패: {str(e)}")

일반 모델 사용 (데이터 균형 처리됨)

=== 모델 성능 비교 ===
RandomForest        : 검증 정확도 0.870, CV 평균 0.955
GradientBoosting    : 검증 정확도 0.868, CV 평균 0.882
LogisticRegression  : 검증 정확도 0.761, CV 평균 0.766


In [22]:
# 최적 모델 선택 및 상세 평가
if best_model is not None:
    print(f"\n🏆 최적 모델: {best_name} (검증 정확도: {best_score:.3f})")

    # 상세 분류 보고서
    y_val_pred = best_model.predict(X_val_processed)
    print(f"\n=== 상세 분류 성능 ===")
    print(classification_report(y_val, y_val_pred))

    # 등급별 정확도
    print("\n등급별 예측 정확도:")
    for grade in sorted(set(y_val)):
        grade_mask = np.array(y_val) == grade
        if grade_mask.sum() > 0:
            grade_accuracy = accuracy_score(
                np.array(y_val)[grade_mask],
                y_val_pred[grade_mask]
            )
            print(f"{grade}등급: {grade_accuracy:.3f} ({grade_mask.sum()}개 샘플)")

    # 피처 중요도 (RandomForest인 경우)
    if hasattr(best_model, 'feature_importances_'):
        print(f"\n=== 상위 20개 중요 피처 ===")
        feature_importance = pd.DataFrame({
            'feature': X_train_processed.columns,
            'importance': best_model.feature_importances_
        }).sort_values('importance', ascending=False)

        for i, (_, row) in enumerate(feature_importance.head(20).iterrows()):
            print(f"{i+1:2d}. {row['feature']:35s} {row['importance']:.4f}")

    # 최종 테스트 데이터 예측 준비
    print(f"\n=== 최종 예측 준비 ===")
    X_final_test_processed = processor.transform(X_final_test)
    print(f"최종 테스트 데이터 처리 완료: {X_final_test_processed.shape}")

    # 최종 예측
    final_predictions = best_model.predict(X_final_test_processed)
    prediction_proba = best_model.predict_proba(X_final_test_processed)

    print("최종 예측 등급 분포:")
    print(pd.Series(final_predictions).value_counts().sort_index())

    # 예측 결과 저장을 위한 DataFrame 생성
    results_df = pd.DataFrame({
        'ID': test_df['ID'],
        'Predicted_Segment': final_predictions
    })

    # 예측 확률도 추가 (각 등급별)
    prob_cols = [f'Prob_{cls}' for cls in best_model.classes_]
    for i, col in enumerate(prob_cols):
        results_df[col] = prediction_proba[:, i]

    print(f"\n예측 결과 샘플:")
    print(results_df.head())

    # 결과 저장 (선택사항)
    # results_df.to_csv('/content/drive/MyDrive/card_predictions.csv', index=False)
    # print("예측 결과가 저장되었습니다.")

else:
    print("❌ 모든 모델 학습에 실패했습니다.")

# 학습 검증 요약
print(f"\n{'='*50}")
print("=== 학습 검증 요약 ===")
print(f"✅ 데이터 분할: Train 80% ({len(y_train)}개), Validation 20% ({len(y_val)}개)")
if use_class_weight:
    print(f"✅ 클래스 불균형 처리: 가중치 적용 (극소수 클래스 처리)")
else:
    print(f"✅ 클래스 불균형 처리: 데이터 증강 완료 ({len(y_train_balanced)}개)")
print(f"✅ 피처 처리: {X_train_processed.shape[1]}개 피처 (파생 피처 포함)")
if best_model:
    print(f"✅ 모델 선택: {best_name} (검증 정확도 {best_score:.3f})")
    print(f"✅ 최종 예측: {len(final_predictions)}개 테스트 샘플 예측 완료")

print("\n클래스별 처리 결과:")
final_class_counts = pd.Series(y_train_balanced).value_counts().sort_index()
for cls, count in final_class_counts.items():
    original_count = pd.Series(y_train).value_counts().get(cls, 0)
    print(f"  {cls}: {original_count} → {count} ({'증강' if count > original_count else '유지'})")

print("\n권장사항:")
if use_class_weight:
    print("1. 극소수 클래스가 제거되었으므로 전체 클래스 예측 성능 확인 필요")
    print("2. 더 많은 데이터 수집으로 클래스 불균형 근본 해결 권장")
else:
    print("1. 데이터 증강이 완료되어 균형잡힌 학습 가능")
print("3. 하이퍼파라미터 튜닝으로 성능 개선 가능")
print("4. 앙상블 모델 적용 고려")


🏆 최적 모델: RandomForest (검증 정확도: 0.870)

=== 상세 분류 성능 ===
              precision    recall  f1-score   support

           A       0.00      0.00      0.00         6
           B       0.00      0.00      0.00         1
           C       0.65      0.60      0.62       750
           D       0.58      0.66      0.62      2054
           E       0.95      0.93      0.94     11301

    accuracy                           0.87     14112
   macro avg       0.44      0.44      0.44     14112
weighted avg       0.88      0.87      0.87     14112


등급별 예측 정확도:
A등급: 0.000 (6개 샘플)
B등급: 0.000 (1개 샘플)
C등급: 0.596 (750개 샘플)
D등급: 0.661 (2054개 샘플)
E등급: 0.927 (11301개 샘플)

=== 상위 20개 중요 피처 ===
 1. 정상청구원금_B0M                          0.0383
 2. 청구금액_R6M                            0.0328
 3. 정상청구원금_B5M                          0.0279
 4. 청구금액_R3M                            0.0267
 5. 카드이용한도금액                            0.0221
 6. 카드이용한도금액_B2M                        0.0204
 7. 정상청구원금_B2M                   