In [6]:
import sys
import os
!{sys.executable} -m pip install pydataxm plotly pyarrow



In [7]:
import datetime as dt
import pandas as pd
import numpy as np
import re
import warnings
import matplotlib.pyplot as plt
import numpy as np
import json
from pydataxm import *
from pydataxm.pydatasimem import ReadSIMEM, CatalogSIMEM
from datetime import datetime as dt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.preprocessing import StandardScaler
from prophet import Prophet
from prophet.utilities import regressor_coefficients
import plotly.graph_objects as go
import plotly.express as px
from json import load
from pathlib import Path
import pickle
import pyarrow.parquet as pq
warnings.filterwarnings('ignore')

In [8]:
class ThermalGenerationPipeline:
    """
    A pipeline for fetching, processing, modeling, and forecasting thermal generation data.

    The pipeline consists of several stages:
    1. Data Acquisition - Fetch data from APIs and external sources
    2. Data Processing - Clean and transform the raw data
    3. Feature Engineering - Create additional features
    4. Modeling - Train and evaluate forecasting models
    5. Forecasting - Generate out-of-sample predictions
    6. Visualization - Create plots and visualizations
    """

    def __init__(self):
        """Initialize the pipeline with default settings"""
        self.data = None
        self.processed_data = None
        self.model = None
        self.scaler = None
        self.y_mean = None
        self.y_std = None
        self.results = None
        self.model = self.load_serialized_model('serialized_prophet')


    def fetch_data(self, list_date, metric_Id_list, metric_Id_list_tar, entity_cov, entity_tar):
        """
        Fetch data from various sources including APIs and external datasets.

        Args:
            list_date: List containing start and end dates [start_date, end_date]
            metric_Id_list: List of metric IDs for covariates
            metric_Id_list_tar: Metric ID for target variable
            entity_cov: Entity for covariates
            entity_tar: Entity for target variable

        Returns:
            List of raw DataFrames
        """
        df_thermalgen = []

        # Fetch covariates data from API
        objetoAPI = pydataxm.ReadDB()
        for metric_id in metric_Id_list:
            df = objetoAPI.request_data(
                metric_id,
                entity_cov,
                dt.date(list_date[0]),
                dt.date(list_date[1]))
            df_thermalgen.append(df)

        # Fetch thermal generation data
        dataset_gen = 'E17D25'  # Real Generation from API
        simem_gen = ReadSIMEM(dataset_gen, list_date[0], list_date[1])
        df_tar = simem_gen.main()
        df_thermalgen.append(df_tar)

        # Fetch SST data for ONI index
        url = "https://psl.noaa.gov/data/correlation/nina3.anom.data"
        df_sst = self._process_sst_data(url, list_date)
        df_thermalgen.append(df_sst)

        self.data = df_thermalgen
        return df_thermalgen

    def _process_sst_data(self, url, date_range):
        """Process SST data from NOAA"""
        df_sst = pd.read_csv(url, delim_whitespace=True, skiprows=1, header=None)
        df_sst_clean = df_sst.iloc[2:-3,:]
        df_sst_clean.columns = ['Year', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12']

        # Reshape and clean data
        df_sst_clean_stack = df_sst_clean.set_index(['Year'])[df_sst_clean.columns[1:]].stack().reset_index()
        df_sst_clean_stack.columns = ['Year', 'Month', 'ONI Anomaly']
        df_sst_clean_stack['Date'] = pd.to_datetime(df_sst_clean_stack[['Year', 'Month']].assign(DAY=1))
        df_sst_clean_stack.drop(columns=['Year', 'Month'], inplace=True)
        df_sst_clean_stack.set_index('Date', inplace=True)
        df_sst_clean_stack['ONI Anomaly'] = pd.to_numeric(df_sst_clean_stack['ONI Anomaly'], errors='coerce')

        # Filter by date range and resample
        df_sst_clean_stack = df_sst_clean_stack[
            (df_sst_clean_stack.index >= date_range[0]) &
            (df_sst_clean_stack.index <= date_range[1])
        ]
        df_sst_clean_stack_daily = df_sst_clean_stack.resample('D').ffill()

        # Extend with 30 days of forward fill
        new_dates = pd.date_range(
            start=df_sst_clean_stack_daily.index[-1] + pd.Timedelta(days=1),
            periods=30,
            freq='D'
        )
        df_sst_clean_stack_daily = df_sst_clean_stack_daily.reindex(
            df_sst_clean_stack_daily.index.union(new_dates)
        ).ffill()

        return df_sst_clean_stack_daily

    def process_data(self):
        """
        Process and transform the raw data into a format suitable for modeling.

        Returns:
            Tuple of (daily_data, monthly_data) DataFrames
        """
        if self.data is None:
            raise ValueError("No data available. Run fetch_data() first.")

        # Unpack the raw data
        df_volumen_ener = self.data[0]
        df_apor_ener = self.data[1]
        df_porc_vol_util_ = self.data[2]
        df_genreal = self.data[3]
        df_ONI = self.data[4]

        # Process generation data
        df_genreal_tipo = df_genreal.groupby(['Fecha','TipoGeneracion'])['GeneracionRealEstimada'].sum().reset_index()
        Gigafactor_gene = 1e6  # Conversion factor from kWh to Gwh
        df_genreal_tipo['GeneracionRealEstimada'] = df_genreal_tipo['GeneracionRealEstimada'].div(Gigafactor_gene)

        # Pivot and rename columns
        df_genreal_tipo = df_genreal_tipo.pivot_table(
            columns='TipoGeneracion',
            index='Fecha',
            values='GeneracionRealEstimada',
            fill_value=0
        ).reset_index()
        df_genreal_tipo['Fecha'] = pd.to_datetime(df_genreal_tipo['Fecha'], format='%Y-%m-%d')
        df_genreal_tipo.set_index('Fecha', inplace=True)
        df_genreal_tipo = df_genreal_tipo[['Termica']]
        df_genreal_tipo.rename(columns={'Termica':'TERMICA'}, inplace=True)

        # Process volume data
        df_volumen_ener.rename(columns={'Value':'Volume in Energy'}, inplace=True)
        Gigafactor_volu = 1e6
        df_volumen_ener['Volume in Energy'] = df_volumen_ener['Volume in Energy'].div(Gigafactor_volu)
        df_volumen_ener.drop(columns=['Id'], inplace=True)
        df_volumen_ener.set_index('Date', inplace=True)

        # Process apor energy data
        df_apor_ener.rename(columns={'Value':'AporEnergia'}, inplace=True)
        Gigafactor_apor = 1e6
        df_apor_ener['AporEnergia'] = df_apor_ener['AporEnergia'].div(Gigafactor_apor)
        df_apor_ener.drop(columns=['Id'], inplace=True)
        df_apor_ener.set_index('Date', inplace=True)

        # Process volume utilization data
        df_porc_vol_util_.rename(columns={'Value':'PorcVoluUtilDiario'}, inplace=True)
        df_porc_vol_util_.drop(columns=['Id'], inplace=True)
        df_porc_vol_util_.set_index('Date', inplace=True)

        # Combine all data
        df_thermalgen_cov = pd.concat([
            df_genreal_tipo,
            df_volumen_ener,
            df_ONI,
            df_apor_ener,
            df_porc_vol_util_
        ], axis=1)

        # Create monthly aggregated data
        df_thermalcovariates_monthly = df_thermalgen_cov.resample('M').agg({
            'TERMICA': 'sum',
            'Volume in Energy': 'sum',
            'ONI Anomaly': 'max',
            'AporEnergia': 'sum',
            'PorcVoluUtilDiario': 'mean'
        })

        # Reset indices and rename columns
        df_thermalgen_cov.reset_index(inplace=True)
        df_thermalgen_cov.rename(columns={'index':'Fecha'}, inplace=True)
        df_thermalcovariates_monthly.reset_index(inplace=True)
        df_thermalcovariates_monthly.rename(columns={'index':'Fecha'}, inplace=True)

        self.processed_data = (df_thermalgen_cov, df_thermalcovariates_monthly)
        return self.processed_data

    def preprocess_for_modeling(self, df, target_col='TERMICA'):
        """
        Prepare data for modeling by scaling and renaming columns.

        Args:
            df: Input DataFrame
            target_col: Name of the target column

        Returns:
            Tuple of (processed DataFrame, scaler object)
        """
        # Store original scale info for inverse transform later
        self.y_mean = df[target_col].mean()
        self.y_std = df[target_col].std()

        # Rename columns for Prophet
        df = df.rename(columns={
            'Fecha': 'ds',
            target_col: 'y',
            'Volume in Energy': 'exog2',
            'ONI Anomaly': 'exog3',
            'AporEnergia': 'exog4',
            'PorcVoluUtilDiario': 'exog6',
        })

        # Scale features
        self.scaler = StandardScaler()
        numeric_cols = ['y'] + [f'exog{i}' for i in range(1, 8) if f'exog{i}' in df.columns]
        df[numeric_cols] = self.scaler.fit_transform(df[numeric_cols])

        return df, self.scaler, self.y_mean, self.y_std

    def train_model(self, train_df, seasonality_mode='additive',
                   weekly_seasonality=False, yearly_seasonality=True):
        """
        Train a Prophet model with optional regressors.

        Args:
            train_df: Training DataFrame
            seasonality_mode: 'additive' or 'multiplicative'
            weekly_seasonality: Whether to include weekly seasonality
            yearly_seasonality: Whether to include yearly seasonality

        Returns:
            Trained Prophet model
        """
        regressors = [f'exog{i}' for i in range(1, 8) if f'exog{i}' in train_df.columns]

        self.model = Prophet(
            seasonality_mode=seasonality_mode,
            weekly_seasonality=weekly_seasonality,
            yearly_seasonality=yearly_seasonality
        )

        # Add regressors
        for reg in regressors:
            self.model.add_regressor(reg)

        self.model.fit(train_df)
        return self.model

    def evaluate_model(self, model, test_df):
        """
        Evaluate the model on test data.

        Args:
            model: Trained Prophet model
            test_df: Test DataFrame

        Returns:
            Tuple of (forecast DataFrame, metrics dictionary)
        """
        regressors = [f'exog{i}' for i in range(1, 8) if f'exog{i}' in test_df.columns]

        # Prepare future dataframe with regressors
        future = test_df[['ds'] + regressors].copy()

        # Make predictions
        forecast = model.predict(future)

        # Get actual and predicted values
        y_true = test_df['y'].values
        y_pred = forecast['yhat'].values

        # Calculate metrics
        metrics = {
            'MAE': mean_absolute_error(y_true, y_pred),
            'MSE': mean_squared_error(y_true, y_pred),
            'RMSE': np.sqrt(mean_squared_error(y_true, y_pred))
        }

        return forecast, metrics

    def add_lagged_regressors(self, df, regressor_col, lags):
        """
        Add lagged regressors to the DataFrame.

        Args:
            df: Input DataFrame
            regressor_col: Name of the regressor column
            lags: List of lag values

        Returns:
            Modified DataFrame
        """
        for exog in regressor_col:
          for lag in lags:
            df[f'{exog}_lag_t-{lag}'] = df[exog].shift(lag)
        return df

    def mod_df(self, df, delay, number_of_lags, final_lag):
        delay = 1
        number_of_lags = 3
        final_lag = delay + number_of_lags
        lags = np.arange(delay,final_lag)
        new_column = 'y_exog'
        df[new_column] = df['y']
        exog_lagged = ['exog2', 'exog3', 'exog4', 'exog6', 'y_exog']
        df = self.add_lagged_regressors(df, exog_lagged, lags)
        df.dropna(inplace=True)
        df_noexog = df.drop(columns= exog_lagged)
        return df_noexog


    def rolling_forecast(self, df, horizon=3, train_size=0.7):
        """
        Perform rolling window forecasting.

        Args:
            df: Input DataFrame
            horizon: Number of periods to forecast ahead
            train_size: Proportion of data to use for initial training

        Returns:
            Dictionary containing forecast results
        """
        # Split data into train and test sets
        train_size = int(train_size * len(df))
        train = df.iloc[:train_size]
        test = df.iloc[train_size:]

        # Initialize storage for results
        all_predictions = []
        all_true_values = []
        forecast_dates = []
        horizon_labels = []

        # Iterate through the test set in steps of 'horizon'
        for i in range(0, len(test), horizon):
            # Current training data (all past + up to current test point)
            current_train = pd.concat([train, test.iloc[:i]], axis=0)

            # Test data for the next 'horizon' steps
            current_test = test.iloc[i:i + horizon]
            if current_test.empty:
                break

            # Create Prophet Model
            model = Prophet(
                  yearly_seasonality=True,
                  weekly_seasonality=False,
                  daily_seasonality=False,
                  )
            #model = self.train_model(current_train)

             # Add all exogenous regressors
            regressor_cols = [col for col in current_train.columns if col not in ["ds", "y"]]
            for col in regressor_cols:
              model.add_regressor(col)
            # Fit model
            model.fit(current_train)

            future = model.make_future_dataframe(
                periods=len(current_test),
                freq='M',
                include_history=False
            )

            # Add exogenous variables
            future_regressors = current_test[regressor_cols + ['ds']].copy()
            future = future.merge(future_regressors, on="ds", how="left")

            # Predict
            forecast = model.predict(future)

            # Store results
            all_predictions.extend(forecast["yhat"].values)
            all_true_values.extend(current_test["y"].values)
            forecast_dates.extend(current_test["ds"].values)
            horizon_labels.extend([f"Month {h + 1}" for h in range(len(current_test))])

            # Serializing the last model trained
            #if save_model:
            #  with open(save_model, 'wb') as f:
            #      pickle.dump(model, f)
            #  print(f"Figure saved to {save_model}")

        self.results = {
            "predictions": all_predictions,
            "true_values": all_true_values,
            "forecast_dates": forecast_dates,
            "horizon_labels": horizon_labels,
            "model": model,  # Returns the last trained model
            'test_data': test
        }

        return self.results

    def create_forecast_results_df(self):
        """
        Create a DataFrame from rolling forecast results.

        Returns:
            DataFrame with forecast results
        """
        if self.results is None:
            raise ValueError("No results available. Run rolling_forecast() first.")

        return pd.DataFrame({
            "ds": self.results["forecast_dates"],
            "true": self.results["true_values"],
            "predicted": self.results["predictions"],
            "horizon": self.results["horizon_labels"]
        })

    def calculate_metrics(self, model, results):
        """
        Calculate evaluation metrics for forecast results.

        Args:
            results: DataFrame with forecast results
            model: Optional model for calculating information criteria
            end_date_mape: Optional end date for MAPE calculation

        Returns:
            Dictionary of metrics
        """
        if results.empty:
            print("No predictions were made.")
            return None

        # Rescale values to original scale
        true_rescaled = results["true"] * self.y_std + self.y_mean
        pred_rescaled = results["predicted"] * self.y_std + self.y_mean

        # Calculate basic metrics
        metrics = {
            "rmse": np.sqrt(mean_squared_error(true_rescaled, pred_rescaled)),
            "mae": mean_absolute_error(true_rescaled, pred_rescaled)
        }

        # Calculate MAPE
        #if end_date_mape:
        #    mask = results['ds'] < end_date_mape
        #    true_subset = results["true"].loc[mask]
        #    pred_subset = results["predicted"].loc[mask]
        #    metrics["mape"] = mean_absolute_error(true_subset, pred_subset) / np.abs(true_subset).mean()
        #else:
        metrics["mape"] = np.mean(np.abs((true_rescaled -pred_rescaled) / true_rescaled)) * 100

        # Calculate information criteria if model is provided
        if model:
            residuals = results["true"] - results["predicted"]
            n = len(residuals)
            ssr = np.sum(residuals**2)
            sigma2 = ssr / n  # MLE Estimator Variance

            # Log-likelihood (normal distribution)
            llf = -n/2 * np.log(2*np.pi) - n/2 * np.log(sigma2) - ssr/(2*sigma2)

            # Count parameters
            k = (len(model.params['k']) +        # growth parameters
                 len(model.params['m']) +        # offset parameters
                 len(model.params['sigma_obs']) + # observation noise
                 (len(model.seasonalities) * 2) + # fourier terms for each seasonality
                 len(model.extra_regressors))

            metrics["aic"] = -2 * llf + 2 * k
            metrics["bic"] = -2 * llf + k * np.log(n)

        return metrics

    def visualize_metrics(self, metrics, figsize=(8, 3), title="Model Performance Metrics"):
        """
        Visualize evaluation metrics in a table.

        Args:
            metrics: Dictionary of metrics
            figsize: Figure size
            title: Plot title

        Returns:
            matplotlib Figure object
        """
        if not metrics:
            return None, None

        # Create visualization
        fig, ax = plt.subplots(figsize=figsize)
        ax.axis('off')
        ax.axis('tight')

        # Prepare table data
        metric_names = []
        metric_values = []
        metric_formats = {
            'rmse': '{:,.2f}',
            'mae': '{:,.2f}',
            'mape': '{:.2f}%',
            'aic': '{:,.1f}',
            'bic': '{:,.1f}'
        }

        for metric, value in metrics.items():
            metric_names.append(metric.upper())
            if metric in metric_formats:
                metric_values.append(metric_formats[metric].format(value))
            else:
                metric_values.append(f"{value:.4f}")

        # Create table
        table = ax.table(
            cellText=list(zip(metric_names, metric_values)),
            colLabels=['Métrica de Desempeño', 'Valor'],
            loc='center',
            cellLoc='center'
        )

        # Style table
        table.auto_set_font_size(False)
        table.set_fontsize(12)
        table.scale(1, 1.5)

        # Header styling
        for j in range(2):
            table[0, j].set_facecolor('#40466e')
            table[0, j].set_text_props(color='white', weight='bold')

        # Alternate row colors
        for i in range(len(metrics)):
            color = '#f7f7f7' if i % 2 == 0 else 'white'
            for j in range(2):
                table[i+1, j].set_facecolor(color)

        # Add title
        ax.set_title(title, fontsize=14, pad=20, weight='bold')
        plt.tight_layout()

        return fig

    def plot_forecast_comparison(self, results, test_data, horizon=3, title=None):
        """
        Plot forecast vs actual values.

        Args:
            results: Forecast results DataFrame
            test_data: Test data DataFrame
            horizon: Forecast horizon
            title: Plot title

        Returns:
            plotly Figure object
        """
        if title is None:
            title = f"Multi-Step Forecast ({horizon}-Month Horizon) Thermal Generation"

        # Color mapping for different horizons
        color_map = {
            f"Month {h}": {
                "line": color,
                "fill": f"rgba({rgb},{0.2})"
            }
            for h, (color, rgb) in enumerate(zip(
                ["green", "orange", "red"],
                ["0,255,0", "255,165,0", "255,0,0"]
            ), 1)
        }

        fig = go.Figure()

        # Add true values line
        fig.add_trace(go.Scatter(
            x=test_data["ds"],
            y=self.rescale_values(test_data["y"]),
            mode="lines",
            name="Valores Medidos - Generación Eléctrica tipo Térmica",
            line=dict(color="blue", width=2)
        ))

        # Add predictions for each horizon
        for horizon_label in [f"Month {h}" for h in range(1, horizon + 1)]:
            horizon_data = results[results["horizon"] == horizon_label]
            fig.add_trace(go.Scatter(
                x=horizon_data["ds"],
                y=self.rescale_values(horizon_data["predicted"]),
                mode="markers",
                name=horizon_label,
                marker=dict(color=color_map[horizon_label]['line'], size=8),
                hovertemplate=(
                    "<b>Date</b>: %{x}<br>"
                    "<b>Predicted</b>: %{y:.2f}<br>"
                    f"<b>Horizon</b>: {horizon_label}"
                )
            ))

        # Add vertical lines for forecast origins
        for i in range(0, len(test_data), horizon):
            fig.add_vline(
                x=test_data["ds"].iloc[i],
                line_width=1,
                line_dash="dash",
                line_color="gray"
            )

        # Update layout
        fig.update_layout(
            title=title,
            xaxis_title="Date",
            yaxis_title="Energy [GWh]",
            hovermode="x unified",
            legend_title="Legend",
            template="plotly_white"
        )

        return fig

    def rescale_values(self, series):
        """Rescale standardized values back to original scale."""
        return series * self.y_std + self.y_mean

    def make_prediction(self, start_date = None):
        if start_date is None:
          prediction_date  = dt.now()
          prediction_date = prediction_date.strftime("%Y-%m-%d")
        else:
          prediction_date = start_date
          prediction_date = pd.to_datetime(prediction_date)
          prediction_date = prediction_date.strftime("%Y-%m-%d")

        #self.model = self.load_serialized_model('serialized_prophet')
        raw_data = self.fetch_data(list_date, metric_Id_list, metric_Id_list_tar, entity_cov, entity_tar)
        _, monthly_data = self.process_data()
        monthly = self.load_and_concatenate('monthly_data.parquet', monthly_data, 'Fecha', True)
        modeling_data, _ , _ , _ = self.preprocess_for_modeling(monthly)
        delay = 1
        number_of_lags = 3
        final_lag = delay + number_of_lags
        mod_modeling_data = self.mod_df(modeling_data, delay,number_of_lags, final_lag)
        # Generate out-of-sample forecast
        forecast_data, _ = self.generate_out_of_sample_forecast(self.model, mod_modeling_data, prediction_date)
        return forecast_data, monthly, modeling_data

    def forecast_data_rescaled(self,forecast_data, columns, y_mean, y_std):
        forecast_data_rescale = forecast_data.copy()
        for col in columns:
          forecast_data_rescale[col] = forecast_data[col] * y_std + y_mean
        return forecast_data_rescale

    def save_forecast_data(self, forecast_data, columns, output_file):
        """
        Save forecast data to a file.

        Args:
            forecast_data: Forecast DataFrame
            file_path: Path to save file
        """
        #columns_to_save = ['ds', 'trend', 'yhat_lower', 'yhat_upper']
        columns_to_save = columns
        selected_data = forecast_data[columns_to_save]
        #data_dict = selected_data.to_dict(orient='records')
        output_path = Path(output_file)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        #forecast_data.to_csv(file_path, index=False)
        try:
            selected_data.to_csv(
                output_path,
                index= True,
                date_format="%Y-%m-%d"
            )
        except IOError as e:
            raise IOError(f"Failed to write CSV file: {e}")

    def generate_out_of_sample_forecast(self, model, test_df, date_backtesting, future_periods=3):
        """
        Generate out-of-sample forecast with exogenous variables.

        Args:
            test_df: Test DataFrame
            future_periods: Number of periods to forecast
            current_date: Current for backtesting

        Returns:
            Tuple of (forecast DataFrame, exogenous data DataFrame)
        """
        date_backtesting = pd.to_datetime(date_backtesting)
        # Get last date from historical data
        if test_df['ds'].iloc[-1] < date_backtesting:
            #last_rows_set = test_df.iloc[-2:]
            last_date = test_df['ds'].iloc[-1]


            # Create block matrix for exogenous variables
            row_input_list = np.array(test_df.iloc[-1][2:]).reshape(5,3)
            result_matrix = self._create_block_matrix(row_input_list, num_blocks=5)

            # Create future exogenous DataFrame
            dates = pd.date_range(start=last_date, periods=3, freq='M')
            df_future_exog = pd.DataFrame(
                result_matrix,
                columns=test_df.columns[2:],  # Assuming exog columns start from index 2
                index=dates
            )

            # Interpolate missing values
            df_future_exog_interpolate = df_future_exog.interpolate(method='linear', limit_direction='forward')
            df_future_exog_interpolate.reset_index(inplace=True)
            df_future_exog_interpolate.rename(columns={'index': 'ds'}, inplace=True)
            forecast = model.predict(df_future_exog_interpolate)
        else:
            target_date = date_backtesting
            idx_pos = test_df[test_df['ds'] == target_date].index[0]
            test_df = test_df.drop(columns= 'y')
            df_future_exog_interpolate = test_df.iloc[idx_pos-3:idx_pos]

            #previous_3_rows = test_df.iloc[idx_pos-3:idx_pos]
            #df_future_exog_interpolate = test_df[test_df['ds'] > target_date].tail(3)
            forecast = model.predict(df_future_exog_interpolate)


        return forecast, df_future_exog_interpolate

    def _create_block_matrix(self, row_list, num_blocks=5):
        """
        Creates a 3x(3*num_blocks) matrix by concatenating Toeplitz blocks.
        """
        if num_blocks > len(row_list):
            print(f"Warning: num_blocks ({num_blocks}) exceeds the number of provided row lists ({len(row_list)}). Using {len(row_list)} blocks.")
            num_blocks = len(row_list)

        # Create first block
        first_row = row_list[0]
        toeplitz_block = self._create_toeplitz(first_row)
        block_size = len(first_row)
        mask = np.tri(block_size, k=-1, dtype=bool)  # Lower triangular (excluding diagonal)
        upper_block = np.where(~mask, toeplitz_block, np.nan)

        # Concatenate blocks horizontally
        full_matrix = upper_block.copy()
        for i in range(1,num_blocks):
            current_row = row_list[i]
            if len(current_row) != block_size:
                raise ValueError(f"Row list at index {i} has inconsistent length. Expected {block_size}, got {len(current_row)}.")
            new_block = self._create_toeplitz(current_row)
            new_block = np.where(~mask, new_block, np.nan)
            full_matrix = np.hstack((full_matrix, new_block))

        return full_matrix

    def _create_toeplitz(self, row_list):
        """
        Constructs a Toeplitz matrix from the given list representing the first row.
        """
        num_cols = len(row_list)
        num_rows = num_cols  # For a square Toeplitz matrix
        toeplitz_matrix = np.zeros((num_rows, num_cols), dtype=row_list[0].__class__)

        for i in range(num_rows):
            for j in range(num_cols):
                if j-i >= 0:
                    toeplitz_matrix[i, j] = row_list[j-i]
                elif i-j < num_cols:
                    toeplitz_matrix[i, j] = row_list[i-j]
                else:
                    toeplitz_matrix[i, j] = np.nan
        return toeplitz_matrix

    def _predict_out_of_sample(self, model, last_train_date, exogenous_data, future_periods, freq):
        """
        Generate out-of-sample forecasts using a trained Prophet model.
        """
        # Create future dataframe
        future = model.make_future_dataframe(
            periods=future_periods,
            freq=freq,
            include_history=False
        )

        # Merge exogenous data
        future = future.merge(exogenous_data, on='ds', how='left')

        # Generate forecast
        forecast = model.predict(future)

        return forecast

    def plot_time_series_with_forecast(self, test_data, forecast_data,
                                     actual_series_name="Generación Térmica SIMEM",
                                     forecast_series_name="Predicción Fuera de la Muestra",
                                     confidence_band_name="Intervalo de Confianza",
                                     title="Serie de Tiempo para Generación Térmica Eléctrica + Predicción Fuera de la Muestra",
                                     xaxis_title="Fecha",
                                     yaxis_title="Energía [GWh]",
                                     confidence_band_color="rgba(0,100,80,0.2)",
                                     font_family="Times New Roman, serif",
                                     font_size=25,
                                     template="plotly_white",
                                     forecast_line_style="dot"
                                    ):
        """
        Plot time series data with forecast and confidence interval.
        """
        fig = go.Figure()

        # Add actual values trace
        fig.add_trace(go.Scatter(
            x=test_data['ds'].iloc[:-1],
            y=self.rescale_values(test_data['y'].iloc[:-1]),
            mode='lines+markers',
            line=dict(color='black', width=3),
            name=actual_series_name
        ))

        # Add forecast trace
        fig.add_trace(go.Scatter(
            x=forecast_data['ds'],
            y=self.rescale_values(forecast_data['yhat']),
            mode='lines+markers',
            name=forecast_series_name,
            line=dict(dash=forecast_line_style)
        ))

        # Add confidence interval band
        fig.add_trace(go.Scatter(
            x=np.concatenate([forecast_data['ds'], forecast_data['ds'][::-1]]),
            y=np.concatenate([
                self.rescale_values(forecast_data['yhat_upper']),
                self.rescale_values(forecast_data['yhat_lower'][::-1])
            ]),
            fill='toself',
            fillcolor=confidence_band_color,
            line=dict(color='rgba(255, 255, 255, 0)'),
            name=confidence_band_name
        ))

        # Update layout
        fig.update_layout(
            title=title,
            xaxis_title=xaxis_title,
            yaxis_title=yaxis_title,
            font=dict(family=font_family, size=font_size),
            template=template
        )
        return fig

    def save_data(self, df, file_path):
        """
        Save a DataFrame to NPZ file.

        Args:
            df: DataFrame to save
            file_path: Path to save file
        """
        np.savez(
            file_path,
            index=df.index.values,
            **{col: df[col].values for col in df.columns}
        )

    def load_data(self, file_path):
        """
        Load a DataFrame from NPZ file.

        Args:
            file_path: Path to NPZ file

        Returns:
            Loaded DataFrame
        """
        with np.load(file_path, allow_pickle=True) as data:
            df = pd.DataFrame(
                {col: data[col] for col in data.files if col != 'index'},
                index=data['index'] if 'index' in data.files else None
            )
        return df

    def generate_descriptive_stats(self, data, periods):
        """
        Generate descriptive statistics for specified time periods

        Args:
            data: pandas DataFrame with datetime index
            periods: list of tuples with (period_name, years)

        Returns:
            Dictionary of DataFrames with descriptive statistics
        """
        #data = data.set_index('Fecha',inplace=True)
        results = {}
        #current_date = data.index.max()  # Most recent date in data
        current_date = data.index.max()

        for period_name, years in periods:
            # Calculate cutoff date
            cutoff = current_date - pd.DateOffset(years=years)

            # Filter data for the period
            period_data = data.loc[cutoff:current_date]

            # Generate descriptive stats
            stats = period_data.describe(percentiles=[.25, .5, .75])

            # Add additional statistics if needed
            stats.loc['skew'] = period_data.skew()
            stats.loc['kurtosis'] = period_data.kurtosis()
            stats.loc['count_null'] = period_data.isnull().sum()

            results[f"{period_name} ({years} year)"] = stats

        return results

    def save_to_parquet_by_date_range( self, df, output_path, start_date, end_date):
        self.df = df.copy()
        df_filtered = self.df.copy()
        if start_date:
            start_dt = start_date
            df_filtered = df_filtered[df_filtered['Fecha'] >= start_dt]

        if end_date:
            end_dt = pd.to_datetime(end_date)
            df_filtered = df_filtered[df_filtered['Fecha'] <= end_dt]

        # Save to Parquet
        df_filtered.to_parquet(
            output_path,
            #partition_cols=partition_cols,
        )
        print(f"Successfully saved {len(df_filtered)} records to {output_path}")


    def load_and_concatenate( self, full_path, next_df , date_column, ensure_continuity):

        self.date_column = date_column

        # Construct full path if base_path is set
        #full_path = file_path if self.base_path is None else f"{self.base_path}/{file_path}"

        # Load the Parquet file
        current_df = pd.read_parquet(full_path)

        # Ensure date column exists and is datetime
        if date_column not in current_df.columns:
            raise ValueError(f"Date column '{date_column}' not found in Parquet file")

        current_df[date_column] = pd.to_datetime(current_df[date_column])

        # If no next_df provided, just return the loaded DataFrame
        if next_df is None:
            self.df = current_df
            return current_df

        # Validate next_df structure matches current_df
        if set(current_df.columns) != set(next_df.columns):
            raise ValueError("DataFrames must have identical column structures")

        # Ensure date column in next_df is datetime
        #next_df[date_column] = pd.to_datetime(next_df[date_column])

        # Check for datetime continuity if required
        if ensure_continuity:
            current_max_date = current_df[date_column].max()
            next_min_date = next_df[date_column].min()

            if next_min_date <= current_max_date:
                raise ValueError(
                    f"Date continuity violation: Next DataFrame starts at {next_min_date} "
                    f"which is before/equal to current DataFrame's max date {current_max_date}"
                )

        # Concatenate while preserving original column order
        concatenated = pd.concat(
            [current_df, next_df],
            axis=0,
            ignore_index=True
        )

        # Restore original column order
        concatenated = concatenated[current_df.columns]

        # Update instance DataFrame
        self.df = concatenated

        return concatenated

    @staticmethod
    def load_serialized_figure(filepath):
        """Load a pickled Plotly figure"""
        with open(filepath, 'rb') as f:
            fig = pickle.load(f)
        return fig
    @staticmethod
    def load_serialized_model(filepath):
        """Load a pickled Prophet model"""
        with open(filepath, 'rb') as f:
            model = pickle.load(f)
        return model
    @staticmethod
    def load_json_metrics (filepath):
      df = pd.read_json(filepath, orient='columns')
      return df

In [9]:
# Initialize pipeline
pipeline = ThermalGenerationPipeline()

# Fetch data - from the API
list_date = [dt.fromisoformat('2025-02-01'), dt.fromisoformat('2025-05-31')]
metric_Id_list = ['VoluUtilDiarEner','AporEner','PorcVoluUtilDiar']
metric_Id_list_tar = ['Gene']
entity_cov = 'Sistema'
entity_tar = 'Recurso'


# Make the forecast - Preprocess-Concatenate
#date_backtesting = '2015-02-28'
#out_of_sample_forecast, monthly_data, test_df = pipeline.make_prediction(start_date = date_backtesting)
out_of_sample_forecast, monthly_data, test_df = pipeline.make_prediction()

# Rescale the forecast according to the  standarization method
forecast_data_rescaled = pipeline.forecast_data_rescaled(out_of_sample_forecast,['yhat', 'yhat_lower', 'yhat_upper'],
                                                         pipeline.y_mean, pipeline.y_std)

# Save the forecast and the measured data into a csv file
pipeline.save_forecast_data(forecast_data_rescaled,['ds', 'yhat', 'yhat_lower', 'yhat_upper'], 'forecast_data.csv')
pipeline.save_forecast_data(monthly_data,monthly_data.columns,'measured_data.csv')


# Load the test set metrics
metrics = pipeline.load_json_metrics('metrics_test.json')

# Plot the forecast
forecast_fig = pipeline.plot_time_series_with_forecast(
    test_df,
    out_of_sample_forecast,
)


# Periods to analyze 1-year, 2-year, 5-year.
periods_to_analyze = [
    ('1 año', 1),
    ('2 años', 2),
    ('5 años', 5)
]

# Generate the reports
monthly_data.set_index('Fecha',inplace=True)
statistical_reports = pipeline.generate_descriptive_stats(monthly_data, periods_to_analyze)

# Access the reports
for period_name, report in statistical_reports.items():
    print(f"\n{period_name.upper()} Estadística Descriptiva de la Variables del Modelo")
    print(report)
    print("\n" + "="*50 + "\n")

****************************************************************************************************
Initializing object
The object has been initialized with the dataset: "Generación Real y Programada en las Plantas de Generación"
****************************************************************************************************
Inicio consulta sincronica
Creacion url: 0.001445770263671875
Extraccion de registros: 13.153762340545654
End of data extracting process
****************************************************************************************************

1 AÑO (1 YEAR) Estadística Descriptiva de la Variables del Modelo
                TERMICA  Volume in Energy  ONI Anomaly   AporEnergia  \
count         13.000000         13.000000    13.000000     13.000000   
mean        1483.809031     297323.423077    -0.131538   6783.620246   
std          672.307479      43688.448805     0.212165   2756.561922   
min          717.301294     204915.420900    -0.390000   3701.567300   
25%

In [10]:
forecast_fig.show()