В данном ноутбуке находятся примеры работы с классом Dataloader

In [1]:
import pandas as pd
import numpy as np
import matplotlib as plt

import logging

from sklearn.linear_model import ElasticNet, HuberRegressor, Lasso, \
                                RANSACRegressor, Ridge, TheilSenRegressor
from sklearn.ensemble import RandomForestRegressor
from catboost import CatBoostRegressor
from statsmodels.tsa.api import Holt, SimpleExpSmoothing, ExponentialSmoothing, SARIMAX
import statsmodels
from prophet import Prophet
from prophet.serialize import model_to_json

from validation_system.dataloader import DataLoader

In [2]:
def weighted_mean(x: pd.Series, weights_coeffs: list) -> pd.Series:
    return (x * pd.Series(weights_coeffs, index=x.index)).sum() / len(x.index)

In [3]:
def seasonal_coefficients(ts: pd.Series, timestamps: pd.Series, time_step: str, timestamps_res: pd.DatetimeIndex) -> tuple[dict, pd.Series]:
    """
    Вычисление сезонных коэффициентов на основе ряда.
    :param ts: Временной ряд;
    :param timestamps: Метка времени;
    :param time_step: Шаг времени;
    :param timestamps_res: Временные метки, для которых нужно получить коэффициенты
    :return: Словарь с сезонными коэффициентами, временные метки
    """
    mean_sales = np.mean(ts.values)
    marks_res = None
    marks = None
    match time_step:
        case 'YS':
            marks = timestamps.dt.year
            marks_res = timestamps_res.year
        case 'QS':
            marks = timestamps.dt.quarter
            marks_res = timestamps_res.quarter
        case 'MS':
            marks = timestamps.dt.month
            marks_res = timestamps_res.month
        case 'W' | 'W-MON':
            marks = (timestamps.dt.day - 1 + (
                    timestamps - pd.to_timedelta(timestamps.dt.day - 1, unit='d')).dt.dayofweek) // 7 + 1
            marks_res = (timestamps_res.day - 1 + (
                    timestamps_res - pd.to_timedelta(timestamps_res.day - 1, unit='d')).dayofweek) // 7 + 1
        case 'D':
            marks = timestamps.dt.dayofweek
            marks_res = timestamps_res.dayofweek
    dict_seasonality = {}
    df = pd.DataFrame({'ds': marks, 'y': ts})
    for timestamp in marks.unique():
        values = df[df.ds == timestamp].y
        dict_seasonality[timestamp] = np.mean(values / mean_sales)
    for timestamp in set(marks_res.unique()) - set(marks.unique()):
        dict_seasonality[timestamp] = dict_seasonality.get(timestamp, 1)
    return dict_seasonality, marks_res

def calculate_weights_coeffs(n: int, weights_type: str, weights_coeffs: list) -> list:
    """
    Вычисление коэффициента весов на основе заданного типа весов и количества элементов
    :param n: Количество эламентов;
    :param weights_type: Тип весов;
    :param weights_coeffs: Изначальные коэффициенты весов;
    :return: Коэффициенты весов
    """
    if weights_type == 'custom':
        return weights_coeffs
    elif weights_type == 'new':
        reverse = False
    elif weights_type == 'old':
        reverse = True
    else:
        logging.exception(f'Передан несуществующий режим "{weights_type}". '
                          f'Необходимо выбирать режимы: "new", "old", "custom". '
                          f'Для данного пересечения будет по умолчанию будет выбран метод "new".')
        reverse = False
    a1 = 0.001
    d = (2 - 2 * a1 * n) / (n * (n - 1))
    res = [a1 + d * i for i in range(0, n)]
    res.sort(reverse=reverse)
    return res


In [4]:
def const(ts: pd.Series, n_predict: int, type: str) -> tuple[pd.Series, float]:
    """
    Прогноз - константа
    :param ts: Временной ряд;
    :param n_predict: Количество предсказаний;
    :param type: Тип константы;
    :return: Фрейм с предсказанием
    """
    n = len(ts.index)
    value = None
    match (type):
        case 'Moda':
            value = ts.mode()[0]
        case 'Mean':
            value = ts.mean()
        case 'Min':
            value = ts.min()
        case 'Max':
            value = ts.max()
        case 'Median':
            value = ts.median()

    ts_res = pd.Series(value, index=range(n + n_predict), dtype='float64')
    return ts_res, value


In [5]:
def rol_mean_fit(timestamps: pd.Series, time_step: str, ts: pd.Series, n_predict: int, window_size: int,
                 weights_coeffs: list, weights_type: str, sample: pd.DataFrame):
    """
    Создает скользящее среднее, формирует список компонентов

    :param ts: Временной ряд
    :param n_predict: Количество периодов для прогнозирования
    :param window_size: Размер окна
    :param weights_coeffs: Весовые коэффциенты
    :param weights_type: Тип весов
    :param sample: Датафрейм с признаками (не используется);
    :return: Прогноз
    """
    weights_coeffs = calculate_weights_coeffs(window_size, weights_type, weights_coeffs)
    ts_res = ts.copy()
    ts_base = None
    for i in range(n_predict):
        ts_res[len(ts_res.index)] = np.nan
        rol = ts_res.fillna(0).rolling(window_size)
        if i == 0:
            ts_base = rol.apply(lambda x: weighted_mean(x, weights_coeffs)).shift(1)[:-1]
            ts_base[:window_size] = ts[:window_size].values
        ts_res = ts_res.where(pd.notna, other=rol.apply(lambda x: weighted_mean(x, weights_coeffs)).shift(1))
    ts_res.loc[ts_base.index] = ts_base.values
    return ts_res


In [6]:
def croston_tsb_fit(timestamps: pd.Series, ts: pd.Series, alpha: float, beta: float, n_predict: int,
                    time_step: str, sample: pd.DataFrame) -> tuple[pd.Series, dict]:
    """
    Прогноз - метод Кростона.
    :param timestamps: Временные метки;
    :param ts: Временной ряд;
    :param alpha: Параметр сглаживания для уровня;
    :param beta: Параметр сглаживания для вероятности;
    :param n_predict: Количество предсказаний;
    :param time_step: Шаг прогноза;
    :param sample: Датафрейм с признаками (не используется);
    :return: Фрейм с предсказанием
    """
    d = np.array(ts)
    cols = len(d)
    d = np.append(d, [np.nan] * n_predict)

    # Уровень(a), Вероятность(p), Прогноз(f)
    a, p, f = np.full((3, cols + n_predict + 1), np.nan)

    # Инициализация
    first_occurrence = np.argmax(d[:cols] > 0)
    a[0] = d[first_occurrence]
    p[0] = 1 / (1 + first_occurrence)
    f[0] = p[0] * a[0]

    # Заполнение матриц уровня и вероятности, прогноз
    for t in range(cols):
        if d[t] > 0:
            a[t + 1] = alpha * d[t] + (1 - alpha) * a[t]
            p[t + 1] = beta * 1 + (1 - beta) * p[t]
        else:
            a[t + 1] = a[t]
            p[t + 1] = (1 - beta) * p[t]
        f[t + 1] = p[t + 1] * a[t + 1]
    a[cols + 1:cols + n_predict + 1] = a[cols]
    p[cols + 1:cols + n_predict + 1] = p[cols]
    f[cols + 1:cols + n_predict + 1] = f[cols]

    ts_res = pd.Series(index=range(cols + n_predict), dtype='float64')
    ts_res.loc[ts_res.index] = f[1:]
    timestamps_res = pd.date_range(start=timestamps.iloc[0], freq=time_step, periods=len(ts_res))
    dict_seasonality, marks_res = seasonal_coefficients(ts, timestamps, time_step, timestamps_res)
    df = pd.DataFrame({'y_pred': ts_res, 'indexes': marks_res})
    df.loc[cols + 1:cols + n_predict + 1, 'y_pred'] = df.iloc[cols + 1:cols + n_predict + 1].apply(
        lambda x: x.y_pred * dict_seasonality[x.indexes], axis=1)
    return df.y_pred.reset_index(drop=True)  # , {'alpha': alpha, 'beta': beta}


Пример загрузки данных от optimacros

In [7]:
dataloader = DataLoader("om_correct.csv")
models, hyperparams = dataloader.load_hyperparams_from_optimacros()
print(f"Models: {models}")
print(f"Hyperparams: {hyperparams}")

Models: ['croston_tsb', 'elastic_net', 'exp_smoothing', 'holt', 'holt_winters', 'huber', 'lasso', 'polynomial', 'ransac', 'ridge', 'rol_mean', 'theil_sen', 'const', 'catboost', 'sarima', 'prophet', 'random_forest', 'symfit_fourier_fft']
Hyperparams: [{'croston_tsb_min_alpha': 0, 'croston_tsb_max_alpha': 1, 'croston_tsb_min_beta': 0, 'croston_tsb_max_beta': 1, 'croston_tsb_step': 0.1}, {'elastic_net_min_alpha': 0, 'elastic_net_max_alpha': 1, 'elastic_net_max_l1': 1, 'elastic_net_min_l1': 0, 'elastic_net_step': 0.05}, {'exp_smoothing_min_alpha': 0, 'exp_smoothing_max_alpha': 1, 'exp_smoothing_step': 0.1}, {'holt_min_alpha': 0, 'holt_max_alpha': 1, 'holt_min_beta': 0, 'holt_max_beta': 1, 'holt_step': 0.1}, {'holt_winters_min_seasonality': 2, 'holt_winters_max_seasonality': 7, 'holt_winters_trend_types': ['add', 'mul', None], 'holt_winters_seasonal_types': ['add', 'mul', None]}, {'huber_min_degrees': 1, 'huber_max_degrees': 5}, {'lasso_min_alpha': 0, 'lasso_max_alpha': 1, 'lasso_step': 0.0

---

In [8]:
def generate_regression_data(n_samples=100, n_features=5):
    """Генерирует фейковые данные для задач регрессии."""
    X = np.random.rand(n_samples, n_features) * 10
    # Простое линейное отношение с шумом
    true_coef = np.array([1.5, -0.8, 0.5, 2.1, -1.2][:n_features])
    y = X @ true_coef + np.random.normal(0, 2, n_samples)
    return pd.DataFrame(X, columns=[f'feature_{i+1}' for i in range(n_features)]), pd.Series(y, name='target')

def generate_time_series_data(n_points=100, trend_strength=0.5, seasonality_period=12, seasonality_strength=5):
    """Генерирует фейковые данные временного ряда с трендом и сезонностью."""
    dates = pd.date_range(start='2020-01-01', periods=n_points, freq='MS')
    time_index = np.arange(n_points)

    # ТРЕНД
    trend = trend_strength * time_index

    # СЕЗОННОСТЬ
    seasonality = seasonality_strength * np.sin(2 * np.pi * time_index / seasonality_period)

    # ШУМ
    noise = np.random.normal(0, 1.5, n_points)

    # ОБЩИЙ РЯД
    y = trend + seasonality + noise

    df = pd.DataFrame({'ds': dates, 'y': y}) # 'ds' и 'y' - обязательные названия для Prophet
    return df

print("Функции генерации данных определены.")

# Генерируем данные для регрессии
X_reg, y_reg = generate_regression_data()
print(f"Сгенерированы данные для регрессии: X_reg.shape={X_reg.shape}, y_reg.shape={y_reg.shape}")

# Генерируем данные для временных рядов
df_ts = generate_time_series_data()
print(f"Сгенерированы данные для временных рядов: df_ts.shape={df_ts.shape}")


Функции генерации данных определены.
Сгенерированы данные для регрессии: X_reg.shape=(100, 5), y_reg.shape=(100,)
Сгенерированы данные для временных рядов: df_ts.shape=(100, 2)


In [9]:
elastic_net = ElasticNet(random_state=42)
try:
    elastic_net.fit(X_reg, y_reg)
    y_pred_en = elastic_net.predict(X_reg.head(5))
    print("  ElasticNet успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении ElasticNet: {e}")

huber = HuberRegressor()
try:
    huber.fit(X_reg, y_reg)
    y_pred_huber = huber.predict(X_reg.head(5))
    print("  HuberRegressor успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении HuberRegressor: {e}")

lasso = Lasso(random_state=42)
try:
    lasso.fit(X_reg, y_reg)
    y_pred_lasso = lasso.predict(X_reg.head(5))
    print("  Lasso успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении Lasso: {e}")

ransac = RANSACRegressor(random_state=42)
try:
    ransac.fit(X_reg, y_reg)
    y_pred_ransac = ransac.predict(X_reg.head(5))
    print("  RANSACRegressor успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении RANSACRegressor: {e}")

ridge = Ridge(random_state=42)
try:
    ridge.fit(X_reg, y_reg)
    y_pred_ridge = ridge.predict(X_reg.head(5))
    print("  Ridge успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении Ridge: {e}")

theil_sen = TheilSenRegressor(random_state=42)
try:
    theil_sen.fit(X_reg, y_reg)
    y_pred_theil_sen = theil_sen.predict(X_reg.head(5))
    print("  TheilSenRegressor успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении TheilSenRegressor: {e}")

random_forest = RandomForestRegressor(random_state=42, n_estimators=50)
try:
    random_forest.fit(X_reg, y_reg)
    y_pred_rf = random_forest.predict(X_reg.head(5))
    print("  RandomForestRegressor успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении RandomForestRegressor: {e}")

catboost = CatBoostRegressor(random_state=42, verbose=0, iterations=50)
try:
    catboost.fit(X_reg, y_reg)
    y_pred_cat = catboost.predict(X_reg.head(5))
    print("  CatBoostRegressor успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении CatBoostRegressor: {e}")

y_ts = df_ts.set_index('ds')['y']

models_ts_statsmodels = {
    "SimpleExpSmoothing": SimpleExpSmoothing(y_ts, initialization_method="estimated"),
    "Holt": Holt(y_ts, initialization_method="estimated"),
    "ExponentialSmoothing": ExponentialSmoothing(y_ts, seasonal_periods=12, trend='add', seasonal='add', initialization_method="estimated")
}

simpexpsmoth = SimpleExpSmoothing(y_ts, initialization_method="estimated")
try:
    model_fit_simpexpsmoth = simpexpsmoth.fit()
    forecast_steps = 5
    forecast_simpexpsmoth = model_fit_simpexpsmoth.forecast(forecast_steps)
    print("  SimpleExpSmoothing успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении SimpleExpSmoothing: {e}")

holt = Holt(y_ts, initialization_method="estimated")
try:
    model_fit_holt = holt.fit()
    forecast_steps = 5
    forecast_holt = model_fit_holt.forecast(forecast_steps)
    print("  Holt успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении Holt: {e}")

expsmoothing = ExponentialSmoothing(y_ts, seasonal_periods=12, trend='add', seasonal='add', initialization_method="estimated")
try:
    model_fit_expsmoothing = expsmoothing.fit()
    forecast_steps = 5
    forecast_expsmoothing = model_fit_expsmoothing.forecast(forecast_steps)
    print("  ExponentialSmoothing успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении ExponentialSmoothing: {e}")

sarimax = SARIMAX(y_ts, order=(1, 1, 1), seasonal_order=(1, 1, 1, 12), enforce_stationarity=False, enforce_invertibility=False)
try:
    model_fit_sarimax = sarimax.fit(disp=False)
    forecast_steps = 5
    forecast_sarimax = model_fit_sarimax.forecast(forecast_steps)
    print("  SARIMAX успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении SARIMAX: {e}")

model_prophet = Prophet()
try:
    model_prophet.fit(df_ts)
    future = model_prophet.make_future_dataframe(periods=5, freq='MS')
    forecast_prophet = model_prophet.predict(future)
    print("  Prophet успешно обучена.")
except Exception as e:
    print(f"  Ошибка при обучении Prophet: {e}")

  ElasticNet успешно обучена.
  HuberRegressor успешно обучена.
  Lasso успешно обучена.
  RANSACRegressor успешно обучена.
  Ridge успешно обучена.
  TheilSenRegressor успешно обучена.
  RandomForestRegressor успешно обучена.
  CatBoostRegressor успешно обучена.
  SimpleExpSmoothing успешно обучена.
  Holt успешно обучена.


  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


  ExponentialSmoothing успешно обучена.


13:48:15 - cmdstanpy - INFO - Chain [1] start processing
13:48:15 - cmdstanpy - INFO - Chain [1] done processing


  SARIMAX успешно обучена.
  Prophet успешно обучена.


---

Пример получения гиперпараметров моделей

In [10]:
sklearn_models = [elastic_net, huber, lasso, ransac, ridge, theil_sen, random_forest, catboost]
statsmodels_models = [simpexpsmoth, holt, expsmoothing]
statsmodels_models_fit = [model_fit_simpexpsmoth, model_fit_holt, model_fit_expsmoothing]
prophet_models = [model_prophet]
for model in sklearn_models:
    hyperparams = dataloader.get_params(model)
    print(f"Гиперпараметры модели {model.__class__.__name__}: {hyperparams}")
for model, model_fit in zip(statsmodels_models, statsmodels_models_fit):
    hyperparams = dataloader.get_params(model, model_fit)
    print(f"Гиперпараметры модели {model.__class__.__name__}: {hyperparams}")
for model in prophet_models:
    hyperparams = dataloader.get_params(model)
    print(f"Гиперпараметры модели {model.__class__.__name__}: {hyperparams}")

Гиперпараметры модели ElasticNet: {'Model': 'elastic_net', 'Params': {'alpha': 1.0, 'copy_X': True, 'fit_intercept': True, 'l1_ratio': 0.5, 'max_iter': 1000, 'positive': False, 'precompute': False, 'random_state': 42, 'selection': 'cyclic', 'tol': 0.0001, 'warm_start': False}}
Гиперпараметры модели HuberRegressor: {'Model': 'huber', 'Params': {'alpha': 0.0001, 'epsilon': 1.35, 'fit_intercept': True, 'max_iter': 100, 'tol': 1e-05, 'warm_start': False}}
Гиперпараметры модели Lasso: {'Model': 'elastic_net', 'Params': {'alpha': 1.0, 'copy_X': True, 'fit_intercept': True, 'max_iter': 1000, 'positive': False, 'precompute': False, 'random_state': 42, 'selection': 'cyclic', 'tol': 0.0001, 'warm_start': False}}
Гиперпараметры модели RANSACRegressor: {'Model': 'ransac', 'Params': {'estimator': None, 'is_data_valid': None, 'is_model_valid': None, 'loss': 'absolute_error', 'max_skips': inf, 'max_trials': 100, 'min_samples': None, 'random_state': 42, 'residual_threshold': None, 'stop_n_inliers': in

Пример сохранения библиотечных моделей

In [11]:
for model in sklearn_models:
    dataloader.save_trained_model(model, f"saved")
for model in statsmodels_models:
    dataloader.save_trained_model(model, f"saved")
for model in prophet_models:
    dataloader.save_trained_model(model, f"saved")