In [None]:
from pathlib import Path

import matplotlib.pyplot as plt
import pandas as pd

import sys

In [None]:
"""Data Loader.

The functions in this script perform data load and a time series split.

Usage:
    Either run the whole pipeline (see src/main.py) or
    import the functions.
"""


from pathlib import Path
from typing import List, Tuple

import numpy as np
import pandas as pd


def load_data(path_to_file: Path) -> pd.DataFrame:
    """Loads the main data file from csv to a pandas dataframe.

    Parameters
    -------
    path_to_file : Path
                Path to main csv.

    Returns
    -------
    df : pd.DataFrame
            Data as a dataframe.
    """
    df = pd.read_csv(path_to_file)

    # convert time from string to datetime and set it as index
    df.index = pd.to_datetime(df["time"])
    df = df.drop(columns="time")

    return df


def time_split(
    df: pd.DataFrame, n_folds: int = 6, test_size: int = 9
) -> List[Tuple[np.ndarray[int], np.ndarray[int]]]:
    """Creates an extending time series split for data.

    Parameters
    -------
    df : pd.DataFrame
        Data as a dataframe.
    n_folds : int, optional
        Number of time series folds, default is 6.
    test_size : int, optional
        Number of rows in one test test, default is 9.

    Returns
    -------
    all_splits : List[Tuple[np.ndarray[int], np.ndarray[int]]]
                Splits of train and test indices per fold.
    """
    all_splits = []
    split_index = len(df) - n_folds * test_size
    train_ids = np.arange(0, split_index)

    for _ in range(1, n_folds + 1):
        test_ids = np.arange(split_index, split_index + test_size)

        all_splits.append((train_ids, test_ids))
        train_ids = np.append(train_ids, test_ids)

        split_index += test_size

    return all_splits

In [None]:
DATA_DIR = (
    Path("..")
    / ".."
    / "hfactory_magic_folders"
    / "plastic_cost_prediction"
    / "data"
)
MAIN_FILE = "PA6_cleaned_dataset.csv"

In [None]:
df = load_data(DATA_DIR / MAIN_FILE)
df

In [None]:
def time_split(
    df: pd.DataFrame, n_folds: int = 6, test_size: int = 9
) -> List[Tuple[np.ndarray[int], np.ndarray[int]]]:
    """Creates an extending time series split for data.

    Parameters
    -------
    df : pd.DataFrame
        Data as a dataframe.
    n_folds : int, optional
        Number of time series folds, default is 6.
    test_size : int, optional
        Number of rows in one test test, default is 9.

    Returns
    -------
    all_splits : List[Tuple[np.ndarray[int], np.ndarray[int]]]
                Splits of train and test indices per fold.
    """
    all_splits = []
    split_index = len(df) - n_folds * test_size
    train_ids = np.arange(0, split_index)

    for _ in range(1, n_folds + 1):
        test_ids = np.arange(split_index, split_index + test_size)

        all_splits.append((train_ids, test_ids))
        train_ids = np.append(train_ids, test_ids)

        split_index += test_size

    return all_splits

### check for stationarity

In [None]:
from statsmodels.tsa.stattools import adfuller

def adf_test(df):
    non_stationary_columns = []

    for column in df.columns:
        print(f'\nAugmented Dickey-Fuller Test for Column: {column}')
        result = adfuller(df[column].dropna(), autolag='AIC')
        labels = ['ADF test statistic', 'p-value', '# lags used', '# observations']
        out = pd.Series(result[0:4], index=labels)
        for key, val in result[4].items():
            out[f'critical value ({key})'] = val
        print(out.to_string())

        if result[1] <= 0.05:
            print("Strong evidence against the null hypothesis")
            print("Reject the null hypothesis")
            print("Data has no unit root and is stationary")
        else:
            non_stationary_columns.append(column)
            print("Weak evidence against the null hypothesis")
            print("Fail to reject the null hypothesis")
            print("Data has a unit root and is non-stationary")

    if not non_stationary_columns:
        print("\nAll columns are stationary.")
    else:
        print(f"\nNon-stationary columns: {', '.join(non_stationary_columns)}")


In [None]:
df_1diff = df.diff()
adf_test(df_1diff)

In [None]:
df_2diff = df_1diff.diff()
adf_test(df_2diff)

In [None]:
df_3diff = df_2diff.diff()
adf_test(df_3diff)

In [None]:
df_3diff.dropna(inplace=True)
df_3diff


In [None]:
spl = time_split(df_3diff)

for train_idx, test_idx in spl:
    train = df_3diff.iloc[train_idx]
    test = df_3diff.iloc[test_idx]

for i in [1,2,3,4,5,6,7,8,9,10]:
    model = VAR(train)
    results = model.fit(i)
    print('Order =', i)
    print('AIC: ', results.aic)
    print('BIC: ', results.bic)
    print()


In [None]:
model_fit = model.fit(5)
model_fit.summary()

### Durbin Watson’s Statistic

In [None]:
from statsmodels.stats.stattools import durbin_watson
out = durbin_watson(model_fit.resid)

for col, val in zip(df.columns, out):
    print(col, ':', round(val, 2))

### forecasting

In [None]:
lag_order = model_fit.k_ar
print(lag_order)

In [None]:
forecast_input = df_difference_difference_difference.values[-lag_order:]
forecast_input

In [None]:
fc = model_fit.forecast(y=forecast_input, steps=nobs) # nobs defined at top of program
df_forecast = pd.DataFrame(fc, index=df.index[-nobs:], columns=df.columns + '_3d')
df_forecast

In [None]:
def invert_transformation(df_train, df_forecast):
    df_fc = df_forecast.copy()
    columns = df_train.columns

    for col in columns:
        # Roll back 3rd Diff
        df_fc[str(col) + '_2d'] = df_train[col].iloc[-1] - df_train[col].iloc[-2] + df_fc[str(col) + '_3d'].cumsum()
        df_fc[str(col) + '_1d'] = df_train[col].iloc[-2] - df_train[col].iloc[-3] + df_fc[str(col) + '_2d'].cumsum()
        df_fc[str(col) + '_forecast'] = df_train[col].iloc[-3] + df_fc[str(col) + '_1d'].cumsum()

    return df_fc

In [None]:
df_results = invert_transformation(train, df_forecast)

In [None]:
df_results

In [None]:
fig, axes = plt.subplots(nrows=int(len(df.columns)/2), ncols=2, dpi=150, figsize=(20,20))
for i, (col,ax) in enumerate(zip(df.columns, axes.flatten())):
    df_results[col+'_forecast'].plot(legend=True, ax=ax).autoscale(axis='x',tight=True)
    test[col][-nobs:].plot(legend=True, ax=ax);
    ax.set_title(col + ": Forecast vs Actuals")
    ax.xaxis.set_ticks_position('none')
    ax.yaxis.set_ticks_position('none')
    ax.spines["top"].set_alpha(0)
    ax.tick_params(labelsize=6)

plt.tight_layout();

In [None]:
first_try_df = df.loc[:, test_df.columns != "iNATGAS"]
first_try_df.dropna(inplace=True)
first_try_df

In [None]:
import numpy as np
import pandas as pd
from statsmodels.tsa.statespace.varmax import VARMAX
from sklearn.model_selection import ParameterGrid


def varmax_grid_search(train_df, order_range, seasonal_order_range):
    """
    Perform a grid search to find the best parameters for VARMAX model based on AIC.

    Parameters:
    - train_df: pd.DataFrame, the training data
    - order_range: tuple, range of values for the non-seasonal order parameter (p, q)
    - seasonal_order_range: tuple, range of values for the seasonal order parameter (P, D, Q, s)

    Returns:
    - best_params: dict, the best parameters found during the grid search
    """

    # Create a grid of parameter combinations
    param_grid = {
        "order": [
            (p, q)
            for p in range(order_range[0], order_range[1] + 1)
            for q in range(order_range[0], order_range[1] + 1)
        ],
        "seasonal_order": [
            (P, D, Q, s)
            for P in range(
                seasonal_order_range[0], seasonal_order_range[1] + 1
            )
            for D in range(
                seasonal_order_range[0], seasonal_order_range[1] + 1
            )
            for Q in range(
                seasonal_order_range[0], seasonal_order_range[1] + 1
            )
            for s in range(1, 13)
        ],
    }

    best_aic = np.inf
    best_params = None

    for params in ParameterGrid(param_grid):
        # Create VARMAX model with current parameters
        model = VARMAX(
            train_df,
            order=params["order"],
            seasonal_order=params["seasonal_order"],
            trend="c",
        )
        try:
            # Fit the model
            model_fitted = model.fit(disp=False)

            # Calculate AIC
            current_aic = model_fitted.aic

            # Update best parameters if the current AIC is lower
            if current_aic < best_aic:
                best_aic = current_aic
                best_params = params

        except Exception as e:
            print(f"Error fitting model with parameters {params}: {e}")

    return best_params

In [None]:
order_range = (1, 3)
seasonal_order_range = (1, 3)

best_params = varmax_grid_search(
    first_try_df, order_range, seasonal_order_range
)
print("Best Parameters:", best_params)