In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.stattools import adfuller
import pmdarima as pm
from prophet import Prophet
from statsmodels.graphics.tsaplots import plot_acf
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error
from joblib import Parallel, delayed
import os
import logging
import time
from datetime import datetime
import warnings
import uuid

# Configuration
warnings.filterwarnings("ignore")

CONFIG = {
    'forecast_horizon': 12,
    'seasonal_periods': 12,
    'min_data_length': 24,
    'img_dir': 'classical_model_results',
    'results_file': 'classical_model_results/classical_model_results.csv',
    'n_jobs': -1,  # Có thể giới hạn, ví dụ: 4, nếu máy có ít CPU
    'outlier_threshold': 3,
    'max_diff': 3,
    'lag_features': list(range(1, 13)),
    'rolling_windows': [3, 6, 12],
    'correlation_threshold': 0.2,
    'dpi': 150,  # Giảm DPI để tăng tốc lưu biểu đồ
}

# Setup logging
def setup_logging(img_dir):
    os.makedirs(img_dir, exist_ok=True)
    logging.basicConfig(
        filename=f'{img_dir}/classical_models_log.txt',
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    return logging.getLogger(__name__)

def detect_outliers(series, threshold=CONFIG['outlier_threshold']):
    logger = logging.getLogger(__name__)
    z_scores = np.abs((series - series.mean()) / series.std())
    outliers = z_scores > threshold
    series_clean = series.copy()
    series_clean[outliers] = series.mean()
    logger.info(f"Detected {outliers.sum()} outliers")
    return series_clean

# Feature Engineering
def create_features(data, target, config=CONFIG):
    logger = logging.getLogger(__name__)
    logger.info(f"Creating features for {target}, data shape: {data.shape}")
    
    if len(data) < config['min_data_length']:
        raise ValueError(f"Data too short: {len(data)} rows")
    if not isinstance(data.index, pd.DatetimeIndex):
        raise ValueError("DataFrame index must be DatetimeIndex")
    
    if target not in data.columns:
        raise ValueError(f"Target column {target} not found in data")
    
    df = data.copy()
    required_cols = [target]
    
    # Handle missing/infinite values once
    if df.isna().any().any() or np.isinf(df).any().any():
        logger.warning("Data contains NaN or Inf, filling...")
        df = df.fillna(method='ffill').fillna(df.mean(numeric_only=True))
    
    # Vectorized lag and rolling features
    lag_features = {f'{target}_lag_{lag}': df[target].shift(lag) for lag in config['lag_features']}
    rolling_features = {
        f'{target}_roll_{stat}_{window}': getattr(df[target].rolling(window=window), stat)()
        for window in config['rolling_windows']
        for stat in ['mean', 'std']
    }
    df = df.assign(**lag_features, **rolling_features)
    
    # Add seasonal features
    df = df.assign(
        month=df.index.month,
        month_sin=np.sin(2 * np.pi * df.index.month / config['seasonal_periods']),
        month_cos=np.cos(2 * np.pi * df.index.month / config['seasonal_periods']),
        quarter=df.index.quarter
    )
    df = pd.get_dummies(df, columns=['month'], prefix='month')
    
    # Remove low-correlation features
    try:
        correlations = df.corr(numeric_only=True)[target].drop(required_cols, errors='ignore')
        low_corr_cols = correlations[abs(correlations) < config['correlation_threshold']].index
        if low_corr_cols.any():
            logger.info(f"Dropping {len(low_corr_cols)} low-correlation features: {list(low_corr_cols)}")
            df = df.drop(columns=low_corr_cols)
        else:
            logger.info("No features dropped due to low correlation")
    except Exception as e:
        logger.error(f"Error calculating correlations: {str(e)}")
    
    return df

# Stationarity Check
def check_stationarity(series, name, max_diff=CONFIG['max_diff']):
    logger = logging.getLogger(__name__)
    series_clean = series.dropna().replace([np.inf, -np.inf], np.nan).dropna()
    if len(series_clean) < 2:
        logger.error(f"{name}: Data too short after cleaning!")
        return 0
    
    d = 0
    while d <= max_diff:
        result = adfuller(series_clean)
        if result[1] < 0.05:
            logger.info(f"{name} stationary at d={d}")
            return d
        if d == max_diff:
            logger.warning(f"{name} not stationary after {max_diff} differencing")
            return d
        series_clean = series_clean.diff().dropna()
        if len(series_clean) < 2:
            logger.warning(f"{name}: Data too short after differencing {d+1}")
            return d
        d += 1
    return d

# Model Evaluation Metrics
def calculate_metrics(actual, predicted):
    logger = logging.getLogger(__name__)
    actual = np.array(actual, dtype=float)
    predicted = np.array(predicted, dtype=float)
    valid_mask = ~np.isnan(actual) & ~np.isnan(predicted) & ~np.isinf(actual) & ~np.isinf(predicted)
    actual = actual[valid_mask]
    predicted = predicted[valid_mask]
    
    if len(actual) == 0:
        logger.warning("No valid data for metrics calculation!")
        return np.nan, np.nan, np.nan, np.nan, np.nan, np.nan
    
    rmse = np.sqrt(mean_squared_error(actual, predicted))
    mae = mean_absolute_error(actual, predicted)
    mape = mean_absolute_percentage_error(actual, predicted) * 100 if np.all(np.abs(actual) > 1e-8) else np.nan
    smape = 100 * np.mean(2 * np.abs(predicted - actual) / (np.abs(actual) + np.abs(predicted)))
    norm_mape = mape / np.mean(np.abs(actual)) if not np.isnan(mape) else np.nan
    directional_acc = np.mean((np.diff(actual) * np.diff(predicted)) > 0) * 100 if len(actual) > 1 else np.nan
    return rmse, mae, mape, smape, norm_mape, directional_acc

# Visualization Functions
def plot_decomposition(series, period, filename, img_dir=CONFIG['img_dir']):
    logger = logging.getLogger(__name__)
    if series.isna().any():
        logger.warning(f"Series has {series.isna().sum()} NaN values, filling...")
        series = series.fillna(method='ffill').fillna(series.mean())
    
    try:
        decomposition = seasonal_decompose(series, period=period, model='additive')
        fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 1, figsize=(12, 8))
        ax1.plot(series.index, series, label='Original'); ax1.legend(loc='upper left')
        ax2.plot(series.index, decomposition.trend, label='Trend'); ax2.legend(loc='upper left')
        ax3.plot(series.index, decomposition.seasonal, label='Seasonal'); ax3.legend(loc='upper left')
        ax4.plot(series.index, decomposition.resid, label='Residual'); ax4.legend(loc='upper left')
        plt.tight_layout()
        plt.savefig(os.path.join(img_dir, filename), dpi=CONFIG['dpi'])
        logger.info(f"Saved decomposition plot: {filename}")
    except Exception as e:
        logger.error(f"Error saving decomposition plot: {str(e)}")
    finally:
        plt.close()

def plot_forecast(historical, test, forecast, forecast_index, title, ylabel, filename, confidence_intervals=None, img_dir=CONFIG['img_dir']):
    logger = logging.getLogger(__name__)
    fig, ax = plt.subplots(figsize=(12, 6))
    ax.plot(historical.index, historical, label='Historical', color='blue')
    ax.plot(test.index, test, label='Actual (Test)', color='green')
    ax.plot(forecast_index, forecast, label='Forecast', color='orange', linestyle='--', linewidth=2)
    if confidence_intervals:
        ax.fill_between(forecast_index, confidence_intervals[0], confidence_intervals[1], 
                        color='orange', alpha=0.2, label='95% CI')
    ax.set_title(title); ax.set_xlabel('Time'); ax.set_ylabel(ylabel); ax.legend(); ax.grid(True)
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
    ax.xaxis.set_major_locator(mdates.MonthLocator(interval=3))
    plt.xticks(rotation=45); plt.tight_layout()
    try:
        plt.savefig(os.path.join(img_dir, filename), dpi=CONFIG['dpi'])
        logger.info(f"Saved plot: {filename}")
    except Exception as e:
        logger.error(f"Error saving plot: {str(e)}")
    plt.close()

def plot_comparison_forecasts(historical, test, forecasts, forecast_index, title, ylabel, filename, metrics=None, img_dir=CONFIG['img_dir']):
    logger = logging.getLogger(__name__)
    fig, ax = plt.subplots(figsize=(14, 8))
    ax.plot(historical.index, historical, label='Historical', color='blue')
    ax.plot(test.index, test, label='Actual (Test)', color='green')
    colors = sns.color_palette("husl", len(forecasts))
    for (model_name, forecast), color in zip(forecasts.items(), colors):
        rmse = metrics.get(model_name, {}).get('RMSE', np.nan) if metrics else np.nan
        if forecast is None or pd.isna(rmse):
            continue
        ax.plot(forecast_index, forecast, label=f'Forecast {model_name} (RMSE: {rmse:.4f})', 
                linestyle='--', color=color)
    ax.set_title(title); ax.set_xlabel('Time'); ax.set_ylabel(ylabel); ax.legend(); ax.grid(True)
    ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
    ax.xaxis.set_major_locator(mdates.MonthLocator(interval=3))
    plt.xticks(rotation=45); plt.tight_layout()
    try:
        plt.savefig(os.path.join(img_dir, filename), dpi=CONFIG['dpi'])
        logger.info(f"Saved comparison plot: {filename}")
    except Exception as e:
        logger.error(f"Error saving comparison plot: {str(e)}")
    plt.close()

def plot_metrics_bar(metrics_df, filename, img_dir=CONFIG['img_dir']):
    logger = logging.getLogger(__name__)
    fig, axes = plt.subplots(2, 3, figsize=(12, 8))
    metrics = ['RMSE', 'MAE', 'MAPE', 'sMAPE', 'NormMAPE', 'DirAcc']
    for i, (metric, ax) in enumerate(zip(metrics, axes.flatten())):
        sns.barplot(x='Model', y=metric, data=metrics_df, ax=ax)
        ax.set_title(f'{metric} Comparison')
        ax.tick_params(axis='x', rotation=45)
    plt.tight_layout()
    try:
        plt.savefig(os.path.join(img_dir, filename), dpi=CONFIG['dpi'])
        logger.info(f"Saved metrics bar plot: {filename}")
    except Exception as e:
        logger.error(f"Error saving metrics bar plot: {str(e)}")
    plt.close()

def plot_residual_acf(residuals, title, filename, img_dir=CONFIG['img_dir']):
    logger = logging.getLogger(__name__)
    if residuals is None or len(residuals) < 2:
        logger.warning(f"Skipping ACF: Insufficient residual data - {title}")
        return
    fig, ax = plt.subplots(figsize=(5, 3))
    max_lags = min(20, len(residuals) - 1)
    if max_lags < 1:
        return
    try:
        plot_acf(residuals, lags=max_lags, title=title, ax=ax)
        plt.tight_layout()
        plt.savefig(os.path.join(img_dir, filename), dpi=CONFIG['dpi'])
        logger.info(f"Saved ACF plot: {filename}")
    except Exception as e:
        logger.error(f"Error saving ACF plot: {str(e)}")
    plt.close()

# Model Functions
def run_exponential_smoothing(train, test, forecast_index, seasonal_periods=CONFIG['seasonal_periods']):
    logger = logging.getLogger(__name__)
    start_time = time.time()
    try:
        model = ExponentialSmoothing(train, trend='add', seasonal='add', 
                                  seasonal_periods=seasonal_periods).fit(optimized=True)
        forecast = model.forecast(CONFIG['forecast_horizon'])
        residuals = train - model.fittedvalues
        forecast = pd.Series(forecast.values, index=forecast_index)
        resid_std = np.std(residuals)
        ci_lower = forecast - 1.96 * resid_std
        ci_upper = forecast + 1.96 * resid_std
        metrics = calculate_metrics(test, forecast)
        logger.info(f"Exponential Smoothing: RMSE={metrics[0]:.4f}, Time={time.time() - start_time:.2f}s")
        return forecast, residuals, metrics, (ci_lower, ci_upper)
    except Exception as e:
        logger.error(f"Error Exponential Smoothing: {str(e)}")
        return None, None, (np.nan, np.nan, np.nan, np.nan, np.nan, np.nan), None

def run_sarima(train, test, forecast_index, seasonal_periods=CONFIG['seasonal_periods']):
    logger = logging.getLogger(__name__)
    start_time = time.time()
    try:
        d = check_stationarity(train, "SARIMA")
        try:
            model = pm.auto_arima(train, start_p=0, start_q=0, max_p=5, max_q=5, d=d, max_d=1,
                                seasonal=True, m=seasonal_periods, start_P=0, start_Q=0, max_P=3, max_Q=3, max_D=3,
                                stepwise=True, trace=False, error_action='ignore', suppress_warnings=True,
                                information_criterion='aic', maxiter=30)
        except:
            logger.warning("auto_arima failed, using SARIMA(1,1,1)(1,1,1,12)")
            model = pm.ARIMA(order=(1,1,1), seasonal_order=(1,1,1,seasonal_periods), 
                           suppress_warnings=True).fit(train)
        forecast = model.predict(n_periods=CONFIG['forecast_horizon'])
        residuals = train - model.predict_in_sample()
        forecast = pd.Series(forecast, index=forecast_index)
        metrics = calculate_metrics(test, forecast)
        logger.info(f"SARIMA: RMSE={metrics[0]:.4f}, Time={time.time() - start_time:.2f}s")
        return forecast, residuals, metrics, None
    except Exception as e:
        logger.error(f"Error SARIMA: {str(e)}")
        return None, None, (np.nan, np.nan, np.nan, np.nan, np.nan, np.nan), None

def run_prophet(train, test, forecast_index):
    logger = logging.getLogger(__name__)
    start_time = time.time()
    try:
        df_train = pd.DataFrame({'ds': train.index, 'y': train.values})
        model = Prophet(yearly_seasonality=True, weekly_seasonality=False, daily_seasonality=False,
                       changepoint_prior_scale=0.01, n_changepoints=10, seasonality_prior_scale=10.0).fit(df_train)
        future = pd.DataFrame({'ds': forecast_index})
        forecast = model.predict(future)
        forecast_series = pd.Series(forecast['yhat'].values, index=forecast_index)
        residuals = train - model.predict(df_train)['yhat']
        ci_lower = pd.Series(forecast['yhat_lower'].values, index=forecast_index)
        ci_upper = pd.Series(forecast['yhat_upper'].values, index=forecast_index)
        metrics = calculate_metrics(test, forecast_series)
        logger.info(f"Prophet: RMSE={metrics[0]:.4f}, Time={time.time() - start_time:.2f}s")
        return forecast_series, residuals, metrics, (ci_lower, ci_upper)
    except Exception as e:
        logger.error(f"Error Prophet: {str(e)}")
        return None, None, (np.nan, np.nan, np.nan, np.nan, np.nan, np.nan), None

# Model Execution Pipeline
def run_model_for_target(target, train, test, forecast_index, model_name, model_func, params, config=CONFIG):
    logger = logging.getLogger(__name__)
    logger.info(f"Running {model_name} for {target}")
    start_time = time.time()
    try:
        forecast, residuals, (rmse, mae, mape, smape, norm_mape, dir_acc), ci = model_func(
            train[target], test[target], forecast_index, **params
        )
        if forecast is None or pd.isna(rmse):
            logger.error(f"{model_name} for {target} failed to produce valid forecast or RMSE")
            return None
        
        plot_forecast(
            train[target][-36:], test[target], forecast, forecast_index,
            f'{model_name} Forecast for {target}', target, f'{target}_{model_name}_forecast.png', ci
        )
        if residuals is not None:
            plot_residual_acf(
                residuals.dropna(), f'ACF of Residuals - {model_name} ({target})',
                f'{target}_{model_name}_acf.png'
            )
        
        logger.info(f"Completed {model_name} for {target} in {time.time() - start_time:.2f}s")
        return {
            'Target': target,
            'Model': model_name,
            'RMSE': rmse,
            'MAE': mae,
            'MAPE': mape,
            'sMAPE': smape,
            'NormMAPE': norm_mape,
            'DirAcc': dir_acc,
            'Forecast': forecast,
            'Residuals': residuals,
            'CI': ci
        }
    except Exception as e:
        logger.error(f"Error running {model_name} for {target}: {str(e)}")
        return None

# Main Execution
def main():
    logger = setup_logging(CONFIG['img_dir'])
    try:
        # Load dữ liệu với usecols để giảm bộ nhớ
        cpi_data = pd.read_csv('data/cpi.csv', usecols=['t', 'cpi'])
        cpi_data.rename(columns={'t': 'date'}, inplace=True)
        cpi_data['time'] = pd.to_datetime(cpi_data['date'], format='%b-%y')
        cpi_data.set_index('time', inplace=True)
        cpi_data['cpi_mom'] = cpi_data['cpi'].pct_change() * 100
        cpi_data = cpi_data[['cpi_mom']].dropna()  # Loại bỏ NaN ngay sau khi tạo cpi_mom

        exog_data = pd.read_csv('data/exog_data.csv')
        exog_data['Ngày'] = pd.to_datetime(exog_data['Ngày'])
        exog_data.set_index('Ngày', inplace=True)

        # Join và xử lý NaN một lần
        data = cpi_data.join(exog_data, how='inner')
        if data.isna().any().any():
            logger.warning("Data contains NaN after join, filling...")
            data = data.fillna(method='ffill').fillna(data.mean(numeric_only=True))

        # Tạo features
        data_features = create_features(data, 'cpi_mom')
        
        # Split data
        train_size = len(data) - CONFIG['forecast_horizon']
        train, test = data_features[:train_size], data_features[train_size:]
        forecast_index = pd.date_range(start=test.index[0], periods=CONFIG['forecast_horizon'], freq='MS')
        
        # Plot decomposition
        plot_decomposition(data['cpi_mom'], period=CONFIG['seasonal_periods'], 
                         filename='cpi_mom_decomposition.png')
        
        # Define models
        models = {
            'Exponential Smoothing': (run_exponential_smoothing, {}),
            'SARIMA': (run_sarima, {}),
            'Prophet': (run_prophet, {})
        }
        
        # Run models in parallel
        results = []
        forecasts_mom = {}
        metrics_mom = {}
        logger.info("Running models for cpi_mom")
        
        tasks = [
            delayed(run_model_for_target)(
                'cpi_mom', train, test, forecast_index, model_name, model_func, params
            ) for model_name, (model_func, params) in models.items()
        ]
        model_results = Parallel(n_jobs=CONFIG['n_jobs'], verbose=1, backend='loky')(tasks)
        
        # Process results
        for result in model_results:
            if result is not None:
                results.append({
                    'Target': result['Target'],
                    'Model': result['Model'],
                    'RMSE': result['RMSE'],
                    'MAE': result['MAE'],
                    'MAPE': result['MAPE'],
                    'sMAPE': result['sMAPE'],
                    'NormMAPE': result['NormMAPE'],
                    'DirAcc': result['DirAcc']
                })
                forecasts_mom[result['Model']] = result['Forecast']
                metrics_mom[result['Model']] = {'RMSE': result['RMSE']}
        
        # Generate comparison plots and save results
        if forecasts_mom:
            plot_comparison_forecasts(
                train['cpi_mom'][-36:], test['cpi_mom'], forecasts_mom, forecast_index,
                'Comparison of Forecasts for cpi_mom', 'cpi_mom', 
                'cpi_mom_model_comparison.png', metrics=metrics_mom
            )
        
        results_df = pd.DataFrame(results)
        if not results_df.empty:
            print(results_df)
            results_df.to_csv(CONFIG['results_file'], index=False)
            logger.info(f"Results saved to {CONFIG['results_file']}")
            plot_metrics_bar(results_df, 'cpi_mom_metrics_comparison.png')
        
        if forecasts_mom:
            combined_forecast = pd.DataFrame({'Date': forecast_index})
            for model_name, forecast in forecasts_mom.items():
                combined_forecast[f'{model_name}_cpi_mom'] = forecast
            combined_forecast.to_csv(f'{CONFIG["img_dir"]}/combined_forecast_cpi_mom.csv', index=False)
            logger.info(f"Combined forecasts saved to {CONFIG['img_dir']}/combined_forecast_cpi_mom.csv")
            
    except Exception as e:
        logger.error(f"Main program error: {str(e)}")
        raise

if __name__ == "__main__":
    main()

[Parallel(n_jobs=-1)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=-1)]: Done 3 out of 3 | elapsed:  3.2min finished


    Target                  Model      RMSE       MAE        MAPE       sMAPE  \
0  cpi_mom  Exponential Smoothing  0.319763  0.285126  256.581561  140.962257   
1  cpi_mom                 SARIMA  0.325483  0.286272  252.902878  137.211902   
2  cpi_mom                Prophet  0.296743  0.241665  226.573355  116.287847   

     NormMAPE     DirAcc  
0  752.574411  36.363636  
1  741.784539  36.363636  
2  664.557923  45.454545  
