https://developers.sber.ru/portal/products/lightautoml

In [None]:
!pip install -U lightautoml

In [None]:
# Standard libs
import os
from typing import Tuple

# Installed libraries
import joblib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error

# LAMA
from lightautoml.tasks import Task
from lightautoml.addons.autots.base import AutoTS
from lightautoml.dataset.roles import DatetimeRole
# from lightautoml.automl.base import AutoML
# from lightautoml.ml_algo.boost_cb import BoostCB
# from lightautoml.ml_algo.linear_sklearn import LinearLBFGS
# from lightautoml.pipelines.features.lgb_pipeline import LGBSeqSimpleFeatures
# from lightautoml.pipelines.features.linear_pipeline import LinearTrendFeatures
# from lightautoml.pipelines.ml.base import MLPipeline
# from lightautoml.reader.base import DictToPandasSeqReader
# from lightautoml.automl.blend import WeightedBlender
# from lightautoml.ml_algo.random_forest import RandomForestSklearn

# Disable warnings

import warnings
warnings.filterwarnings("ignore")

In [None]:

class LamaTSA:
    
    def __init__(self, store_id, item_id, period='7'):
        """
        :param str store_id: 
        :param str item_id: 
        :param str period: 
        """
        
        print("preprocessing loaded data...")
        self.store_id = f"STORE_{store_id}" if isinstance(store_id, int) or store_id.isnumeric() else store_id
        self.item_id = f"{self.store_id}_{item_id}" if isinstance(item_id, int) or item_id.isnumeric() else item_id
        
        periods = {"w": 7, "m": 30, "q": 90}        
        self.period = periods.get(period)
        if not self.period:
            raise ValueError(f'Incorrect period value: {period}\nSupported values: "w" - week, "m" - month, "q" - quarter')
        
        print("LamaTSA init complete!")

    def load_data(self, sales, dates, prices) -> pd.DataFrame:
        """
        Загрузка и обработка данных
        :param str sales: путь до файла sales.csv
        :param str dates: путь до файла dates.csv
        :param str prices: путь до файла prices.csv
        :return: объединенный предобработанный DataFrame
        """
        
        sales = pd.read_csv(sales)
        dates = pd.read_csv(dates)
        prices = pd.read_csv(prices)
        
        df = pd.merge(sales, dates, on='date_id')
        df = pd.merge(df, prices, on=['store_id', 'item_id', 'wm_yr_wk'])
        
        df = self.preprocess_data(df)
        return df

    def preprocess_data(self, df) -> pd.DataFrame:
        df['date'] = pd.to_datetime(df['date'])
        df = df.loc[df['store_id'] == self.store_id, ['item_id', 'date', 'cnt']]
        df = df.sort_values(by=['date'])
        
        # remove trailing (from start) zeros
        df = df.loc[df.cnt.ne(0).idxmax():]
        
        # fill empty periods? 
        # df['']
        
        # MA smoothing
        # df.cnt = df.cnt.rolling(7).mean()
        # df = df.dropna()
        
        # events
        print(f"data shape: {df.shape}\nselected class count: {df.loc[df.item_id == self.item_id].shape[0]}")
        return df.reset_index(drop=True)
    
    def split_train_test(self, df) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        (опционально) сплит датасета на train/test для простого backtesting'а
        :return: 
        """
        
        test_start = df[df['item_id'] == self.item_id]['date'].values[-self.period]
        train = df[df['date'] < test_start].copy()
        test = df[df['date'] >= test_start].copy()
        return train, test
        
    def define_training_task(self):
        
        # define task
        # "multi:reg": ["mae", "mse"]
        task = Task("multi:reg", greater_is_better=False, metric="mae", loss="mae")
        
        # configure model
        seq_params = {
            "seq0": {
                "case": "next_values",                  
                "params": {
                    "n_target": self.period,                
                    "history": self.period,                              
                    "step": 1, 
                    "from_last": True,
                    "test_last": True
                }
            }
        }
        
        transformers_params = {
            "lag_features": self.period,      
            "lag_time_features": self.period, 
        }
        
        # time_series_trend_params = {
        #     'trend': False,
        #     'train_on_trend': False,
        #     'trend_type': 'decompose',  # 'decompose', 'decompose_STL', 'linear', 'rolling'
        #     'trend_size': 1, 
        #     'decompose_period': 30, 
        #     'detect_step_quantile': 0.01, 
        #     'detect_step_window': 1, 
        #     'detect_step_threshold': 0.7,
        #     'rolling_size': 1, 
        #     'verbose': 0
        # }
        
        automl = AutoTS(
            task,
            reader_params = {
                "seq_params": seq_params
            },
            time_series_trend_params={
                "trend": False,  # detrend before main use
                # "decompose_period": self.period
            },
            time_series_pipeline_params=transformers_params
        )
        return automl
        
    def train_model(self, train_dataset, verbose=4):
        # load dataset
        univariate_train = train_dataset[train_dataset['item_id'] == self.item_id].drop("item_id", axis=1)
                
        # define roles
        univariate_roles = {
           "target": 'cnt',
           DatetimeRole(seasonality=('d', 'm', 'wd'), base_date=True): 'date',  # + y ?
        }
        
        # train model        
        model = self.define_training_task()
        univariate_train_pred, _ = model.fit_predict(univariate_train, univariate_roles, verbose=verbose)
        
        return model

    def eval_model(self, model, train_dataset, test_dataset):
        """
        Подсчет метрики MAE, построение графика предсказаний для наглядности
        :param model: модель 
        :param pd.DataFrame train_dataset: 
        :param pd.DataFrame test_dataset: 
        :return: fig, mae
        """
        train_dataset = train_dataset[train_dataset['item_id'] == self.item_id].drop("item_id", axis=1)
        test_dataset = test_dataset[(test_dataset['item_id'] == self.item_id)].drop("item_id", axis=1)[:self.period]
        
        
        print(train_dataset)
        fcst, _ = model.predict(train_dataset)
        # print(f'forecast {(len(fcst))}: {fcst};\ntest size: {len(test_dataset.cnt.values)} {test_dataset}')
        
        mae = mean_absolute_error(test_dataset.cnt.values, fcst)
        print(f"MAE: {mae}")      
        
        # plot predictions
        fig = plt.figure(figsize=(13, 5))
        last_N = min(len(train_dataset), len(fcst)*5)  # historical data
        plt.plot(
            train_dataset['date'][-last_N:], 
            train_dataset['cnt'][-last_N:], 
            c="#003865", 
            label="train"
        )
        plt.plot(
            test_dataset['date'], 
            test_dataset['cnt'], 
            c="#EF5B0C", 
            label="test", 
            marker="o", 
            markersize=4
        )
        plt.plot(
            test_dataset['date'], 
            fcst, 
            c="#3CCF4E", 
            label="forecast", 
            marker="o", 
            markersize=4
        )

        plt.xlabel("Date")
        plt.ylabel("Value")
        plt.title(f"Train, test and forecasts of LightAutoML for product_id {self.item_id}")
        plt.legend()        
        return fig, mae
        
    def save_model(self, model, path) -> str:
        """
        Сохранение модели
        :param model: модель 
        :param str path: путь
        :return: path
        """
        joblib.dump(model, path)
        print(f"Model saved in {path}")
        return path
        
    def load_model(self, path):
        """
        Загрузка модели
        :param str path: путь до модели 
        :return: 
        """
        if not os.path.exists(path):
            raise ValueError("path to model not found!")
        
        try:
            model = joblib.load(path)
            print(f'model {path} loaded successfully')
        except Exception as e:
            print(f'Error loading model: {e}')
            return 0
        return model

    def predict(self, model, pred_df) -> list: 
        """
        Инференс модели
        :param model: модель
        :param pred_df: DataFrame с датами для предсказаний
        :return: list (cnt)
        """
        pred = pred_df[pred_df['item_id'] == self.item_id].drop("item_id", axis=1)
        fcst, _ = model.predict(pred)

        print(fcst, "\n")
        print(f"MAE: {mean_absolute_error(pred.cnt.values, fcst)}")
        return fcst
        

In [None]:
i = LamaTSA(
    store_id='2',  # selected store
    item_id='586',  # selected item
    period='m'  # w / m / q
)

train_df = i.load_data(
    sales="data/shop_sales.csv", 
    dates="data/shop_sales_dates.csv", 
    prices="data/shop_sales_prices.csv"
)

test_df = i.load_data(
    sales="data/shop_sales_test.csv", 
    dates="data/shop_sales_dates_test.csv", 
    prices="data/shop_sales_prices_test.csv"
)


In [None]:
train_df

In [None]:
model = i.train_model(train_df)

In [None]:
i.save_model(model, 'models/model_f_2.pkl')

In [None]:
model = i.load_model('models/model_f_2.pkl')

In [None]:
fig, mae = i.eval_model(model, train_df, test_df)