Hier entsteht der Source-Code für meine Bachelorarbeit.

Datenbankverbindungsinformationen (ist nur aus dem FH-Aachen Netz erreichbar)

# Installations

In [None]:
!pip install pyarrow --upgrade
!pip install statsforecast
!pip install etelemetry 
!pip install dask --upgrade
!pip install xgboost
!pip install mlforecast
!pip install chronos-forecasting

!pip install psycopg2-binary

!pip install 'plotly[kaleido]' --upgrade

!pip install gputil
!pip install memory_profiler

!pip install jupyterlab_execute_time

# Imports

In [71]:
country_code = 'DE_fixed'
country_code_2 = 'DE' # only used for pulling data from database
scenario_number = 4

In [None]:
import os
import datetime
from math import floor, sqrt, ceil
import numpy as np
import psycopg2
import matplotlib.pyplot as plt
import plotly.express as px
import plotly.io as pio
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from plotly.graph_objs import Figure
import pandas as pd
from pandas import DataFrame

from sklearn.model_selection import train_test_split

from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA, MSTL, ARIMA, AutoTBATS
from statsforecast.arima import arima_string
from coreforecast.seasonal import find_season_length

from pathlib import Path

from xgboost import XGBRegressor
from mlforecast import MLForecast

from chronos import BaseChronosPipeline, Chronos2Pipeline

import psutil
import time
import GPUtil
import gc
from IPython.core.magic import register_cell_magic, register_line_magic
from memory_profiler import memory_usage
%load_ext memory_profiler

In [73]:
@register_cell_magic
def gpu_memory_profiling(line, cell):
    gpus = GPUtil.getGPUs()
    start_vram = sum(gpu.memoryUsed for gpu in gpus) 
    exec(cell)
    gpus = GPUtil.getGPUs()
    end_vram = sum(gpu.memoryUsed for gpu in gpus)

    print(f"Total VRAM Used: Start: {start_vram} MB End: {end_vram} MB")

In [74]:
def get_current_gpu():
    gpus = GPUtil.getGPUs()
    vram = sum(gpu.memoryUsed for gpu in gpus) 
    return vram

In [75]:
@register_cell_magic
def resource_profiler(line, cell):
    # consider #time & #memeit
    start_cpu = psutil.cpu_percent(interval=None)
    start_memory = psutil.virtual_memory().used / (1024 ** 2) 
    gpus = GPUtil.getGPUs()
    start_vram = sum(gpu.memoryUsed for gpu in gpus)
    start_time = time.time()
    
    # Execute the code in the cell
    exec(cell)

    end_time = time.time()
    end_cpu = psutil.cpu_percent(interval=None)
    end_memory = psutil.virtual_memory().used / (1024 ** 2)
    gpus = GPUtil.getGPUs()
    end_vram = sum(gpu.memoryUsed for gpu in gpus)

    print(f"Execution Time: {end_time - start_time:.4f} seconds")
    print(f"CPU Usage: {end_cpu - start_cpu:.2f}%")
    print(f"Memory Used: {end_memory - start_memory:.2f} MB")
    print(f"Total VRAM Used: Start: {start_vram} MB End: {end_vram} MB")

# Datenbank init

In [None]:
db_params = {
    "host":"xxx",
    "database":"xxx",
    "user":"xxx",
    "password":"xxx",
    "port":"xxx"
}

Instrat : Emissionenhandel <br>
Weather ECMWF : Wetterdaten (Wind, Sonne, ...) <br>
Entsoe : Stromdaten (Last und Preis) <br>

Die spezifischen Abfragen für Deutschland werden dabei vorher in Lower Letters gecastet, damit mögliche Fehleinträge in der Datenbank verücksichtigt werden.

In [None]:

db_queries = {
    "instrat_pl_gas_price" : "SELECT * FROM instrat_pl.gas_price;",
    "instrat_pl_cole_price" : "SELECT * FROM instrat_pl.coal_price;",
    "instrate_pl_ets_price" : "SELECT * FROM instrat_pl.eu_ets",

    "weather_de" : f"SELECT * FROM weather.ecmwf_eu WHERE lower(nuts_id) LIKE lower('{country_code_2}');",

    #"entsoe_generation_de" : f"SELECT * FROM entsoe.query_generation WHERE lower(country) LIKE lower('{country_code_2}%');",
    #"entsoe_load_de" : f"SELECT * FROM entsoe.query_load WHERE lower(country) LIKE lower('{country_code_2}%');",
    #"entsoe_generation_de" : f"SELECT * FROM entsoe.query_generation WHERE lower(country) LIKE lower('{country_code_2}');",
    "entsoe_generation_de" : f"SELECT * FROM entsoe.query_installed_generation_capacity WHERE lower(country) like lower('{country_code_2}');",

    "entsoe_load_de" : f"SELECT * FROM entsoe.query_load WHERE lower(country) LIKE lower('{country_code_2}');",

    "entsoe_price_de" : f"SELECT * FROM entsoe.query_day_ahead_prices WHERE lower(country) LIKE lower('{country_code_2}%');",


}

Funktion um über eine Datenbankverbindung eine Abfrage abzuschicken und die Daten anschließend in ein Dataframe zu laden. <br>
@params {conn: Datenbankverbindung, query: SQL-Abfrage} <br>
@returns {df: Daten der SQL Abfrage in ein Dataframe geladen} <br>

In [None]:
def get_dataframe_from_databasequery(conn, query):
    cursor = connection.cursor()
    try:
        cursor.execute(query)
    except (Exception, psycopg2.errors) as error:
        print(f"Error fetching data: {error}")
    data = cursor.fetchall()
    column_names = [desc[0] for desc in cursor.description]
    cursor.close()
    df = pd.DataFrame(data=data, columns=column_names)
    return df

Hauptteil: Hier holen wir uns zunächst alle Daten aus der Datenbank und laden sie in separate Dataframes. <br>
Braucht ca. 3 Min um alle Daten zu laden. (ggf. Länger je nach Internetgeschwindigkeit. Die 3 Min wurden mit einer 1 Gbit Geschwindigkeit erreicht. Ca.5GB an Daten, bei einer 25Mbit Geschwindigkeit brauchen wir 13 Min)

In [None]:
try:
    # Database Connection
    connection = psycopg2.connect(**db_params)

    # entsoe (Power)
    df_price_de = get_dataframe_from_databasequery(connection, db_queries.get("entsoe_price_de"))
    df_generation_de = get_dataframe_from_databasequery(connection, db_queries.get("entsoe_generation_de"))
    df_load_de = get_dataframe_from_databasequery(connection, db_queries.get("entsoe_load_de"))

    # instrat (Emission)
    df_gas_price = get_dataframe_from_databasequery(connection, db_queries.get("instrat_pl_gas_price"))
    df_cole_price = get_dataframe_from_databasequery(connection, db_queries.get("instrat_pl_cole_price"))
    df_ets_price = get_dataframe_from_databasequery(connection, db_queries.get("instrate_pl_ets_price"))

    # Weather
    df_weather_de = get_dataframe_from_databasequery(connection, db_queries.get('weather_de'))

except (Exception, psycopg2.errors) as error:
    print(f"Error connecting to the database: {error}")
finally:
    if connection:
        connection.close()
        print("Connection to database closed.")


# Strom
Verarbeitung des Preises <br>
1. Erst alle exakten Duplikate entfernen (diese haben keinen mehrwert für die Daten)
2. anschließend den Datensatz neu indexieren mit dem Datum
3. die 'country' Spalte entfernen (sie hat jetzt keinen Nutzen mehr)
4. zuletzt noch die Preisspalte von '0' auf 'price' umbennen

In [None]:
def verarbeitung_strom(price: DataFrame, generation: DataFrame, load: DataFrame):

    price.set_index('index', inplace=True)
    price = price.tz_localize(None)
    price.drop_duplicates(inplace=True)
    price.drop('country', axis=1, inplace=True)
    price.rename(columns={'0':'price'}, inplace=True)
    price = price.resample('h').mean().interpolate('time')
    price.sort_index(inplace=True)


    df_entsoe_generation_de_grouped = generation.groupby([pd.Grouper(key='index', axis=0, freq='h'), 'country']).mean().reset_index()
    df_entsoe_generation_de_grouped.drop(columns={'country', 'fossil_oil_shale', 'fossil_peat', 'marine'}, axis=1, inplace=True)
    df_entsoe_generation_de_grouped.set_index('index', inplace=True)
    #generation = df_entsoe_generation_de_grouped.groupby(pd.Grouper(key='index', axis=0, freq='h')).sum()
    #generation = generation.resample('h').interpolate('time')
    generation = df_entsoe_generation_de_grouped.resample('h').bfill()
    generation.sort_index(inplace=True)

    load.drop_duplicates(inplace=True)
    df_entsoe_load_de_grouped = load.groupby([pd.Grouper(key='index', axis=0, freq='h'), 'country']).mean().reset_index()
    df_entsoe_load_de_grouped.drop('country', axis=1, inplace=True)
    load = df_entsoe_load_de_grouped.groupby(pd.Grouper(key='index', axis=0, freq='h')).sum()
    load = load.tz_localize(None)
    load = load.resample('h').interpolate('time')
    load.sort_index(inplace=True)
    return price, generation, load

In [None]:
df_price_de, df_generation_de, df_load_de = verarbeitung_strom(df_price_de, df_generation_de, df_load_de)

In [None]:
df_load_de

# Emmisionen

In [None]:
def verarbeitung_emissionen(gas: DataFrame, cole: DataFrame, ets: DataFrame):

    gas.drop_duplicates(inplace=True)
    gas.drop('indeks', axis=1, inplace=True)
    gas.set_index('date', inplace=True)
    gas.drop(['price_pln_per_mwh', 'volume', 'price_eur_per_mwh'], axis=1, inplace=True)
    gas = gas.resample('h').interpolate('time')
    gas.rename(columns={'price_eur_per_kwh':'price_gas_eur_per_kwh'}, inplace=True)
    gas.sort_index(inplace=True)

    cole.drop_duplicates(inplace=True)
    cole.set_index('date', inplace=True)
    cole.drop(['pscmi1_pln_per_gj', 'pscmi1_pln_per_t', 'steam_coal_eur_per_gj', 'steam_coal_eur_per_t'], axis=1, inplace=True)
    cole = cole.resample('h').interpolate('time')
    cole.rename(columns={'price_eur_per_kwh':'price_cole_eur_per_kwh'}, inplace=True)
    cole.sort_index(inplace=True)

    ets.drop_duplicates()
    ets.drop('indeks', axis=1, inplace=True)
    ets.set_index('date', inplace=True)
    ets.drop('volume', axis=1, inplace=True)
    ets = ets.resample('h').interpolate('time')
    ets.sort_index(inplace=True)

    return gas, cole, ets

In [None]:
df_gas_price, df_cole_price, df_ets_price = verarbeitung_emissionen(df_gas_price, df_cole_price, df_ets_price)

# Wetter

In [None]:
def verarbeitung_wetter(weather: DataFrame):
    weather.drop_duplicates(inplace=True)
    weather.set_index('time', inplace=True)
    weather.drop(['latitude', 'longitude', 'nuts_id'], axis=1, inplace=True)
    weather = weather.resample('h').interpolate('time')
    weather.sort_index(inplace=True)
    return weather

In [None]:
df_weather_de = verarbeitung_wetter(df_weather_de)

In [None]:
df_load_de.plot()

In [None]:
df_load_de.plot()
plt.xlim(pd.Timestamp('2025-09-09 00:00:00'), pd.Timestamp('2025-09-13 13:00:00'))

In [None]:
print(f'Maximaler Wert am Datum: {df_load_de['actual_load'].idxmax()}\n')
print(f'Wert davor: {df_load_de.loc['2025-09-11 12:00:00']}\n')
print(f'Wert: {df_load_de.loc['2025-09-11 13:00:00']}\n')
print(f'Wert danach: {df_load_de.loc['2025-09-11 14:00:00']}')

# Zusammenfassung der einzelen Datensätze

Aufgrund der Beschaffenheit unserer Datensätze können wir diesen Wert vorläufig ignorieren. Da wir die Daten im weiteren Verlauf zusammenfassen und dafür die größte gemeinsame Zeitreihe (vom: 2016-01-02 00:00:00 bis: 2024-10-01 00:00:00) nehmen. In dieser Reihe liegt der Wert nicht mehr drin und beeinflusst im folgenden unsere Rechnungen nicht weiter.

In [None]:
def converge_dataframes(gas, cole, ets, weather, generation, load, price):
    tmp1 = pd.merge(gas, cole, left_index=True, right_index=True)
    tmp2 = pd.merge(tmp1, ets, left_index=True, right_index=True)
    tmp3 = pd.merge(tmp2, weather, left_index=True, right_index=True)
    tmp4 = pd.merge(tmp3, generation, left_index=True, right_index=True)
    tmp5 = pd.merge(tmp4, load, left_index=True, right_index=True)
    final_data = pd.merge(tmp5, price, left_index=True, right_index=True)
    final_data.sort_index(inplace=True)
    return  final_data

In [None]:
final_data = converge_dataframes(df_gas_price, df_cole_price, df_ets_price, df_weather_de, df_generation_de, df_load_de, df_price_de)

In [None]:
final_data

# Zwischenergebnis
Save Data to csv, then read it from there. This way i can update the csv if needed, but don't need to requery the information

In [76]:
current_name = f'combined_data_{country_code}.pkl'

In [77]:
def save_data(final_data: DataFrame, name: str):
    current_filepath = Path().resolve()
    output_filepath = Path.joinpath(current_filepath, name)
    final_data.to_pickle(output_filepath)


In [78]:
def read_data(name: str):
    current_filepath = Path().resolve()
    output_filepath = Path.joinpath(current_filepath, name)
    final_data = pd.read_pickle(output_filepath)
    final_data.sort_index(inplace=True)
    return final_data

In [79]:
#final_data['price_gas_eur_per_kwh'] = final_data['price_gas_eur_per_kwh'] * 1000
#final_data['price_cole_eur_per_kwh'] = final_data['price_cole_eur_per_kwh'] * 1000
#final_data.rename(columns={'price_gas_eur_per_kwh': 'price_gas_eur_per_mwh', 'price_cole_eur_per_kwh':'price_cole_eur_per_mwh'}, inplace=True)
#final_data = final_data.loc[:'2024-06-01']
#final_data.dropna(axis=1, how='all', inplace=True)
#final_data.fillna(0, inplace=True)

In [80]:
#save_data(final_data, current_name)

In [None]:
final_data = read_data(current_name)
final_data_2 = read_data(f'combined_data_{country_code_2}_mit_stromkapazitaet.pkl')
#final_data[final_data <= 0] = 0.000001
final_data

In [82]:
def choose_scenario(complete_data: DataFrame, scenario : int = 1):
    complete_data.sort_index(inplace=True)
    training : DataFrame
    testing: DataFrame
    prediction_length : int
    match scenario:
        case 1:
            training = complete_data.loc[:'2018-12-31']
            testing = complete_data.loc['2019-01-01':'2019-12-31']
            prediction_length = len(testing)
        case 2:
            training = complete_data.loc[:'2020-12-31']
            testing = complete_data.loc['2021-01-01':'2021-12-31']
            prediction_length = len(testing)
        case 3:
            training, testing = train_test_split(complete_data, test_size=0.2, shuffle=False)
            testing = testing.head(1024)
            prediction_length = len(testing)
        case 4:
            training = complete_data.loc[:'2022-12-31']
            testing = complete_data.loc['2023-01-01':'2023-12-31']
            prediction_length = len(testing)
    return training, testing, prediction_length

In [None]:

train_set, test_set, prediction_length = choose_scenario(final_data, scenario_number)
#_, test_set, _ = choose_scenario(final_data_2, scenario_number)
scenario_string :str = f'scenario_{scenario_number}_{country_code}_{country_code_2}'
test_set.shape

In [84]:
# mussten gelöscht werden, da sie auch im test set bei frankreich nicht enthalten sind
#train_set.drop(columns=['fossil_brown_coal/lignite', 'other_renewable', 'fossil_coal-derived_gas'], 
#               inplace=True)

In [None]:
train_set.columns

# Functions

In [86]:
def data_preparation_statsforecast(training: DataFrame, testing: DataFrame):
    '''
    :param training: Training Dataset
    :param testing: Testing Dataset
    :return: Preped Datasets for Training and testing in regard to SARIMAX models
    '''
    statsforecast_train : DataFrame = training.__deepcopy__()
    statsforecast_test : DataFrame = testing.__deepcopy__()

    statsforecast_train.reset_index(inplace=True)
    statsforecast_train.rename(columns={'index':'ds', 'price':'y'}, inplace=True)
    statsforecast_train['unique_id'] = '1'

    statsforecast_test.reset_index(inplace=True)
    statsforecast_test.rename(columns={'index':'ds'}, inplace=True)
    statsforecast_test['unique_id'] = '1'
    statsforecast_test.drop(columns={'price'}, inplace=True)


    return statsforecast_train, statsforecast_test

In [87]:
def auto_fitting_the_data_mstl_sarimax(training: DataFrame, freq: str, season_length: int = None, multi_season: bool = False, multi_season_length: list[int] = None):
    '''
    This function also has the Option to train with MSTL models, those then use as a trendforecaster Autoarima
    :param training: training Data
    :param freq: freq of the Data (in our case h for hourly)
    :param season_length: length of the season freq * length (in our case 24 for daily seasonality)
    :param multi_season: is the Data multiseasonal (boolean)
    :param multi_season_length: list of season in our case [24, 24*7, 24*14]
    :return: returns a Statsforecast object
    '''
    models : list[any]
    if multi_season:
        models = [MSTL(season_length=multi_season_length, trend_forecaster=AutoARIMA(trace=True))]
    else:
        # for faster forecasting use approximation=True
        models = [AutoARIMA(season_length=season_length, trace=True, seasonal=True, stepwise=True)]
    sf = StatsForecast(models=models, freq=freq, n_jobs=-1)
    sf.fit(df=training)
    return sf

In [88]:
def fitting_the_data_sarimax(training: DataFrame, freq: str, order: tuple[int, int, int], seasonal_order : tuple[int, int, int], season_length: int):
    models = [ARIMA(order=order, seasonal_order=seasonal_order, season_length=season_length)]
    sf = StatsForecast(models=models, freq=freq, n_jobs=-1)
    sf.fit(df=training)
    return sf


In [89]:
def fitting_the_data_tbats(training: DataFrame, season_length: list[int], freq: str):
    models = [AutoTBATS(season_length=season_length)]
    sf = StatsForecast(models=models, freq=freq, n_jobs=-1)
    sf.fit(training)
    return sf

In [90]:
def forecasting(sf: StatsForecast, test: DataFrame, h: int, level: list[int] = None):
    '''
    :param sf: Statsforecast model for forecasting (already trained)
    :param test: test set (we need to drop the price from this
    :param h: how many time steps we want to predict
    :param level: accuracy of the datapoint
    :return: returns a pandas Dataframe with the predicted values
    '''
    test = test.head(h)
    result = sf.predict(X_df=test, h=h, level=level)
    return result

In [91]:
fig = make_subplots(rows=4, cols=1,
                    shared_xaxes=False,
                    shared_yaxes=True,
                    subplot_titles=('1','2','3','4', '5', '6'), x_title='Datum', y_title='Preis [€/MWh]')

colors = {
    'train': 'cornflowerblue',
    'test': 'forestgreen',
    'predict': 'rgba(136, 2, 212, 0.5)'
}

In [92]:
def plot_single_dataframe(complete_data: DataFrame, name: str, saving: bool = False):
    fig = px.line()
    fig.add_scatter(x=complete_data.index, y=complete_data.price, mode='lines', name='Preis', line=dict(color='cornflowerblue', width=2))
    fig.update_layout(
        xaxis_title='Datum',
        yaxis_title='Preis [€/MWh]',
        )
    if saving:
        pio.get_chrome()
        fig.write_image(f'pictures/{name}.pdf')
    fig.show()

In [93]:
fig.update_layout(showlegend=True, height=1000, width=1000)


def plotting(model_name: str, train: DataFrame, annotation: str = None, test: DataFrame = None, predict: DataFrame = None, row: int = 1, col: int = 1, fig: Figure = fig):
    showlegend_value = True
    if row != 1:
        showlegend_value = False
    fig.add_trace(go.Scatter(x=train.index, y=train.price, mode='lines', line=dict(color=colors['train']),name='Train Data', showlegend=showlegend_value), row=row, col=col)
    if test is not None:
        fig.add_trace(go.Scatter(x=test.index, y=test.price, mode='lines', line=dict(color=colors['test']), name='Test Data',showlegend=showlegend_value),row=row, col=col)
    if predict is not None:
        fig.add_trace(go.Scatter(x=predict.index, y=predict.price, mode='lines', line=dict(color=colors['predict']), name='Prediction Data',showlegend=showlegend_value),row=row, col=col)
    
    fig.layout.annotations[row-1].update(text=f'{model_name} : {annotation}',font=dict(size=14), xanchor='left', x=0)

    #fig.show()


def plotting_save(fig:Figure = fig, filename:str = scenario_string):
    pio.get_chrome()
    fig.write_image(f'pictures/{filename}.pdf')

In [94]:
def cleanup_prediction_statsforecast(prediction: DataFrame):
    prediction.set_index(prediction['ds'], inplace=True)
    prediction.rename(columns={'MSTL':'price'}, inplace=True)
    prediction.rename(columns={'AutoARIMA':'price'}, inplace=True)
    prediction.rename(columns={'ARIMA':'price'}, inplace=True)
    prediction.rename(columns={'AutoTBATS': 'price'}, inplace=True)
    prediction.rename(columns={'XGBRegressor': 'price'}, inplace=True)

In [95]:
def verification_mae_rmse(test: DataFrame, prediction: DataFrame):
    mae = abs((test.head(len(prediction))['price']-prediction['price'])).mean()
    rmse = sqrt(abs((test.head(len(prediction))['price']-prediction['price'])**2).mean())
    result_string = f'MAE: {round(mae,3)} || RMSE: {round(rmse,3)}'
    return result_string

In [None]:
plot_single_dataframe(final_data, f'final_data_{country_code}', True)

# SARIMAX
Klassisches statistisches Modell

## Grafiken

In [None]:
#plotting("Strompreis Train Set", train_set)

In [None]:
#plotting('Strompreis Test Set', test_set)

In [None]:
# kopiert von geeksforgekks 'https://www.geeksforgeeks.org/machine-learning/sarima-seasonal-autoregressive-integrated-moving-average/'
#def check_stationarity(timeseries):
#    result = adfuller(timeseries, autolag='AIC')
#    p_value = result[1]
#    print(f'ADF Statistic: {result[0]}')
#    print(f'p-value: {p_value}')
#    print('Stationary' if p_value < 0.05 else 'Non-Stationary')


#check_stationarity(train_set['price'])

In [None]:
#decomposition = seasonal_decompose(train_set.price, model='additive')
#decomposition.plot()
#time_delta = 50
#plt.xlim(pd.Timestamp(train_set.index.max() - datetime.timedelta(time_delta) ), pd.Timestamp(test_set.index.min() + datetime.timedelta(time_delta)))
#plt.show()


In [None]:
final_data

Die PMD AutoArima Methode kann nicht verwendet werden, da sie den RAM nicht wieder freigibt.

## Training

In [None]:
##%%memit
start_vram = get_current_gpu()
#setup
sarima_train, sarima_test = data_preparation_statsforecast(train_set, test_set)
#training
sf_sarima = auto_fitting_the_data_mstl_sarimax(training=sarima_train, freq='h', season_length=24) # ca 12 min zum laden
#forecasting
prediction_sarima = forecasting(sf=sf_sarima, test=sarima_test, h=prediction_length, level=[90])
cleanup_prediction_statsforecast(prediction_sarima)
print(f'VRAM Start : {start_vram} End: {get_current_gpu()} ')

In [None]:
arima_string(sf_sarima.fitted_[0, 0].model_)

In [None]:

#setup
#sarima_train, sarima_test = data_preparation_statsforecast(train_set, test_set)
#training order=[5, 0, 3]
#sf_sarima = fitting_the_data_sarimax(training=sarima_train, freq='h', order=[3, 0, 1], seasonal_order=[2, 1, 0], season_length=24) # ca 12 min zum laden
#forecasting
#prediction_sarima = forecasting(sf=sf_sarima, test=sarima_test, h=1024, level=[90])
#cleanup_prediction_statsforecast(prediction_sarima)


In [None]:
##%%memit
start_vram = get_current_gpu()
#setup
sarima_train, sarima_test = data_preparation_statsforecast(train_set, test_set)
#training
sf_mstl = auto_fitting_the_data_mstl_sarimax(training=sarima_train, freq='h', multi_season=True, multi_season_length=[24, 24*7, 24*14]) # ca 20 min zum laden mae 45
#forecasting
prediction_mstl = forecasting(sf=sf_mstl, test=sarima_test, h=prediction_length, level=[90])
cleanup_prediction_statsforecast(prediction_mstl)
print(f'VRAM Start : {start_vram} End: {get_current_gpu()} ')


In [None]:
#sf_mstl.fitted_[0, 0].model_.tail(24*14).plot(subplots=True, grid=True)
#plt.tight_layout()
#plt.show()


## Plotting

In [None]:
plotting(train=train_set, annotation=verification_mae_rmse(test_set,prediction_sarima),
         test=test_set, predict=prediction_sarima, model_name=f'{arima_string(sf_sarima.fitted_[0, 0].model_)}',row=1)

In [None]:

plotting(train=train_set, annotation=verification_mae_rmse(test_set,prediction_mstl), test=test_set,
         predict=prediction_mstl,model_name= "MSTL 24, 24*7, 24*14 mit Regression", row=2)


# TBATS
ML-Modell

## Training

Wir können an dieser Stelle die gleichen vorbereitung wir bei SARIMAX verwenden, da beide Modelle aus der gleichen Bibliothek sind, und die benötigten Dataframes gleich aufgebaut sind.

In [None]:
##%%memit
start_vram = get_current_gpu()
#setup
training_set_tbats, testing_set_tbats = data_preparation_statsforecast(train_set, test_set)
#training
trained_model_tbats = fitting_the_data_tbats(training_set_tbats, [24, 24 * 7, 24*14], 'h')
#forecasting
prediction_tbats = forecasting(sf=trained_model_tbats, test=testing_set_tbats, h=prediction_length)
cleanup_prediction_statsforecast(prediction_tbats)
print(f'VRAM Start : {start_vram} End: {get_current_gpu()} ')

## Plotting

In [98]:
plotting(train=train_set, annotation=verification_mae_rmse(test_set, prediction_tbats), test=test_set,
         predict=prediction_tbats, model_name='AutoTBATS', row=1)

# XGBOOST
ML-Modell

## Training

In [99]:
final_data = final_data.convert_dtypes()

In [None]:
final_data.dtypes

In [None]:
##%%memit
start_vram = get_current_gpu()
# Setup
xgboost_train, xgboost_test = data_preparation_statsforecast(train_set, test_set)
mlf = MLForecast(models=[XGBRegressor()], freq='h', lags=[24, 24*7])
# Training
mlf.fit(xgboost_train, static_features=[])
#Forecasting
prediction_xgboost = forecasting(sf=mlf, test=xgboost_test, h=prediction_length)
cleanup_prediction_statsforecast(prediction_xgboost)
print(f'VRAM Start : {start_vram} End: {get_current_gpu()} ')

## Plotting

In [102]:
plotting(train=train_set, annotation=verification_mae_rmse(test_set, prediction_xgboost), 
         test=test_set, predict=prediction_xgboost, model_name='XGBRegressor XGBoost', row=2)


# CHRONOS 2
Basis ist ein LLM

## Functions

In [103]:
def data_preparation_chronos(training: DataFrame, testing: DataFrame, id: str, use_finetunedmodel: bool = False, finetunedmodel_name: str = None):

    chronos_train_df: DataFrame = training.__deepcopy__()
    chronos_test_df: DataFrame = testing.__deepcopy__()

    #os.environ["CUDA_VISIBLE_DEVICES"] = "0"
    if use_finetunedmodel:
        pipeline_data_prep : Chronos2Pipeline = BaseChronosPipeline.from_pretrained(f'chronos-2-finetuned/{finetunedmodel_name}/finetuned-ckpt', device_map="cuda")
    else:
        pipeline_data_prep : Chronos2Pipeline = BaseChronosPipeline.from_pretrained("amazon/chronos-2", device_map="cuda")

    chronos_train_df['id'] = id
    chronos_train_df.reset_index(inplace=True)
    chronos_test_df['id'] = id
    chronos_test_df.reset_index(inplace=True)

    chronos_test_df.drop('price', axis='columns', inplace=True)
    return pipeline_data_prep, chronos_train_df, chronos_test_df

In [104]:
def forecasting_chronos(pipeline: Chronos2Pipeline, training: DataFrame, future: DataFrame, prediction_length: int, target: str):
    future = future.head(prediction_length)
    energy_price = pipeline.predict_df(
    training,
    future_df=future,
    prediction_length=prediction_length,
    quantile_levels=[0.1, 0.5, 0.9],
    id_column='id',
    timestamp_column='index',
    target=target,
    )
    return energy_price


In [105]:
def split_test_into_chunks(test : DataFrame, chunk_size: int):
    chunks = [test.iloc[i:i+chunk_size].copy() for i in range(0, len(test), chunk_size)]
    return chunks

In [106]:
def for_loop_chronos(training: DataFrame, testing: DataFrame, finetuned_pipeline: Chronos2Pipeline, chunks_size: int = 1024):
    test_array = split_test_into_chunks(testing, chunks_size)
    complete_prediction = pd.DataFrame()
    tmp_train = training.__deepcopy__()
    for x in test_array:
        chronos_forecast = forecasting_chronos(finetuned_pipeline, tmp_train, x, len(x), 'price')
        chronos_forecast.set_index('index', inplace=True)
        chronos_forecast.rename(columns={'predictions':'price'}, inplace=True)
        complete_prediction = pd.concat([complete_prediction,chronos_forecast])
        x['price'] = chronos_forecast['price']
        tmp_train = pd.concat([tmp_train, x])
    return complete_prediction

## Finetuning

In [None]:
gc.collect()

In [None]:
known_covariates_values = list(train_set.columns.values)
known_covariates_values.remove('price')
known_covariates_values


In [None]:
##%%memit
start_vram = get_current_gpu()

pipeline, chronos_train, chronos_test = data_preparation_chronos(train_set, test_set, 'DE')

train_inputs = []
for item_id, group in chronos_train.groupby("id"):
    train_inputs.append({
        "target": group['price'].values,
        "past_covariates": {col: group[col].values for col in known_covariates_values},
        "future_covariates": {col: None for col in known_covariates_values},
    })

finetuned_pipeline = pipeline.fit(
    finetune_mode='full',
    inputs=train_inputs,
    prediction_length=1024, # 2 Wochen
    num_steps=750,
    learning_rate=1e-5,
    batch_size=32,
    logging_steps=100,
)

energy_price_predict_chronos_df = forecasting_chronos(finetuned_pipeline, chronos_train, chronos_test, prediction_length, 'price')
energy_price_predict_chronos_df.set_index('index', inplace=True)
energy_price_predict_chronos_df.rename(columns={'predictions':'price'}, inplace=True)
print(f'VRAM Start : {start_vram} End: {get_current_gpu()} ')


## Training

In [None]:
##%%memit
start_vram = get_current_gpu()
pipeline_untrained, chronos_train, chronos_test = data_preparation_chronos(train_set, test_set, 'DE')
energy_price_predict_chronos_untrained_df = forecasting_chronos(pipeline_untrained, chronos_train, chronos_test, prediction_length, 'price')
energy_price_predict_chronos_untrained_df.set_index('index', inplace=True)
energy_price_predict_chronos_untrained_df.rename(columns={'predictions':'price'}, inplace=True)
print(f'VRAM Start : {start_vram} End: {get_current_gpu()} ')

## Training loop

In [111]:
energy_price_predict_chronos_df_for_loop = for_loop_chronos(chronos_train, chronos_test, finetuned_pipeline, 1024)

In [112]:
energy_price_predict_chronos_untrained_df_for_loop = for_loop_chronos(chronos_train, chronos_test, pipeline_untrained, 1024)

## Plotting

In [113]:
plotting(model_name='Chronos 2', annotation=verification_mae_rmse(test_set, energy_price_predict_chronos_df),
         train=train_set, test=test_set, predict=energy_price_predict_chronos_df, row=3)

In [114]:
#plotting(model_name='Chronos 2 for-loop', annotation=verification_mae_rmse(test_set, energy_price_predict_chronos_df_for_loop),train=train_set, test=test_set, predict=energy_price_predict_chronos_df_for_loop)

In [115]:

plotting(model_name='Chronos 2 untrained', annotation=verification_mae_rmse(test_set, energy_price_predict_chronos_untrained_df),
         train=train_set, test=test_set, predict=energy_price_predict_chronos_untrained_df, row=4)

In [116]:
#plotting(model_name='Chronos 2 untrained for-loop', annotation=verification_mae_rmse(test_set, energy_price_predict_chronos_untrained_df_for_loop),train=train_set, test=test_set, predict=energy_price_predict_chronos_untrained_df_for_loop)

In [117]:
plotting_save()