#### **Project directory and important variables**

In [1]:
path_to_directory = "C:/Users/tiago/Desktop/DS/data_science_project"

file_tag: str = "forecast_ny_arrests"
filename: str = f"{path_to_directory}/datasets/{file_tag}.csv"
target: str = "Manhattan"
timecol: str = "Date"

# **Forecasting**
## **Training**

In [None]:
from pandas import Series
from math import sqrt
from dslabs_functions import plot_multibar_chart, FORECAST_MEASURES, HEIGHT, Axes, subplots, set_chart_labels, BarContainer, ndarray, FONT_TEXT
from numpy import arange


def series_train_test_split(data: Series, trn_pct: float = 0.90) -> tuple[Series, Series]:
    trn_size: int = int(len(data) * trn_pct)
    df_cp: Series = data.copy()
    train: Series = df_cp.iloc[:trn_size, -1]
    test: Series = df_cp.iloc[trn_size:, -1]
    return train, test

def plot_multibar_chart(
    group_labels: list,
    yvalues: dict,
    ax: Axes = None,  # type: ignore
    title: str = "",
    xlabel: str = "",
    ylabel: str = "",
    percentage: bool = False,
) -> Axes | list[Axes]:
    if ax is None:
        ax = gca()
    ax = set_chart_labels(ax=ax, title=title, xlabel=xlabel, ylabel=ylabel)
    if percentage:
        ax.set_ylim(0.0, 1.0)
    bar_labels: list = list(yvalues.keys())

    # This is the location for each bar
    index: ndarray = arange(len(group_labels))
    bar_width: float = 0.8 / len(bar_labels)
    ax.set_xticks(index + bar_width / 2, labels=group_labels)

    for i in range(len(bar_labels)):
        bar_yvalues = yvalues[bar_labels[i]]
        values: BarContainer = ax.bar(
            index + i * bar_width,
            bar_yvalues,
            width=bar_width,
            label=bar_labels[i],
        )
        format = "%.2f" if percentage else "%.2f"  # Updated format for better precision
        ax.bar_label(values, fmt=format, fontproperties=FONT_TEXT)
        if any(y < 0 for y in bar_yvalues) and percentage:
            ax.set_ylim(-1.0, 1.0)
    ax.legend(fontsize="xx-small")

    return ax

def plot_forecasting_eval(trn: Series, tst: Series, prd_trn: Series, prd_tst: Series, title: str = "") -> list[Axes]:
    ev1: dict = {
        "RMSE": [sqrt(FORECAST_MEASURES["MSE"](trn, prd_trn)), sqrt(FORECAST_MEASURES["MSE"](tst, prd_tst))],
        "MAE": [FORECAST_MEASURES["MAE"](trn, prd_trn), FORECAST_MEASURES["MAE"](tst, prd_tst)],
    }
    ev2: dict = {
        "MAPE": [FORECAST_MEASURES["MAPE"](trn, prd_trn), FORECAST_MEASURES["MAPE"](tst, prd_tst)],
        "R2": [FORECAST_MEASURES["R2"](trn, prd_trn), FORECAST_MEASURES["R2"](tst, prd_tst)],
    }

    # print(eval1, eval2)
    fig, axs = subplots(1, 2, figsize=(1.5 * HEIGHT, 0.75 * HEIGHT), squeeze=True)
    fig.suptitle(title)
    plot_multibar_chart(["train", "test"], ev1, ax=axs[0], title="Scale-dependent error", percentage=False)
    plot_multibar_chart(["train", "test"], ev2, ax=axs[1], title="Percentage error", percentage=True)

    return axs

## **Simple Average**

In [None]:
from sklearn.base import RegressorMixin
from matplotlib.pyplot import savefig


class SimpleAvgRegressor(RegressorMixin):
    def __init__(self):
        super().__init__()
        self.mean: float = 0.0
        return

    def fit(self, X: Series):
        self.mean = X.mean()
        return

    def predict(self, X: Series) -> Series:
        prd: list = len(X) * [self.mean]
        prd_series: Series = Series(prd)
        prd_series.index = X.index
        return prd_series
    

from pandas import read_csv, DataFrame, Series

data: DataFrame = read_csv(filename, index_col=timecol, sep=";", decimal=".", parse_dates=True, infer_datetime_format=True)
series: Series = data[target]

train, test = series_train_test_split(data, trn_pct=0.90)

fr_mod = SimpleAvgRegressor()
fr_mod.fit(train)
prd_trn: Series = fr_mod.predict(train)
prd_tst: Series = fr_mod.predict(test)

plot_forecasting_eval(train, test, prd_trn, prd_tst, title=f"{file_tag} - Simple Average")
savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with Simple Average model.png", bbox_inches='tight')

In [None]:
from dslabs_functions import PAST_COLOR, FUTURE_COLOR, PRED_FUTURE_COLOR

def plot_forecasting_series(
    trn: Series,
    tst: Series,
    prd_tst: Series,
    title: str = "",
    xlabel: str = "time",
    ylabel: str = "",
) -> list[Axes]:
    fig, ax = subplots(1, 1, squeeze=True)
    fig.suptitle(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    ax.plot(trn.index, trn.values, label="train", color=PAST_COLOR)
    ax.plot(tst.index, tst.values, label="test", color=FUTURE_COLOR)
    ax.plot(prd_tst.index, prd_tst.values, "--", label="test prediction", color=PRED_FUTURE_COLOR)
    ax.legend(prop={"size": 5})

    return ax


plot_forecasting_series(
    train,
    test,
    prd_tst,
    title=f"{file_tag} - Simple Average",
    xlabel=timecol,
    ylabel=target,
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with Simple Average model.png", bbox_inches='tight')

## **Persistence Model**
### Long Term

In [None]:
from pandas import Series
from sklearn.base import RegressorMixin


class PersistenceOptimistRegressor(RegressorMixin):
    def __init__(self):
        super().__init__()
        self.last: float = 0.0
        return

    def fit(self, X: Series):
        self.last = X.iloc[-1]
        # print(self.last)
        return

    def predict(self, X: Series):
        prd: list = X.shift().values.ravel()
        prd[0] = self.last
        prd_series: Series = Series(prd)
        prd_series.index = X.index
        return prd_series
    
from pandas import read_csv, DataFrame, Series
from matplotlib.pyplot import savefig

data: DataFrame = read_csv(filename, index_col=timecol, sep=";", decimal=".", parse_dates=True, infer_datetime_format=True)
series: Series = data[target]

train, test = series_train_test_split(data, trn_pct=0.90)

fr_mod = PersistenceOptimistRegressor()
fr_mod.fit(train)
prd_trn: Series = fr_mod.predict(train)
prd_tst: Series = fr_mod.predict(test)

plot_forecasting_eval(train, test, prd_trn, prd_tst, title=f"{file_tag} - Persistence Optimist")
savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with Persistence model (long term).png", bbox_inches='tight')

In [None]:
plot_forecasting_series(
    train,
    test,
    prd_tst,
    title=f"{file_tag} - Persistence Optimist",
    xlabel=timecol,
    ylabel=target,
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with Persistence model (long term).png", bbox_inches='tight')

### One-set-behind

In [None]:
class PersistenceRealistRegressor(RegressorMixin):
    def __init__(self):
        super().__init__()
        self.last = 0
        self.estimations = [0]
        self.obs_len = 0

    def fit(self, X: Series):
        for i in range(1, len(X)):
            self.estimations.append(X.iloc[i - 1])
        self.obs_len = len(self.estimations)
        self.last = X.iloc[len(X) - 1]
        prd_series: Series = Series(self.estimations)
        prd_series.index = X.index
        return prd_series

    def predict(self, X: Series):
        prd: list = len(X) * [self.last]
        prd_series: Series = Series(prd)
        prd_series.index = X.index
        return prd_series
    
fr_mod = PersistenceRealistRegressor()
fr_mod.fit(train)
prd_trn: Series = fr_mod.predict(train)
prd_tst: Series = fr_mod.predict(test)

plot_forecasting_eval(train, test, prd_trn, prd_tst, title=f"{file_tag} - Persistence Realist")
savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with Persistence model (one-set-behind).png", bbox_inches='tight')

In [None]:
plot_forecasting_series(
    train,
    test,
    prd_tst,
    title=f"{file_tag} - Persistence Realist",
    xlabel=timecol,
    ylabel=target,
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with Persistence model (one-set-behind).png", bbox_inches='tight')

## **Exponential Smoothing**

In [None]:
from pandas import read_csv, DataFrame, Series
from statsmodels.tsa.holtwinters import SimpleExpSmoothing
from dslabs_functions import FORECAST_MEASURES, DELTA_IMPROVE, set_chart_xticks, LINE_COLOR, FILL_COLOR, std, gca

def plot_line_chart(
    xvalues: list,
    yvalues: list,
    ax: Axes = None,  # type: ignore
    title: str = "",
    xlabel: str = "",
    ylabel: str = "",
    name: str = "",
    percentage: bool = False,
    show_stdev: bool = False,
) -> Axes:
    if ax is None:
        ax = gca()
    ax = set_chart_labels(ax=ax, title=title, xlabel=xlabel, ylabel=ylabel)
    ax = set_chart_xticks(xvalues, ax, percentage=percentage)
    if any(y < 0 for y in yvalues) and percentage:
            ax.set_ylim(-30.0, 1.0)
    ax.plot(xvalues, yvalues, c=LINE_COLOR, label=name)
    if show_stdev:
        stdev: float = round(std(yvalues), 3)
        y_bottom: list[float] = [(y - stdev) for y in yvalues]
        y_top: list[float] = [(y + stdev) for y in yvalues]
        ax.fill_between(xvalues, y_bottom, y_top, color=FILL_COLOR, alpha=0.2)
    return ax


def exponential_smoothing_study(train: Series, test: Series, measure: str = "R2"):
    alpha_values = [i / 10 for i in range(1, 10)]
    flag = measure == "R2" or measure == "MAPE"
    best_model = None
    best_params: dict = {"name": "Exponential Smoothing", "metric": measure, "params": ()}
    best_performance: float = -100000

    yvalues = []
    for alpha in alpha_values:
        tool = SimpleExpSmoothing(train)
        model = tool.fit(smoothing_level=alpha, optimized=False)
        prd_tst = model.forecast(steps=len(test))

        eval: float = FORECAST_MEASURES[measure](test, prd_tst)
        # print(w, eval)
        if eval > best_performance and abs(eval - best_performance) > DELTA_IMPROVE:
            best_performance: float = eval
            best_params["params"] = (alpha,)
            best_model = model
        yvalues.append(eval)

    print(f"Exponential Smoothing best with alpha={best_params['params'][0]:.0f} -> {measure}={best_performance}")
    plot_line_chart(
        alpha_values,
        yvalues,
        title=f"Exponential Smoothing ({measure})",
        xlabel="alpha",
        ylabel=measure,
        percentage=flag,
    )

    return best_model, best_params

measure: str = "R2"

data: DataFrame = read_csv(filename, index_col=timecol, sep=";", decimal=".", parse_dates=True, infer_datetime_format=True)
series: Series = data[target]
train, test = series_train_test_split(data, trn_pct=0.90)

best_model, best_params = exponential_smoothing_study(train, test, measure=measure)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting study over different parameterisations of the Exponential Smoothing algorithm.png", bbox_inches='tight')

In [None]:
params = best_params["params"]
prd_trn = best_model.predict(start=0, end=len(train) - 1)
prd_tst = best_model.forecast(steps=len(test))

plot_forecasting_eval(train, test, prd_trn, prd_tst, title=f"{file_tag} - Exponential Smoothing alpha={params[0]}")
savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with the best parameterisation of Exponential Smoothing algorithm.png", bbox_inches='tight')

In [None]:
plot_forecasting_series(
    train,
    test,
    prd_tst,
    title=f"{file_tag} - Exponential Smoothing ",
    xlabel=timecol,
    ylabel=target,
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with the best parameterisation of Exponential Smoothing algorithm.png", bbox_inches='tight')

## **Rolling Mean**

In [None]:
from numpy import mean
from pandas import Series
from sklearn.base import RegressorMixin
from dslabs_functions import FORECAST_MEASURES, DELTA_IMPROVE, plot_line_chart
from pandas import read_csv, DataFrame
from matplotlib.pyplot import figure, savefig

def plot_line_chart(
    xvalues: list,
    yvalues: list,
    ax: Axes = None,  # type: ignore
    title: str = "",
    xlabel: str = "",
    ylabel: str = "",
    name: str = "",
    percentage: bool = False,
    show_stdev: bool = False,
) -> Axes:
    if ax is None:
        ax = gca()
    ax = set_chart_labels(ax=ax, title=title, xlabel=xlabel, ylabel=ylabel)
    ax = set_chart_xticks(xvalues, ax, percentage=percentage)
    if any(y < 0 for y in yvalues) and percentage:
            ax.set_ylim(-100.0, 1.0)
    ax.plot(xvalues, yvalues, c=LINE_COLOR, label=name)
    if show_stdev:
        stdev: float = round(std(yvalues), 3)
        y_bottom: list[float] = [(y - stdev) for y in yvalues]
        y_top: list[float] = [(y + stdev) for y in yvalues]
        ax.fill_between(xvalues, y_bottom, y_top, color=FILL_COLOR, alpha=0.2)
    return ax

class RollingMeanRegressor(RegressorMixin):
    def __init__(self, win: int = 3):
        super().__init__()
        self.win_size = win
        self.memory: list = []

    def fit(self, X: Series):
        self.memory = X.iloc[-self.win_size :]
        # print(self.memory)
        return

    def predict(self, X: Series):
        estimations = self.memory.tolist()
        predictions = []
        for _ in range(len(X)):
            new_value = mean(estimations[-self.win_size:])  # Use only the last `win_size` values
            estimations.append(new_value)  # Update the memory with the new value
            predictions.append(new_value)  # Store the prediction
        prd_series: Series = Series(predictions, index=X.index)  # Ensure the index matches the input
        return prd_series

    

def rolling_mean_study(train: Series, test: Series, measure: str = "R2"):
    win_size = (1, 5, 10, 20, 40, 50, 80, 160)
    flag = measure == "R2" or measure == "MAPE"
    best_model = None
    best_params: dict = {"name": "Rolling Mean", "metric": measure, "params": ()}
    best_performance: float = -100000

    yvalues = []
    for w in win_size:
        pred = RollingMeanRegressor(win=w)
        pred.fit(train)
        prd_tst = pred.predict(test)

        eval: float = FORECAST_MEASURES[measure](test, prd_tst)
        # print(w, eval)
        if eval > best_performance and abs(eval - best_performance) > DELTA_IMPROVE:
            best_performance: float = eval
            best_params["params"] = (w,)
            best_model = pred
        yvalues.append(eval)

    print(f"Rolling Mean best with win={best_params['params'][0]:.0f} -> {measure}={best_performance}")
    print(yvalues)
    plot_line_chart(
        win_size, yvalues, title=f"Rolling Mean ({measure})", xlabel="window size", ylabel=measure, percentage=flag
    )

    return best_model, best_params


data: DataFrame = read_csv(filename, index_col=timecol, sep=";", decimal=".", parse_dates=True, infer_datetime_format=True)
series: Series = data[target]

train, test = series_train_test_split(data, trn_pct=0.90)

fig = figure()
best_model, best_params = rolling_mean_study(train, test)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting study over different parameterisations of the Rolling Mean algorithm.png", bbox_inches='tight')

In [None]:
params = best_params["params"]
prd_trn: Series = best_model.predict(train)
prd_tst: Series = best_model.predict(test)

plot_forecasting_eval(train, test, prd_trn, prd_tst, title=f"{file_tag} - Rolling Mean (win={params[0]})")
savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with the best parameterisation of Rolling Mean algorithm.png", bbox_inches='tight')

In [None]:
plot_forecasting_series(
    train,
    test,
    prd_tst,
    title=f"{file_tag} - Rolling Mean (win={params[0]})",
    xlabel=timecol,
    ylabel=target,
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with the best parameterisation of Rolling Mean algorithm.png", bbox_inches='tight')

## **Linear Regression**

In [None]:
from numpy import arange
from pandas import read_csv, DataFrame, Series
from matplotlib.pyplot import savefig
from sklearn.linear_model import LinearRegression

data: DataFrame = read_csv(filename, index_col=timecol, sep=";", decimal=".", parse_dates=True, infer_datetime_format=True)
series: Series = data[target]
train, test = series_train_test_split(data, trn_pct=0.90)

trnX = arange(len(train)).reshape(-1, 1)
trnY = train.to_numpy()
tstX = arange(len(train), len(data)).reshape(-1, 1)
tstY = test.to_numpy()

model = LinearRegression()
model.fit(trnX, trnY)

prd_trn: Series = Series(model.predict(trnX), index=train.index)
prd_tst: Series = Series(model.predict(tstX), index=test.index)

plot_forecasting_eval(train, test, prd_trn, prd_tst, title=f"{file_tag} - Linear Regression")
savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with Linear Regression model.png", bbox_inches='tight')

In [None]:
plot_forecasting_series(
    train,
    test,
    prd_tst,
    title=f"{file_tag} - Linear Regression",
    xlabel=timecol,
    ylabel=target,
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with Linear Regression model.png", bbox_inches='tight')

## **ARIMA**
### Only with target variable

In [None]:
from pandas import read_csv, DataFrame, Series
from statsmodels.tsa.arima.model import ARIMA

data: DataFrame = read_csv(filename, index_col=timecol, sep=";", decimal=".", parse_dates=True, infer_datetime_format=True)
series: Series = data[target]
train, test = series_train_test_split(data, trn_pct=0.90)

predictor = ARIMA(train, order=(3, 1, 2))
model = predictor.fit()
print(model.summary())

In [None]:
from matplotlib.pyplot import figure, savefig, subplots
from dslabs_functions import FORECAST_MEASURES, DELTA_IMPROVE

def plot_multiline_chart(
    xvalues: list,
    yvalues: dict,
    ax: Axes = None,  # type: ignore
    title: str = "",
    xlabel: str = "",
    ylabel: str = "",
    percentage: bool = False,
) -> Axes:
    if ax is None:
        ax = gca()
    ax = set_chart_labels(ax=ax, title=title, xlabel=xlabel, ylabel=ylabel)
    ax = set_chart_xticks(xvalues, ax=ax, percentage=percentage)
    legend: list = []
    for name, y in yvalues.items():
        ax.plot(xvalues, y)
        legend.append(name)
        if any(v < 0 for v in y) and percentage:
            ax.set_ylim(-15.0, 1.0)
    ax.legend(legend, fontsize="xx-small")
    return ax


def arima_study(train: Series, test: Series, measure: str = "R2"):
    d_values = (0, 1, 2)
    p_params = (1, 2, 3, 5, 7, 10)
    q_params = (1, 3, 5, 7)

    flag = measure == "R2" or measure == "MAPE"
    best_model = None
    best_params: dict = {"name": "ARIMA", "metric": measure, "params": ()}
    best_performance: float = -100000

    fig, axs = subplots(1, len(d_values), figsize=(len(d_values) * HEIGHT, HEIGHT))
    for i in range(len(d_values)):
        d: int = d_values[i]
        values = {}
        for q in q_params:
            yvalues = []
            for p in p_params:
                arima = ARIMA(train, order=(p, d, q))
                model = arima.fit()
                prd_tst = model.forecast(steps=len(test), signal_only=False)
                eval: float = FORECAST_MEASURES[measure](test, prd_tst)
                # print(f"ARIMA ({p}, {d}, {q})", eval)
                if eval > best_performance and abs(eval - best_performance) > DELTA_IMPROVE:
                    best_performance: float = eval
                    best_params["params"] = (p, d, q)
                    best_model = model
                yvalues.append(eval)
            values[q] = yvalues
        print(values)
        plot_multiline_chart(
            p_params, values, ax=axs[i], title=f"ARIMA d={d} ({measure})", xlabel="p", ylabel=measure, percentage=flag
        )
    print(
        f"ARIMA best results achieved with (p,d,q)=({best_params['params'][0]:.0f}, {best_params['params'][1]:.0f}, {best_params['params'][2]:.0f}) ==> measure={best_performance:.2f}"
    )

    return best_model, best_params


best_model, best_params = arima_study(train, test, measure=measure)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting study over different parameterisations of the ARIMA algorithm, only with the target variable.png", bbox_inches='tight')

In [None]:
params = best_params["params"]
prd_trn = best_model.predict(start=0, end=len(train) - 1)
prd_tst = best_model.forecast(steps=len(test))

plot_forecasting_eval(
    train, test, prd_trn, prd_tst, title=f"{file_tag} - ARIMA (p={params[0]}, d={params[1]}, q={params[2]})"
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with the best parameterisation of ARIMA algorithm, only with the target variable.png", bbox_inches='tight')

In [None]:
plot_forecasting_series(
    train,
    test,
    prd_tst,
    title=f"{file_tag} - ARIMA ",
    xlabel=timecol,
    ylabel=target,
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with the best parameterisation of ARIMA algorithm, only with the target variable.png", bbox_inches='tight')

### Multiple variables

In [None]:
from pandas import read_csv, DataFrame, Series
from statsmodels.tsa.arima.model import ARIMA

def series_train_test_split_exog(data: DataFrame, trn_pct: float = 0.90) -> tuple[Series, Series, DataFrame, DataFrame]:
    trn_size: int = int(len(data) * trn_pct)
    target: Series = data.iloc[:, -1]  # Assuming the target is the last column
    exog: DataFrame = data.iloc[:, :-1]  # All columns except the last one
    
    train_target: Series = target.iloc[:trn_size]
    test_target: Series = target.iloc[trn_size:]
    train_exog: DataFrame = exog.iloc[:trn_size]
    test_exog: DataFrame = exog.iloc[trn_size:]
    
    return train_target, test_target, train_exog, test_exog


data: DataFrame = read_csv(filename, index_col=timecol, sep=";", decimal=".", parse_dates=True, infer_datetime_format=True)
series: Series = data[target]
train_target, test_target, train_exog, test_exog = series_train_test_split_exog(data, trn_pct=0.90)

predictor = ARIMA(train_target, exog=train_exog, order=(3, 1, 2))
model = predictor.fit()
print(model.summary())

In [None]:
def plot_multiline_chart(
    xvalues: list,
    yvalues: dict,
    ax: Axes = None,  # type: ignore
    title: str = "",
    xlabel: str = "",
    ylabel: str = "",
    percentage: bool = False,
) -> Axes:
    if ax is None:
        ax = gca()
    ax = set_chart_labels(ax=ax, title=title, xlabel=xlabel, ylabel=ylabel)
    ax = set_chart_xticks(xvalues, ax=ax, percentage=percentage)
    legend: list = []
    for name, y in yvalues.items():
        ax.plot(xvalues, y)
        legend.append(name)
        if any(v < 0 for v in y) and percentage:
            ax.set_ylim(-16.0, 1.0)
    ax.legend(legend, fontsize="xx-small")
    return ax


def arima_study(train: Series, test: Series, train_exog: Series, test_exog: Series, measure: str = "R2"):
    d_values = (0, 1, 2)
    p_params = (1, 2, 3, 5, 7, 10)
    q_params = (1, 3, 5, 7)

    flag = measure == "R2" or measure == "MAPE"
    best_model = None
    best_params: dict = {"name": "ARIMA", "metric": measure, "params": ()}
    best_performance: float = -100000

    fig, axs = subplots(1, len(d_values), figsize=(len(d_values) * HEIGHT, HEIGHT))
    for i in range(len(d_values)):
        d: int = d_values[i]
        values = {}
        for q in q_params:
            yvalues = []
            for p in p_params:
                arima = ARIMA(train, exog=train_exog, order=(p, d, q))
                model = arima.fit()
                prd_tst = model.forecast(steps=len(test), signal_only=False, exog=test_exog)
                eval: float = FORECAST_MEASURES[measure](test, prd_tst)
                if eval > best_performance and abs(eval - best_performance) > DELTA_IMPROVE:
                    best_performance: float = eval
                    best_params["params"] = (p, d, q)
                    best_model = model
                yvalues.append(eval)
            values[q] = yvalues
        print(values)
        plot_multiline_chart(
            p_params, values, ax=axs[i], title=f"ARIMA d={d} ({measure})", xlabel="p", ylabel=measure, percentage=flag
        )
    print(
        f"ARIMA best results achieved with (p,d,q)=({best_params['params'][0]:.0f}, {best_params['params'][1]:.0f}, {best_params['params'][2]:.0f}) ==> measure={best_performance:.2f}"
    )

    return best_model, best_params


best_model, best_params = arima_study(train_target, test_target, train_exog=train_exog, test_exog=test_exog, measure=measure)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting study over different parameterisations of the ARIMA algorithm with multiple variables.png", bbox_inches='tight')

In [None]:
params = best_params["params"]
prd_trn = best_model.predict(start=0, end=len(train_target) - 1, exog=train_exog)
prd_tst = best_model.forecast(steps=len(test_target), exog=test_exog)

plot_forecasting_eval(
    train_target, test_target, prd_trn, prd_tst, title=f"{file_tag} - ARIMA (p={params[0]}, d={params[1]}, q={params[2]})"
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with the best parameterisation of ARIMA algorithm with multiple variables.png", bbox_inches='tight')

In [None]:
plot_forecasting_series(
    train,
    test,
    prd_tst,
    title=f"{file_tag} - ARIMA ",
    xlabel=timecol,
    ylabel=target,
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with the best parameterisation of ARIMA algorithm with multiple variables.png", bbox_inches='tight')

## **LSTMs**
### Only with target variable

In [None]:
from torch import no_grad, tensor
from torch.nn import LSTM, Linear, Module, MSELoss
from torch.optim import Adam
from torch.utils.data import DataLoader, TensorDataset


def prepare_dataset_for_lstm(series, seq_length: int = 4):
    setX: list = []
    setY: list = []
    for i in range(len(series) - seq_length):
        past = series[i : i + seq_length]
        future = series[i + 1 : i + seq_length + 1]
        setX.append(past)
        setY.append(future)
    return tensor(setX), tensor(setY)


class DS_LSTM(Module):
    def __init__(self, train, input_size: int = 1, hidden_size: int = 50, num_layers: int = 1, length: int = 4):
        super().__init__()
        self.lstm = LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.linear = Linear(hidden_size, 1)
        self.optimizer = Adam(self.parameters())
        self.loss_fn = MSELoss()

        trnX, trnY = prepare_dataset_for_lstm(train, seq_length=length)
        self.loader = DataLoader(TensorDataset(trnX, trnY), shuffle=True, batch_size=len(train) // 10)

    def forward(self, x):
        x, _ = self.lstm(x)
        x = self.linear(x)
        return x

    def fit(self):
        self.train()
        for batchX, batchY in self.loader:
            y_pred = self(batchX)
            loss = self.loss_fn(y_pred, batchY)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
        return loss

    def predict(self, X):
        with no_grad():
            y_pred = self(X)
        return y_pred[:, -1, :]
    
from pandas import read_csv, DataFrame, Series

data: DataFrame = read_csv(filename, index_col=timecol, sep=";", decimal=".", parse_dates=True, infer_datetime_format=True)
series = data[[target]].values.astype("float32")

train_size = int(len(series) * 0.90)
train, test = series[:train_size], series[train_size:]

model = DS_LSTM(train, input_size=1, hidden_size=50, num_layers=1)
loss = model.fit()
print(loss)

In [None]:
from copy import deepcopy
import torch
from matplotlib.pyplot import subplots, savefig
from dslabs_functions import FORECAST_MEASURES, DELTA_IMPROVE

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def lstm_study(train, test, nr_episodes: int = 1000, measure: str = "R2", device=None):
    if device is None:
        device = torch.device("cpu")

    sequence_size = [2, 4, 8] 
    nr_hidden_units = [25, 50, 100] 

    step: int = nr_episodes // 10
    episodes = list(range(0, nr_episodes + 1, step))
    best_model = None
    best_params: dict = {"name": "LSTM", "metric": measure, "params": ()}
    best_performance: float = -100000 

    _, axs = subplots(1, len(sequence_size), figsize=(len(sequence_size) * 6, 4))

    for i, length in enumerate(sequence_size):
        tstX, tstY = prepare_dataset_for_lstm(test, seq_length=length)
        tstX, tstY = tstX.to(device), tstY.to(device)  # Move os dados para o dispositivo

        ax = axs[i]
        ax.set_title(f"LSTM seq length={length} ({measure})")
        ax.set_xlabel("nr episodes")
        ax.set_ylabel(measure)
        for hidden in nr_hidden_units:
            yvalues = []  
            model = DS_LSTM(train, hidden_size=hidden, length=length).to(device) 

            for n in range(0, nr_episodes + 1):
                model.fit()
                if n % step == 0: 
                    prd_tst = model.predict(tstX)
                    eval = FORECAST_MEASURES[measure](test[length:].astype("float32"), prd_tst.cpu().numpy())
                    print(f"seq length={length}, hidden_units={hidden}, nr_episodes={n}, eval={eval:.4f}")

                    if eval > best_performance and abs(eval - best_performance) > DELTA_IMPROVE:
                        best_performance = eval
                        best_params["params"] = (length, hidden, n)
                        best_model = deepcopy(model)

                    yvalues.append(eval)

            ax.plot(episodes, yvalues, label=f"Hidden={hidden}")

        ax.legend()

    savefig(f"{path_to_directory}/images//Set 1 - Forecasting study over different parameterisations of LSTMs, only with the target variable.png", bbox_inches="tight")

    print(
        f"LSTM best results achieved with length={best_params['params'][0]} "
        f"hidden_units={best_params['params'][1]} and nr_episodes={best_params['params'][2]}) "
        f"==> measure={best_performance:.2f}"
    )

    return best_model, best_params

measure = "R2"
best_model, best_params = lstm_study(train, test, nr_episodes=3000, measure=measure, device=device)

In [None]:
params = best_params["params"]
best_length = params[0]

# Preparação dos dados
trnX, trnY = prepare_dataset_for_lstm(train, seq_length=best_length)
tstX, tstY = prepare_dataset_for_lstm(test, seq_length=best_length)

# Previsões do modelo
prd_trn = best_model.predict(trnX).cpu().numpy()  # Move previsões para CPU
prd_tst = best_model.predict(tstX).cpu().numpy()  # Move previsões para CPU

train_cpu = train[best_length:].astype("float32")
test_cpu = test[best_length:].astype("float32")    

plot_forecasting_eval(
    train_cpu,
    test_cpu,
    prd_trn,
    prd_tst,
    title=f"{file_tag} - LSTM (length={best_length}, hidden={params[1]}, epochs={params[2]})",
)

savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with the best parameterisation of LSTMs, only with the target variable.png", bbox_inches="tight")

In [None]:
series = data[[target]]
train, test = series[:train_size], series[train_size:]

pred_series: Series = Series(prd_tst.ravel(), index=test.index[best_length:])

plot_forecasting_series(
    train[best_length:],
    test[best_length:],
    pred_series,
    title=f"{file_tag} - LSTMs ",
    xlabel=timecol,
    ylabel=target,
)

savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with the best parameterisation of LSTMs, only with the target variable.png", bbox_inches="tight")

### Multiple variables

In [None]:
from torch import no_grad, tensor
from torch.nn import LSTM, Linear, Module, MSELoss
from torch.optim import Adam
from torch.utils.data import DataLoader, TensorDataset


def prepare_dataset_for_lstm(data, seq_length: int = 4):
    setX: list = []
    setY: list = []
    for i in range(len(data) - seq_length):
        past = data[i : i + seq_length, :]  # All features
        future = data[i + 1 : i + seq_length + 1, 0]  # Only the target variable
        setX.append(past)
        setY.append(future)
    return tensor(setX), tensor(setY)



class DS_LSTM(Module):
    def __init__(self, train, input_size: int, hidden_size: int = 50, num_layers: int = 1, length: int = 4):
        super().__init__()
        self.lstm = LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.linear = Linear(hidden_size, 1)  # Output size is 1 (target variable)
        self.optimizer = Adam(self.parameters())
        self.loss_fn = MSELoss()

        trnX, trnY = prepare_dataset_for_lstm(train, seq_length=length)
        self.loader = DataLoader(TensorDataset(trnX, trnY), shuffle=True, batch_size=len(train) // 10)

    def forward(self, x):
        x, _ = self.lstm(x)  # x: (batch, seq_length, hidden_size)
        x = self.linear(x)  # x: (batch, seq_length, 1)
        return x

    def fit(self):
        self.train()
        for batchX, batchY in self.loader:
            y_pred = self(batchX)
            loss = self.loss_fn(y_pred[:, -1, :], batchY[:, -1])  # Compare only the last step
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
        return loss.item()

    def predict(self, X):
        with no_grad():
            y_pred = self(X)
        return y_pred[:, -1, :]  # Return prediction for the last step



data: DataFrame = read_csv(filename, index_col=timecol, sep=";", decimal=".", parse_dates=True, infer_datetime_format=True)
series = data.values.astype("float32")  # Use all features

train_size = int(len(series) * 0.90)
train, test = series[:train_size], series[train_size:]

input_size = train.shape[1]  # Number of features
model = DS_LSTM(train, input_size=input_size, hidden_size=50, num_layers=1)
loss = model.fit()
print(loss)

In [None]:
from matplotlib.pyplot import subplots, savefig

def plot_multiline_chart(
    xvalues: list,
    yvalues: dict,
    ax=None,
    title: str = "",
    xlabel: str = "",
    ylabel: str = "",
    percentage: bool = False,
):
    if ax is None:
        ax = gca()
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    for name, y in yvalues.items():
        ax.plot(xvalues, y, label=name)
    ax.legend(fontsize="small")
    if percentage:
        ax.set_ylim(0, 1)
    return ax

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def lstm_study(train, test, nr_episodes: int = 1000, measure: str = "R2", device=None):
    if device is None:
        device = torch.device("cpu")

    sequence_size = [2, 4, 8] 
    nr_hidden_units = [25, 50, 100]  
    step: int = nr_episodes // 10  
    episodes = list(range(0, nr_episodes + 1, step)) 

    best_model = None
    best_params: dict = {"name": "LSTM", "metric": measure, "params": ()}
    best_performance: float = -100000 

    _, axs = subplots(1, len(sequence_size), figsize=(len(sequence_size) * 6, 4))

    for i, length in enumerate(sequence_size):
        tstX, tstY = prepare_dataset_for_lstm(test, seq_length=length)
        input_size = train.shape[1]  

        ax = axs[i]
        ax.set_title(f"LSTM seq length={length} ({measure})")
        ax.set_xlabel("Nr episodes")
        ax.set_ylabel(measure)

        values = {}
        for hidden in nr_hidden_units:
            yvalues = [] 
            model = DS_LSTM(train, input_size=input_size, hidden_size=hidden, length=length)

            for n in range(0, nr_episodes + 1):
                model.fit()
                if n % step == 0:  
                    prd_tst = model.predict(tstX)
                    eval: float = FORECAST_MEASURES[measure](test[length:, 0], prd_tst.numpy().flatten())
                    print(f"seq length={length}, hidden_units={hidden}, nr_episodes={n}, eval={eval:.4f}")

                    if eval > best_performance and abs(eval - best_performance) > DELTA_IMPROVE:
                        best_performance = eval
                        best_params["params"] = (length, hidden, n)
                        best_model = deepcopy(model)

                    yvalues.append(eval)

            values[f"Hidden={hidden}"] = yvalues

        plot_multiline_chart(
            episodes,
            values,
            ax=ax,
            title=f"LSTM seq length={length} ({measure})",
            xlabel="Nr episodes",
            ylabel=measure,
        )

    savefig(f"{path_to_directory}/images/Set 1 - Forecasting study over different parameterisations of LSTMs with multiple variables.png", bbox_inches="tight")

    print(
        f"LSTM best results achieved with length={best_params['params'][0]} "
        f"hidden_units={best_params['params'][1]} and nr_episodes={best_params['params'][2]}) "
        f"==> measure={best_performance:.2f}"
    )
    return best_model, best_params

best_model, best_params = lstm_study(train, test, nr_episodes=3000, measure="R2", device=device)

In [None]:
from matplotlib.pyplot import subplots, savefig
from math import sqrt

def plot_forecasting_eval(
    trn: ndarray, tst: ndarray, prd_trn: ndarray, prd_tst: ndarray, title: str = ""
) -> list[Axes]:

    ev1: dict = {
        "RMSE": [
            sqrt(FORECAST_MEASURES["MSE"](trn, prd_trn)),
            sqrt(FORECAST_MEASURES["MSE"](tst, prd_tst)),
        ],
        "MAE": [
            FORECAST_MEASURES["MAE"](trn, prd_trn),
            FORECAST_MEASURES["MAE"](tst, prd_tst),
        ],
    }
    ev2: dict = {
        "MAPE": [
            FORECAST_MEASURES["MAPE"](trn, prd_trn),
            FORECAST_MEASURES["MAPE"](tst, prd_tst),
        ],
        "R2": [
            FORECAST_MEASURES["R2"](trn, prd_trn),
            FORECAST_MEASURES["R2"](tst, prd_tst),
        ],
    }

    fig, axs = subplots(1, 2, figsize=(1.5 * HEIGHT, 0.75 * HEIGHT), squeeze=True)
    fig.suptitle(title)
    plot_multibar_chart(
        ["Train", "Test"],
        ev1,
        ax=axs[0],
        title="Scale-dependent error",
        percentage=False,
    )
    plot_multibar_chart(
        ["Train", "Test"],
        ev2,
        ax=axs[1],
        title="Percentage error",
        percentage=True,
    )

    return axs

params = best_params["params"]
best_length = params[0]

trnX, trnY = prepare_dataset_for_lstm(train, seq_length=best_length)
tstX, tstY = prepare_dataset_for_lstm(test, seq_length=best_length)
prd_trn = best_model.predict(trnX).cpu().numpy().squeeze() 
prd_tst = best_model.predict(tstX).cpu().numpy().squeeze() 

trn_last = train[best_length:, -1]
tst_last = test[best_length:, -1]

assert len(trn_last) == len(prd_trn), "Tamanho incompatível entre treino e previsões."
assert len(tst_last) == len(prd_tst), "Tamanho incompatível entre teste e previsões."

plot_forecasting_eval(
    trn_last,
    tst_last,
    prd_trn,
    prd_tst,
    title=f"{file_tag} - LSTM (length={best_length}, hidden={params[1]}, epochs={params[2]})",
)
savefig(f"{path_to_directory}/images/Set 1 - Forecasting results obtained with the best parameterisation of LSTMs with multiple variables.png", bbox_inches="tight")

In [None]:
series = data[[target]]
train, test = series[:train_size], series[train_size:]

pred_series = Series(prd_tst.ravel(), index=test.index[best_length:])

plot_forecasting_series(
    train[best_length:],
    test[best_length:],
    pred_series,
    title=f"{file_tag} - LSTMs",
    xlabel=timecol,
    ylabel=target,
)

savefig(f"{path_to_directory}/images/Set 1 - Forecasting plots obtained with the best parameterisation of LSTMs with multiple variables.png", bbox_inches="tight")