<a href="https://colab.research.google.com/github/erika0915/pattern-recognition/blob/main/05_27_%ED%95%99%EC%8A%B5%EB%AA%A8%EB%8D%B8%EA%B0%9C%EC%84%A0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install catboost

Collecting catboost
  Downloading catboost-1.2.8-cp311-cp311-manylinux2014_x86_64.whl.metadata (1.2 kB)
Downloading catboost-1.2.8-cp311-cp311-manylinux2014_x86_64.whl (99.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: catboost
Successfully installed catboost-1.2.8


### 주요 라이브러리 import


In [None]:
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.ensemble import VotingClassifier
from lightgbm import LGBMClassifier
from xgboost import XGBClassifier
from catboost import CatBoostClassifier
from imblearn.over_sampling import SMOTE
from sklearn.metrics import f1_score, accuracy_score, roc_auc_score
from sklearn.ensemble import StackingClassifier
from sklearn.linear_model import LogisticRegression

### 데이터 전처리


In [None]:
# 학습 및 테스트 데이터 불러오기
train_df = pd.read_csv('/content/drive/MyDrive/패턴인식/train.csv')
test_df = pd.read_csv('/content/drive/MyDrive/패턴인식/test.csv')

# 불필요한 컬럼 제거
train_df = train_df.drop(columns=['id', 'shares'])

# 수치형 변수 이상치 처리 : 로그 변환으로 값의 분포 완화
def log1p(x):
    return np.log1p(np.maximum(x, 0))

log_transform_cols = [
    'num_hrefs', 'num_self_hrefs', 'num_imgs', 'num_videos',
    'self_reference_avg_shares', 'kw_min_min', 'kw_max_min', 'kw_avg_min',
    'kw_min_max', 'kw_max_max', 'kw_avg_max', 'kw_min_avg', 'kw_max_avg', 'kw_avg_avg'
]

for col in log_transform_cols:
    if col in train_df.columns:
        train_df[col] = log1p(train_df[col])
    if col in test_df.columns:
        test_df[col] = log1p(test_df[col])

# 결측치 처리 : 수치형은 중앙값, 범주형은 최빈값으로 대체
numeric_cols = train_df.select_dtypes(include=['float64', 'int64']).columns.tolist()
categorical_cols = train_df.select_dtypes(include='object').columns.tolist()

y = train_df['y']
X = train_df.drop(columns=['y'])

numeric_cols = X.select_dtypes(include=['float64', 'int64']).columns.tolist()
categorical_cols = X.select_dtypes(include='object').columns.tolist()

train_median = X[numeric_cols].median()
train_mode = X[categorical_cols].mode().iloc[0]

X[numeric_cols] = X[numeric_cols].fillna(train_median)
X[categorical_cols] = X[categorical_cols].fillna(train_mode)

test_df[numeric_cols] = test_df[numeric_cols].fillna(train_median)
test_df[categorical_cols] = test_df[categorical_cols].fillna(train_mode)

# 수치형 변수 정규화
scaler = StandardScaler()
X_train_scaled = pd.DataFrame(scaler.fit_transform(X[numeric_cols]), columns=numeric_cols)
X_test_scaled = pd.DataFrame(scaler.transform(test_df[numeric_cols]), columns=numeric_cols)

# 범주형 변수 One-hot 인코딩
encoder = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
X_train_cat = pd.DataFrame(encoder.fit_transform(X[categorical_cols]), columns=encoder.get_feature_names_out())
X_test_cat = pd.DataFrame(encoder.transform(test_df[categorical_cols]), columns=encoder.get_feature_names_out())

# 테스트 데이터셋에 없는 칼럼 채워넣기 (0으로)
missing_cols = set(X_train_cat.columns) - set(X_test_cat.columns)
for col in missing_cols:
    X_test_cat[col] = 0
X_test_cat = X_test_cat[X_train_cat.columns]

# 최종 결합
X_train_final = pd.concat([X_train_scaled.reset_index(drop=True), X_train_cat.reset_index(drop=True)], axis=1)
X_test_final = pd.concat([X_test_scaled.reset_index(drop=True), X_test_cat.reset_index(drop=True)], axis=1)

# 검증용 데이터 분리 (train/validation = 8:2, stratify로 클래스 비율 유지)
X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(
     X_train_final, y,
    test_size=0.2,
    random_state=42,
    stratify=y
)

# 테스트용 ID 백업 (예측 결과 저장용)
test_id = test_df['id'].copy()

### 모델 학습

In [None]:
# 파생 변수 생성 함수 정의
def add_engineered_features(df):
    df = df.copy()
    df['keyword_ratio'] = (df['num_keywords'] + 1) / (df['n_tokens_content'] + 1)
    df['token_density'] = (df['n_tokens_content'] + 1) / (df['num_imgs'] + 1)
    df['href_self_ratio'] = (df['num_self_hrefs'] + 1) / (df['num_hrefs'] + 1)
    df['stopword_diversity'] = (df['n_non_stop_unique_tokens'] + 1e-5) / (df['n_non_stop_words'] + 1e-5)
    df['keyword_score_max'] = df['kw_max_max'] / (df['num_keywords'] + 1)
    df['keyword_score_avg'] = df['kw_avg_avg'] / (df['num_keywords'] + 1)
    df['keyword_contrast'] = df['kw_max_max'] - df['kw_min_min']

    return df

# 훈련, 검증, 테스트 데이터에 파생 변수 적용
X_train = add_engineered_features(X_train_split)
X_val = add_engineered_features(X_val_split)
X_test = add_engineered_features(X_test_final)

In [None]:
# 클래스 불균형 해결을 위한 SMOTE 오버샘플링
sm = SMOTE(random_state=42)
X_resampled, y_resampled = sm.fit_resample(X_train, y_train_split)

# Base learners
lgbm = LGBMClassifier(n_estimators=300, max_depth=7, learning_rate=0.05, random_state=42)
xgb = XGBClassifier(n_estimators=300, max_depth=6, learning_rate=0.05, eval_metric='logloss', use_label_encoder=False, random_state=42)
cat = CatBoostClassifier(n_estimators=300, max_depth=6, learning_rate=0.05, verbose=0, random_state=42)

# Meta model (XGBoost 사용)
meta_model = XGBClassifier(
    n_estimators=100,
    max_depth=3,
    learning_rate=0.1,
    eval_metric='logloss',
    use_label_encoder=False,
    random_state=42
)

# StackingClassifier 정의
stacking_model = StackingClassifier(
    estimators=[('lgbm', lgbm), ('xgb', xgb), ('cat', cat)],
    final_estimator=meta_model,
    passthrough=True,
    n_jobs=-1
)

# 학습
stacking_model.fit(X_resampled, y_resampled)

# 검증 및 테스트셋의 컬럼 순서를 학습 데이터와 동일하게 정렬
X_val = X_val[X_resampled.columns]
X_test = X_test[X_resampled.columns]

# 훈련셋에서 최적의 임계값(Threshold)을 찾기 위한 F1 Score 기반 탐색
train_probs = stacking_model.predict_proba(X_resampled)[:, 1]
thresholds = np.linspace(0.1, 0.9, 81)
f1_scores = [(t, f1_score(y_resampled, (train_probs >= t).astype(int))) for t in thresholds]
best_threshold, best_f1 = max(f1_scores, key=lambda x: x[1])

# 최적 임계값 기준으로 검증 세트 예측 수행
val_probs = stacking_model.predict_proba(X_val)[:, 1]
val_preds = (val_probs >= best_threshold).astype(int)

# 검증 세트 평가 지표 계산
acc = accuracy_score(y_val_split, val_preds)
f1 = f1_score(y_val_split, val_preds)
auc = roc_auc_score(y_val_split, val_probs)
avg = (acc + f1 + auc) / 3

# 결과 출력
print(f"\n[Validation 최적 임계값]: {best_threshold:.2f} (F1: {best_f1:.4f})")
print("[Validation 성능]")
print(f" - Accuracy: {acc:.4f}")
print(f" - F1 Score:  {f1:.4f}")
print(f" - AUC:       {auc:.4f}")
print(f" - 평균 점수:  {avg:.4f}")

### 테스트 데이터 분포 확인

In [None]:
# 확률 예측
test_probs = stacking_model.predict_proba(X_test)[:, 1]

# 최적 임계값 적용해 0 또는 1로 변환
test_preds = (test_probs >= best_threshold).astype(int)
unique, counts = np.unique(test_preds, return_counts=True)

print("\n▶ 테스트 세트 예측 클래스 분포:")
print(pd.Series(test_preds).value_counts().to_string())


In [None]:
# prediction.csv 저장
prediction = pd.DataFrame({'id': test_id, 'y_predict': test_preds, 'y_prob': test_probs})
prediction.to_csv('/content/prediction.csv', index=False)