**Holiday features from** [Enefit Estonian Holidays LB=65.79](https://www.kaggle.com/code/albansteff/enefit-estonian-holidays-lb-65-79)🙏


### What's new?
The main idea of this notebook is to predict the difference between the last available `target_48h` and the `target` inplace of directly predictiting the `target`.

In [None]:
import warnings

warnings.filterwarnings("ignore")

import time
import os
import gc
import pickle
import datetime

import numpy as np
import pandas as pd
import polars as pl
import plotly.express as px

from sklearn.ensemble import VotingRegressor
from sklearn.metrics import mean_absolute_error
import lightgbm as lgb

import matplotlib.pyplot as plt
import holidays

In [None]:
# Время старта работы ноутбука
notebook_starttime = time.time()

In [None]:
# Возвращает сколько уже работает ноутбук
def p_time():
    #
    run_time = round(time.time() - notebook_starttime)
    return str(run_time).zfill(5)+' sec:'

In [None]:
# Ставим is_local в True, если локально работаем, если сабмитим - ставим в False
#is_local = False
is_local = True

# Ставим is_gpu в True, если будем работать на GPU, если на процессоре - ставим в False
# is_gpu = False
is_gpu = True

# Ставим is_tuning в True, если запускаем подбор гиперпараметров в Optuna
is_tuning = False

# Начальная дата обучения модели
training_start_date = 'datetime >= "2022-01-01 00:00:00"'

# Устанавливаем время начало переобучения. Начинаем переобучать модели заново,
# когда подходит дата предсказаний, которая идет в скор.
scor_start_time = pd.to_datetime('2023-06-01')

# Для локальных вычислений. Последний data_block_id тренировочной выборки
# А начиная со следующего data_block_id и до конца идет тест
train_end_data_block_id = 500
# train_end_data_block_id = 600

# Ставим в False, если не хотитим отключать контроль времени выполнения ноутбука
# (не обучать модели заново после 8 часов 30 минут)
# Если хотим, чтобы модели обучались заново и после лимита, ставим True
is_disable_run_time_limit = True
# is_disable_run_time_limit = False

# Раз во сколько циклов сабмита (дней) учить модели заново с учетом новый данных, добавленных к старым
# Скажем если учить заново каждые 30 циклов (после каждых 30 предсказанных дней) то ставим 30
learn_again_period = 2

## Подготовка данных

### Запись тестовых и тренировочных csv файлов

In [None]:
if is_local:
    # Если выполняем локально
    train_path = 'train'
    if not os.path.exists(train_path):
        # Создание каталога, если его нет
        os.makedirs(train_path)
    test_path = 'example_test_files'
    if not os.path.exists(test_path):
        # Создание каталога, если его нет
        os.makedirs(test_path)
else:
    # Если сабмит
    train_path = root
# Путь, куда запишем csv файлы для теста

In [None]:
# Разделяет датафрейм на тренировочную и тестовую часть
# Возвращает часть датафрейма для тренировки, тестовую часть датафрейма записывает в каталог с тестами
def split_train_test(filename):
    df = pd.read_csv(os.path.join(root, filename))
    
    #Запишем часть данных для теста
    test_df = df[df["data_block_id"] > train_end_data_block_id]
    if (filename =="train.csv"):
        # Берем только те ячейки где target был не нулевым
        test_df = test_df[test_df["target"].notnull()]
        
    test_df.to_csv(os.path.join(test_path, filename), index=False)

    #Запишем часть данных для трейна
    train_df = df[df["data_block_id"] <= train_end_data_block_id]
    train_df.to_csv(os.path.join(train_path, filename), index=False)

# Доводим до ума тестовые таблицы чтобы они были точно такие как в реальном сабмите
def test_dfs_tune():
    # Делаем таблицу revealed_targets.csv
    df = pd.read_csv(os.path.join(root, "train.csv"))
    df = df[df["data_block_id"] > train_end_data_block_id - 2]
    df["data_block_id"] += 2
    df = df[df["target"].notnull()]
    df.to_csv(os.path.join(test_path, 'revealed_targets.csv'), index=False)
    
    # Делаем таблицу test.csv
    df = pd.read_csv(os.path.join(test_path, "train.csv"))
    df.rename(columns={'datetime': 'prediction_datetime'}, inplace=True)
    df.drop('target', axis=1, inplace=True)
    df['currently_scored'] = False
    df.to_csv(os.path.join(test_path, 'test.csv'), index=False)
    
    # Делаем таблицу sample_submission.csv
    selected_columns = ['row_id', 'data_block_id']
    df = df[selected_columns]
    df['target'] = 0
    df.to_csv(os.path.join(test_path, 'sample_submission.csv'), index=False)

# Сборка разделения файлов
def make_split():
    # csv файлы которые будем делить:
    csv_names = ["train.csv", "client.csv", "gas_prices.csv", "electricity_prices.csv", "forecast_weather.csv", "historical_weather.csv"]
    for csv_name in csv_names:
        split_train_test(csv_name)
    # Доделываем тестовые таблицы
    test_dfs_tune()

In [None]:
%%time

# Создаем файлы csv c тренировочными и тестовыми таблицами
if is_local:
    # Пока отключил создание тестовых файлом. У меня локально они есть
    # make_split()
    pass

# Classes

### DataStorage

In [None]:
root = "/kaggle/input/predict-energy-behavior-of-prosumers"
if is_local:
    # Если выполняем локально
    train_path = 'train'
    if not os.path.exists(train_path):
        # Создание каталога, если его нет
        os.makedirs(train_path)
    test_path = 'example_test_files'
    if not os.path.exists(test_path):
        # Создание каталога, если его нет
        os.makedirs(test_path)
else:
    # Если сабмит
    train_path = root

In [None]:
class DataStorage:

    data_cols = [
        "target",
        "county",
        "is_business",
        "product_type",
        "is_consumption",
        "datetime",
        "row_id",
    ]
    
    df_data_cols     = ['county', 'is_business', 'product_type', 'target', 'is_consumption', 'datetime', 'row_id']
    
    client_cols = [
        "product_type",
        "county",
        "eic_count",
        "installed_capacity",
        "is_business",
        "date",
    ]
    gas_prices_cols = ["forecast_date", "lowest_price_per_mwh", "highest_price_per_mwh"]
    electricity_prices_cols = ["forecast_date", "euros_per_mwh"]
    forecast_weather_cols = [
        "latitude",
        "longitude",
        "hours_ahead",
        "temperature",
        "dewpoint",
        "cloudcover_high",
        "cloudcover_low",
        "cloudcover_mid",
        "cloudcover_total",
        "10_metre_u_wind_component",
        "10_metre_v_wind_component",
        "forecast_datetime",
        "direct_solar_radiation",
        "surface_solar_radiation_downwards",
        "snowfall",
        "total_precipitation",
    ]
    historical_weather_cols = [
        "datetime",
        "temperature",
        "dewpoint",
        "rain",
        "snowfall",
        "surface_pressure",
        "cloudcover_total",
        "cloudcover_low",
        "cloudcover_mid",
        "cloudcover_high",
        "windspeed_10m",
        "winddirection_10m",
        "shortwave_radiation",
        "direct_solar_radiation",
        "diffuse_radiation",
        "latitude",
        "longitude",
    ]
    location_cols = ["longitude", "latitude", "county"]
    target_cols = [
        "target",
        "county",
        "is_business",
        "product_type",
        "is_consumption",
        "datetime",
    ]

    def __init__(self):
        self.df_data = pl.read_csv(
            os.path.join(train_path, "train.csv"),
            columns=self.data_cols,
            try_parse_dates=True,
        )
        self.df_client = pl.read_csv(
            os.path.join(train_path, "client.csv"),
            columns=self.client_cols,
            try_parse_dates=True,
        )
        self.df_gas_prices = pl.read_csv(
            os.path.join(train_path, "gas_prices.csv"),
            columns=self.gas_prices_cols,
            try_parse_dates=True,
        )
        self.df_electricity_prices = pl.read_csv(
            os.path.join(train_path, "electricity_prices.csv"),
            columns=self.electricity_prices_cols,
            try_parse_dates=True,
        )
        self.df_forecast_weather = pl.read_csv(
            os.path.join(train_path, "forecast_weather.csv"),
            columns=self.forecast_weather_cols,
            try_parse_dates=True,
        )
        self.df_historical_weather = pl.read_csv(
            os.path.join(train_path, "historical_weather.csv"),
            columns=self.historical_weather_cols,
            try_parse_dates=True,
        )
        self.df_weather_station_to_county_mapping = pl.read_csv(
            os.path.join(train_path, "weather_station_to_county_mapping.csv"),
            columns=self.location_cols,
            try_parse_dates=True,
        )
        self.df_data = self.df_data.filter(
            pl.col("datetime") >= pd.to_datetime("2022-01-01")
        )
        self.df_target = self.df_data.select(self.target_cols)

        self.schema_data = self.df_data.schema
        self.schema_client = self.df_client.schema
        self.schema_gas_prices = self.df_gas_prices.schema
        self.schema_electricity_prices = self.df_electricity_prices.schema
        self.schema_forecast_weather = self.df_forecast_weather.schema
        self.schema_historical_weather = self.df_historical_weather.schema
        self.schema_target = self.df_target.schema

        self.df_weather_station_to_county_mapping = (
            self.df_weather_station_to_county_mapping.with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
        )

    def update_with_new_data(
        self,
        df_new_client,
        df_new_gas_prices,
        df_new_electricity_prices,
        df_new_forecast_weather,
        df_new_historical_weather,
        df_new_target,
    ):
        df_new_client = pl.from_pandas(
            df_new_client[self.client_cols], schema_overrides=self.schema_client
        )
        df_new_gas_prices = pl.from_pandas(
            df_new_gas_prices[self.gas_prices_cols],
            schema_overrides=self.schema_gas_prices,
        )
        df_new_electricity_prices = pl.from_pandas(
            df_new_electricity_prices[self.electricity_prices_cols],
            schema_overrides=self.schema_electricity_prices,
        )
        df_new_forecast_weather = pl.from_pandas(
            df_new_forecast_weather[self.forecast_weather_cols],
            schema_overrides=self.schema_forecast_weather,
        )
        df_new_historical_weather = pl.from_pandas(
            df_new_historical_weather[self.historical_weather_cols],
            schema_overrides=self.schema_historical_weather,
        )

        df_new_data = pl.from_pandas(df_new_target[self.df_data_cols], schema_overrides=self.schema_data)

        df_new_target = pl.from_pandas(
            df_new_target[self.target_cols], schema_overrides=self.schema_target
        )

        self.df_client = pl.concat([self.df_client, df_new_client]).unique(
            ["date", "county", "is_business", "product_type"]
        )
        self.df_gas_prices = pl.concat([self.df_gas_prices, df_new_gas_prices]).unique(
            ["forecast_date"]
        )
        self.df_electricity_prices = pl.concat(
            [self.df_electricity_prices, df_new_electricity_prices]
        ).unique(["forecast_date"])
        self.df_forecast_weather = pl.concat(
            [self.df_forecast_weather, df_new_forecast_weather]
        ).unique(["forecast_datetime", "latitude", "longitude", "hours_ahead"])
        self.df_historical_weather = pl.concat(
            [self.df_historical_weather, df_new_historical_weather]
        ).unique(["datetime", "latitude", "longitude"])
        self.df_target = pl.concat([self.df_target, df_new_target]).unique(
            ["datetime", "county", "is_business", "product_type", "is_consumption"]
        )
        self.df_data = pl.concat([self.df_data, df_new_data]).unique(
            ["datetime", "county", "is_business", "product_type", "is_consumption"]
        )

    def preprocess_test(self, df_test):
        df_test = df_test.rename(columns={"prediction_datetime": "datetime"})
        df_test = pl.from_pandas(
            df_test[self.data_cols[1:]], schema_overrides=self.schema_data
        )
        return df_test


### FeaturesGenerator

In [None]:
class FeaturesGenerator:
    def __init__(self, data_storage):
        self.data_storage = data_storage
        self.estonian_holidays = list(
            holidays.country_holidays("EE", years=range(2021, 2026)).keys()
        )

    def _add_general_features(self, df_features):
        df_features = (
            df_features.with_columns(
                pl.col("datetime").dt.ordinal_day().alias("dayofyear"),
                pl.col("datetime").dt.hour().alias("hour"),
                pl.col("datetime").dt.day().alias("day"),
                pl.col("datetime").dt.weekday().alias("weekday"),
                pl.col("datetime").dt.month().alias("month"),
                pl.col("datetime").dt.year().alias("year"),
            )
            .with_columns(
                pl.concat_str(
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    separator="_",
                ).alias("segment"),
            )
            .with_columns(
                (np.pi * pl.col("dayofyear") / 183).sin().alias("sin(dayofyear)"),
                (np.pi * pl.col("dayofyear") / 183).cos().alias("cos(dayofyear)"),
                (np.pi * pl.col("hour") / 12).sin().alias("sin(hour)"),
                (np.pi * pl.col("hour") / 12).cos().alias("cos(hour)"),
            )
        )
        return df_features

    def _add_client_features(self, df_features):
        df_client = self.data_storage.df_client

        df_features = df_features.join(
            df_client.with_columns(
                (pl.col("date") + pl.duration(days=2)).cast(pl.Date)
            ),
            on=["county", "is_business", "product_type", "date"],
            how="left",
        )
        return df_features
    
    def is_country_holiday(self, row):
        return (
            datetime.date(row["year"], row["month"], row["day"])
            in self.estonian_holidays
        )

    def _add_holidays_features(self, df_features):
        df_features = df_features.with_columns(
            pl.struct(["year", "month", "day"])
            .apply(self.is_country_holiday)
            .alias("is_country_holiday")
        )
        return df_features

    def _add_forecast_weather_features(self, df_features):
        df_forecast_weather = self.data_storage.df_forecast_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )

        df_forecast_weather = (
            df_forecast_weather.rename({"forecast_datetime": "datetime"})
            .filter((pl.col("hours_ahead") >= 22) & pl.col("hours_ahead") <= 45)
            .drop("hours_ahead")
            .with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )

        df_forecast_weather_date = (
            df_forecast_weather.group_by("datetime").mean().drop("county")
        )

        df_forecast_weather_local = (
            df_forecast_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )

        for hours_lag in [0, 7 * 24]:
            df_features = df_features.join(
                df_forecast_weather_date.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on="datetime",
                how="left",
                suffix=f"_forecast_{hours_lag}h",
            )
            df_features = df_features.join(
                df_forecast_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on=["county", "datetime"],
                how="left",
                suffix=f"_forecast_local_{hours_lag}h",
            )

        return df_features

    def _add_historical_weather_features(self, df_features):
        df_historical_weather = self.data_storage.df_historical_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )

        df_historical_weather = (
            df_historical_weather.with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )

        df_historical_weather_date = (
            df_historical_weather.group_by("datetime").mean().drop("county")
        )

        df_historical_weather_local = (
            df_historical_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )

        for hours_lag in [2 * 24, 7 * 24]:
            df_features = df_features.join(
                df_historical_weather_date.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on="datetime",
                how="left",
                suffix=f"_historical_{hours_lag}h",
            )
            df_features = df_features.join(
                df_historical_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on=["county", "datetime"],
                how="left",
                suffix=f"_historical_local_{hours_lag}h",
            )

        for hours_lag in [1 * 24]:
            df_features = df_features.join(
                df_historical_weather_date.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag),
                    pl.col("datetime").dt.hour().alias("hour"),
                )
                .filter(pl.col("hour") <= 10)
                .drop("hour"),
                on="datetime",
                how="left",
                suffix=f"_historical_{hours_lag}h",
            )

        return df_features

    def _add_target_features(self, df_features):
        df_target = self.data_storage.df_target

        df_target_all_type_sum = (
            df_target.group_by(["datetime", "county", "is_business", "is_consumption"])
            .sum()
            .drop("product_type")
        )

        df_target_all_county_type_sum = (
            df_target.group_by(["datetime", "is_business", "is_consumption"])
            .sum()
            .drop("product_type", "county")
        )

        for hours_lag in [
            2 * 24,
            3 * 24,
            4 * 24,
            5 * 24,
            6 * 24,
            7 * 24,
            8 * 24,
            9 * 24,
            10 * 24,
            11 * 24,
            12 * 24,
            13 * 24,
            14 * 24,
        ]:
            df_features = df_features.join(
                df_target.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_{hours_lag}h"}),
                on=[
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    "datetime",
                ],
                how="left",
            )

        for hours_lag in [2 * 24, 3 * 24, 7 * 24, 14 * 24]:
            df_features = df_features.join(
                df_target_all_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_type_sum_{hours_lag}h"}),
                on=["county", "is_business", "is_consumption", "datetime"],
                how="left",
            )

            df_features = df_features.join(
                df_target_all_county_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_county_type_sum_{hours_lag}h"}),
                on=["is_business", "is_consumption", "datetime"],
                how="left",
                suffix=f"_all_county_type_sum_{hours_lag}h",
            )

        cols_for_stats = [
            f"target_{hours_lag}h" for hours_lag in [2 * 24, 3 * 24, 4 * 24, 5 * 24]
        ]
        df_features = df_features.with_columns(
            df_features.select(cols_for_stats).mean(axis=1).alias(f"target_mean"),
            df_features.select(cols_for_stats)
            .transpose()
            .std()
            .transpose()
            .to_series()
            .alias(f"target_std"),
        )

        for target_prefix, lag_nominator, lag_denomonator in [
            ("target", 24 * 7, 24 * 14),
            ("target", 24 * 2, 24 * 9),
            ("target", 24 * 3, 24 * 10),
            ("target", 24 * 2, 24 * 3),
            ("target_all_type_sum", 24 * 2, 24 * 3),
            ("target_all_type_sum", 24 * 7, 24 * 14),
            ("target_all_county_type_sum", 24 * 2, 24 * 3),
            ("target_all_county_type_sum", 24 * 7, 24 * 14),
        ]:
            df_features = df_features.with_columns(
                (
                    pl.col(f"{target_prefix}_{lag_nominator}h")
                    / (pl.col(f"{target_prefix}_{lag_denomonator}h") + 1e-3)
                ).alias(f"{target_prefix}_ratio_{lag_nominator}_{lag_denomonator}")
            )

        return df_features

    def _reduce_memory_usage(self, df_features):
        df_features = df_features.with_columns(pl.col(pl.Float64).cast(pl.Float32))
        return df_features

    def _drop_columns(self, df_features):
        df_features = df_features.drop(
            "date", "datetime", "hour", "dayofyear"
        )
        return df_features

    def _to_pandas(self, df_features, y):
        cat_cols = [
            "county",
            "is_business",
            "product_type",
            "is_consumption",
            "segment",
        ]

        if y is not None:
            df_features = pd.concat([df_features.to_pandas(), y.to_pandas()], axis=1)
        else:
            df_features = df_features.to_pandas()

        df_features = df_features.set_index("row_id")
        df_features[cat_cols] = df_features[cat_cols].astype("category")

        return df_features

    def generate_features(self, df_prediction_items):
        if "target" in df_prediction_items.columns:
            df_prediction_items, y = (
                df_prediction_items.drop("target"),
                df_prediction_items.select("target"),
            )
        else:
            y = None

        df_features = df_prediction_items.with_columns(
            pl.col("datetime").cast(pl.Date).alias("date"),
        )

        for add_features in [
            self._add_general_features,
            self._add_client_features,
            self._add_forecast_weather_features,
            self._add_historical_weather_features,
            self._add_target_features,
            self._add_holidays_features,
            self._reduce_memory_usage,
            self._drop_columns,
        ]:
            df_features = add_features(df_features)

        df_features = self._to_pandas(df_features, y)

        return df_features


### Model

In [None]:
class Model:
    def __init__(self):
        self.model_parameters = {'device': 'gpu',
                                 'n_estimators': 1656,
                                 'verbose': -1,
                                 'objective': 'l2', 
                                 'num_leaves': 41,
                                 'learning_rate': 0.044516384496327305,
                                 'colsample_bytree': 0.8375548988212456,
                                 'colsample_bynode': 0.5667409811919698,
                                 'reg_alpha': 3.1759770693392193,
                                 'reg_lambda': 11.018602402122836,
                                 'min_child_samples': 5,
                                 'max_depth': -1,
                                 'max_bin': 58
                                }

        self.model_consumption = VotingRegressor(
            [
                (
                    f"consumption_lgb_{i}",
                    lgb.LGBMRegressor(**self.model_parameters, random_state=i),
                )
                for i in range(42, 48)
                # for i in range(42, 43)
            ]
        )
        self.model_production = VotingRegressor(
            [
                (
                    f"production_lgb_{i}",
                    lgb.LGBMRegressor(**self.model_parameters, random_state=i),
                )
                for i in range(42, 48)
                # for i in range(42, 43)
            ]
        )

    def fit(self, df_train_features):
        mask = df_train_features["is_consumption"] == 1
        self.model_consumption.fit(
            X=df_train_features[mask].drop(columns=["target"]),
            y=df_train_features[mask]["target"]
            - df_train_features[mask]["target_48h"].fillna(0),
        )

        mask = df_train_features["is_consumption"] == 0
        self.model_production.fit(
            X=df_train_features[mask].drop(columns=["target"]),
            y=df_train_features[mask]["target"]
            - df_train_features[mask]["target_48h"].fillna(0),
        )

    def predict(self, df_features):
        predictions = np.zeros(len(df_features))

        mask = df_features["is_consumption"] == 1
        predictions[mask.values] = np.clip(
            df_features[mask]["target_48h"].fillna(0).values
            + self.model_consumption.predict(df_features[mask]),
            0,
            np.inf,
        )

        mask = df_features["is_consumption"] == 0
        predictions[mask.values] = np.clip(
            df_features[mask]["target_48h"].fillna(0).values
            + self.model_production.predict(df_features[mask]),
            0,
            np.inf,
        )

        return predictions


# Initialisation

In [None]:
data_storage = DataStorage()
features_generator = FeaturesGenerator(data_storage=data_storage)

# Feature Generation

In [None]:
'''
%%time
df_train_features = features_generator.generate_features(data_storage.df_data)
df_train_features = df_train_features[df_train_features['target'].notnull()]
'''
pass

In [None]:
# df_train_features

# Train Model

In [None]:
%%time
model = Model()
'''
model.fit(df_train_features)

pickle.dump(model, open('model.pkl', 'wb'))
'''
pass

# Submit API

In [None]:
if is_local:
    # Если выполняем локально, а не сабмитим на кагл,
    # то выбираем другое имя для файла submission.csv.
    # Потому что в submission.csv записать прав нет и вылетает по ошибке
    submission_name = 'submission_loc.csv'
else:
    submission_name = 'submission.csv'

### Содержимое public_timeseries_testing_util.py

С необходимыми праками. Решил не импортировать его. а прямо тут. Так удобнее переносить на kaggle

In [None]:
'''
An unlocked version of the timeseries API intended for testing alternate inputs.
Mirrors the production timeseries API in the crucial respects, but won't be as fast.

ONLY works afer the first three variables in MockAPI.__init__ are populated.
'''

from typing import Sequence, Tuple


class MockApi:
    def __init__(self):
        '''
        YOU MUST UPDATE THE FIRST THREE LINES of this method.
        They've been intentionally left in an invalid state.

        Variables to set:
            input_paths: a list of two or more paths to the csv files to be served
            group_id_column: the column that identifies which groups of rows the API should serve.
                A call to iter_test serves all rows of all dataframes with the current group ID value.
            export_group_id_column: if true, the dataframes iter_test serves will include the group_id_column values.
        '''
        self.input_paths: Sequence[str] = ['example_test_files/test.csv',
                                   'example_test_files/revealed_targets.csv', 
                                   'example_test_files/client.csv',
                                   'example_test_files/historical_weather.csv',
                                   'example_test_files/forecast_weather.csv',
                                   'example_test_files/electricity_prices.csv',
                                   'example_test_files/gas_prices.csv',
                                   'example_test_files/sample_submission.csv']
        self.group_id_column: str = 'data_block_id'
        self.export_group_id_column: bool = False
        # iter_test is only designed to support at least two dataframes, such as test and sample_submission
        assert len(self.input_paths) >= 2

        self._status = 'initialized'
        self.predictions = []

    def iter_test(self) -> Tuple[pd.DataFrame]:
        '''
        Loads all of the dataframes specified in self.input_paths,
        then yields all rows in those dataframes that equal the current self.group_id_column value.
        '''
        if self._status != 'initialized':

            raise Exception('WARNING: the real API can only iterate over `iter_test()` once.')

        dataframes = []
        for pth in self.input_paths:
            dataframes.append(pd.read_csv(pth, low_memory=False))
        group_order = dataframes[0][self.group_id_column].drop_duplicates().tolist()
        dataframes = [df.set_index(self.group_id_column) for df in dataframes]

        for group_id in group_order:
            self._status = 'prediction_needed'
            current_data = []
            for df in dataframes:
                cur_df = df.loc[group_id].copy()
                # returning single line dataframes from df.loc requires special handling
                if not isinstance(cur_df, pd.DataFrame):
                    cur_df = pd.DataFrame({a: b for a, b in zip(cur_df.index.values, cur_df.values)}, index=[group_id])
                    cur_df.index.name = self.group_id_column
                cur_df = cur_df.reset_index(drop=not(self.export_group_id_column))
                current_data.append(cur_df)
            yield tuple(current_data)

            while self._status != 'prediction_received':
                print('You must call `predict()` successfully before you can continue with `iter_test()`', flush=True)
                yield None

        with open(submission_name, 'w') as f_open:
            pd.concat(self.predictions).to_csv(f_open, index=False)
        self._status = 'finished'

    def predict(self, user_predictions: pd.DataFrame):
        '''
        Accepts and stores the user's predictions and unlocks iter_test once that is done
        '''
        if self._status == 'finished':
            raise Exception('You have already made predictions for the full test set.')
        if self._status != 'prediction_needed':
            raise Exception('You must get the next test sample from `iter_test()` first.')
        if not isinstance(user_predictions, pd.DataFrame):
            raise Exception('You must provide a DataFrame.')

        self.predictions.append(user_predictions)
        self._status = 'prediction_received'


def make_env():
    return MockApi()


In [None]:
if is_local:
    # После этого можно имитировать локально загрузку при собмите на большом числе итераций
    # А не только четыре иттерации на 4 дня как в стандартной имитайии на кагле
    env = make_env()
else:
    # загружаем оригинальную библиотеку для сабмита
    import enefit
    env = enefit.make_env()

iter_test = env.iter_test()

### Функция для скора

In [None]:
# Подсчитывает скор.
# Возвращает датафрейм compare со сравнением предсказаний
def calc_score():
    # Загружаем предсказания
    #submission = pd.read_csv(submission_name)
    submission = pd.concat(env.predictions)
    
    # Загружаем истинные значения
    revealed_targets = pd.read_csv(os.path.join(test_path, "revealed_targets.csv"))
    revealed_targets['data_block_id'] -= 2
    revealed_targets = revealed_targets[revealed_targets["data_block_id"] > train_end_data_block_id]
    # Обрезаем реальные предсказания revealed_targets по длине уже сделанных предсказаний submission
    revealed_targets = revealed_targets.iloc[:len(submission)]

    mae = mean_absolute_error(revealed_targets['target'] , submission['target'])
    # print(f'MAE: {mae}')
    
    # Подготовим данные для анализа изменения ошибки предсказания по мере удаления от времени завершения обучения
    compare = revealed_targets[['data_block_id', 'is_consumption', 'target']].copy()
    compare['predict'] = submission['target'].values
    compare['abs_err'] = abs(compare['predict'] - compare['target']).values
    compare['err'] = (compare['predict'] - compare['target']).values

    if (compare['data_block_id'].max() > 600):
        compare_600 = compare[compare['data_block_id'] > 600]
        # Выводим MAE для data_block_id > 600
        
        mae_600 = mean_absolute_error(compare_600['target'], compare_600['predict'])
        
        # Выводим MAE для data_block_id > 600 и is_consumption == 0
        mae_600_cons_0 = mean_absolute_error(compare_600[compare_600['is_consumption']==0]['target'], compare_600[compare_600['is_consumption']==0]['predict'])
        
        # Выводим MAE для data_block_id > 600 и is_consumption == 0
        mae_600_cons_1 = mean_absolute_error(compare_600[compare_600['is_consumption']==1]['target'], compare_600[compare_600['is_consumption']==1]['predict'])
    else:
        # Если еще не дошли до data_block_id > 600 не считаем эти величины
        mae_600, mae_600_cons_0, mae_600_cons_1 = '-', '-', '-'

    mae_df = pd.DataFrame({
        '(ALL)': mae,
        '(> 600)': mae_600,
        '(> 600, is_cons==0)': mae_600_cons_0,
        '(> 600, is_cons==1)': mae_600_cons_1
    }, index=['MAE'])

    # Округляем числа до двух знаков после запятой и преобразуем их в строки
    mae_df = mae_df.round(3).astype(str)
    
    display(mae_df)
    return compare

In [None]:
%%time
count = 0
for (
    df_test, 
    df_new_target, 
    df_new_client, 
    df_new_historical_weather,
    df_new_forecast_weather, 
    df_new_electricity_prices, 
    df_new_gas_prices, 
    df_sample_prediction
) in iter_test:
    iteration_start_time = time.time()
    print(p_time(), '*************** Iteration: ', count, '***************')
    
    if is_local:
        # Если выполняем локально, то преобразуем некоторые типы данных
        # На кагле (а может и в линуксе) они и так преобразуются, но на виновс локально
        # не преобразуются и выдетают по ощибке
        df_test['prediction_datetime'] = pd.to_datetime(df_test['prediction_datetime'])
        df_new_client['date'] = pd.to_datetime(df_new_client['date'])
        df_new_gas_prices['origin_date'] = pd.to_datetime(df_new_gas_prices['origin_date'])
        df_new_gas_prices['forecast_date'] = pd.to_datetime(df_new_gas_prices['forecast_date'])
        df_new_electricity_prices['origin_date'] = pd.to_datetime(df_new_electricity_prices['origin_date'])
        df_new_electricity_prices['forecast_date'] = pd.to_datetime(df_new_electricity_prices['forecast_date'])
        df_new_forecast_weather['origin_datetime'] = pd.to_datetime(df_new_forecast_weather['origin_datetime'])
        df_new_forecast_weather['forecast_datetime'] = pd.to_datetime(df_new_forecast_weather['forecast_datetime'])
        df_new_historical_weather['datetime'] = pd.to_datetime(df_new_historical_weather['datetime'])
        df_new_target['datetime'] = pd.to_datetime(df_new_target['datetime'])
        
    data_storage.update_with_new_data(
        df_new_client=df_new_client,
        df_new_gas_prices=df_new_gas_prices,
        df_new_electricity_prices=df_new_electricity_prices,
        df_new_forecast_weather=df_new_forecast_weather,
        df_new_historical_weather=df_new_historical_weather,
        df_new_target=df_new_target
    )
    
    if (not(is_local) or (count == 0) or (count>=100)):
    #if (not(is_local) or (count == 0) or (count>=0)):
        # Если испольняем локально только после
        # итерации 100. Потому что интересует как ведет себя моделья через два месяца
        # после обучения
        cur_time = time.time()
        if (((cur_time - notebook_starttime) < (8*60*60 + 60*30)) or is_disable_run_time_limit):
            if ((count % learn_again_period) == 0):
                if (df_test['prediction_datetime'].max() >= scor_start_time or (count == 0) or is_local):
                    print(p_time(), 'Формируем признаки')
                    df_train_features = features_generator.generate_features(data_storage.df_data)
                    df_train_features = df_train_features[df_train_features['target'].notnull()]
                    print('df_train_features shape', df_train_features.shape[0])
                    print(p_time(), 'Тренируем модель')
                    model.fit(df_train_features)
                else:
                    print('Не тренеруем модель', df_test['datetime'].max(), 'не достигла даты начала тренировки:', scor_start_time)

        else:
            print('Не тренеруем модель, превышено время выполнения ноутбука:', (cur_time - notebook_starttime))

    print(p_time(), 'Делаем предсказание')
    df_test = data_storage.preprocess_test(df_test)
    
    df_test_features = features_generator.generate_features(df_test)
    df_sample_prediction["target"] = model.predict(df_test_features)
    
    env.predict(df_sample_prediction)
    if is_local:
        # Выводим текущий скор в разных разрезах
        compare = calc_score()
        
    count += 1
    print(p_time(), 'Iteration run time:', round(time.time() - iteration_start_time))
    print('')
    print('________________________________________________')
    print('')

## Анализ предсказания

### Подсчет скора

In [None]:
if is_local:
    compare = calc_score()

### График MAE по дням предсказания

In [None]:
# выводит график средних ошибок сгруппированных по дням (точнее для блоков данных для предсказаний которые в целом эквиваленты дням)
def print_err(err_name, err_lable, err_title):
    # Группируем по data_block_id, то есть по дням и считаем отдельно для каждого дня предсказания MAE
    grouped_compare = compare.groupby('data_block_id').mean().reset_index()
    # Делаем скользящую среднюю
    grouped_compare['rolling_mean'] = grouped_compare[err_name].rolling(window=30, min_periods=1).mean()
    
    
    # Plotting the mean absolute errors
    plt.figure(figsize=(10, 8))
    #plt.bar(grouped_compare['data_block_id'], grouped_compare['abs_err'])
    plt.bar(grouped_compare['data_block_id'], grouped_compare[err_name], label=err_lable)
    plt.plot(grouped_compare['data_block_id'],
             grouped_compare['rolling_mean'],
             label='Rolling Mean (window=30)',
             color='orange',
             linestyle='-', linewidth=2)
    plt.xlabel('data_block_id')
    plt.ylabel(err_lable)
    plt.title(err_title)
    plt.legend()
    
    # Set ticks every 10 data_block_id
    tick_positions_y = np.arange(-40, max(grouped_compare[err_name]) + 1, 10)
    plt.yticks(tick_positions_y)
    plt.grid(True)
    plt.show()

In [None]:
if is_local:
    print_err(err_name='abs_err',
              err_lable='Mean Absolute Error',
              err_title='Mean Absolute Error by data_block_id')

In [None]:
if is_local:
    compare = compare[compare["data_block_id"] > 600]
    print_err(err_name='err',
              err_lable='Mean Error (predict - target)',
              err_title='Mean Error by data_block_id')