# Predict the power production using sunshine duration

## Init, Load

In [None]:
import os
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns

from src.config import DATA_RAW_DIR, POWER_DWD_WEATHER_FILENAME

In [None]:
df_raw = pd.read_csv(
    os.path.join(DATA_RAW_DIR, POWER_DWD_WEATHER_FILENAME),
    sep=";",
    usecols=[
        "installation",
        "timestamp",
        "sol_prod",
        "sunshine_duration",
        "radiation_global",
    ],
)

In [None]:
df_raw["date"] = pd.to_datetime(df_raw["timestamp"]).dt.tz_convert(None).dt.normalize()

df_doy = (
    df_raw.groupby(["installation", "date"])
    .agg({"sunshine_duration": "sum", "radiation_global": "sum", "sol_prod": "sum"})
    .reset_index()
)

df_doy = df_doy.sort_values(["date"])

df_doy = df_doy.drop(["installation"], axis=1)

## Split

In [None]:
y = df_doy["sol_prod"]
X = df_doy.reset_index().drop(["sol_prod"], axis=1)

# last year as holdout
split_point = len(X) - 365
X_train, X_test = X.loc[:split_point], X.loc[split_point + 1 :]
y_train, y_test = y.loc[:split_point], y.loc[split_point + 1 :]

## Transformers

In [None]:
from sklearn.preprocessing import FunctionTransformer


def day_of_year_trig_transformer(trig_function):
    """
    Create a FunctionTransformer that applies a trigonometric transformation (sin or cos)
    to the day of year extracted from a datetime column or DataFrame.

    Parameters
    ----------
    trig_function : str
        The trigonometric function to use, either "sin" or "cos".

    Returns
    -------
    FunctionTransformer
        A scikit-learn FunctionTransformer that transforms datetime features into their
        corresponding trigonometric representation of the day of year.

    Example
    -------
    >>> transformer = day_of_year_trig_transformer("sin")
    >>> transformer.transform(pd.DataFrame({"date": pd.to_datetime(["2022-01-01", "2022-06-30"])}))
    array([[ 0.01721421],
           [ 0.9998477 ]])
    """

    def _feature_names_out(self, input_features=None):
        if input_features is None:
            return np.array([f"doy_{trig_function}"])
        return np.array([f"{c}_doy_{trig_function}" for c in input_features])

    def _extract_dayofyear(x):
        if isinstance(x, pd.Series) and np.issubdtype(x.dtype, np.datetime64):
            return x.dt.dayofyear.to_frame()
        elif isinstance(x, pd.DataFrame):
            x = x.copy()
            for col in x.columns:
                x[col] = x[col].dt.dayofyear
            return x
        else:
            raise ValueError(
                "Input must be a pandas Series with datetime64 dtype"
                " or a DataFrame with datetime64 columns."
            )

    def _extract_days_in_year(x):
        if isinstance(x, pd.Series) and np.issubdtype(x.dtype, np.datetime64):
            return x.dt.is_leap_year.map({True: 366, False: 365}).to_frame()
        elif isinstance(x, pd.DataFrame):
            x = x.copy()
            for col in x.columns:
                x[col] = x[col].dt.is_leap_year.map({True: 366, False: 365})
            return x
        else:
            raise ValueError(
                "Input must be a pandas Series with datetime64 dtype"
                " or a DataFrame with datetime64 columns."
            )

    def _trig_transformer(x):
        # Use a defined function to make the transformer pickable
        if trig_function == "sin":
            return np.sin(_extract_dayofyear(x) / _extract_days_in_year(x) * 2 * np.pi)
        elif trig_function == "cos":
            return np.cos(_extract_dayofyear(x) / _extract_days_in_year(x) * 2 * np.pi)
        else:
            raise ValueError("trig_function must be 'sin' or 'cos'")

    return FunctionTransformer(_trig_transformer, feature_names_out=_feature_names_out)

## Training

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_error

### Pipeline

In [None]:
num_cols = ["sunshine_duration"]

num_pipe = Pipeline(
    steps=[
        ("scale", MinMaxScaler()),
    ]
)

doy_sin_pipe = Pipeline(
    steps=[
        ("trig", day_of_year_trig_transformer("sin")),
    ]
)
doy_cos_pipe = Pipeline(
    steps=[
        ("trig", day_of_year_trig_transformer("cos")),
    ]
)

preprocessor = ColumnTransformer(
    transformers=[
        ("num", num_pipe, num_cols),
        ("sin", doy_sin_pipe, ["date"]),
        ("cos", doy_cos_pipe, ["date"]),
    ],
    remainder="drop",
)
preprocessor.set_output(transform="pandas")

### Fit

In [None]:
reg = Ridge(alpha=1.0)
model = Pipeline(steps=[("prep", preprocessor), ("reg", reg)])
model.fit(X_train, y_train)

## Predict

In [None]:
y_pred = model.predict(X_test)

## Evaluation

In [None]:
from src.model_evaluation.regressor_evaluation import evaluate_regressor
from datetime import datetime

results = evaluate_regressor(
    regressor=reg,
    y_true=y_test,
    y_pred=y_pred,
    timestamp=datetime.now(),
    model_purpose="predict",
    special_features="sunshine,feateng-doytrig",
)

print("Evaluation Results:")
for key in [
    k
    for k in ["MAE", "MSE", "RMSE", "MAPE", "MedAE", "R2", "ExplainedVar"]
    if k in results
]:
    print(f"  {key}: {results.get(key):.4f}")

## Save Model And Results

In [None]:
import pickle
import json
import os

from src.config import MODELS_DIR

model_name = results["model_name"]

folder = os.path.join(MODELS_DIR, model_name)
filename = os.path.join(folder, model_name)
os.makedirs(folder, exist_ok=True)

# because of issues with pickling custom transformers,
# saving only config and results

# with open(f"{filename}.model.pkl", "wb") as f:
#     pickle.dump(reg, f)

# with open(f"{filename}.pipeline.pkl", "wb") as f:
#     pickle.dump(preprocessor, f)

with open(f"{filename}.model.txt", "w") as file:
    file.write(str(reg))

with open(f"{filename}.model_params.json", "w") as f:
    json.dump(reg.get_params(), f, indent=2)

with open(f"{filename}.pipeline_params.txt", "w") as f:
    f.write(preprocessor.get_params().__str__())

with open(f"{filename}.results.json", "w") as f:
    json.dump(results, f, indent=2)
