In [None]:
import huggingface_hub
import pandas as pd
import numpy as np

In [None]:
splits = {'train': 'train_ts.csv', 'validation': 'val_ts.csv', 'test': 'test_ts.csv'}
df_train = pd.read_csv("hf://datasets/Creatorin/solarpower/" + splits["train"])
df_validation = pd.read_csv("hf://datasets/Creatorin/solarpower/" + splits["validation"])
df_test = pd.read_csv("hf://datasets/Creatorin/solarpower/" + splits["test"])

In [None]:
df = pd.concat([df_train, df_validation, df_test])
ts = df["Leistung"]
ts.index = pd.to_datetime(ts.index, utc=True)

In [None]:
# Interpolation
ts = ts.interpolate(method="time")
# Only now can we infer a frequency.
ts = ts.asfreq(pd.infer_freq(ts.index))

In [None]:
# Normalize
avg_train, dev_train = ts.mean(), ts.std()
ts_series = (ts - avg_train)/dev_train

In [None]:
# Remove trend
ts_series = ts_series.diff().dropna()

In [None]:
# remove increasing volatility

def remove_volatility(ts):
    annual_volatility = ts.groupby(ts.index.year).std()
    annual_vol_per_day = ts.index.map(lambda d: annual_volatility.loc[d.year])
    ts_corrected_variance = ts/annual_vol_per_day
    monthly_mean = ts_corrected_variance.groupby(ts_corrected_variance.index.month).mean()
    monthly_mean_per_day = ts_corrected_variance.index.map(lambda d: monthly_mean.loc[d.month])
    ts_corrected_variance= ts_corrected_variance - monthly_mean_per_day
    ts_corrected_variance= ts_corrected_variance[~np.isnan(ts_corrected_variance)]
    return ts_corrected_variance

In [None]:
ts = remove_volatility(ts_series)

# Model Class, Backtesting, Metrics

In [None]:
class TimeSeriesPredictionModel():
    """
    Time series prediction model implementation
    
    Parameters
    ----------
        model_class : class
            Choice of regressor
        model_params : dict
            Definition of model specific tuning parameters
    
    Functions
    ----------
        init: Initialize model with given parameters
        train : Train chosen model
        forecast : Apply trained model to prediction period and generate forecast DataFrame
    """
    def __init__(self, model_class, model_params: dict) -> None:
        """Initialize a new instance of time_series_prediction_model."""
        self.model_class = model_class
        self.model_params = model_params
        self.model = None
        self.is_univariate = 'endog' in model_class.__init__.__code__.co_varnames

    def train(self, X_train: pd.DataFrame = None, y_train: pd.Series = None, train_series: pd.Series = None) -> None:
        """Train chosen model."""
        if self.is_univariate:
            if train_series is None:
                raise ValueError("train_series must be provided for univariate models")
            self.train_series = train_series
            self.model = self.model_class(endog=self.train_series, **self.model_params)
            self.model = self.model.fit()
        else:
            if X_train is None or y_train is None:
                raise ValueError("X_train and y_train must be provided for multivariate models")
            self.X_train = X_train
            self.y_train = y_train
            self.model = self.model_class(**self.model_params)
            self.model.fit(self.X_train, self.y_train)

    def forecast(self, X_test: pd.DataFrame = None, start_date: str = None, end_date: str = None) -> pd.DataFrame:
        """Apply trained model to prediction period and generate forecast DataFrame."""
        if self.is_univariate:
            #if steps is None:
            if start_date is None or end_date is None:
                raise ValueError("start_date and end_date must be provided for univariate models")
            #forecast = self.model.predict(start = start_date, end= end_date, typ = 'levels')
            # Make predictions
            start_date = pd.to_datetime(start_date)
            end_date = pd.to_datetime(end_date)
            forecast = self.model.predict(start=start_date, end=end_date, typ='levels')

            # forecast = self.model.forecast(steps)
            forecast_df = pd.DataFrame(forecast, columns=['Forecast'])
        else:
            if X_test is None:
                raise ValueError("X_test must be provided for multivariate models")
            self.X_test = X_test
            forecast_df = pd.DataFrame(self.model.predict(self.X_test), index=self.X_test.index)
            forecast_df.index.name = 'Datum'
        return forecast_df

In [None]:
# Backtesting with sliding window

def backtesting(X_train: pd.DataFrame, y_train: pd.DataFrame,
                X_test: pd.DataFrame, y_test: pd.DataFrame,
                model: TimeSeriesPredictionModel, prediction_step_size: int=96):
    """
    Perform rolling forecast backtesting for a time series prediction model using
    specified train and test datasets, and a given model.

    This function splits the test data into multiple windows based on the
    prediction_step_size and sequentially forecasts each window. After each
    forecasting step, the window of test data used for the current prediction is
    added to the training data, and the earliest window of the training data is
    removed. The predictions are stored in a DataFrame alongside the original test data values.

    Args:
        X_train (pd.DataFrame): Training feature dataset.
        y_train (pd.DataFrame): Training target dataset.
        X_test (pd.DataFrame): Testing feature dataset.
        y_test (pd.DataFrame): Testing target dataset.
        model (TimeSeriesPredictionModel): The model used for time series forecasting.
        prediction_step_size (int): The number of time steps to predict at each iteration.

    Returns:
        pd.DataFrame: A DataFrame with two columns 'Original' and 'Predictions',
                      containing the actual values from y_test and the predictions
                      made by the model, respectively.
    """

    # initializing output df
    predictions = pd.DataFrame(index=y_test.index, columns=['Original', 'Predictions'])
    predictions['Original'] = y_test

    for i in range(0, len(X_test)-prediction_step_size, prediction_step_size):
        end_idx = i + prediction_step_size
        forecast_index= X_test.iloc[i:end_idx].index
        
        # fit model and predict
        model.train(X_train, y_train)
        forecast = model.forecast(X_test.iloc[i:end_idx])
        predictions.loc[forecast_index, 'Predictions'] = forecast.to_numpy()
    
        print(f'Finished Forecast for {forecast_index[-1].date()}')

        # delete old time window from train data
        X_train = X_train.drop(X_train.head(prediction_step_size).index)
        y_train = y_train.drop(y_train.head(prediction_step_size).index)

        # add next time window to train data
        X_train = pd.concat([X_train, X_test.iloc[i:end_idx]])
        y_train = pd.concat([y_train, y_test.iloc[i:end_idx]])

    return predictions

In [None]:
# Metrics

from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, mean_squared_error, r2_score, root_mean_squared_error

def evaluation(y_true, y_pred):

    """
    Calculate various error metrics to evaluate the accuracy of a regression model.

    This function computes the mean absolute error (MAE), mean absolute percentage error (MAPE),
    mean squared error (MSE), coefficient of determination (R^2 score), and root mean squared error (RMSE)
    between the actual and predicted values.

    Args:
        y_true (array-like): True values for the target variable.
        y_pred (array-like): Predicted values generated by the model.

    Returns:
        tuple: A tuple containing:
               - mae (float): Mean absolute error.
               - mape (float): Mean absolute percentage error.
               - mse (float): Mean squared error.
               - r2 (float): R^2 score, measuring the proportion of variation explained by the model.
               - rmse (float): Root mean squared error.

    """
    
    mae = mean_absolute_error(y_true, y_pred)
    mape = mean_absolute_percentage_error(y_true, y_pred)
    mse = mean_squared_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    rmse = root_mean_squared_error(y_true, y_pred)

    return mae, mape, mse, r2, rmse

# Univariate Data Preprocessing

In [None]:
import matplotlib.pyplot as plt
from statsmodels.graphics import tsaplots
from statsmodels.tsa.stattools import acf

In [None]:
# Autocorrelation plots
x_label = 'Lags'
y_label = 'Autocorrelation'

# Titles for each subplot
plot_titles = ['2 Days (48 h)', '2 Weeks (336)', '2 Months (1487 h)', '2 Years (17520 h)']
lags = [48, 336, 1487, 17520]

# X and Y labels
x_label = 'Lags'
y_label = 'Autocorrelation'

fig, axes = plt.subplots(2, 2, figsize=(12, 10))
all_autocorr_values = {}

for i, ax in enumerate(axes.flatten()):
    tsaplots.plot_acf(ts, ax=ax, lags=lags[i])
    ax.set_title(plot_titles[i])
    ax.set_xlabel(x_label)
    ax.set_ylabel(y_label)

    # Compute autocorrelation values
    autocorr_values = acf(ts, nlags=lags[i])
    
    # Collect all autocorrelation values
    for lag in range(1, len(autocorr_values)):
        if lag not in all_autocorr_values:
            all_autocorr_values[lag] = []
        all_autocorr_values[lag].append(autocorr_values[lag])

plt.tight_layout()
plt.show()

# Combine all autocorrelation values and find the highest values
combined_autocorr_values = {lag: np.mean(values) for lag, values in all_autocorr_values.items()}

# Get the highest values
sorted_lags = sorted(combined_autocorr_values.items(), key=lambda item: item[1], reverse=True)
sorted_lags_filtered = [item for item in sorted_lags if item[0] >= 24]
top_5_combined_lags = sorted_lags_filtered[:5]
least_3_combined_lags = sorted_lags_filtered[-3:]

print("Top 5 lags with highest combined autocorrelation values:")
for lag, value in top_5_combined_lags:
    print(f"Lag {lag}: {value:.4f}")

In [None]:
# Data preparation for univariate TimeSeriesPredictionModel
lags = [24, 48, 72, 96, 120]

data = pd.DataFrame(index=ts.index)
data['Original'] = ts
for lag in lags: 
    data[f'{lag}_Lag'] = ts.shift(lag)

In [None]:
# Data train-test split
train_df = data[:'2022-12-31 23:00+00:00']
y_train = train_df[['Original']]
X_train = train_df.drop(columns=['Original'])

valuation_df = data['2023-01-01 00:00+00:00':'2023-12-31 23:00+00:00']
y_valuation = valuation_df[['Original']]
X_valuation = valuation_df.drop(columns=['Original'])

test_df = data['2024-01-01 00:00+00:00':]
y_test = test_df[['Original']]
X_test = test_df.drop(columns=['Original'])

# Univariate Models

## Naive Model: Moving Average

In [None]:
# Moving average model
def moving_average(data: pd.DataFrame, window_size: int=3, shift_size: int=24):
    moving_avg = data.rolling(window=window_size).mean()
    shifted_moving_avg = moving_avg.shift(shift_size)
    return(shifted_moving_avg)

In [None]:
# Plot Naive Model Predictions
naive_model = moving_average(ts)

test_date_start = '2024-01-01 00:00+00:00'
test_ts = ts[test_date_start:]
naive_model_print = naive_model[test_date_start:]

plt.figure(figsize=(12, 6))
plt.plot(test_ts.index, test_ts, label='Original')
plt.plot(naive_model_print.index, naive_model_print, label='Moving average', linestyle='--')
plt.legend()
plt.title('Moving Average')
plt.xlabel('Date')
plt.ylabel('Time Series')
plt.show()

In [None]:
# Detail Plot Naive Model Predictions
naive_model = moving_average(ts)

test_date_start = '2024-04-24 00:00+00:00'
test_ts = ts[test_date_start:]
naive_model_print = naive_model[test_date_start:]

plt.figure(figsize=(12, 6))
plt.plot(test_ts.index, test_ts, label='Original')
plt.plot(naive_model_print.index, naive_model_print, label='Moving average', linestyle='--')
plt.legend()
plt.title('Moving Average')
plt.xlabel('Date')
plt.ylabel('Time Series')
plt.show()

In [None]:
# Metrics Naive Model
mae, mape, mse, r2, rmse = evaluation(test_ts, naive_model_print)

print(f'Model: Naive Moving Average \n Mean absolute error: {mae}\n Mean absolute percentage error: {mape} \n Mean squared error: {mse} \n r2_score: {r2} \n Root mean squared error: {rmse}')

## Random Forest

In [None]:
from sklearn.ensemble import RandomForestRegressor

In [None]:
# Initializing random forest regressor as instance of TimeSeriesPredictionModel
rdnf = TimeSeriesPredictionModel(RandomForestRegressor, {'n_estimators': 150, 'criterion': 'squared_error', 'max_depth': 10})

In [None]:
rdnf_pred = backtesting(X_train, y_train, X_test, y_test, rdnf)
rdnf_pred = rdnf_pred.dropna()

In [None]:
# Plot Random Forest Predictions
test_date_start = '2024-01-01 00:00+00:00'
test_ts = ts[test_date_start:]

plt.figure(figsize=(12, 6))
plt.plot(test_ts.index, test_ts, label='Original')
plt.plot(rdnf_pred.index, rdnf_pred['Predictions'], label='Random Forest', linestyle='--')
plt.legend()
plt.title('Random Forest')
plt.xlabel('Date')
plt.ylabel('Time Series')
plt.show()

In [None]:
# Detail Plot Random Forest Predictions
test_date_start = '2024-04-24 00:00+00:00'
test_date_end = '2024-04-30 23:00:00+00:00'

plt.figure(figsize=(12, 6))
plt.plot(test_ts[test_date_start:test_date_end].index, test_ts[test_date_start:test_date_end], label='Original')
plt.plot(rdnf_pred[test_date_start:test_date_end].index, rdnf_pred[test_date_start:test_date_end]['Predictions'], label='Random Forest', linestyle='--')
plt.legend()
plt.title('Random Forest')
plt.xlabel('Date')
plt.ylabel('Time Series')
plt.show()

In [None]:
mae, mape, mse, r2, rmse = evaluation(rdnf_pred['Original'], rdnf_pred['Predictions'])

print(f'Model: Random Forest \n Mean absolute error: {mae}\n Mean absolute percentage error: {mape} \n Mean squared error: {mse} \n r2_score: {r2} \n Root mean squared error: {rmse}')

In [None]:
# Metrics comparison random forest default settings v.s. optimized hyperparameters
rdn_forest_default = [0.85, 2.54, 0.83, 1.59]
rdn_forest_optimized = [mae, mse, r2, rmse]
index = ['mae', 'mse',
         'r2', 'rmse']
df = pd.DataFrame({'Rdn Forest': rdn_forest_default,
                   'Rdn Forest optimized': rdn_forest_optimized}, index=index)
ax = df.plot.bar(rot=0)

## CatBoost

In [None]:
%pip install catboost

In [None]:
from catboost import CatBoostRegressor

# Initializing CatBoost regressor as instance of TimeSeriesPredictionModel
cboost = TimeSeriesPredictionModel(CatBoostRegressor, {'iterations': 20, 'learning_rate': 0.25, 'depth': 16})

In [None]:

cboost_pred = backtesting(X_train, y_train, X_test, y_test, cboost)
cboost_pred = cboost_pred.dropna()

In [None]:
test_date_start = '2024-01-01 00:00+00:00'
test_ts = ts[test_date_start:]

plt.figure(figsize=(12, 6))
plt.plot(test_ts.index, test_ts, label='Original')
plt.plot(cboost_pred.index, rdnf_pred['Predictions'], label='CatBoost', linestyle='--')
plt.legend()
plt.title('CatBoost')
plt.xlabel('Date')
plt.ylabel('Time Series')
plt.show()

In [None]:
test_date_start = '2024-04-27 00:00+00:00'
test_date_end = '2024-05-03 23:00:00+00:00'

plt.figure(figsize=(12, 6))
plt.plot(test_ts[test_date_start:test_date_end].index, test_ts[test_date_start:test_date_end], label='Original')
plt.plot(cboost_pred[test_date_start:test_date_end].index, cboost_pred[test_date_start:test_date_end]['Predictions'], label='CatBoost', linestyle='--')
plt.legend()
plt.title('CatBoost')
plt.xlabel('Date')
plt.ylabel('Time Series')
plt.show()

In [None]:
mae, mape, mse, r2, rmse = evaluation(cboost_pred['Original'], cboost_pred['Predictions'])

print(f'Model: CatBoost \n Mean absolute error: {mae}\n Mean absolute percentage error: {mape} \n Mean squared error: {mse} \n r2_score: {r2} \n Root mean squared error: {rmse}')

#  Optimizer univariate Models

In [None]:
import random
from sklearn.model_selection import train_test_split

def random_search_optimization(model_class, param_grid, train_series=None, X_train=None, y_train=None, 
                               X_val=None, y_val=None, n_iter=10, scoring_function=None):
    """
    Perform random search optimization on model parameters.
    
    Parameters
    ----------
    model_class : class
        Choice of model class (e.g., ARIMA, LinearRegression)
    param_grid : dict
        Dictionary with parameter names as keys and lists of parameter settings to try as values
    train_series : pd.Series, optional
        Training data for univariate models
    X_train : pd.DataFrame, optional
        Training features for multivariate models
    y_train : pd.Series, optional
        Training target for multivariate models
    X_val : pd.DataFrame, optional
        Validation features for multivariate models
    y_val : pd.Series, optional
        Validation target for multivariate models
    n_iter : int
        Number of parameter settings that are sampled
    scoring_function : function
        Function to evaluate model performance, should return a single score
    
    Returns
    -------
    best_params : dict
        Best parameter combination found
    best_score : float
        Best score obtained
    """
    
    def sample_params(param_grid):
        return {key: random.choice(values) for key, values in param_grid.items()}
    
    best_score = float('-inf')
    best_params = None
    
    for i in range(n_iter):
        print(i)
        params = sample_params(param_grid)
        model = TimeSeriesPredictionModel(model_class, params)
        
        if train_series is not None:
            model.train(train_series=train_series)
            forecast = model.forecast(steps=len(train_series))
            score = scoring_function(train_series[-len(forecast):], forecast)
        else:
            model.train(X_train=X_train, y_train=y_train)
            forecast = model.forecast(X_test=X_val)
            score = scoring_function(y_val, forecast)
        
        if score > best_score:
            best_score = score
            best_params = params
    
    return best_params, best_score

In [None]:
# Model Optimization - Example usage
from sklearn.metrics import mean_squared_error

# Define parameter grid
param_grid = {
    'n_estimators': [50, 100, 150], 
    'criterion': ['squared_error'], 
    'max_depth': [5, 10, 20]}
    # 'iterations': [20, 25, 30],
    # 'learning_rate': [0.25, 0.5, 0.75],
    # 'depth': [5, 10, 16]}

# Example scoring function
def my_scoring_function(true, pred):
    return -mean_squared_error(true, pred)

# Perform random search optimization
best_params, best_score = random_search_optimization(
    RandomForestRegressor, 
    param_grid, 
    X_train= X_train, 
    y_train= y_train, 
    X_val = X_valuation,
    y_val= y_valuation,
    n_iter=10, 
    scoring_function=my_scoring_function
)

print("Best Parameters:", best_params)
print("Best Score:", best_score)