# Временные ряды: Гибридная стратегия

В этой задаче вам предстоит реализовать гибридную стратегию для прогнозирования временных рядов. В качестве временного ряда будем использовать данные о дневном спросе разных товаров в магазинах, рассматриваем только 1-й товар из 1-го магазина. Чтобы посмотреть на данные локально, можете скачать их отсюда: https://www.kaggle.com/c/demand-forecasting-kernels-only.

1. Функция `read_timeseries` уже реализована. Вы можете использовать ее для локально тестирования.
2. В функции `extract_hybrid_strategy_features` необходимо реализовать извлечение фичей из временного ряда согласно гибридной схеме.
3. Функция `build_datasets` используется для генерации датасета для каждой модели в гибридной схеме.
4. Внутри функции `predict` необходимо реализовать генерацию прогноза с использованием обученных моделей. Обратите внимание, что в процессе генерации новых значений временного ряда необходимо также считать вектор фичей для следующего прогноза. Для этого стоит использовать функцию `exrtact_features`, переданную в аргументах вызова. Важно: внутри predict нужно вызывать extract_features без параметров со значениями по умолчанию (то есть нужно передавать только timeseries и model_idx).
5. Реализуйте обучение моделей в теле функции `train_models`. Можно использовать любые модели из sklearn, например, линейную регрессию или градиентный бустинг. Так же очень полезным может оказаться добавление новых фичей, можете подумать про сезонности и даты. Может быть полезно реализовать новую функцию `extract_features`, чтобы получить лучший результат.
6. Функция `score_models` используется для оценки качества обученных моделей по метрике MSE. Эта функция уже реализована, Вы можете использовать ее для локальной отладки. Баллы за итоговую модель (55 из 100) будут выставляться обратно пропорционально полученному MSE при обучении 30 моделей (`model_count=30`) - от 0 баллов за $MSE⩾40$ до 55 баллов за $MSE⩽25$.

В каждую функцию из шаблона можно добавлять произвольные аргументы с дефолтными значениями.



In [1]:
import datetime
import sklearn
import typing as tp
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

X_type = tp.NewType("X_type", np.ndarray)
X_row_type = tp.NewType("X_row_type", np.ndarray)
Y_type = tp.NewType("Y_type", np.array)
TS_type = tp.NewType("TS_type", pd.Series)
Model_type = tp.TypeVar("Model_type")


def read_timeseries(path_to_df: str = "train.csv") -> TS_type:
    """Функция для чтения данных и получения обучающей и тестовой выборок"""
    df = pd.read_csv(path_to_df)
    df = df[(df['store'] == 1) & (df['item'] == 1)]
    df["date"] = pd.to_datetime(df["date"])
    df = df.set_index("date")
    ts = df["sales"]
    train_ts = ts[:-365]
    test_ts = ts[-365:]
    return train_ts, test_ts


def extract_hybrid_strategy_features(
        timeseries: TS_type,
        model_idx: int,
        window_size: int = 7
) -> X_row_type:
    """
    Функция для получения вектора фичей согласно гибридной схеме. На вход подаётся временной ряд
    до момента T, функция выделяет из него фичи, необходимые модели под номером model_idx для
    прогноза на момент времени T
    
    Args:
        timeseries --- временной ряд до момента времени T (не включительно), pd.Series с датой 
                       в качестве индекса
        model_idx --- индекс модели, то есть номер шага прогноза, 
                      для которого нужно получить признаки, нумерация с нуля
        window_size --- количество последних значений ряда, используемых для прогноза 
                        (без учёта количества прогнозов с предыдущих этапов)

    Returns:
        Одномерный вектор фичей для модели с индексом model_idx (np.array), 
        чтобы сделать прогноз для момента времени T
    """
    if model_idx == 0:
        return timeseries[-window_size:].values
    
    feature_vector = timeseries[-(window_size + model_idx): -model_idx]
    return np.concatenate([feature_vector.values, timeseries[-model_idx:].values])


def my_extract_features(timeseries: pd.Series, model_idx: int, window_size: int = 7) -> np.ndarray:
    if model_idx == 0:
        return timeseries[-window_size:].values
    else:
        extended_window = window_size + model_idx
        return timeseries[-extended_window:-model_idx].values


def build_datasets(
        timeseries: TS_type,
        extract_features: tp.Callable[..., X_row_type],
        window_size: int,
        model_count: int
) -> tp.List[tp.Tuple[X_type, Y_type]]:
    """
    Функция для получения обучающих датасетов согласно гибридной схеме
    
    Args:
        timeseries --- временной ряд
        extract_features --- функция для генерации вектора фичей
        window_size --- количество последних значений ряда, используемых для прогноза
        model_count --- количество моделей, используемых для получения предскзаний 

    Returns:
        Список из model_count датасетов, i-й датасет используется для обучения i-й модели 
        и представляет собой пару из двумерного массива фичей и одномерного массива таргетов
    """
    datasets = []
    total_length = len(timeseries)

    for model_idx in range(model_count):
        X, Y = [], []
        # Установим правильное начало и окончание цикла для каждой модели
        start_index = 0
        end_index = total_length - window_size - model_idx  # Учитываем сдвиг на model_idx вперёд для Y

        for i in range(start_index, end_index):
            features = extract_features(timeseries[i: i + window_size], model_idx, window_size)
            X.append(features)
            Y.append(timeseries[i + window_size + model_idx])

        datasets.append((np.array(X), np.array(Y)))

    return datasets

def predict(
        timeseries: TS_type,
        models: tp.List[Model_type],
        extract_features: tp.Callable[..., X_row_type] = my_extract_features
) -> TS_type:
    """
    Функция для получения прогноза len(models) следующих значений временного ряда
    
    Args:
        timeseries --- временной ряд, по которому необходимо сделать прогноз на следующие даты
        models --- список обученных моделей, i-я модель используется для получения i-го прогноза
        extract_features --- функция для генерации вектора фичей. Если вы реализуете свою функцию 
                             извлечения фичей для конечной модели, передавайте этим аргументом.
                             Внутри функции predict функцию extract_features нужно вызывать только
                             с аргументами timeseries и model_idx, остальные должны быть со значениями
                             по умолчанию

    Returns:
        Прогноз len(models) следующих значений временного ряда
    """
    predictions = []
    for model_idx, model in enumerate(models):
        features = extract_features(timeseries, model_idx)
        prediction = model.predict([features])[0]
        predictions.append(prediction)
        # Расширяем timeseries с новыми прогнозами и индексами
        new_index = timeseries.index[-1] + datetime.timedelta(days=1)
        timeseries = timeseries.append(pd.Series([prediction], index=[new_index]))
    return pd.Series(predictions, index=[timeseries.index[-i] for i in range(len(predictions), 0, -1)])


def train_models(
        train_timeseries: TS_type,
        model_count: int
) -> tp.List[Model_type]:
    """
    Функция для получения обученных моделей
    
    Args:
        train_timeseries --- обучающий временной ряд
        model_count --- количество моделей для обучения согласно гибридной схеме.
                        Прогнозирование должно выполняться на model_count дней вперёд

    Returns:
        Список из len(datasets) обученных моделей
    """
    models = []

    datasets = build_datasets(
        train_timeseries,
        my_extract_features, 
        window_size=5,
        model_count=model_count
    )

    for X, y in datasets:
        model = LinearRegression()
        model.fit(X, y)
        models.append(model)

    assert len(models) == model_count

    return models


def score_models(
        train_ts: TS_type,
        test_ts: TS_type,
        models: tp.List[Model_type],
        predict: tp.Callable[[TS_type, tp.List[Model_type]], TS_type] = predict
):
    """
    Функция для оценки качества обученных моделей по метрике MSE
    
    Args:
        train_ts --- обучающий временной ряд
        test_ts --- тестовый временной ряд
        models --- список обученных моделей
        predict --- функция для получения прогноза временного ряда

    Returns:
        Усредненное MSE для прогноза моделей по всей тестовой выборке
    """
    predict_len = len(models)
    predictions = []
    targets = []

    for i in range(len(test_ts) - predict_len + 1):
        predictions.extend(list(predict(train_ts, models)))
        targets.extend(list(test_ts[i:i+predict_len]))
        train_ts = train_ts.append(test_ts[i:i+1])

    return sklearn.metrics.mean_squared_error(targets, predictions)

In [15]:
def test_build_datasets():
    # Загрузка данных
    train_ts, _ = read_timeseries("demand-forecasting-kernels-only/train.csv")

    # Задаем параметры для тестирования
    model_counts = [1, 3, 10 ]  # Примеры количества моделей для тестирования
    window_sizes = [5, 7, 10]  # Разные размеры окна для тестирования

    # Перебор различных комбинаций параметров
    for model_count in model_counts:
        for window_size in window_sizes:
            # Получение датасетов
            datasets = build_datasets(
                timeseries=train_ts,
                extract_features=extract_hybrid_strategy_features,
                window_size=window_size,
                model_count=model_count
            )

            # Проверка каждого датасета
            for idx, (X, Y) in enumerate(datasets):
                # Проверка размеров массивов
                assert X.shape[0] == Y.shape[0], f"Mismatch in dataset {idx}: X and Y samples count not equal ({X.shape[0]} != {Y.shape[0]})"
                # Проверка размерности X
                assert X.shape[1] == window_size, f"Mismatch in dataset {idx}: X features count not equal to window_size ({X.shape[1]} != {window_size})"
                print(f"Dataset {idx} with model_count {model_count} and window_size {window_size} passed successfully.")

    print("All tests passed successfully.")

# Вызов функции тестирования
test_build_datasets()


  Y.append(timeseries[i + window_size + model_idx])


Dataset 0 with model_count 1 and window_size 5 passed successfully.
Dataset 0 with model_count 1 and window_size 7 passed successfully.
Dataset 0 with model_count 1 and window_size 10 passed successfully.
Dataset 0 with model_count 3 and window_size 5 passed successfully.
Dataset 1 with model_count 3 and window_size 5 passed successfully.
Dataset 2 with model_count 3 and window_size 5 passed successfully.
Dataset 0 with model_count 3 and window_size 7 passed successfully.
Dataset 1 with model_count 3 and window_size 7 passed successfully.
Dataset 2 with model_count 3 and window_size 7 passed successfully.
Dataset 0 with model_count 3 and window_size 10 passed successfully.
Dataset 1 with model_count 3 and window_size 10 passed successfully.
Dataset 2 with model_count 3 and window_size 10 passed successfully.
Dataset 0 with model_count 10 and window_size 5 passed successfully.
Dataset 1 with model_count 10 and window_size 5 passed successfully.
Dataset 2 with model_count 10 and window_s

In [None]:
def test_build_datasets(train_test_ts):
    train_ts, _ = train_test_ts

    # Предположим, что у нас есть параметры для тестирования
    model_count, window_size = 3, 5

    # Получение датасетов от вашей и эталонной реализации
    p_datasets = build_datasets(
        timeseries=train_ts[-100:],
        extract_features=extract_hybrid_strategy_features,
        model_count=model_count,
        window_size=window_size
    )

    # j_datasets = jury_solution.build_datasets(
    #     timeseries=train_ts[-100:],
    #     extract_features=jury_solution.extract_hybrid_strategy_features,
    #     model_count=model_count,
    #     window_size=window_size
    # )
    # 
    # assert len(p_datasets) == len(j_datasets)


    for i, (p_d, j_d) in enumerate(zip(p_datasets, j_datasets)):
        assert len(p_d) == 2 and len(j_d) == 2, "Wrong dataset element count"
        assert p_d[0].shape == j_d[0].shape, f"Mismatch in shape of X at dataset {i}"
        assert p_d[1].shape == j_d[1].shape, f"Mismatch in shape of Y at dataset {i}"
        assert np.allclose(p_d[0], j_d[0]), f"Wrong X_{i}"
        assert np.allclose(p_d[1], j_d[1]), f"Wrong Y_{i}"

    print("All datasets match successfully.")
