In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge, Lasso
from sklearn.preprocessing import StandardScaler, LabelEncoder, PolynomialFeatures
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.feature_selection import SelectFromModel, RFE
from sklearn.model_selection import cross_val_score, TimeSeriesSplit
import xgboost as xgb
import lightgbm as lgb
import optuna
import warnings
import os
import joblib
warnings.filterwarnings('ignore')

# 한글 폰트 설정
plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rcParams['axes.unicode_minus'] = False

In [None]:
class EnhancedFeatureEngineering:
    """고도화된 특성 공학"""
    
    def __init__(self):
        self.scalers = {}
        self.encoders = {}
        self.selected_features = None
    # 매서드 1 . 극한 기상 상태 이진 변수 생성     
    def create_extreme_weather_features(self, df):
        """극한 기상 상태 이진 변수 생성"""
        print("🌪️ 극한 기상 상태 변수 생성 중...")
        
        # 온도 기반 극한 상태
        if 'ta' in df.columns:
            temp_q05 = df['ta'].quantile(0.05)  # 한파
            temp_q95 = df['ta'].quantile(0.95)  # 폭염
            
            df['extreme_cold'] = (df['ta'] <= temp_q05).astype(int)
            df['extreme_heat'] = (df['ta'] >= temp_q95).astype(int)
            df['moderate_temp'] = ((df['ta'] > temp_q05) & (df['ta'] < temp_q95)).astype(int)
            
            print(f"  한파 기준: ≤{temp_q05:.1f}°C ({df['extreme_cold'].sum():,}개)")
            print(f"  폭염 기준: ≥{temp_q95:.1f}°C ({df['extreme_heat'].sum():,}개)")
        
        # 강수 기반 극한 상태
        if 'rn_hr1' in df.columns:
            df['no_rain'] = (df['rn_hr1'] == 0).astype(int)
            df['light_rain'] = ((df['rn_hr1'] > 0) & (df['rn_hr1'] <= 2)).astype(int)
            df['heavy_rain'] = (df['rn_hr1'] > 10).astype(int)
            df['extreme_rain'] = (df['rn_hr1'] > 30).astype(int)
            
            print(f"  폭우 기준: >10mm ({df['heavy_rain'].sum():,}개)")
            print(f"  극한 강수: >30mm ({df['extreme_rain'].sum():,}개)")
        
        # 풍속 기반 극한 상태
        if 'ws' in df.columns:
            wind_q90 = df['ws'].quantile(0.90)
            wind_q95 = df['ws'].quantile(0.95)
            
            df['calm_wind'] = (df['ws'] <= 1.0).astype(int)
            df['strong_wind'] = (df['ws'] >= wind_q90).astype(int)
            df['extreme_wind'] = (df['ws'] >= wind_q95).astype(int)
            
            print(f"  강풍 기준: ≥{wind_q90:.1f}m/s ({df['strong_wind'].sum():,}개)")
            print(f"  극한 풍속: ≥{wind_q95:.1f}m/s ({df['extreme_wind'].sum():,}개)")
        
        # 습도 기반 극한 상태
        if 'hm' in df.columns:
            humidity_q05 = df['hm'].quantile(0.05)
            humidity_q95 = df['hm'].quantile(0.95)
            
            df['extreme_dry'] = (df['hm'] <= humidity_q05).astype(int)
            df['extreme_humid'] = (df['hm'] >= humidity_q95).astype(int)
            
            print(f"  극건조: ≤{humidity_q05:.1f}% ({df['extreme_dry'].sum():,}개)")
            print(f"  극습함: ≥{humidity_q95:.1f}% ({df['extreme_humid'].sum():,}개)")
        
        # 복합 극한 상태
        df['extreme_weather_any'] = (
            df.get('extreme_cold', 0) | df.get('extreme_heat', 0) |
            df.get('heavy_rain', 0) | df.get('extreme_wind', 0) |
            df.get('extreme_dry', 0) | df.get('extreme_humid', 0)
        ).astype(int)
        
        print(f"  전체 극한 기상: {df['extreme_weather_any'].sum():,}개 ({df['extreme_weather_any'].mean()*100:.1f}%)")
        
        return df
    
    def create_detailed_time_features(self, df):
        """세분화된 시간대/계절 범주형 변수"""
        print("⏰ 세분화된 시간 특성 생성 중...")
        
        # 시간 관련 기본 특성
        df['datetime'] = pd.to_datetime(df['tm'], format='%Y%m%d%H')
        df['hour'] = df['datetime'].dt.hour
        df['dayofweek'] = df['datetime'].dt.dayofweek
        df['month'] = df['datetime'].dt.month
        df['day'] = df['datetime'].dt.day
        df['week_of_year'] = df['datetime'].dt.isocalendar().week
        
        # 세분화된 출퇴근 시간대
        def get_detailed_time_period(hour):
            if hour in [6, 7]:
                return 'early_morning_rush'
            elif hour in [8, 9]:
                return 'morning_rush_peak'
            elif hour == 10:
                return 'morning_rush_end'
            elif hour in [11, 12, 13, 14]:
                return 'daytime'
            elif hour in [15, 16]:
                return 'afternoon_start'
            elif hour in [17, 18]:
                return 'evening_rush_start'
            elif hour in [19, 20]:
                return 'evening_rush_peak'
            elif hour == 21:
                return 'evening_rush_end'
            elif hour in [22, 23]:
                return 'night'
            else:  # 0-5시
                return 'late_night'
        
        df['time_period'] = df['hour'].apply(get_detailed_time_period)
        
        # 세분화된 계절
        def get_detailed_season(month):
            if month in [12, 1, 2]:
                return 'winter'
            elif month in [3, 4]:
                return 'spring_early'
            elif month == 5:
                return 'spring_late'
            elif month in [6, 7]:
                return 'summer_early'
            elif month == 8:
                return 'summer_peak'
            elif month in [9, 10]:
                return 'autumn_early'
            else:  # 11월
                return 'autumn_late'
        
        df['detailed_season'] = df['month'].apply(get_detailed_season)
        
        # 주말/평일 세분화
        df['day_type'] = df['dayofweek'].apply(
            lambda x: 'weekend' if x >= 5 else 'weekday'
        )
        
        # 월요일/금요일 효과
        df['is_monday'] = (df['dayofweek'] == 0).astype(int)
        df['is_friday'] = (df['dayofweek'] == 4).astype(int)
        
        # 순환적 시간 특성 (기존 + 추가)
        df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
        df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
        df['day_sin'] = np.sin(2 * np.pi * df['dayofweek'] / 7)
        df['day_cos'] = np.cos(2 * np.pi * df['dayofweek'] / 7)
        df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)
        df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)
        df['week_sin'] = np.sin(2 * np.pi * df['week_of_year'] / 52)
        df['week_cos'] = np.cos(2 * np.pi * df['week_of_year'] / 52)
        
        print(f"  세분화된 시간대: {df['time_period'].nunique()}개 카테고리")
        print(f"  세분화된 계절: {df['detailed_season'].nunique()}개 카테고리")
        
        return df
    
    def create_interaction_features(self, df):
        """상호작용 항 생성"""
        print("🔗 상호작용 특성 생성 중...")
        
        # 기상 변수 간 상호작용
        if 'ta' in df.columns and 'hm' in df.columns:
            # 체감온도 (온도 × 습도)
            df['apparent_temp'] = df['ta'] * (1 + df['hm'] / 100)
            df['temp_humidity_interaction'] = df['ta'] * df['hm']
            print("  온도 × 습도 상호작용 생성")
        
        if 'ta' in df.columns and 'ws' in df.columns:
            # 풍속에 의한 체감온도
            df['wind_chill'] = df['ta'] - df['ws'] * 2
            df['temp_wind_interaction'] = df['ta'] * df['ws']
            print("  온도 × 풍속 상호작용 생성")
        
        if 'rn_hr1' in df.columns and 'ws' in df.columns:
            # 비바람 효과
            df['rain_wind_interaction'] = df['rn_hr1'] * df['ws']
            print("  강수 × 풍속 상호작용 생성")
        
        if 'rn_hr1' in df.columns and 'hm' in df.columns:
            # 습도-강수 상호작용
            df['rain_humidity_interaction'] = df['rn_hr1'] * df['hm']
            print("  강수 × 습도 상호작용 생성")
        
        # 시간-기상 상호작용
        if 'ta' in df.columns:
            df['temp_hour_interaction'] = df['ta'] * df['hour']
            df['temp_season_interaction'] = df['ta'] * df['month']
            print("  온도 × 시간 상호작용 생성")
        
        # 극한 기상과 시간 상호작용
        if 'extreme_weather_any' in df.columns:
            df['extreme_weather_rush'] = df['extreme_weather_any'] * (
                df['time_period'].isin(['morning_rush_peak', 'evening_rush_peak']).astype(int)
            )
            print("  극한기상 × 출퇴근시간 상호작용 생성")
        
        return df
    
    def create_lag_features(self, df, target_col='congestion'):
        """시차 특성 생성"""
        print("📈 시차 특성 생성 중...")
        
        # 시간 정렬
        df = df.sort_values(['station_name', 'tm'])
        
        # 혼잡도 시차 특성
        for lag in [1, 2, 3, 24, 48]:  # 1,2,3시간 전, 1일전, 2일전
            df[f'{target_col}_lag_{lag}'] = df.groupby('station_name')[target_col].shift(lag)
        
        # 기상 변수 시차 특성 (1일 전)
        weather_vars = ['ta', 'ws', 'rn_hr1', 'hm','si']
        for var in weather_vars:
            if var in df.columns:
                df[f'{var}_lag_24'] = df[var].shift(24)
        
        # 이동평균 특성
        for window in [3, 6, 12, 24]:  # 3,6,12,24시간 이동평균
            df[f'{target_col}_ma_{window}'] = df.groupby('station_name')[target_col].rolling(
                window=window, min_periods=1
            ).mean().reset_index(0, drop=True)
        
        print(f"  시차 특성: {5 + len(weather_vars)}개")
        print(f"  이동평균 특성: 4개")
        
        return df

In [None]:
class EnhancedSubwayModel:
    """고도화된 지하철 혼잡도 예측 모델"""
    
    def __init__(self, use_optuna=True, n_trials=100):
        self.feature_engineer = EnhancedFeatureEngineering()
        self.models = {}
        self.best_model = None
        self.feature_importance = None
        self.use_optuna = use_optuna
        self.n_trials = n_trials
        self.results = {}
        
    def load_and_preprocess_data(self, train_years=['21'], test_year='23', sample_size=5000000):
        """데이터 로드 및 전처리"""
        print("🚀 고도화된 데이터 전처리 시작")
        print("=" * 60)
        
        try:
            base_dir = os.path.dirname(os.path.abspath(__file__))
            data_dir = os.path.abspath(os.path.join(base_dir, '..', '데이터'))
            
            # 훈련 데이터 로드
            train_dfs = []
            for year in train_years:
                file_path = os.path.join(data_dir, f'train_subway{year}.csv')
                print(f"20{year}년 데이터 로드 중...")
                df = pd.read_csv(file_path, encoding='cp949', nrows=sample_size)
                df.columns = [col.replace(f'train_subway{year}.', '') for col in df.columns]
                train_dfs.append(df)
            
            self.train_data = pd.concat(train_dfs).reset_index(drop=True)
            
            # 테스트 데이터 로드
            test_file = os.path.join(data_dir, f'train_subway{test_year}.csv')
            print(f"20{test_year}년 검증 데이터 로드 중...")
            self.test_data = pd.read_csv(test_file, encoding='cp949', nrows=sample_size)
            self.test_data.columns = [col.replace(f'train_subway{test_year}.', '') for col in self.test_data.columns]
            
            # 특성 공학 적용
            print("\n🔧 고도화된 특성 공학 적용 중...")
            self.train_data = self._apply_feature_engineering(self.train_data)
            self.test_data = self._apply_feature_engineering(self.test_data)
            
            print(f"\n✅ 데이터 준비 완료!")
            print(f"  훈련 데이터: {len(self.train_data):,}개")
            print(f"  검증 데이터: {len(self.test_data):,}개")
            print(f"  특성 수: {self.train_data.shape[1]}개")
            
            return True
            
        except Exception as e:
            print(f"❌ 데이터 로드 실패: {str(e)}")
            return False
    
    def _apply_feature_engineering(self, df):
        """특성 공학 파이프라인 적용"""
        print("📊 결측치 처리 전 상태:")
        print(f"  전체 결측치: {df.isnull().sum().sum():,}개")
        
        # 0. 기상 데이터 특수 결측치 값 처리
        print("\n🔧 기상 데이터 특수 결측치 값 처리...")
        weather_vars = ['ta', 'ws', 'rn_hr1', 'hm']
        special_missing_values = [-99, -9999, 999, -999, 9999, -88, -77]
        
        for var in weather_vars:
            if var in df.columns:
                original_missing = df[var].isnull().sum()
                
                # 특수 결측치 값들을 NaN으로 변환
                for missing_val in special_missing_values:
                    special_count = (df[var] == missing_val).sum()
                    if special_count > 0:
                        print(f"  {var}: {special_count}개의 {missing_val} 값을 NaN으로 변환")
                        df[var] = df[var].replace(missing_val, np.nan)
                
                new_missing = df[var].isnull().sum()
                if new_missing != original_missing:
                    print(f"  {var}: 결측치 {original_missing} → {new_missing}개")
        
        # 비상식적인 값들도 체크 (온도가 -50도 이하나 60도 이상 등)
        if 'ta' in df.columns:
            extreme_temp = ((df['ta'] < -50) | (df['ta'] > 60)) & df['ta'].notna()
            if extreme_temp.sum() > 0:
                print(f"  ta: {extreme_temp.sum()}개의 극한 온도값을 NaN으로 변환")
                df.loc[extreme_temp, 'ta'] = np.nan
        
        if 'hm' in df.columns:
            extreme_hum = ((df['hm'] < 0) | (df['hm'] > 100)) & df['hm'].notna()
            if extreme_hum.sum() > 0:
                print(f"  hm: {extreme_hum.sum()}개의 극한 습도값을 NaN으로 변환")
                df.loc[extreme_hum, 'hm'] = np.nan
        
        if 'ws' in df.columns:
            extreme_wind = (df['ws'] < 0) & df['ws'].notna()
            if extreme_wind.sum() > 0:
                print(f"  ws: {extreme_wind.sum()}개의 음수 풍속값을 NaN으로 변환")
                df.loc[extreme_wind, 'ws'] = np.nan
        
        if 'rn_hr1' in df.columns:
            extreme_rain = (df['rn_hr1'] < 0) & df['rn_hr1'].notna()
            if extreme_rain.sum() > 0:
                print(f"  rn_hr1: {extreme_rain.sum()}개의 음수 강수량을 NaN으로 변환")
                df.loc[extreme_rain, 'rn_hr1'] = np.nan
        
        print(f"특수값 처리 후 총 결측치: {df.isnull().sum().sum():,}개")
        
        # 1. 극한 기상 특성
        df = self.feature_engineer.create_extreme_weather_features(df)
        
        # 2. 세분화된 시간 특성
        df = self.feature_engineer.create_detailed_time_features(df)
        
        # 3. 상호작용 특성
        df = self.feature_engineer.create_interaction_features(df)
        
        # 4. 시차 특성 (혼잡도가 있는 경우만)
        if 'congestion' in df.columns:
            df = self.feature_engineer.create_lag_features(df)
        
        # 5. 체계적인 결측치 처리
        print("\n🔧 체계적인 결측치 처리 시작...")
        
        # 5-1. 기상 변수 결측치 처리 (시계열 특성 고려)
        if 'datetime' in df.columns:
            df = df.sort_values(['station_name', 'datetime']).reset_index(drop=True)
            
            for var in weather_vars:
                if var in df.columns:
                    missing_before = df[var].isnull().sum()
                    if missing_before > 0:
                        # Forward fill -> Backward fill -> Median
                        df[var] = df.groupby('station_name')[var].fillna(method='ffill').fillna(method='bfill')
                        df[var] = df[var].fillna(df[var].median())
                        missing_after = df[var].isnull().sum()
                        print(f"  {var}: {missing_before} → {missing_after} 결측치 처리")
        
        # 5-2. 시차 특성 결측치 처리 (기상 변수만, 혼잡도 시차는 제외)
        lag_cols = [col for col in df.columns if ('lag_' in col or '_ma_' in col) and 'congestion' not in col]
        for col in lag_cols:
            missing_before = df[col].isnull().sum()
            if missing_before > 0:
                # 기상 시차는 원본 변수 값으로
                if '_lag_' in col:
                    base_var = col.split('_lag_')[0]
                elif '_ma_' in col:
                    base_var = col.split('_ma_')[0]
                else:
                    base_var = None
                
                if base_var and base_var in df.columns:
                    df[col] = df[col].fillna(df[base_var])
                else:
                    df[col] = df[col].fillna(0)
                
                missing_after = df[col].isnull().sum()
                if missing_before > 0:
                    print(f"  {col}: {missing_before} → {missing_after} 기상시차 결측치 처리")
        
        # 5-3. 범주형 변수 결측치 처리
        categorical_vars = ['time_period', 'detailed_season', 'day_type']
        for var in categorical_vars:
            if var in df.columns:
                missing_before = df[var].isnull().sum()
                if missing_before > 0:
                    # 최빈값으로 채우기
                    mode_value = df[var].mode()
                    if len(mode_value) > 0:
                        df[var] = df[var].fillna(mode_value[0])
                    missing_after = df[var].isnull().sum()
                    print(f"  {var}: {missing_before} → {missing_after} 범주형 결측치 처리")
        
        # 5-4. 이진 변수 결측치 처리 (극한 기상 등)
        binary_vars = [col for col in df.columns if col.startswith(('extreme_', 'is_', 'no_', 'light_', 'heavy_', 'strong_', 'calm_'))]
        for var in binary_vars:
            missing_before = df[var].isnull().sum()
            if missing_before > 0:
                df[var] = df[var].fillna(0)  # 이진 변수는 0으로
                missing_after = df[var].isnull().sum()
                if missing_before > 0:
                    print(f"  {var}: {missing_before} → {missing_after} 이진 결측치 처리")
        
        # 5-5. 상호작용 특성 결측치 처리
        interaction_vars = [col for col in df.columns if 'interaction' in col or 'apparent_temp' in col or 'wind_chill' in col]
        for var in interaction_vars:
            missing_before = df[var].isnull().sum()
            if missing_before > 0:
                df[var] = df[var].fillna(df[var].median())
                missing_after = df[var].isnull().sum()
                if missing_before > 0:
                    print(f"  {var}: {missing_before} → {missing_after} 상호작용 결측치 처리")
        
        # 5-6. 숫자형 변수 최종 처리 (median)
        numeric_columns = df.select_dtypes(include=[np.number]).columns
        for col in numeric_columns:
            if df[col].isnull().sum() > 0:
                missing_before = df[col].isnull().sum()
                df[col] = df[col].fillna(df[col].median())
                missing_after = df[col].isnull().sum()
                if missing_before > 0:
                    print(f"  {col}: {missing_before} → {missing_after} 기타 숫자형 결측치 처리")
        
        # 5-7. 최종 결측치 확인
        final_missing = df.isnull().sum().sum()
        print(f"\n✅ 결측치 처리 완료: {final_missing}개 남음")
        
        if final_missing > 0:
            print("⚠️ 남은 결측치가 있는 컬럼:")
            missing_cols = df.columns[df.isnull().any()].tolist()
            for col in missing_cols:
                missing_count = df[col].isnull().sum()
                missing_pct = missing_count / len(df) * 100
                print(f"  {col}: {missing_count}개 ({missing_pct:.1f}%)")
        
        return df
    
    def prepare_features(self):
        """최종 특성 준비"""
        print("\n🎯 최종 특성 준비 중...")
        
        # 공통 역 필터링
        train_stations = set(self.train_data['station_name'].unique())
        test_stations = set(self.test_data['station_name'].unique())
        common_stations = train_stations & test_stations
        
        print(f"공통 역: {len(common_stations)}개")
        
        self.train_data = self.train_data[self.train_data['station_name'].isin(common_stations)]
        self.test_data = self.test_data[self.test_data['station_name'].isin(common_stations)]
        
        # 역 인코딩
        le_station = LabelEncoder()
        le_station.fit(sorted(common_stations))
        self.train_data['station_encoded'] = le_station.transform(self.train_data['station_name'])
        self.test_data['station_encoded'] = le_station.transform(self.test_data['station_name'])
        
        # 범주형 변수 인코딩
        categorical_cols = ['time_period', 'detailed_season', 'day_type']
        for col in categorical_cols:
            if col in self.train_data.columns:
                le = LabelEncoder()
                # 훈련 데이터와 테스트 데이터의 모든 값으로 fit
                combined_values = pd.concat([self.train_data[col], self.test_data[col]]).unique()
                le.fit(combined_values)
                self.train_data[f'{col}_encoded'] = le.transform(self.train_data[col])
                self.test_data[f'{col}_encoded'] = le.transform(self.test_data[col])
        
        # 명시적으로 제외할 컬럼들 정의
        exclude_cols = [
            'tm', 'datetime', 'station_name', 'congestion',
            'time_period', 'detailed_season', 'day_type'  # 인코딩된 버전을 사용하므로 원본 제외
        ]
        
        # 숫자형 컬럼만 선택 (더 안전한 방법)
        numeric_cols = self.train_data.select_dtypes(include=[np.number]).columns.tolist()
        
        # 특성 컬럼 선택: 숫자형이면서 제외 목록에 없고, 테스트 데이터에도 있는 컬럼들
        feature_cols = [col for col in numeric_cols 
                       if col not in exclude_cols and col in self.test_data.columns]
        
        print(f"전체 숫자형 컬럼: {len(numeric_cols)}개")
        print(f"제외된 컬럼: {len([col for col in numeric_cols if col in exclude_cols])}개")
        
        # 결측치가 많은 특성 제거
        missing_threshold = 0.5
        features_to_remove = []
        for col in feature_cols.copy():
            train_missing = self.train_data[col].isnull().mean()
            test_missing = self.test_data[col].isnull().mean()
            if train_missing > missing_threshold or test_missing > missing_threshold:
                features_to_remove.append(col)
                feature_cols.remove(col)
                print(f"제거: {col} (훈련 결측치 {train_missing:.1%}, 테스트 결측치 {test_missing:.1%})")
        
        # 데이터 타입 확인 및 안전성 검증
        print(f"\n데이터 타입 검증:")
        for col in feature_cols[:5]:  # 처음 5개만 확인
            train_dtype = self.train_data[col].dtype
            test_dtype = self.test_data[col].dtype
            print(f"  {col}: 훈련={train_dtype}, 테스트={test_dtype}")
        
        # 최종 특성 데이터 생성
        X_train = self.train_data[feature_cols].copy()
        y_train = self.train_data['congestion'].copy()
        X_test = self.test_data[feature_cols].copy()
        y_test = self.test_data['congestion'].copy()
        
        # 문자열이 섞여있는지 최종 확인
        for col in feature_cols:
            if X_train[col].dtype == 'object':
                print(f"⚠️ 경고: {col}이 문자열 타입입니다. 샘플: {X_train[col].head().tolist()}")
                # 문자열 컬럼이면 제거
                feature_cols.remove(col)
                X_train = X_train.drop(columns=[col])
                X_test = X_test.drop(columns=[col])
        
        # LightGBM 호환성을 위한 컬럼명 정리
        print(f"\n🔧 LightGBM 호환성을 위한 컬럼명 정리 중...")
        def clean_feature_name(name):
            """특수문자를 안전한 문자로 치환"""
            # 특수문자들을 안전한 문자로 치환
            replacements = {
                '%': 'pct',
                '(': '_',
                ')': '_',
                '[': '_',
                ']': '_',
                ':': '_',
                ' ': '_',
                '-': '_',
                '/': '_',
                '.': '_',
                ',': '_',
                '&': 'and',
                '+': 'plus',
                '*': 'mult',
                '=': 'eq',
                '<': 'lt',
                '>': 'gt',
                '!': 'not',
                '?': 'q',
                '@': 'at',
                '#': 'hash',
                '$': 'dollar'
            }
            
            cleaned_name = name
            for old_char, new_char in replacements.items():
                cleaned_name = cleaned_name.replace(old_char, new_char)
            
            # 연속된 언더스코어 정리
            while '__' in cleaned_name:
                cleaned_name = cleaned_name.replace('__', '_')
            
            # 시작과 끝의 언더스코어 제거
            cleaned_name = cleaned_name.strip('_')
            
            return cleaned_name
        
        # 컬럼명 정리 및 변경사항 추적
        original_feature_cols = feature_cols.copy()
        cleaned_feature_cols = [clean_feature_name(col) for col in feature_cols]
        
        # 변경된 컬럼명이 있는지 확인
        changes_made = False
        for original, cleaned in zip(original_feature_cols, cleaned_feature_cols):
            if original != cleaned:
                if not changes_made:
                    print("  컬럼명 변경 사항:")
                    changes_made = True
                print(f"    {original} → {cleaned}")
        
        if not changes_made:
            print("  ✅ 모든 컬럼명이 이미 안전함")
        
        # DataFrame 컬럼명 변경
        column_mapping = dict(zip(original_feature_cols, cleaned_feature_cols))
        X_train = X_train.rename(columns=column_mapping)
        X_test = X_test.rename(columns=column_mapping)
        feature_cols = cleaned_feature_cols
        
        print(f"\n최종 특성 수: {len(feature_cols)}개")
        print(f"특성 종류: 기본시간, 극한기상, 상호작용, 기상시차, 인코딩")
        print(f"훈련 데이터 형태: {X_train.shape}")
        print(f"테스트 데이터 형태: {X_test.shape}")
        
        return X_train, y_train, X_test, y_test, feature_cols
    
    def hyperparameter_tuning(self, X_train, y_train, model_type='xgboost'):
        """Optuna를 사용한 하이퍼파라미터 튜닝"""
        print(f"\n🔍 {model_type} 하이퍼파라미터 튜닝 시작...")
        
        def objective(trial):
            if model_type == 'xgboost':
                params = {
                    'objective': 'reg:squarederror',
                    'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
                    'max_depth': trial.suggest_int('max_depth', 3, 12),
                    'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
                    'subsample': trial.suggest_float('subsample', 0.6, 1.0),
                    'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
                    'reg_alpha': trial.suggest_float('reg_alpha', 0, 10),
                    'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
                    'random_state': 42
                }
                model = xgb.XGBRegressor(**params)
                
            elif model_type == 'lightgbm':
                params = {
                    'objective': 'regression',
                    'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
                    'max_depth': trial.suggest_int('max_depth', 3, 12),
                    'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
                    'feature_fraction': trial.suggest_float('feature_fraction', 0.6, 1.0),
                    'bagging_fraction': trial.suggest_float('bagging_fraction', 0.6, 1.0),
                    'reg_alpha': trial.suggest_float('reg_alpha', 0, 10),
                    'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
                    'random_state': 42,
                    'verbosity': -1
                }
                model = lgb.LGBMRegressor(**params)
            
            # 시계열 교차검증
            tscv = TimeSeriesSplit(n_splits=3)
            cv_scores = cross_val_score(model, X_train, y_train, cv=tscv, 
                                      scoring='neg_mean_absolute_error', n_jobs=-1)
            return cv_scores.mean()
        
        # Optuna 스터디
        study = optuna.create_study(direction='maximize', 
                                  sampler=optuna.samplers.TPESampler(seed=42))
        study.optimize(objective, n_trials=self.n_trials, show_progress_bar=True)
        
        print(f"최적 파라미터: {study.best_params}")
        print(f"최적 CV 점수: {study.best_value:.4f}")
        
        return study.best_params
    
    def feature_selection(self, X_train, y_train, X_test, feature_cols, method='simple'):
        """빠른 특성 선택"""
        print(f"\n🎯 빠른 특성 선택 ({method}) 중...")
        
        if method == 'simple' or len(feature_cols) < 20:
            # 간단한 방법: 분산이 너무 낮은 특성만 제거
            from sklearn.feature_selection import VarianceThreshold
            
            # 분산 임계값으로 특성 선택 (매우 빠름)
            selector = VarianceThreshold(threshold=0.01)
            X_train_transformed = selector.fit_transform(X_train)
            X_test_transformed = selector.transform(X_test)
            
            selected_features = [feature_cols[i] for i, selected in enumerate(selector.get_support()) if selected]
            
            print(f"분산 기반 선택: {len(selected_features)}개 (전체 {len(feature_cols)}개 중)")
            
            X_train_selected = pd.DataFrame(X_train_transformed, columns=selected_features, index=X_train.index)
            X_test_selected = pd.DataFrame(X_test_transformed, columns=selected_features, index=X_test.index)
            
        else:
            # 기존 중요도 기반 방법 (더 느림)
            rf = RandomForestRegressor(n_estimators=20, random_state=42, n_jobs=1)  # 더 빠르게
            rf.fit(X_train, y_train)
            
            importances = rf.feature_importances_
            feature_importance_df = pd.DataFrame({
                'feature': feature_cols,
                'importance': importances
            }).sort_values('importance', ascending=False)
            
            threshold = 0.005  # 임계값 완화
            selected_features = feature_importance_df[
                feature_importance_df['importance'] >= threshold
            ]['feature'].tolist()
            
            print(f"중요도 기반 선택: {len(selected_features)}개")
            
            X_train_selected = X_train[selected_features]
            X_test_selected = X_test[selected_features]
        
        self.selected_features = selected_features
        return X_train_selected, X_test_selected, selected_features
    
    def train_enhanced_models(self, X_train, y_train, X_test, y_test):
        """고도화된 모델들 훈련 (XGBoost, LightGBM만)"""
        print("\n🚀 고도화된 모델 훈련 시작")
        print("=" * 50)
        
        models_to_train = {
            'Enhanced_XGBoost': 'xgboost',
            'Enhanced_LightGBM': 'lightgbm'
        }
        
        for model_name, model_type in models_to_train.items():
            print(f"\n🔧 {model_name} 훈련 중...")
            
            # 하이퍼파라미터 튜닝
            if self.use_optuna:
                best_params = self.hyperparameter_tuning(X_train, y_train, model_type)
            else:
                # 기본 파라미터 사용
                if model_type == 'xgboost':
                    best_params = {'n_estimators': 200, 'max_depth': 6, 'learning_rate': 0.1}
                elif model_type == 'lightgbm':
                    best_params = {'n_estimators': 200, 'max_depth': 6, 'learning_rate': 0.1}
            
            # 모델 생성 및 훈련
            if model_type == 'xgboost':
                model = xgb.XGBRegressor(**best_params)
            elif model_type == 'lightgbm':
                model = lgb.LGBMRegressor(**best_params)
            
            model.fit(X_train, y_train)
            
            # 예측 및 평가
            y_pred = model.predict(X_test)
            
            mae = mean_absolute_error(y_test, y_pred)
            rmse = np.sqrt(mean_squared_error(y_test, y_pred))
            r2 = r2_score(y_test, y_pred)
            
            self.models[model_name] = model
            self.results[model_name] = {
                'mae': mae,
                'rmse': rmse,
                'r2': r2,
                'params': best_params
            }
            
            print(f"  MAE: {mae:.3f}")
            print(f"  RMSE: {rmse:.3f}")
            print(f"  R²: {r2:.3f}")
        
        # 최고 성능 모델 선택
        best_model_name = min(self.results.keys(), key=lambda x: self.results[x]['mae'])
        self.best_model = self.models[best_model_name]
        
        print(f"\n🏆 최고 성능 모델: {best_model_name}")
        print(f"  MAE: {self.results[best_model_name]['mae']:.3f}")
        print(f"  RMSE: {self.results[best_model_name]['rmse']:.3f}")
        print(f"  R²: {self.results[best_model_name]['r2']:.3f}")
        
        return self.results
    
    def analyze_feature_importance(self):
        """특성 중요도 분석"""
        print("\n📊 특성 중요도 분석")
        print("=" * 40)
        
        if self.best_model is None:
            print("❌ 훈련된 모델이 없습니다.")
            return
        
        # 특성 중요도 추출
        if hasattr(self.best_model, 'feature_importances_'):
            importances = self.best_model.feature_importances_
        else:
            print("❌ 모델이 특성 중요도를 지원하지 않습니다.")
            return
        
        # 중요도 데이터프레임 생성
        importance_df = pd.DataFrame({
            'feature': self.selected_features,
            'importance': importances
        }).sort_values('importance', ascending=False)
        
        self.feature_importance = importance_df
        
        # 상위 20개 특성 출력
        print("상위 20개 중요 특성:")
        for i, row in importance_df.head(20).iterrows():
            print(f"  {row['feature']}: {row['importance']:.4f}")
        
        return importance_df
    
    def save_model(self, filename='enhanced_subway_model.pkl'):
        """모델 저장"""
        model_data = {
            'best_model': self.best_model,
            'feature_engineer': self.feature_engineer,
            'selected_features': self.selected_features,
            'results': self.results,
            'feature_importance': self.feature_importance
        }
        
        joblib.dump(model_data, filename)
        print(f"✅ 모델 저장 완료: {filename}")

In [None]:
def train_model():
    print("🚀 고도화된 지하철 혼잡도 예측 모델 (학습)")
    print("=" * 60)
    
    # 모델 초기화 (빠른 테스트용 설정)
    model = EnhancedSubwayModel(use_optuna=True, n_trials=10)  # 10회로 줄임
    
    # 1. 데이터 로드 및 전처리 (작은 샘플)
    if not model.load_and_preprocess_data(train_years=['21'], test_year='23', sample_size=5000000):
        return None
    
    # 2. 특성 준비
    X_train, y_train, X_test, y_test, feature_cols = model.prepare_features()
    
    # 3. 빠른 특성 선택
    X_train_selected, X_test_selected, selected_features = model.feature_selection(
        X_train, y_train, X_test, feature_cols, method='simple'
    )
    
    # 4. 모델 훈련
    results = model.train_enhanced_models(X_train_selected, y_train, X_test_selected, y_test)
    
    # 5. 특성 중요도 분석
    model.analyze_feature_importance()
    
    # 6. 모델 저장 - 시각화에 필요한 모든 데이터 포함
    model_data = {
        'best_model': model.best_model,
        'feature_engineer': model.feature_engineer,
        'selected_features': selected_features,
        'results': model.results,
        'feature_importance': model.feature_importance,
        # 시각화를 위한 전체 데이터
        'test_data': {
            'X_test': X_test_selected,
            'y_test': y_test,
            'test_data_full': model.test_data  # 시간대별 분석용
        }
    }

    try:
        joblib.dump(model_data, '../result/enhanced_subway_model_fast.pkl')
        print("✅ 모델 저장 완료: ../result/enhanced_subway_model_fast.pkl")
    except:
        print("⚠️ 모델 저장 경로 문제 - 현재 폴더에 저장")
        joblib.dump(model_data, 'enhanced_subway_model_fast.pkl')
        print("✅ 모델 저장 완료: enhanced_subway_model_fast.pkl")
    
    print(f"\n🎉 모델 학습 및 저장 완료!")
    return model

In [None]:
trained_model = train_model()