In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.preprocessing import RobustScaler, StandardScaler

- Distribution visualization
- Missing value analysis
- Correlation analysis with automatic selection of correlation method
- Smart normalization based on outlier presence
- Outlier visualization
- Comprehensive summary report generation

In [None]:
class UniversalEDA:
    def __init__(self, df):
        """
        Initialize the EDA class with a DataFrame
        
        Parameters:
        -----------
        df : pandas.DataFrame
            DataFrame to analyze
        """
        self.df = df
        self.numeric_columns = df.select_dtypes(include=['int64', 'float64']).columns
        
    def plot_distributions(self, figsize=(15, 5*len(self.numeric_columns))):
        """
        Visualize the distribution of numerical variables
        - Histogram
        - Box plot
        - Q-Q plot for normality check
        """
        n_cols = len(self.numeric_columns)
        
        fig, axes = plt.subplots(n_cols, 3, figsize=figsize)
        
        for idx, col in enumerate(self.numeric_columns):
            # Histogram
            sns.histplot(data=self.df, x=col, ax=axes[idx, 0])
            axes[idx, 0].set_title(f'{col} Distribution')
            
            # Box plot
            sns.boxplot(data=self.df, y=col, ax=axes[idx, 1])
            axes[idx, 1].set_title(f'{col} Boxplot')
            
            # Q-Q plot
            stats.probplot(self.df[col].dropna(), dist="norm", plot=axes[idx, 2])
            axes[idx, 2].set_title(f'{col} Q-Q Plot')
        
        plt.tight_layout()
        return fig
    
    def check_missing_values(self):
        """
        Analyze missing values
        - Count and percentage of missing values for each variable
        - Return rows containing missing values
        """
        missing_count = self.df.isnull().sum()
        missing_percent = (missing_count / len(self.df)) * 100
        missing_summary = pd.DataFrame({
            'Missing Count': missing_count,
            'Missing Percent': missing_percent
        }).sort_values('Missing Count', ascending=False)
        
        print("\n=== Missing Value Summary ===")
        print(missing_summary[missing_summary['Missing Count'] > 0])
        
        print("\n=== Rows with Missing Values ===")
        return self.df[self.df.isnull().any(axis=1)]
    
    def correlation_analysis(self):
        """
        Analyze correlations between variables
        - Use Pearson or Spearman correlation based on normality test
        """
        correlation_matrix = pd.DataFrame(index=self.numeric_columns, columns=self.numeric_columns)
        method_matrix = pd.DataFrame(index=self.numeric_columns, columns=self.numeric_columns)
        
        for col1 in self.numeric_columns:
            for col2 in self.numeric_columns:
                # Shapiro-Wilk test for normality
                _, p_val1 = stats.shapiro(self.df[col1].dropna())
                _, p_val2 = stats.shapiro(self.df[col2].dropna())
                
                # If both variables are normally distributed (p > 0.05), use Pearson
                if p_val1 > 0.05 and p_val2 > 0.05:
                    corr, _ = stats.pearsonr(self.df[col1].dropna(), self.df[col2].dropna())
                    method = 'Pearson'
                else:
                    corr, _ = stats.spearmanr(self.df[col1].dropna(), self.df[col2].dropna())
                    method = 'Spearman'
                
                correlation_matrix.loc[col1, col2] = corr
                method_matrix.loc[col1, col2] = method
        
        # Plot correlation heatmap
        plt.figure(figsize=(10, 8))
        sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
        plt.title('Correlation Matrix')
        plt.show()
        
        print("\n=== Correlation Method Used ===")
        print(method_matrix)
        
        return correlation_matrix, method_matrix
    
    def normalize_data(self, columns=None):
        """
        Normalize data using appropriate scaling method
        - RobustScaler for data with outliers
        - StandardScaler for data without outliers
        
        Parameters:
        -----------
        columns : list
            List of columns to normalize (None for all numeric variables)
        
        Returns:
        --------
        normalized_df : pandas.DataFrame
            Normalized DataFrame
        scalers : dict
            Dictionary of scaler objects used for each column
        """
        if columns is None:
            columns = self.numeric_columns
            
        normalized_df = self.df.copy()
        scalers = {}
        
        for col in columns:
            # Check outliers using IQR method
            Q1 = self.df[col].quantile(0.25)
            Q3 = self.df[col].quantile(0.75)
            IQR = Q3 - Q1
            outlier_range = 1.5 * IQR
            outliers = ((self.df[col] < (Q1 - outlier_range)) | 
                       (self.df[col] > (Q3 + outlier_range))).sum()
            
            # Use RobustScaler if outliers are more than 1% of data
            if outliers / len(self.df) >= 0.01:
                scaler = RobustScaler()
                print(f"{col}: Using RobustScaler (Found {outliers} outliers)")
            else:
                scaler = StandardScaler()
                print(f"{col}: Using StandardScaler")
            
            normalized_df[col] = scaler.fit_transform(self.df[[col]])
            scalers[col] = scaler
        
        return normalized_df, scalers
    
    def plot_outliers(self, columns=None):
        """
        Visualize outliers using box plots
        
        Parameters:
        -----------
        columns : list
            List of columns to visualize (None for all numeric variables)
        """
        if columns is None:
            columns = self.numeric_columns
            
        n_cols = len(columns)
        fig, axes = plt.subplots(n_cols, 1, figsize=(10, 5*n_cols))
        if n_cols == 1:
            axes = [axes]
            
        for idx, col in enumerate(columns):
            
            # Calculate outlier bounds
            Q1 = self.df[col].quantile(0.25)
            Q3 = self.df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            # Create boxplot
            sns.boxplot(data=self.df, y=col, ax=axes[idx])
            
            # Add text with outlier information
            outliers = ((self.df[col] < lower_bound) | (self.df[col] > upper_bound)).sum()
            axes[idx].set_title(f'{col} Outliers: {outliers} points')
            
        plt.tight_layout()
        return fig

    def generate_summary_report(self):
        """
        Generate comprehensive data summary report
        - Basic information
        - Numeric summary
        - Missing values analysis
        - Distribution plots
        - Correlation analysis
        - Outlier visualization
        """
        print("=== Data Summary Report ===")
        print("\nBasic Information:")
        print(f"- Total Rows: {len(self.df)}")
        print(f"- Total Columns: {len(self.df.columns)}")
        print(f"- Numeric Columns: {len(self.numeric_columns)}")
        print(f"- Non-numeric Columns: {len(self.df.columns) - len(self.numeric_columns)}")
        
        print("\nNumeric Columns Summary:")
        print(self.df[self.numeric_columns].describe())
        
        print("\nMissing Values Summary:")
        self.check_missing_values()
        
        # Generate and save all plots
        self.plot_distributions()
        plt.savefig('distributions.png')
        
        self.correlation_analysis()
        plt.savefig('correlation.png')
        
        self.plot_outliers()
        plt.savefig('outliers.png')
        
        print("\nPlots have been saved as:")
        print("- distributions.png")
        print("- correlation.png")
        print("- outliers.png")

In [None]:
# Usage Example:
"""
# Initialize the class with a DataFrame
eda = UniversalEDA(df)

# Generate comprehensive summary report
eda.generate_summary_report()

# Or perform specific analyses
eda.plot_distributions()  # Check distributions
eda.check_missing_values()  # Analyze missing values
eda.correlation_analysis()  # Analyze correlations
eda.plot_outliers()  # Visualize outliers

# Normalize data
normalized_df, scalers = eda.normalize_data()
"""

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import shapiro, spearmanr, pearsonr
from sklearn.preprocessing import StandardScaler, RobustScaler

# 1. Plot distribution of numerical features
def plot_distribution(df):
    num_cols = df.select_dtypes(include=['int64', 'float64']).columns
    for col in num_cols:
        plt.figure(figsize=(8, 4))
        sns.histplot(df[col], kde=True, bins=30, color='blue')
        plt.title(f'Distribution of {col}')
        plt.xlabel(col)
        plt.ylabel('Frequency')
        plt.show()

# 2. Missing Value Analysis
def missing_value_analysis(df):
    missing_counts = df.isnull().sum()
    missing_counts = missing_counts[missing_counts > 0]
    print("Missing Values Per Column:")
    print(missing_counts)
    
    if missing_counts.sum() > 0:
        print("\nRows with Missing Values:")
        display(df[df.isnull().any(axis=1)])

# 3. Compute Correlation (Pearson if normal, Spearman otherwise)
def calculate_correlation(df):
    num_cols = df.select_dtypes(include=['int64', 'float64']).columns
    correlation_matrix = pd.DataFrame(index=num_cols, columns=num_cols)
    
    for col1 in num_cols:
        for col2 in num_cols:
            if col1 != col2:
                stat, p = shapiro(df[col1].dropna())
                if p > 0.05:  # Normally distributed
                    corr, _ = pearsonr(df[col1].dropna(), df[col2].dropna())
                else:  # Not normally distributed
                    corr, _ = spearmanr(df[col1].dropna(), df[col2].dropna())
                correlation_matrix.loc[col1, col2] = corr
    
    correlation_matrix = correlation_matrix.astype(float)
    
    plt.figure(figsize=(10, 6))
    sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt=".2f")
    plt.title("Correlation Matrix")
    plt.show()
    
    return correlation_matrix

# 4. Detect Outliers
def detect_outliers(df, method="zscore", threshold=3):
    num_cols = df.select_dtypes(include=['int64', 'float64']).columns
    outlier_dict = {}
    
    for col in num_cols:
        if method == "zscore":
            mean, std = df[col].mean(), df[col].std()
            z_scores = (df[col] - mean) / std
            outliers = df[np.abs(z_scores) > threshold]
        elif method == "iqr":
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            outliers = df[(df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))]
        
        if not outliers.empty:
            outlier_dict[col] = outliers
    
    return outlier_dict

# 5. Normalize Data
def normalize_data(df):
    num_cols = df.select_dtypes(include=['int64', 'float64']).columns
    scaler_dict = {}
    df_scaled = df.copy()
    
    for col in num_cols:
        outliers = detect_outliers(df[[col]], method="zscore", threshold=3)
        
        if len(outliers) > 0:
            scaler = RobustScaler()
            print(f"Using RobustScaler for {col} (outliers detected)")
        else:
            scaler = StandardScaler()
            print(f"Using StandardScaler for {col} (no outliers detected)")
        
        df_scaled[col] = scaler.fit_transform(df[[col]])
        scaler_dict[col] = scaler
    
    return df_scaled, scaler_dict


In [None]:
df[column_name] = df[column_name].fillna(df[column_name].rolling(window, min_periods=1).mean())

In [None]:
###Time-Based Features:
# Basic time features
df['year'] = df['Date'].dt.year
df['month'] = df['Date'].dt.month
df['day'] = df['Date'].dt.day
df['day_of_week'] = df['Date'].dt.dayofweek
df['week_of_year'] = df['Date'].dt.isocalendar().week
df['quarter'] = df['Date'].dt.quarter

# Seasonal features
df['is_winter'] = df['month'].isin([12, 1, 2])
df['is_summer'] = df['month'].isin([6, 7, 8])
df['season'] = pd.cut(df['month'], 
                     bins=[0, 2, 5, 8, 11, 12], 
                     labels=['Winter', 'Spring', 'Summer', 'Fall', 'Winter'])

# Holiday features
from holidays import US
holidays_us = US()
df['is_holiday'] = df['Date'].isin(holidays_us)
df['is_weekend'] = df['day_of_week'].isin([5, 6])

###Temperature-Related Features:
# Temperature variations
df['temp_diff'] = df['actual_temp'] - df['forecasted_average_temp']
df['temp_lag_1d'] = df['actual_temp'].shift(1)
df['temp_lag_2d'] = df['actual_temp'].shift(2)
df['temp_lag_7d'] = df['actual_temp'].shift(7)

# Rolling temperature features
df['temp_rolling_mean_7d'] = df['actual_temp'].rolling(window=7).mean()
df['temp_rolling_std_7d'] = df['actual_temp'].rolling(window=7).std()

# Temperature change rates
df['temp_change_1d'] = df['actual_temp'] - df['temp_lag_1d']
df['temp_change_rate'] = df['temp_change_1d'] / df['temp_lag_1d']


###Gas Demand Historical Features:
# Lagged demand features
df['demand_lag_1d'] = df['actual_gas_demand'].shift(1)
df['demand_lag_2d'] = df['actual_gas_demand'].shift(2)
df['demand_lag_7d'] = df['actual_gas_demand'].shift(7)
df['demand_lag_30d'] = df['actual_gas_demand'].shift(30)

# Rolling demand statistics
df['demand_rolling_mean_7d'] = df['actual_gas_demand'].rolling(window=7).mean()
df['demand_rolling_std_7d'] = df['actual_gas_demand'].rolling(window=7).std()
df['demand_rolling_max_7d'] = df['actual_gas_demand'].rolling(window=7).max()


###HDD and Wind-Related Features:
# HDD features
df['hdd_lag_1d'] = df['HDD'].shift(1)
df['hdd_rolling_mean_7d'] = df['HDD'].rolling(window=7).mean()

# Wind features
df['wind_lag_1d'] = df['avg_wind'].shift(1)
df['wind_rolling_mean_7d'] = df['avg_wind'].rolling(window=7).mean()

# Interaction features
df['hdd_wind_interaction'] = df['HDD'] * df['avg_wind']


###Forecast Error Features:
# Demand forecast error features
df['demand_forecast_error'] = df['actual_gas_demand'] - df['forecasted_gas_demand']
df['demand_forecast_error_pct'] = df['demand_forecast_error'] / df['forecasted_gas_demand']

# Temperature forecast error features
df['temp_forecast_error'] = df['actual_temp'] - df['forecasted_average_temp']
df['temp_forecast_error_pct'] = df['temp_forecast_error'] / df['forecasted_average_temp']

###Cyclical Features (to better capture periodicity):
# Cyclical encoding of time features
df['month_sin'] = np.sin(2 * np.pi * df['month']/12)
df['month_cos'] = np.cos(2 * np.pi * df['month']/12)
df['day_of_week_sin'] = np.sin(2 * np.pi * df['day_of_week']/7)
df['day_of_week_cos'] = np.cos(2 * np.pi * df['day_of_week']/7)


###Additional Interaction Features:
# Create interaction features between important variables
df['temp_hdd_interaction'] = df['actual_temp'] * df['HDD']
df['wind_temp_interaction'] = df['avg_wind'] * df['actual_temp']
df['month_hdd_interaction'] = df['month'] * df['HDD']

### Important considerations:

- Handle missing values created by lag features appropriately
- Scale or normalize features if needed (though tree-based models like XGBoost and LightGBM can handle different scales)
- Consider removing highly correlated features
- Use feature importance from initial models to select most relevant features

In [None]:
# Forward fill for time series data
df = df.fillna(method='ffill')

# Or drop rows with missing values if at the start of the dataset
df = df.dropna()

In [None]:
#Feature selection using correlation:
def remove_highly_correlated_features(df, threshold=0.95, method='pearson'):
    # Calculate correlation matrix
    corr_matrix = df.corr(method=method).abs()
    
    # Create upper triangle matrix
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
    
    # Find features to drop
    to_drop = [column for column in upper.columns if any(upper[column] > threshold)]
    
    print(f"Features to drop: {to_drop}")
    return df.drop(columns=to_drop)

# 사용 예시:
df_features = remove_highly_correlated_features(df, threshold=0.95, method='pearson')

In [None]:
def remove_highly_correlated_features(df, threshold=0.95, method='pearson'):
    """
    Remove highly correlated features and return both cleaned dataframe and list of removed features
    
    Parameters:
    df (pandas.DataFrame): Input dataframe
    threshold (float): Correlation threshold for removal (default: 0.95)
    method (str): Correlation method ('pearson', 'spearman', or 'kendall')
    
    Returns:
    tuple: (cleaned_df, dropped_features, correlation_details)
        - cleaned_df: DataFrame with highly correlated features removed
        - dropped_features: List of removed feature names
        - correlation_details: DataFrame containing details of dropped correlations
    """
    # Calculate correlation matrix
    corr_matrix = df.corr(method=method).abs()
    
    # Create upper triangle matrix
    upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
    
    # Store correlation details
    correlation_details = []
    
    # Find features to drop
    to_drop = []
    for column in upper.columns:
        # Find highly correlated pairs
        high_corr = upper[column][upper[column] > threshold]
        
        if len(high_corr) > 0:
            if column not in to_drop:
                to_drop.append(column)
                
            # Store correlation details
            for idx, corr in high_corr.items():
                correlation_details.append({
                    'dropped_feature': column,
                    'correlated_with': idx,
                    'correlation': corr
                })
    
    # Convert correlation details to DataFrame
    correlation_details = pd.DataFrame(correlation_details)
    
    # Remove features
    cleaned_df = df.drop(columns=to_drop)
    
    return cleaned_df, to_drop, correlation_details

# 사용 예시:
cleaned_df, dropped_features, corr_details = remove_highly_correlated_features(df, threshold=0.95, method='pearson')

# 결과 확인
print("\nDropped Features:", dropped_features)
print("\nCorrelation Details:")
print(corr_details)

## XGBoost

In [None]:
import xgboost as xgb
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

def calculate_mpae(y_true, y_pred, epsilon=0.5):
   """
   Calculate Mean Percentage Absolute Error with epsilon adjustment for zero values
   """
   y_true = np.array(y_true)
   y_pred = np.array(y_pred)
   
   # Add epsilon to zero values
   y_true = np.where(y_true == 0, epsilon, y_true)
   
   # Calculate percentage absolute error
   pae = np.abs((y_true - y_pred) / y_true) * 100
   
   return np.mean(pae)

def train_evaluate_xgboost(X, y, random_state=42):
   """
   Train and evaluate XGBoost model
   """
   # Split data into training and testing sets
   X_train, X_test, y_train, y_test = train_test_split(
       X, y, test_size=0.2, shuffle=False  # No shuffle for time series data
   )
   
   # Initialize XGBoost model
   xgb_model = xgb.XGBRegressor(
       n_estimators=1000,
       learning_rate=0.01,
       max_depth=7,
       min_child_weight=1,
       subsample=0.8,
       colsample_bytree=0.8,
       random_state=random_state,
       n_jobs=-1
   )
   
   # Train model with early stopping
   eval_set = [(X_train, y_train), (X_test, y_test)]
   xgb_model.fit(
       X_train, y_train,
       eval_set=eval_set,
       eval_metric='rmse',
       early_stopping_rounds=50,
       verbose=100
   )
   
   # Make predictions
   y_pred = xgb_model.predict(X_test)
   
   # Calculate metrics
   metrics = {
       'rmse': np.sqrt(mean_squared_error(y_test, y_pred)),
       'mae': mean_absolute_error(y_test, y_pred),
       'r2': r2_score(y_test, y_pred),
       'mpae': calculate_mpae(y_test, y_pred, epsilon=0.5)
   }
   
   # Get feature importance
   feature_importance = pd.DataFrame({
       'feature': X_train.columns,
       'importance': xgb_model.feature_importances_
   }).sort_values('importance', ascending=False)
   
   # Print model performance
   print("\n=== Model Performance ===")
   print(f"RMSE: {metrics['rmse']:.4f}")
   print(f"MAE: {metrics['mae']:.4f}")
   print(f"R2: {metrics['r2']:.4f}")
   print(f"MPAE: {metrics['mpae']:.4f}%")
   
   # Plot feature importance
   plt.figure(figsize=(10, 6))
   sns.barplot(
       data=feature_importance.head(10),
       x='importance',
       y='feature'
   )
   plt.title('XGBoost Top 10 Feature Importance')
   plt.tight_layout()
   plt.show()
   
   # Plot actual vs predicted
   plt.figure(figsize=(10, 6))
   plt.scatter(y_test, y_pred, alpha=0.5)
   plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
   plt.xlabel('Actual')
   plt.ylabel('Predicted')
   plt.title('XGBoost: Actual vs Predicted')
   plt.tight_layout()
   plt.show()
   
   results = {
       'model': xgb_model,
       'predictions': y_pred,
       'metrics': metrics,
       'feature_importance': feature_importance,
       'test_index': X_test.index
   }
   
   return results

# 하이퍼파라미터 튜닝을 위한 함수
def tune_xgboost(X_train, y_train):
   """
   Tune XGBoost hyperparameters using GridSearchCV
   """
   param_grid = {
       'max_depth': [3, 5, 7],
       'learning_rate': [0.01, 0.1],
       'n_estimators': [100, 500, 1000],
       'min_child_weight': [1, 3, 5],
       'subsample': [0.6, 0.8, 1.0],
       'colsample_bytree': [0.6, 0.8, 1.0]
   }
   
   xgb_model = xgb.XGBRegressor(random_state=42)
   grid_search = GridSearchCV(
       estimator=xgb_model,
       param_grid=param_grid,
       cv=TimeSeriesSplit(n_splits=5),
       scoring='neg_root_mean_squared_error',
       n_jobs=-1,
       verbose=2
   )
   
   grid_search.fit(X_train, y_train)
   print("\nBest parameters:", grid_search.best_params_)
   return grid_search.best_params_

# 모델 실행
def run_xgboost_modeling(df, target_col, feature_cols):
   """
   Run complete XGBoost modeling process
   """
   # Prepare data
   X = df[feature_cols]
   y = df[target_col]
   
   # Train and evaluate model
   results = train_evaluate_xgboost(X, y)
   
   return results

# 사용 예시:
# feature_cols = [col for col in df.columns if col != target_col]
# results = run_xgboost_modeling(df, target_col='actual_gas_demand', feature_cols=feature_cols)

# 모델 저장
# results['model'].save_model('xgb_model.json')

## Light GBM

In [None]:
import lightgbm as lgb
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

def calculate_mpae(y_true, y_pred, epsilon=0.5):
   """
   Calculate Mean Percentage Absolute Error with epsilon adjustment for zero values
   """
   y_true = np.array(y_true)
   y_pred = np.array(y_pred)
   
   # Add epsilon to zero values
   y_true = np.where(y_true == 0, epsilon, y_true)
   
   # Calculate percentage absolute error
   pae = np.abs((y_true - y_pred) / y_true) * 100
   
   return np.mean(pae)

def train_evaluate_lightgbm(X, y, random_state=42):
   """
   Train and evaluate LightGBM model
   """
   # Split data into training and testing sets
   X_train, X_test, y_train, y_test = train_test_split(
       X, y, test_size=0.2, shuffle=False  # No shuffle for time series data
   )
   
   # Initialize LightGBM model
   lgb_model = lgb.LGBMRegressor(
       n_estimators=1000,
       learning_rate=0.01,
       num_leaves=31,
       subsample=0.8,
       colsample_bytree=0.8,
       random_state=random_state,
       n_jobs=-1
   )
   
   # Train model with early stopping
   eval_set = [(X_test, y_test)]
   lgb_model.fit(
       X_train, y_train,
       eval_set=eval_set,
       eval_metric='rmse',
       early_stopping_rounds=50,
       verbose=100
   )
   
   # Make predictions
   y_pred = lgb_model.predict(X_test)
   
   # Calculate metrics
   metrics = {
       'rmse': np.sqrt(mean_squared_error(y_test, y_pred)),
       'mae': mean_absolute_error(y_test, y_pred),
       'r2': r2_score(y_test, y_pred),
       'mpae': calculate_mpae(y_test, y_pred, epsilon=0.5)
   }
   
   # Get feature importance
   feature_importance = pd.DataFrame({
       'feature': X_train.columns,
       'importance': lgb_model.feature_importances_
   }).sort_values('importance', ascending=False)
   
   # Print model performance
   print("\n=== Model Performance ===")
   print(f"RMSE: {metrics['rmse']:.4f}")
   print(f"MAE: {metrics['mae']:.4f}")
   print(f"R2: {metrics['r2']:.4f}")
   print(f"MPAE: {metrics['mpae']:.4f}%")
   
   # Plot feature importance
   plt.figure(figsize=(10, 6))
   sns.barplot(
       data=feature_importance.head(10),
       x='importance',
       y='feature'
   )
   plt.title('LightGBM Top 10 Feature Importance')
   plt.tight_layout()
   plt.show()
   
   # Plot actual vs predicted
   plt.figure(figsize=(10, 6))
   plt.scatter(y_test, y_pred, alpha=0.5)
   plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
   plt.xlabel('Actual')
   plt.ylabel('Predicted')
   plt.title('LightGBM: Actual vs Predicted')
   plt.tight_layout()
   plt.show()
   
   results = {
       'model': lgb_model,
       'predictions': y_pred,
       'metrics': metrics,
       'feature_importance': feature_importance,
       'test_index': X_test.index
   }
   
   return results

# 하이퍼파라미터 튜닝을 위한 함수
def tune_lightgbm(X_train, y_train):
   """
   Tune LightGBM hyperparameters using GridSearchCV
   """
   param_grid = {
       'num_leaves': [31, 62, 127],
       'learning_rate': [0.01, 0.1],
       'n_estimators': [100, 500, 1000],
       'subsample': [0.6, 0.8, 1.0],
       'colsample_bytree': [0.6, 0.8, 1.0],
       'min_child_samples': [20, 50, 100]
   }
   
   lgb_model = lgb.LGBMRegressor(random_state=42)
   grid_search = GridSearchCV(
       estimator=lgb_model,
       param_grid=param_grid,
       cv=TimeSeriesSplit(n_splits=5),
       scoring='neg_root_mean_squared_error',
       n_jobs=-1,
       verbose=2
   )
   
   grid_search.fit(X_train, y_train)
   print("\nBest parameters:", grid_search.best_params_)
   return grid_search.best_params_

# 모델 실행
def run_lightgbm_modeling(df, target_col, feature_cols):
   """
   Run complete LightGBM modeling process
   """
   # Prepare data
   X = df[feature_cols]
   y = df[target_col]
   
   # Train and evaluate model
   results = train_evaluate_lightgbm(X, y)
   
   return results

# 사용 예시:
# feature_cols = [col for col in df.columns if col != target_col]
# results = run_lightgbm_modeling(df, target_col='actual_gas_demand', feature_cols=feature_cols)

# 모델 저장
# results['model'].save_model('lgb_model.txt')

In [None]:
import xgboost as xgb
import lightgbm as lgb
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np

def train_evaluate_models(X, y, random_state=42):
    """
    Train and evaluate XGBoost and LightGBM models
    
    Parameters:
    X (pd.DataFrame): Feature matrix
    y (pd.Series): Target variable
    random_state (int): Random seed for reproducibility
    
    Returns:
    dict: Dictionary containing trained models and their performance metrics
    """
    # Time series split for validation
    tscv = TimeSeriesSplit(n_splits=5)
    
    # Split data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, shuffle=False  # No shuffle for time series data
    )
    
    # Initialize models
    xgb_model = xgb.XGBRegressor(
        n_estimators=1000,
        learning_rate=0.01,
        max_depth=7,
        min_child_weight=1,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=random_state,
        n_jobs=-1
    )
    
    lgb_model = lgb.LGBMRegressor(
        n_estimators=1000,
        learning_rate=0.01,
        num_leaves=31,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=random_state,
        n_jobs=-1
    )
    
    # Train models with early stopping
    eval_set_xgb = [(X_train, y_train), (X_test, y_test)]
    eval_set_lgb = [(X_test, y_test)]
    
    # Train XGBoost
    xgb_model.fit(
        X_train, y_train,
        eval_set=eval_set_xgb,
        eval_metric='rmse',
        early_stopping_rounds=50,
        verbose=100
    )
    
    # Train LightGBM
    lgb_model.fit(
        X_train, y_train,
        eval_set=eval_set_lgb,
        eval_metric='rmse',
        early_stopping_rounds=50,
        verbose=100
    )
    
    # Make predictions
    xgb_pred = xgb_model.predict(X_test)
    lgb_pred = lgb_model.predict(X_test)
    
    # Calculate metrics
    def calculate_metrics(y_true, y_pred, model_name):
        return {
            f'{model_name}_rmse': np.sqrt(mean_squared_error(y_true, y_pred)),
            f'{model_name}_mae': mean_absolute_error(y_true, y_pred),
            f'{model_name}_r2': r2_score(y_true, y_pred)
        }
    
    # Get metrics for both models
    xgb_metrics = calculate_metrics(y_test, xgb_pred, 'xgb')
    lgb_metrics = calculate_metrics(y_test, lgb_pred, 'lgb')
    
    # Get feature importance
    xgb_importance = pd.DataFrame({
        'feature': X_train.columns,
        'importance': xgb_model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    lgb_importance = pd.DataFrame({
        'feature': X_train.columns,
        'importance': lgb_model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    # Combine all results
    results = {
        'xgb_model': xgb_model,
        'lgb_model': lgb_model,
        'xgb_predictions': xgb_pred,
        'lgb_predictions': lgb_pred,
        'metrics': {**xgb_metrics, **lgb_metrics},
        'feature_importance': {
            'xgb': xgb_importance,
            'lgb': lgb_importance
        },
        'test_index': X_test.index
    }
    
    return results

# 모델 학습 및 평가 실행
def run_modeling(df, target_col, feature_cols):
    """
    Run the complete modeling process
    
    Parameters:
    df (pd.DataFrame): Complete dataset
    target_col (str): Name of target column
    feature_cols (list): List of feature column names
    """
    # Prepare data
    X = df[feature_cols]
    y = df[target_col]
    
    # Train and evaluate models
    results = train_evaluate_models(X, y)
    
    # Print results
    print("\n=== Model Performance ===")
    print("\nXGBoost Metrics:")
    print(f"RMSE: {results['metrics']['xgb_rmse']:.4f}")
    print(f"MAE: {results['metrics']['xgb_mae']:.4f}")
    print(f"R2: {results['metrics']['xgb_r2']:.4f}")
    
    print("\nLightGBM Metrics:")
    print(f"RMSE: {results['metrics']['lgb_rmse']:.4f}")
    print(f"MAE: {results['metrics']['lgb_mae']:.4f}")
    print(f"R2: {results['metrics']['lgb_r2']:.4f}")
    
    # Plot feature importance
    plt.figure(figsize=(12, 6))
    
    plt.subplot(1, 2, 1)
    sns.barplot(
        data=results['feature_importance']['xgb'].head(10),
        x='importance',
        y='feature'
    )
    plt.title('XGBoost Top 10 Feature Importance')
    
    plt.subplot(1, 2, 2)
    sns.barplot(
        data=results['feature_importance']['lgb'].head(10),
        x='importance',
        y='feature'
    )
    plt.title('LightGBM Top 10 Feature Importance')
    plt.tight_layout()
    plt.show()
    
    # Plot actual vs predicted
    plt.figure(figsize=(15, 6))
    
    plt.subplot(1, 2, 1)
    plt.scatter(y[results['test_index']], results['xgb_predictions'], alpha=0.5)
    plt.plot([y.min(), y.max()], [y.min(), y.max()], 'r--', lw=2)
    plt.xlabel('Actual')
    plt.ylabel('Predicted')
    plt.title('XGBoost: Actual vs Predicted')
    
    plt.subplot(1, 2, 2)
    plt.scatter(y[results['test_index']], results['lgb_predictions'], alpha=0.5)
    plt.plot([y.min(), y.max()], [y.min(), y.max()], 'r--', lw=2)
    plt.xlabel('Actual')
    plt.ylabel('Predicted')
    plt.title('LightGBM: Actual vs Predicted')
    plt.tight_layout()
    plt.show()
    
    return results

# 사용 예시:
# feature_cols = [col for col in df.columns if col != target_col]
# results = run_modeling(df, target_col='actual_gas_demand', feature_cols=feature_cols)

# 모델 저장
# results['xgb_model'].save_model('xgb_model.json')
# results['lgb_model'].save_model('lgb_model.txt')

In [None]:
from sklearn.model_selection import GridSearchCV

def tune_xgboost(X_train, y_train):
    param_grid = {
        'max_depth': [3, 5, 7],
        'learning_rate': [0.01, 0.1],
        'n_estimators': [100, 500, 1000],
        'min_child_weight': [1, 3, 5],
        'subsample': [0.6, 0.8, 1.0],
        'colsample_bytree': [0.6, 0.8, 1.0]
    }
    
    xgb_model = xgb.XGBRegressor(random_state=42)
    grid_search = GridSearchCV(
        estimator=xgb_model,
        param_grid=param_grid,
        cv=TimeSeriesSplit(n_splits=5),
        scoring='neg_root_mean_squared_error',
        n_jobs=-1,
        verbose=2
    )
    
    grid_search.fit(X_train, y_train)
    return grid_search.best_params_

# Prophet

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

# 데이터 로드
df = pd.read_csv("gas_demand_data.csv")

# 날짜 데이터 변환
df['Date'] = pd.to_datetime(df['Date'])
df = df.sort_values('Date')

# Prophet이 요구하는 데이터 포맷으로 변환
prophet_df = df[['Date', 'daily actual gas demand']].rename(columns={'Date': 'ds', 'daily actual gas demand': 'y'})


FileNotFoundError: [Errno 2] No such file or directory: 'gas_demand_data.csv'

In [None]:
from prophet import Prophet

# Prophet 모델 생성 및 학습
prophet_model = Prophet()
prophet_model.fit(prophet_df)

# 예측할 기간 설정 (6일 후 예측)
future = prophet_model.make_future_dataframe(periods=6)
forecast = prophet_model.predict(future)

# 예측 결과 시각화
prophet_model.plot(forecast)
plt.show()

# 결과 확인
forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()


In [None]:
import pandas as pd
from prophet import Prophet
from sklearn.metrics import mean_absolute_percentage_error

# 1. 데이터 로드 및 준비
data = pd.read_csv('your_dataset.csv')  # 데이터셋 로드
data['Date'] = pd.to_datetime(data['Date'])  # 날짜 형식 변환
df = data.rename(columns={'Date': 'ds', 'daily actual gas demand': 'y'})  # Prophet 요구 형식

# 외부 변수 추가 (예: 날씨 데이터)
extra_regressors = ['daily average actual temperature', 'daily average forecasted temperature', 
                    'HDD', 'daily total temperature', 'day minimum temperature', 
                    'day maximum temperature', 'daily average wind']

# 2. Prophet 모델 설정
model = Prophet(daily_seasonality=True, yearly_seasonality=True, weekly_seasonality=True)
for regressor in extra_regressors:
    model.add_regressor(regressor)

# 3. 모델 학습
model.fit(df)

# 4. 미래 데이터 프레임 생성 (예: 6일 후까지 예측)
future_dates = model.make_future_dataframe(periods=6)  # 6일 후까지
# 미래 날씨 데이터가 필요하면 외부에서 제공해야 함 (여기서는 과거 데이터 평균으로 임시 대체)
for regressor in extra_regressors:
    future_dates[regressor] = df[regressor].mean()  # 실제로는 예측된 날씨 데이터를 사용

# 5. 예측
forecast = model.predict(future_dates)
print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(6))  # 예측 결과

# 6. 성능 평가 (학습 데이터로 예시)
y_true = df['y']
y_pred = forecast['yhat'][:len(df)]
mape = mean_absolute_percentage_error(y_true, y_pred)
print(f"MAPE: {mape:.4f}")

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from prophet import Prophet
from sklearn.metrics import mean_absolute_error, mean_squared_error

# 1. 데이터 로드 및 전처리
def load_data(file_path):
    # 데이터 로드
    df = pd.read_csv(file_path)
    
    # 날짜 형식 변환
    df['Date'] = pd.to_datetime(df['Date'])
    
    return df

# 2. Prophet 모델을 위한 데이터 준비
def prepare_prophet_data(df, target_col='daily_actual_gas_demand', date_col='Date'):
    # Prophet은 'ds'와 'y' 컬럼명을 사용
    prophet_df = df[[date_col, target_col]].rename(columns={
        date_col: 'ds',
        target_col: 'y'
    })
    
    return prophet_df

# 3. 추가 변수(리그레서) 설정
def add_regressors(df, prophet_df, regressor_cols):
    for col in regressor_cols:
        prophet_df[col] = df[col].values
    
    return prophet_df

# 4. Prophet 모델 훈련
def train_prophet_model(train_df, regressor_cols=None, forecast_days=1):
    # 기본 모델 생성
    model = Prophet(
        yearly_seasonality=True,
        weekly_seasonality=True,
        daily_seasonality=False,
        changepoint_prior_scale=0.05  # 추세 변화 유연성 조절
    )
    
    # 리그레서 추가
    if regressor_cols:
        for col in regressor_cols:
            model.add_regressor(col)
    
    # 모델 훈련
    model.fit(train_df)
    
    return model

# 5. 미래 데이터 예측
def make_future_dataframe(model, train_df, forecast_days, regressor_cols=None, future_regressors=None):
    # 미래 기간 데이터프레임 생성
    future = model.make_future_dataframe(periods=forecast_days)
    
    # 미래 리그레서 값 추가
    if regressor_cols and future_regressors:
        for col in regressor_cols:
            # 기존 학습 데이터의 리그레서 값 복사
            future[col] = train_df[col].values[-len(train_df):]
            
            # 마지막 forecast_days일의 리그레서 값은 미래 값으로 대체
            if col in future_regressors:
                future.loc[len(future) - forecast_days:, col] = future_regressors[col]
    
    return future

# 6. 예측 및 평가
def predict_and_evaluate(model, future, actual_df, forecast_days):
    # 예측 수행
    forecast = model.predict(future)
    
    # 필요한 컬럼만 선택
    forecast_result = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
    
    # 예측 결과와 실제 값 비교 (테스트 기간에 대해)
    if actual_df is not None:
        # 예측 결과에서 테스트 기간 데이터만 선택
        test_forecast = forecast_result.tail(forecast_days).reset_index(drop=True)
        
        # 실제 값 준비
        test_actual = actual_df.tail(forecast_days).reset_index(drop=True)
        
        # 평가 지표 계산
        mae = mean_absolute_error(test_actual['y'], test_forecast['yhat'])
        rmse = np.sqrt(mean_squared_error(test_actual['y'], test_forecast['yhat']))
        
        print(f"MAE: {mae:.2f}")
        print(f"RMSE: {rmse:.2f}")
    
    return forecast_result

# 7. 결과 시각화
def plot_results(model, forecast, actual_df=None):
    # Prophet 결과 플롯
    fig1 = model.plot(forecast)
    
    # 구성 요소 플롯 (추세, 계절성 등)
    fig2 = model.plot_components(forecast)
    
    # 실제 값과 예측 값 비교 플롯
    if actual_df is not None:
        plt.figure(figsize=(12, 6))
        plt.plot(forecast['ds'].values, forecast['yhat'].values, label='Predicted')
        plt.plot(actual_df['ds'].values, actual_df['y'].values, label='Actual')
        plt.fill_between(
            forecast['ds'].values,
            forecast['yhat_lower'].values,
            forecast['yhat_upper'].values,
            alpha=0.3,
            color='gray'
        )
        plt.legend()
        plt.title('Actual vs Predicted Gas Demand')
        plt.xlabel('Date')
        plt.ylabel('Gas Demand')
        plt.tight_layout()
        plt.show()
    
    return fig1, fig2

# 전체 파이프라인
def prophet_forecast_pipeline(
    file_path, 
    target_col='daily_actual_gas_demand', 
    date_col='Date',
    regressor_cols=['daily_average_actual_temperature', 'HDD', 'daily_average_wind'],
    train_ratio=0.8,
    forecast_days=3,
    future_regressors=None
):
    # 데이터 로드
    df = load_data(file_path)
    
    # 학습/테스트 분할
    train_size = int(len(df) * train_ratio)
    train_df = df[:train_size]
    test_df = df[train_size:]
    
    # Prophet 데이터 준비
    prophet_train_df = prepare_prophet_data(train_df, target_col, date_col)
    prophet_test_df = prepare_prophet_data(test_df, target_col, date_col)
    
    # 리그레서 추가
    prophet_train_df = add_regressors(train_df, prophet_train_df, regressor_cols)
    
    # 모델 훈련
    model = train_prophet_model(prophet_train_df, regressor_cols, forecast_days)
    
    # 미래 데이터프레임 생성
    future = make_future_dataframe(model, prophet_train_df, forecast_days, regressor_cols, future_regressors)
    
    # 예측 및 평가
    forecast = predict_and_evaluate(model, future, prophet_test_df, forecast_days)
    
    # 결과 시각화
    plot_results(model, forecast, prophet_test_df)
    
    return model, forecast

# 예시 실행
if __name__ == "__main__":
    # 파일 경로 설정
    file_path = "gas_demand_data.csv"
    
    # 미래 리그레서 값 (예시)
    future_regressors = {
        'daily_average_actual_temperature': [10.5, 11.2, 9.8],  # 향후 3일 예상 기온
        'HDD': [7.5, 6.8, 8.2],                                 # 향후 3일 예상 HDD
        'daily_average_wind': [12.3, 14.1, 11.5]                # 향후 3일 예상 풍속
    }
    
    # 파이프라인 실행
    model, forecast = prophet_forecast_pipeline(
        file_path=file_path,
        forecast_days=3,
        future_regressors=future_regressors
    )
    
    # 결과 출력
    print("예측 결과:")
    print(forecast.tail(3))

# xLSTM 모델 학습 및 예측

xLSTM과 기존 LSTM의 주요 차이점은 다음과 같습니다:

지수적 메모리 메커니즘(Exponential Memory):

xLSTM은 gamma, eta, lambda_param이라는 추가 파라미터를 도입합니다.
셀 상태(cell state)를 계산할 때 지수 함수를 사용하여 메모리를 변형합니다: cx_exp = gamma * exp(-eta * |cx|) * cx


메모리 제어 파라미터:

gamma: 메모리 확장/축소 계수
eta: 메모리 변환 강도를 제어
lambda_param: 메모리 감쇠 제어 파라미터


셀 상태 업데이트 방식:

기존 LSTM: cy = forgetgate * cx + ingate * cellgate
xLSTM: cy = forgetgate * cx_exp + ingate * cellgate
업데이트된 셀 상태에 lambda_param을 곱하여 추가적인 메모리 제어: cy = cy * lambda_param



이러한 변화로 인해 xLSTM은 다음과 같은 장점을 가집니다:

장기 의존성(long-term dependencies)을 더 효과적으로 처리
그래디언트 흐름 개선
메모리 제어 메커니즘의 유연성 증가
시퀀스 데이터에서 복잡한 패턴을 더 잘 포착

In [None]:
import torch
import torch.nn as nn
import math

class xLSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True):
        super(xLSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        
        # Input gate weights
        self.weight_ih = nn.Parameter(torch.randn(4 * hidden_size, input_size))
        self.weight_hh = nn.Parameter(torch.randn(4 * hidden_size, hidden_size))
        
        # Additional exponential memory parameters for xLSTM
        self.gamma = nn.Parameter(torch.ones(hidden_size))
        self.eta = nn.Parameter(torch.zeros(hidden_size))
        self.lambda_param = nn.Parameter(torch.ones(hidden_size))
        
        if bias:
            self.bias_ih = nn.Parameter(torch.zeros(4 * hidden_size))
            self.bias_hh = nn.Parameter(torch.zeros(4 * hidden_size))
        else:
            self.register_parameter('bias_ih', None)
            self.register_parameter('bias_hh', None)
            
        self.reset_parameters()
        
    def reset_parameters(self):
        stdv = 1.0 / math.sqrt(self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)
            
    def forward(self, input, state):
        hx, cx = state
        
        gates = torch.mm(input, self.weight_ih.t())
        if self.bias_ih is not None:
            gates += self.bias_ih
            
        gates += torch.mm(hx, self.weight_hh.t())
        if self.bias_hh is not None:
            gates += self.bias_hh
            
        # Split gates
        ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1)
        
        # Apply activations
        ingate = torch.sigmoid(ingate)
        forgetgate = torch.sigmoid(forgetgate)
        cellgate = torch.tanh(cellgate)
        outgate = torch.sigmoid(outgate)
        
        # xLSTM cell calculation with exponential memory
        cx_exp = self.gamma * torch.exp(-self.eta * torch.abs(cx)) * cx
        
        # Update cell state with exponential memory component
        cy = forgetgate * cx_exp + ingate * cellgate
        
        # Apply lambda parameter to control memory decay
        cy = cy * self.lambda_param
        
        # Output calculation
        hy = outgate * torch.tanh(cy)
        
        return hy, cy


class xLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=1, bias=True, 
                 batch_first=False, dropout=0.0, bidirectional=False):
        super(xLSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bias = bias
        self.batch_first = batch_first
        self.dropout = dropout
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1
        
        # Create a list of xLSTM cells for each layer and direction
        self.cell_list = nn.ModuleList()
        for layer in range(num_layers):
            for direction in range(self.num_directions):
                layer_input_size = input_size if layer == 0 else hidden_size * self.num_directions
                self.cell_list.append(xLSTMCell(layer_input_size, hidden_size, bias))
                
        if dropout > 0 and num_layers > 1:
            self.dropout_layer = nn.Dropout(dropout)
        else:
            self.dropout_layer = None
    
    def forward(self, input, hx=None):
        """
        Input: input, (h_0, c_0)
            - input: tensor of shape (seq_len, batch, input_size) or (batch, seq_len, input_size)
            - h_0: tensor of shape (num_layers * num_directions, batch, hidden_size)
            - c_0: tensor of shape (num_layers * num_directions, batch, hidden_size)
        Output: output, (h_n, c_n)
            - output: tensor of shape (seq_len, batch, hidden_size * num_directions) or (batch, seq_len, hidden_size * num_directions)
            - h_n: tensor of shape (num_layers * num_directions, batch, hidden_size)
            - c_n: tensor of shape (num_layers * num_directions, batch, hidden_size)
        """
        is_batch_first = self.batch_first
        if is_batch_first:
            input = input.transpose(0, 1)  # Convert to seq_len, batch, features
        
        seq_len, batch_size, _ = input.size()
        
        if hx is None:
            hx = torch.zeros(self.num_layers * self.num_directions, batch_size, self.hidden_size, 
                            device=input.device)
            cx = torch.zeros(self.num_layers * self.num_directions, batch_size, self.hidden_size, 
                            device=input.device)
        else:
            hx, cx = hx
            
        # Output tensors for each layer
        layer_outputs = []
        h_n = []
        c_n = []
        
        # Process input sequence
        for layer in range(self.num_layers):
            layer_input = input
            if layer > 0:
                if self.dropout_layer is not None:
                    layer_input = self.dropout_layer(layer_input)
            
            # Forward direction
            h_forward = hx[layer*self.num_directions]
            c_forward = cx[layer*self.num_directions]
            output_forward = []
            
            for t in range(seq_len):
                h_forward, c_forward = self.cell_list[layer*self.num_directions](
                    layer_input[t], (h_forward, c_forward))
                output_forward.append(h_forward)
                
            output_forward = torch.stack(output_forward)
            
            # If bidirectional, process in backward direction
            if self.bidirectional:
                h_backward = hx[layer*self.num_directions + 1]
                c_backward = cx[layer*self.num_directions + 1]
                output_backward = []
                
                for t in range(seq_len-1, -1, -1):
                    h_backward, c_backward = self.cell_list[layer*self.num_directions + 1](
                        layer_input[t], (h_backward, c_backward))
                    output_backward.append(h_backward)
                    
                output_backward = torch.stack(output_backward[::-1])
                
                # Concatenate forward and backward outputs
                layer_output = torch.cat([output_forward, output_backward], dim=2)
                
                # Add final states to our lists
                h_n.append(h_forward)
                h_n.append(h_backward)
                c_n.append(c_forward)
                c_n.append(c_backward)
            else:
                layer_output = output_forward
                h_n.append(h_forward)
                c_n.append(c_forward)
                
            # Set this layer's output as the next layer's input
            input = layer_output
            layer_outputs.append(layer_output)
            
        h_n = torch.stack(h_n)
        c_n = torch.stack(c_n)
        
        output = layer_outputs[-1]
        if is_batch_first:
            output = output.transpose(0, 1)  # Convert back to batch, seq, features
            
        return output, (h_n, c_n)

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler

# 1. 데이터 로드 및 준비
data = pd.read_csv('your_dataset.csv')
features = ['daily average actual temperature', 'daily average forecasted temperature', 
            'HDD', 'daily total temperature', 'day minimum temperature', 
            'day maximum temperature', 'daily average wind', 'daily actual gas demand']
df = data[features]

# 2. 데이터 정규화
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df)

# 시퀀스 데이터 생성 (과거 7일로 다음 1일 예측)
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length, :-1])  # 모든 feature 제외 gas demand
        y.append(data[i+seq_length, -1])     # gas demand만 타겟
    return np.array(X), np.array(y)

seq_length = 7
X, y = create_sequences(scaled_data, seq_length)
X = torch.FloatTensor(X)
y = torch.FloatTensor(y)

# 3. xLSTM 셀 구현 (sLSTM 기반)
class xLSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(xLSTMCell, self).__init__()
        self.hidden_size = hidden_size
        
        # 게이트 및 상태 업데이트를 위한 선형 변환
        self.W_i = nn.Linear(input_size + hidden_size, hidden_size)  # 입력 게이트
        self.W_f = nn.Linear(input_size + hidden_size, hidden_size)  # 망각 게이트
        self.W_c = nn.Linear(input_size + hidden_size, hidden_size)  # 셀 상태 후보
        self.W_o = nn.Linear(input_size + hidden_size, hidden_size)  # 출력 게이트
        self.W_n = nn.Linear(input_size + hidden_size, hidden_size)  # 누적 메모리 가중치 (sLSTM 특유)
        
    def forward(self, x, h_prev, c_prev, n_prev):
        # 입력과 이전 은닉 상태 결합
        combined = torch.cat((x, h_prev), dim=1)
        
        # 게이트 계산
        i_t = torch.sigmoid(self.W_i(combined))  # 입력 게이트
        f_t = torch.sigmoid(self.W_f(combined))  # 망각 게이트 (지수적 감쇠 반영)
        o_t = torch.sigmoid(self.W_o(combined))  # 출력 게이트
        c_tilde = torch.tanh(self.W_c(combined)) # 셀 상태 후보
        n_tilde = torch.sigmoid(self.W_n(combined)) # 누적 메모리 가중치
        
        # sLSTM의 셀 상태 업데이트 (정규화된 누적 메모리 반영)
        n_t = f_t * n_prev + i_t  # 누적 메모리 (지수 가중 평균 스타일)
        c_t = (f_t * c_prev + i_t * c_tilde) / (n_t + 1e-8)  # 정규화된 셀 상태
        
        # 은닉 상태 업데이트
        h_t = o_t * torch.tanh(c_t)
        
        return h_t, c_t, n_t

# xLSTM 모델 정의
class xLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(xLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.xlstm_cell = xLSTMCell(input_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        h_t = torch.zeros(batch_size, self.hidden_size)
        c_t = torch.zeros(batch_size, self.hidden_size)
        n_t = torch.ones(batch_size, self.hidden_size) * 1e-8  # 초기 누적 메모리
        
        # 시퀀스 처리
        for t in range(seq_len):
            h_t, c_t, n_t = self.xlstm_cell(x[:, t, :], h_t, c_t, n_t)
        
        # 마지막 출력으로 예측
        out = self.fc(h_t)
        return out

# 모델 설정
input_size = len(features) - 1  # gas demand 제외
hidden_size = 64
output_size = 1
model = xLSTM(input_size, hidden_size, output_size)

# 4. 모델 학습
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
epochs = 100

for epoch in range(epochs):
    model.train()
    outputs = model(X)
    loss = criterion(outputs, y.unsqueeze(1))
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

# 5. 예측 (마지막 시퀀스로 1일 후 예측)
model.eval()
with torch.no_grad():
    last_sequence = X[-1].unsqueeze(0)  # 마지막 7일 데이터
    pred = model(last_sequence)
    pred_value = scaler.inverse_transform([[0]*input_size + [pred.item()]])[0][-1]
    print(f"Predicted gas demand: {pred_value}")

# 6. 성능 평가 (별도 테스트셋 필요)

xLSTM과 LSTM의 차이점
xLSTM은 "xLSTM: Extended Long Short-Term Memory" 논문(2024)에서 제안된 모델로, LSTM의 구조를 확장한 것입니다. 주요 차이점은 다음과 같습니다:

메모리 구조
LSTM: 단일 셀 상태(c_t)를 사용해 정보를 기억하며, 망각 게이트(f_t)로 과거 정보를 지우고 입력 게이트(i_t)로 새 정보를 추가.
xLSTM: 두 가지 변형을 제공
sLSTM: 스칼라 메모리 기반으로, 누적 메모리 가중치(n_t)를 추가해 셀 상태를 정규화. 위 코드에서 n_t를 통해 이를 반영.
mLSTM: 행렬 메모리 기반으로, 더 많은 정보를 병렬적으로 저장(여기서는 간소화로 생략).
코드 차이: n_t를 계산하고 셀 상태(c_t)를 n_t로 정규화하는 과정이 추가됨.
망각 메커니즘
LSTM: 망각 게이트(f_t)는 0~1 사이 값을 곱해 과거 정보를 단순히 스케일링.
xLSTM: sLSTM에서는 f_t가 지수 가중 이동 평균(EMA)처럼 작동하며, n_t를 통해 누적된 메모리 크기를 조정. 이로 인해 긴 시퀀스에서 정보 손실이 줄어듦.
코드 차이: n_t = f_t * n_prev + i_t로 누적 메모리를 계산.
셀 상태 정규화
LSTM: 셀 상태(c_t)는 정규화 없이 직접 업데이트(c_t = f_t * c_prev + i_t * c_tilde).
xLSTM: 셀 상태를 누적 메모리(n_t)로 나눠 정규화(c_t = (f_t * c_prev + i_t * c_tilde) / n_t). 이로 인해 값의 폭발/소실 방지.
코드 차이: c_t 계산 시 / (n_t + 1e-8) 추가.
장기 의존성
LSTM: 긴 시퀀스에서 기울기 소실 문제로 인해 장기 의존성 학습에 한계.
xLSTM: sLSTM과 mLSTM의 조합으로 더 긴 시퀀스에서 패턴을 효과적으로 학습. 특히 sLSTM의 정규화로 안정성 향상.
연산 방식
LSTM: 단순한 게이트 연산.
xLSTM: mLSTM은 행렬 연산을 추가해 병렬 처리 가능(코드에서는 생략). sLSTM은 스칼라 연산으로 효율성과 성능 균형 유지.
코드에서 반영된 차이
추가 상태(n_t): LSTM에는 없는 누적 메모리 가중치(n_t)를 계산하고, 이를 셀 상태 업데이트에 활용.
정규화: c_t를 n_t로 나누는 과정이 LSTM과 다름.
구조: xLSTMCell에서 추가적인 선형 변환(W_n)과 계산 로직이 포함됨.
참고 사항
간소화: 위 코드는 sLSTM 스타일만 반영했으며, mLSTM(행렬 메모리)까지 포함하려면 더 복잡한 구현 필요.
미래 예측: 3일, 6일 후 예측을 위해서는 last_sequence에 미래 날씨 데이터를 추가해야 함.
성능 최적화: seq_length, hidden_size, 학습률 조정 추천.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# ✅ xLSTM 모델 정의
class xLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout=0.2):
        super(xLSTM, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size

        # 기존 LSTM과 다른 xLSTM의 핵심: 입력과 망각 게이트를 하나로 합친 구조
        self.xlstm = nn.LSTM(input_size, hidden_size, num_layers, 
                             batch_first=True, dropout=dropout, bidirectional=True)

        # 망각 게이트를 개선한 xLSTM 구조
        self.forget_gate = nn.Linear(hidden_size * 2, hidden_size * 2)  # Bidirectional
        self.sigmoid = nn.Sigmoid()

        # 최종 출력 레이어
        self.fc = nn.Linear(hidden_size * 2, 1)  # Bidirectional이므로 hidden_size * 2

    def forward(self, x):
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)  # Bidirectional이므로 *2
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)

        # xLSTM 통과
        lstm_out, _ = self.xlstm(x, (h0, c0))

        # 망각 게이트 개선
        forget_vector = self.sigmoid(self.forget_gate(lstm_out[:, -1, :]))

        # 최종 출력 계산
        output = self.fc(forget_vector)
        return output

# ✅ 하이퍼파라미터 설정
input_size = 7  # Feature 개수
hidden_size = 128
num_layers = 3  # 깊은 xLSTM 구조 적용
dropout = 0.3
batch_size = 16
learning_rate = 0.001
epochs = 30

# ✅ 모델 초기화
model = xLSTM(input_size, hidden_size, num_layers, dropout)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# ✅ 모델 학습 루프
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to("cuda"), y_batch.to("cuda")
        
        optimizer.zero_grad()
        output = model(X_batch)
        loss = criterion(output.squeeze(), y_batch)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader)}')

# ✅ 테스트 평가
model.eval()
with torch.no_grad():
    y_pred = model(X_test_tensor.to("cuda")).squeeze()
    test_loss = criterion(y_pred, y_test_tensor.to("cuda"))
    print(f'Test Loss: {test_loss.item()}')

# ✅ 결과 시각화
import matplotlib.pyplot as plt

plt.figure(figsize=(10,5))
plt.plot(y_test_tensor.cpu().numpy(), label="Actual")
plt.plot(y_pred.cpu().numpy(), label="Predicted")
plt.legend()
plt.show()
