In [None]:
!pip install optuna
!pip install -U xgboost
!pip install optuna-integration[xgboost]

In [None]:
import pandas as pd
import numpy as np
import os
import joblib
import optuna
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OrdinalEncoder, StandardScaler
from sklearn.metrics import mean_squared_error
from xgboost import XGBRegressor
from sklearn.cluster import KMeans
from google.colab import drive

# 드라이브 마운트
drive.mount('/content/drive')

# 경로 설정
file_path = ''
model_dir = ''
os.makedirs(model_dir, exist_ok=True)

# 데이터 로드
df = pd.read_csv(file_path)

# Group 매핑
group_map = {
    "식품": "Group_A", "유아동": "Group_A", "여행/문화": "Group_A",
    "자동차": "Group_B", "서비스/렌탈": "Group_C", "컴퓨터/IT": "Group_D",
    "건강": "Group_E", "가전/디지털": "Group_F", "가구/인테리어": "Group_F",
    "생활/주방": "Group_G", "기타": "Group_G", "의류/패션": "Group_H",
    "화장품/뷰티": "Group_I", "스포츠/레저": "Group_J"
}
df['Group'] = df['통합카테고리1'].map(group_map).fillna('Group_G')

# 결측치 처리
df = df.dropna(subset=['Group', '시청자수'])
df['강수량(mm)'] = df['강수량(mm)'].fillna(df['강수량(mm)'].mean())

# 증강 함수 정의
def augment_group(df_group, n_samples=300):
    df_group = df_group.copy()
    features = df_group.select_dtypes(include=['int', 'float']).drop(columns=['시청자수'], errors='ignore')
    features = features.fillna(features.mean())
    scaler = StandardScaler()
    scaled = scaler.fit_transform(features)
    kmeans = KMeans(n_clusters=3, random_state=42)
    df_group['cluster'] = kmeans.fit_predict(scaled)

    rows = []
    for c in df_group['cluster'].unique():
        base = df_group[df_group['cluster'] == c]
        for _ in range(n_samples):
            noise = np.random.normal(0, 0.05, size=features.shape[1])
            row = base.sample(1).copy()
            row[features.columns] += noise
            row['시청자수'] = base['시청자수'].mean()
            rows.append(row.squeeze())
    return pd.DataFrame(rows)

# 증강 적용
augment_targets = ['Group_B', 'Group_D', 'Group_C', 'Group_J']
df_aug = df.copy()
for g in augment_targets:
    aug = augment_group(df[df['Group'] == g])
    df_aug = pd.concat([df_aug, aug], axis=0)
df_aug.reset_index(drop=True, inplace=True)

# 인코딩
obj_cols = df_aug.select_dtypes(include='object').columns.tolist()
encoder = OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1)
df_aug[obj_cols] = encoder.fit_transform(df_aug[obj_cols])
joblib.dump(encoder, f"{model_dir}/encoder_total_model.pkl")

# feature 리스트 추출 (시간 관련 컬럼 제외)
exclude_cols = ['시청자수', '방송일자', '방송시작시간', '방송종료시간']
features = [c for c in df_aug.columns if c not in exclude_cols]
with open(f"{model_dir}/feature_list.txt", "w") as f:
    for col in features:
        f.write(col + "\n")

# 그룹별 모델 학습
for group_name in sorted(df_aug['Group'].unique()):
    df_g = df_aug[df_aug['Group'] == group_name].copy()
    X, y = df_g[features], df_g['시청자수']
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    def objective(trial):
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 200, 600),
            'max_depth': trial.suggest_int('max_depth', 3, 8),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.2),
            'subsample': trial.suggest_float('subsample', 0.7, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
            'random_state': 42, 'n_jobs': -1
        }
        model = XGBRegressor(**params)
        model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)
        preds = model.predict(X_val)
        return np.sqrt(mean_squared_error(y_val, preds))

    print(f"[INFO] {group_name} 모델 튜닝 시작...")
    study = optuna.create_study(direction='minimize')
    study.optimize(objective, n_trials=30)

    final_model = XGBRegressor(**study.best_params)
    final_model.fit(X_train, y_train)
    joblib.dump(final_model, f"{model_dir}/model_{group_name}.pkl")
    print(f" {group_name} 모델 저장 완료!")

# 전체 모델 학습
X_all, y_all = df_aug[features], df_aug['시청자수']
X_tr, X_vl, y_tr, y_vl = train_test_split(X_all, y_all, test_size=0.2, random_state=42)

def objective_total(trial):
    model = XGBRegressor(
        n_estimators=trial.suggest_int('n_estimators', 100, 500),
        max_depth=trial.suggest_int('max_depth', 3, 10),
        learning_rate=trial.suggest_float('learning_rate', 0.01, 0.3),
        subsample=trial.suggest_float('subsample', 0.6, 1.0),
        colsample_bytree=trial.suggest_float('colsample_bytree', 0.6, 1.0),
        random_state=42, n_jobs=-1
    )
    model.fit(X_tr, y_tr)
    preds = model.predict(X_vl)
    return np.sqrt(mean_squared_error(y_vl, preds))

print("모델 튜닝 시작")
study_total = optuna.create_study(direction='minimize')
study_total.optimize(objective_total, n_trials=30)

final_model_total = XGBRegressor(**study_total.best_params)
final_model_total.fit(X_tr, y_tr)
joblib.dump(final_model_total, f"{model_dir}/model_total.pkl")

print("\n 전체 학습 완료")
