In [1]:
import numpy as np
import pandas as pd
import datetime
from pmdarima import auto_arima
from prophet import Prophet
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import ParameterGrid
from sklearn.preprocessing import OneHotEncoder
from scipy import linalg

import sys
sys.path.append('C:/Users/EOKozhevnikova/Documents')
import OMA_tools
from OMA_tools.io_data.dates import Dates_Operations
from OMA_tools.forecast.preprocessing import Preprocessing

In [None]:
class Forecast_Models:
    """
        Class that agregates inside several ML models for TimeSeries Forecasting
    """
    def __init__(self, df, forecast_periods: int):
        """
            forecast_periods: the number of months that you want to forecast
            df: TimeSeries DataFrame
        """
        self.ts = df
        self.forecast_periods = forecast_periods
        
    def regression_model(self, column_name_with_date, param: str):
        """
            Regression Model
                Args:
                    column_with_date: column with date
                    param: Linear trend (linear trend) or Logistic trend (logistic trend)
                Returns:
                    New DataFrame with Forecast
        """
        data = self.df.copy()
        # Сброс индекса и добавление названий месяцев
        data.reset_index(inplace = True)
        #Предварительная кодировка в признак
        data['month_name'] = data[column_name_with_date].dt.strftime("%B") #преобразование даты ('2020-01-01') в текстовый формат типа 'Январь'

        #С текстовыми данными работать не можем => применяется OneHotEncoder кодировщик для преобразования категориальных или текстовых данных в числа
        #Числа заменяются на единицы и нули, в зависимости от того, какому столбцу какое значение присуще.
        encoder = OneHotEncoder(categories = 'auto', drop = 'first', sparse_output = False)
        encoded_months = encoder.fit_transform(data[['month_name']])#конвертация в массив закодированного столбца

        # Матрица сезонных признаков
        encoded_df_0 = pd.DataFrame(encoded_months, columns = encoder.get_feature_names_out(['month_name']))

        # Колонка с трендом (наклон)
        if param == 'linear trend':
            encoded_df_trend = pd.DataFrame({'Linear_Trend': np.arange(1, data.shape[0] + 1)})
        elif param == 'logistic trend':
            encoded_df_trend = pd.DataFrame({'Log_Trend': np.log(np.arange(1, data.shape[0] + 1))})
        else:
            raise ValueError('Неверно выбран тип тренда.')

        # Свободный член (интерсепт)
        encoded_df_free_var = pd.DataFrame({'free_variable': np.ones(data.shape[0])})

        # Итоговая матрица признаков (сезонность, тренд, интерсепт)
        encoded_df = pd.concat([encoded_df_0, encoded_df_trend, encoded_df_free_var], axis = 1)

        # Новый DataFrame для хранения спрогнозированных значений
        predicted_df = pd.DataFrame({'Date': data[column_name_with_date]})

        # Словарь для хранения коэффициентов модели для каждого столбца
        model_coefficients_dict = {}

        # Прогнозирование для каждого столбца
        for column in self.df.columns[1:-1]:  # Пропускаем столбцы column_name_with_date и "month_name"
            A = encoded_df.values
            b = data[column].values.reshape((int(encoded_df.shape[0]), 1))

            # Решаем систему уравнений с помощью метода наименьших квадратов
            k, *_ = linalg.lstsq(A, b)

            # Переводим коэффициенты в список и сохраняем в словарь
            model_coefficients = k.reshape((1, encoded_df.shape[1])).tolist()[0]
            model_coefficients_dict[column] = model_coefficients

            # Прогнозируем значения на обученных данных
            y_pred = [
                sum(np.multiply(encoded_df.iloc[i, :].tolist(), model_coefficients))
                for i in range(encoded_df.shape[0])
            ]

            # Добавляем прогнозируемые значения в новый DataFrame
            predicted_df[f'{column}'] = y_pred

        # Прогнозирование на следующий год
        # Определяем последний месяц в данных
        last_date = data[column_name_with_date].max()

        # Создаем новые даты для следующего года
        future_dates = pd.date_range(last_date + pd.DateOffset(months = 1), periods = self.forecast_periods, freq = 'MS')

        # Создаем DataFrame для будущих дат
        future_df = pd.DataFrame({column_name_with_date: future_dates})
        future_df['month_name'] = future_df[column_name_with_date].dt.strftime("%B")

        # Преобразуем названия месяцев в бинарные признаки (One-Hot Encoding)
        encoded_future_months = encoder.transform(future_df[['month_name']])
        encoded_future_df_0 = pd.DataFrame(encoded_future_months, columns = encoder.get_feature_names_out(['month_name']))

        # Тренд для новых дат
        if param == 'linear trend':
            encoded_future_df_trend = pd.DataFrame({'Linear_Trend': np.arange(len(data) + 1, len(data) + (self.forecast_periods + 1))})
        elif param == 'logistic trend':
            encoded_future_df_trend = pd.DataFrame({'Log_Trend': np.log(np.arange(len(data) + 1, len(data) + (self.forecast_periods + 1)))}) 

        # Свободный член (интерсепт)
        encoded_future_df_free_var = pd.DataFrame({'free_variable': np.ones(self.forecast_periods)})

        # Итоговая матрица признаков для будущих дат
        encoded_future_df = pd.concat([encoded_future_df_0, encoded_future_df_trend, encoded_future_df_free_var], axis = 1)

        # Новый DataFrame для хранения прогнозируемых значений на следующий год
        future_predictions = pd.DataFrame({column_name_with_date: future_df[column_name_with_date]})

        # Прогнозирование для каждого столбца на следующий год
        for column in data.columns[1:-1]:  # Пропускаем столбцы column_name_with_date и "month_name"
            model_coefficients = model_coefficients_dict[column]

            y_future_pred = [
                sum(np.multiply(encoded_future_df.iloc[i, :].tolist(), model_coefficients))
                for i in range(encoded_future_df.shape[0])
            ]

            # Добавляем прогнозируемые значения в DataFrame
            future_predictions[f'{column}'] = y_future_pred

        return future_predictions, predicted_df
        
    def auto_arima_forecast(self,
                            seasonal = True,
                            D = 1, #порядок сезонных различий
                            m = 12, #период для сезонной разницы. Например, m = 4 для квартальных данных, m = 12 для месячных данных или m = 1 для годовых (несезонных) данных. 
                            stationary = False,
                            test = 'pp', #тип теста для определения стационарности
                            information_criterion = 'hqic', #информационный критерий
                            stepwise = True, #пошаговый алгоритм для подбора оптимальных параметров модели
                            suppress_warnings = True, #удаление warnings
                            max_p = 5, max_q = 5, max_d = 1,
                            max_P = 2, max_Q = 2, max_D = 1,
                            trace = False
                            plot = False):
        """
            ARIMA MODEL
        """
        forecasts = {}

        for value in self.df.columns:
            ts = self.df[value].dropna()
            original_series = ts.copy()
            was_non_stationary = False

            #for non stationarity situation
            if not Preprocessing(ts).check_stationarity(ts):
                ts = Preprocessing(ts).make_stationary()
                was_non_stationary = True

            try:
                model = auto_arima(
                    ts,
                    seasonal,
                    D,
                    m,
                    stationary,
                    test,
                    information_criterion,
                    stepwise,
                    suppress_warnings,
                    max_p, max_q, max_d,
                    max_P, max_Q, max_D,
                    trace
                )


                model.fit(ts)
                forecast = model.predict(n_periods = forecast_periods)

                if was_non_stationary:
                    last_observation = original_series.iloc[-1]
                    forecast = inverse_difference(pd.Series(forecast), last_observation)

                forecasts[value] = forecast
                '''
                if plot:
                    plt.figure(figsize = (10, 6))
                    plt.plot(original_series.index, original_series, label = 'Historical Data', color = 'blue')
                    forecast_index = pd.date_range(
                        start = original_series.index[-1] + pd.DateOffset(months = 1),
                        periods = forecast_periods,
                        freq = 'MS'
                    )
                    plt.plot(forecast_index, forecast, label = 'Forecast', color = 'orange', linestyle = '--')
                    plt.title(f'Forecast for {value}')
                    plt.xlabel('Date')
                    plt.ylabel('Share')
                    plt.legend()
                    plt.grid(True)
                    plt.show()
                '''

            except Exception as e:
                print(f"Error processing value {value}: {e}")
                forecasts[value] = np.nan

        forecast_df = pd.DataFrame(
            forecasts,
            index = pd.date_range(start = self.df.index[-1] + pd.DateOffset(months = 1), periods = forecast_periods, freq = 'MS')
        )

        return forecast_df
    
    
    def naive_forecast_with_error(self, past_values: int, weigh_error = 0.8):
        
        """
        Функция для предсказания значений ВР на основе предыдущих значений с учетом ошибки.

        Args:
            past_values: the number of last values in DataFrame
            weigh_error: weigh of error 
        Returns:
            New dataframe with forecast values, list of historical data
        """
        forecast_results = {}
        last_n_list = []

        # Получаем последние N месяцев для каждого столбца
        last_n = self.df.tail(past_values)
        last_n_list.append(last_n)

        # Прогнозирование наивным методом с учетом ошибки
        for column in self.df.columns:
            st_data = self.df[column].copy()

            # Получаем значения за последние N месяцев
            last_values = st_data.tail(past_values)

            # Находим среднее значение
            average = last_values.mean()
            print(f"Initial average for {column}: {average}")

            # Рассчитываем ошибку и корректируем прогнозы на каждом шаге
            for i in range(past_values, 0, -1):
                error = (st_data.iloc[-i] - average)  # Рассчитываем ошибку
                average += error * weigh_error

            # Создаем новую серию со прогнозированным значением
            forecast_values = [average] * forecast_periods  # Прогнозируем на 12 месяцев вперед
            forecast_df = pd.DataFrame({column: forecast_values},
                                       index = pd.date_range(start = self.df.index[-1] + pd.DateOffset(months = 1),
                                                           periods = forecast_periods, freq = 'MS'))
            forecast_results[column] = forecast_df

        # Объединение всех столбцов в один DataFrame
        result_df = pd.concat(forecast_results.values(), axis = 1)
        return result_df, last_n_list
    
    
    def naive_forecast(self, past_values: int):
        """
        Функция для предсказания значений ВР на основе предыдущих значений.
            Args:
                df: Датафрейм
                :param n_forecast_months: Количество месяцев, на которое будет выполнен прогноз
                :param past_values: Количество предыдущих месяцев, по которым будет выполняться прогнозирование
            Returns:
                Новый датафрейм с спрогнозированными значениями, список рассатриваемых исторических значений
        """
        forecast_results = {}
        last_n_list = []

        # Получаем последние N месяцев для каждого столбца
        last_n = self.df.tail(past_values)
        last_n_list.append(last_n)

        # Прогнозирование наивным методом по среднему за past_values
        for column in self.df.columns:
            st_data = self.df[column].copy()

            # Получаем значения за past_values последних месяца
            last_values = st_data.tail(past_values)

            # Находим среднее значение
            average = last_values.mean()

            # Создаем новую серию с прогнозированным значением
            forecast_values = [average] * self.forecast_periods #количество предсказуемых месяцев
            forecast_df = pd.DataFrame({column: forecast_values},
                                       index = pd.date_range(start = self.df.index[-1] + pd.DateOffset(months = 1),
                                                           periods = self.forecast_periods, freq = 'MS'))

            forecast_results[column] = forecast_df

        # Объединение всех столбцов в один DataFrame
        result_df = pd.concat(forecast_results.values(), axis = 1)

        return result_df, last_n_list
    
    
    def prophet_forecast(self, 
                         column_name_with_date: str, 
                         seasonality_mode = ['additive', 'multiplicative'], 
                         n_changepoints = [12, 18, 24, 36],
                         changepoint_prior_scale = [0.01, 0.05, 0.1, 0.2, 0.5]):
        """
            Prophet model for TimeSeries forecasting. The code below refers to cross-validation. So you can change input parameters to get best result.
            Args:
                column_name_with_date: column with date 'Date', 'Month', 'Year' etc.
                seasonality_mode: Prophet parameter, type of seasonality mode
                n_changepoints: Prophet parameter
                changepoint_prior_scale: Prophet parameter
            Returns:
                Forecast for N months 
        """
        self.df = self.df.copy()
        param_grid = {
            'seasonality_mode': seasonality_mode,
            'n_changepoints': n_changepoints,
            'changepoint_prior_scale': changepoint_prior_scale
        }
        self.df.reset_index(inplace = True)
        if column_name_with_date not in self.df.columns:
            raise KeyError("В датафрейме отсутствует столбец с датой.")

        self.df[column_name_with_date] = pd.to_datetime(self.df[column_name_with_date], format = '%d.%m.%Y')
        self.df = self.df.sort_values(by = column_name_with_date)

        #Разбиение выборки на тренировочную и тестовые. В качестве тренировочной 70% выборки, в качестве тестовой 30%.
        date_list = list(self.df[column_name_with_date])
        train = self.df.iloc[0:np.round(len(date_list) * 0.7)]
        test = self.df.iloc[np.round(len(date_list) * 0.7)::]

        # Список временных рядов для прогнозирования (исключаем столбец с датами)
        series_list = [col for col in train.columns if col != column_name_with_date]

        # Результаты для каждого временного ряда
        results = []

        # Прогнозирование для каждого временного ряда
        for series in series_list:
            # Подготовка данных для Prophet
            series_df = train[[column_name_with_date, series]].rename(columns = {column_name_with_date: 'ds', series: 'y'})

            # Список для хранения MAPE для каждой комбинации параметров
            best_mape = float('inf')
            best_params = None

            # Перебор параметров
            for params in ParameterGrid(param_grid):
                model = Prophet(seasonality_mode = params['seasonality_mode'],
                                n_changepoints = params['n_changepoints'],
                                changepoint_prior_scale = params['changepoint_prior_scale'])

                model.fit(series_df)

                # Прогнозирование на N месяцев вперед
                future = model.make_future_dataframe(periods = self.forecast_periods, freq = 'MS')
                forecast = model.predict(future)

                # Сравнение прогноза с тестовым набором
                test_series = test[[column_name_with_date, series]].rename(columns = {column_name_with_date: 'ds', series: 'y'})

                # Отбор прогнозов на даты, совпадающие с тестовым периодом
                forecast_test_period = forecast.tail(len(test_series))

                # Вычисление MAPE
                mape = np.mean(
                    (np.abs(forecast_test_period['yhat'].values - test_series['y'].values)) / test_series['y'].values) * 100

                # Сравнение и выбор лучших параметров
                if mape < best_mape:
                    best_mape = mape
                    best_params = params

            # Сохранение лучших параметров и ошибки для ряда
            results.append((series, best_params, best_mape))
            print(f"Лучшие параметры для {series}: {best_params}, MAPE: {best_mape:.2f}")

        # Прогнозирование на N месяцев вперед для полного набора данных с использованием лучших параметров
        forecast_results = pd.DataFrame(
            {'ds': pd.date_range(self.df[column_name_with_date].min(), periods = len(self.df) + self.forecast_periods, freq = 'MS')})

        forecast_df = pd.DataFrame()
        for series, best_params, _ in results:
            # Подготовка данных для Prophet
            series_df = self.df[[column_name_with_date, series]].rename(columns = {column_name_with_date: 'ds', series: 'y'})

            # Создание модели с лучшими параметрами
            model = Prophet(seasonality_mode = best_params['seasonality_mode'],
                            n_changepoints = best_params['n_changepoints'],
                            changepoint_prior_scale = best_params['changepoint_prior_scale'])

            model.fit(series_df)

            # Прогнозирование на N месяцев вперед для полного набора данных
            future = model.make_future_dataframe(periods = self.forecast_periods, freq = 'MS')
            forecast = model.predict(future)

            if forecast_df.empty:
                forecast_df[column_name_with_date] = forecast['ds']  # Создаем индекс "ds" (дата) только один раз
            forecast_df[series] = forecast['yhat']  # Добавляем прогнозы для каждого ряда

        # Установка индекса на основе дат
        forecast_df.set_index(column_name_with_date, inplace = True)
        forecast_df = forecast_df.tail(self.forecast_periods)
        return forecast_df
    
    
    def regression_model(self, column_name_with_date, param: str):
        """
            Модель регрессии
                Args:
                    column_with_date: столбец с датой
                    param: Линейный тренд (linear trend) или Логистический тренд (logistic trend)
                Returns:
                    Новый прогнозный DataFrame
        """
        data = self.df.copy()
        # Сброс индекса и добавление названий месяцев
        data.reset_index(inplace = True)
        #Предварительная кодировка в признак
        data['month_name'] = data[column_name_with_date].dt.strftime("%B") #преобразование даты ('2020-01-01') в текстовый формат типа 'Январь'

        #С текстовыми данными работать не можем => применяется OneHotEncoder кодировщик для преобразования категориальных или текстовых данных в числа
        #Числа заменяются на единицы и нули, в зависимости от того, какому столбцу какое значение присуще.
        encoder = OneHotEncoder(categories = 'auto', drop = 'first', sparse_output = False)
        encoded_months = encoder.fit_transform(data[['month_name']])#конвертация в массив закодированного столбца

        # Матрица сезонных признаков
        encoded_df_0 = pd.DataFrame(encoded_months, columns = encoder.get_feature_names_out(['month_name']))

        # Колонка с трендом (наклон)
        if param == 'linear trend':
            encoded_df_trend = pd.DataFrame({'Linear_Trend': np.arange(1, data.shape[0] + 1)})
        elif param == 'logistic trend':
            encoded_df_trend = pd.DataFrame({'Log_Trend': np.log(np.arange(1, data.shape[0] + 1))})
        else:
            raise ValueError('Неверно выбран тип тренда.')

        # Свободный член (интерсепт)
        encoded_df_free_var = pd.DataFrame({'free_variable': np.ones(data.shape[0])})

        # Итоговая матрица признаков (сезонность, тренд, интерсепт)
        encoded_df = pd.concat([encoded_df_0, encoded_df_trend, encoded_df_free_var], axis = 1)

        # Новый DataFrame для хранения спрогнозированных значений
        predicted_df = pd.DataFrame({'Date': data[column_name_with_date]})

        # Словарь для хранения коэффициентов модели для каждого столбца
        model_coefficients_dict = {}

        # Прогнозирование для каждого столбца
        for column in self.df.columns[1:-1]:  # Пропускаем столбцы column_name_with_date и "month_name"
            A = encoded_df.values
            b = data[column].values.reshape((int(encoded_df.shape[0]), 1))

            # Решаем систему уравнений с помощью метода наименьших квадратов
            k, *_ = linalg.lstsq(A, b)

            # Переводим коэффициенты в список и сохраняем в словарь
            model_coefficients = k.reshape((1, encoded_df.shape[1])).tolist()[0]
            model_coefficients_dict[column] = model_coefficients

            # Прогнозируем значения на обученных данных
            y_pred = [
                sum(np.multiply(encoded_df.iloc[i, :].tolist(), model_coefficients))
                for i in range(encoded_df.shape[0])
            ]

            # Добавляем прогнозируемые значения в новый DataFrame
            predicted_df[f'{column}'] = y_pred

        # Прогнозирование на следующий год
        # Определяем последний месяц в данных
        last_date = data[column_name_with_date].max()

        # Создаем новые даты для следующего года
        future_dates = pd.date_range(last_date + pd.DateOffset(months = 1), periods = self.forecast_periods, freq = 'MS')

        # Создаем DataFrame для будущих дат
        future_df = pd.DataFrame({column_name_with_date: future_dates})
        future_df['month_name'] = future_df[column_name_with_date].dt.strftime("%B")

        # Преобразуем названия месяцев в бинарные признаки (One-Hot Encoding)
        encoded_future_months = encoder.transform(future_df[['month_name']])
        encoded_future_df_0 = pd.DataFrame(encoded_future_months, columns = encoder.get_feature_names_out(['month_name']))

        # Тренд для новых дат
        if param == 'linear trend':
            encoded_future_df_trend = pd.DataFrame({'Linear_Trend': np.arange(len(data) + 1, len(data) + (self.forecast_periods + 1))})
        elif param == 'logistic trend':
            encoded_future_df_trend = pd.DataFrame({'Log_Trend': np.log(np.arange(len(data) + 1, len(data) + (self.forecast_periods + 1)))}) 

        # Свободный член (интерсепт)
        encoded_future_df_free_var = pd.DataFrame({'free_variable': np.ones(self.forecast_periods)})

        # Итоговая матрица признаков для будущих дат
        encoded_future_df = pd.concat([encoded_future_df_0, encoded_future_df_trend, encoded_future_df_free_var], axis = 1)

        # Новый DataFrame для хранения прогнозируемых значений на следующий год
        future_predictions = pd.DataFrame({column_name_with_date: future_df[column_name_with_date]})

        # Прогнозирование для каждого столбца на следующий год
        for column in data.columns[1:-1]:  # Пропускаем столбцы column_name_with_date и "month_name"
            model_coefficients = model_coefficients_dict[column]

            y_future_pred = [
                sum(np.multiply(encoded_future_df.iloc[i, :].tolist(), model_coefficients))
                for i in range(encoded_future_df.shape[0])
            ]

            # Добавляем прогнозируемые значения в DataFrame
            future_predictions[f'{column}'] = y_future_pred

        return future_predictions, predicted_df
    
    
    def rolling_mean_for_three_years(self, column_name_with_date: str):
        """
        Модель скользящего среднего с сезонностью и трендом
            Тренд: скользящее среднее
            Сезонность: Фиксированная сезонность за последние три года.
            а также прогнозирования тренда и сезонности на следующие два года (24 месяца).
        Args:
            column_name_with_date: название колонки с датой
        Returns:
            Результат прогнозирования тренда и сезонности на следующие два года (24 месяца).
            DataFrame с прогнозом и список рассматриваемых исторических значений.
        """
        self.df[column_name_with_date] = pd.to_datetime(self.df[column_name_with_date], format = '%Y-%m-%d')
        
        # Находим последние три полных календарных года
        last_date = self.df.index.max()
        end_year = last_date.year
        last_month = last_date.month

        # Формируем последние три года данных
        if last_month < 12:
            last_three_years = self.df[(self.df.index >= f'{end_year - 3}-01-01') & (self.df.index < f'{end_year}-01-01')]
            last_n_list = [self.df[self.df.index.year == end_year - (i + 1)] for i in range(3)]

            # Скользящее среднее за N месяцев и детрендирование
            rolling_mean = last_three_years.rolling(window = self.forecast_periods).mean()
            detrended = last_three_years - rolling_mean

            # Выделяем сезонность
            last_detr = detrended[detrended.index.year == end_year - 1].reset_index(drop = True) #прошлый год
            penult_detr = detrended[detrended.index.year == end_year - 2].reset_index(drop = True) #позапрошлый год
            seasonality = (last_detr + penult_detr) / 2
        else:
            last_three_years = self.df[(self.df.index >= f'{end_year - 2}-01-01') & (self.df.index < f'{end_year + 1}-01-01')]

            last_n_list = [self.df[self.df.index.year == end_year - i] for i in range(3)]

            # Скользящее среднее за N месяцев и детрендирование
            rolling_mean = last_three_years.rolling(window = self.forecast_periods).mean()
            detrended = last_three_years - rolling_mean

            # Выделяем сезонность
            last_detr = detrended[detrended.index.year == end_year].reset_index(drop = True)
            penult_detr = detrended[detrended.index.year == end_year - 1].reset_index(drop = True) #прошлый год
            seasonality = (last_detr + penult_detr) / 2

        # Прогнозируем тренд (скользящее среднее)
        steps = rolling_mean.diff().dropna()
        average_step = np.abs(steps.mean())
        last_value = rolling_mean.iloc[-1]

        # Прогнозируем скользящее среднее на следующий год
        next_year_rolling_mean = [last_value + (i + 1) * average_step for i in range(self.forecast_periods)]
        next_year_dates = pd.date_range(start = last_date + pd.DateOffset(months = 1), periods = self.forecast_periods, freq = 'MS')
        next_year_rolling_mean_df = pd.DataFrame(next_year_rolling_mean, index = next_year_dates, columns = self.df.columns)

        # Прогноз сезонности (повторяем 12 месяцев шаблона)
        seasonality_repeated = np.tile(seasonality.values, (1, 1))

        # Финальный прогноз: сложение тренда и сезонности
        final_forecast = next_year_rolling_mean_df.values + seasonality_repeated
        forecast_df = pd.DataFrame(final_forecast, index = next_year_dates, columns = self.df.columns)

        return forecast_df, last_n_list
    
    def rolling_mean_periods(self):
            """
                Модель скользящего среднего с сезонностью и трендом.
                    Функция для выделения тренда и сезонности из временного ряда данных,
                    а также прогнозирования тренда и сезонности на следующий год.

                    Из временного ряда выделяются данные за последние три периода (период равен 12 месяцам). Применяется скользящее
                    среднее за 12 месяцев для устранения тренда, после чего рассчитывается остаток (detrended).Сезонность
                    рассчитывается на основе данных за последние два года. Прогнозируется сезонность и тренд на следующий год.
                    Финальный прогноз для следующего года строится как сумма прогнозируемого тренда и сезонности.

                Returns:
                    DataFrame с прогнозом, включающим прогнозируемый тренд и сезонность.

            """
            # Выделяем три последних календарных года
            last_date = self.df.index.max()
            end_year = last_date.year
            last_month = last_date.month

            if last_month < 12:
                last_three_years = self.df[(self.df.index >= f'{end_year - 3}-01-01') & (self.df.index < f'{end_year}-01-01')]
            if last_month == 12:
                last_three_years = self.df[(self.df.index >= f'{end_year - 2}-01-01') & (self.df.index < f'{end_year + 1}-01-01')]

            # Скользящее среднее за 12 месяцев
            rolling_mean = last_three_years.rolling(window = self.forecast_periods).mean()
            detrended = last_three_years - rolling_mean

            #Прогнозируем сезонность
            last_detr = detrended.iloc[-12:].reset_index(drop = True) #последний каледарный год
            penult_detr = detrended.iloc[-24:-12].reset_index(drop = True) #позапрошлый календарный [год n-2; n-1]
            seasonatily = (last_detr + penult_detr) / 2
            #print(seasonatily)

            #Прогнозируем скользящее среднее (тренд)
            steps = rolling_mean.diff().dropna()
            # Усредняем шаги
            average_step = np.abs(steps.mean())

            # Прогнозируем скользящее среднее, добавляя усреднённый шаг к последнему значению скользящего среднего
            last_value = rolling_mean.iloc[-1]

            if last_month == 12:
                months_to_forecast = 12  # Прогнозируем полный следующий год
                next_year_start = f'{end_year + 1}-01-01'
            else:
                months_to_forecast = self.forecast_periods  # Прогнозируем только оставшиеся месяцы
                next_year_start = last_date + pd.DateOffset(months = 1)

            next_year_rolling_mean = [last_value + (i + 1) * average_step for i in range(months_to_forecast)]
            next_year_dates = pd.date_range(start = last_three_years.index[-1] + pd.DateOffset(months = 1), periods = months_to_forecast, freq='MS')

            # Преобразуем результат в DataFrame
            next_year_rolling_mean_df = pd.DataFrame(next_year_rolling_mean, index = next_year_dates, columns = self.df.columns)

            # Прогноз сезонности
            seasonal_forecast = np.tile(seasonatily.values, (1, 1))

            # Финальный прогноз: сложение скользящего среднего с прогнозом сезонности
            final_forecast = next_year_rolling_mean_df + seasonal_forecast

            # Создание DataFrame с прогнозом
            forecast_df = pd.DataFrame(final_forecast, index = next_year_dates, columns = self.df.columns)
            return forecast_df
    
    
    def season_dec_with_seasonality_without_trend_for_three_years(self):
        """
        Модель сезонной декомпозиции с сезонностью, но без тренда.
        Функция для выделения тренда и сезонности из временного ряда данных,
        а также прогнозирования тренда и сезонности на следующий год.

        Из временного ряда выделяются данные за последние три года. Удаляется тренд с помощью дифференцирования
        временного ряда. Сезонность рассчитывается как среднее значение разностей за
        последние три года. Из исходного временного ряда вычитается рассчитанная сезонность, и остается тренд.
        Прогнозируются тренд и сезонность на следующий год. Финальный прогноз на следующий год строится как сумма
        прогнозируемого тренда и сезонности.

        Returns:
            DataFrame с прогнозом, список рассатриваемых исторических значений
        """
        # Выделяем три последних календарных года
        last_date = self.df.index.max()
        end_year = last_date.year
        last_month = last_date.month

        if last_month < 12:
            last_three_years = self.df[(self.df.index >= f'{end_year - 3}-01-01') & (self.df.index < f'{end_year}-01-01')]
            last_n_list = []

            for i in range(3):
                year = end_year - (i + 1)
                last_n = self.df[self.df.index.year == year]
                last_n_list.append(last_n)

        if last_month == 12:
            last_three_years = self.df[(self.df.index >= f'{end_year - 2}-01-01') & (self.df.index < f'{end_year + 1}-01-01')]
            last_n_list = []

            for i in range(3):
                year = end_year - (i)
                last_n = self.df[self.df.index.year == year]
                last_n_list.append(last_n)

        # Удаляем тренд с помощью дифференцирования
        detrended = last_three_years.diff().dropna()

        # Выделяем годы отдельно для получения сезонности
        last_year = detrended.iloc[-12:].reset_index(drop = True)
        penult_year = detrended.iloc[-24:-12].reset_index(drop = True)
        pre_penult_year = detrended.iloc[-36:-24].reset_index(drop = True)

        # Рассчитываем среднее значение сезонности
        average_season_k = (last_year + penult_year + pre_penult_year) / 3
        average_season_k = average_season_k.fillna((last_year + penult_year) / 2)

        average_season_k_4 = pd.concat([average_season_k.dropna()] * 4, ignore_index = True)
        #print(average_season_k_4)

        # Из исходного временного ряда вычитаем сезонность, и остается тренд
        trend = last_three_years.reset_index(drop = True) - average_season_k_4
        trend = trend.dropna()
        # Прогнозируем тренд с использованием линейной регрессии
        X = np.arange(len(trend)).reshape(-1, 1)  # Индексы временного ряда
        y = trend.values  # Значения тренда

        # Линейная регрессия для тренда
        model = LinearRegression()
        model.fit(X, y)

        # Прогнозируем тренд на следующий год
        next_year_indices = np.arange(len(trend), len(trend) + self.forecast_periods).reshape(-1, 1)
        next_year_trend = model.predict(next_year_indices)

        # Определяем месяц последнего известного значения
        last_date = self.df.index.max()
        last_year = last_date.year
        last_month = last_date.month

        if last_month == 12:
            months_to_forecast = 12  # Прогнозируем полный следующий год
            next_year_start = f'{last_year + 1}-01-01'
        else:
            months_to_forecast = self.forecast_periods  # Прогнозируем только оставшиеся месяцы
            next_year_start = last_date + pd.DateOffset(months = 1)

            # Создаем даты для прогноза
        next_year_dates = pd.date_range(start = next_year_start, periods = months_to_forecast, freq = 'MS')

        # Преобразуем результат в DataFrame только для месяцев, которые не покрыты данными
        next_year_trend_df = pd.DataFrame(next_year_trend[:months_to_forecast], index = next_year_dates,
                                          columns = self.df.columns)

        # Прогноз сезонности
        season_forecast_df = pd.DataFrame(average_season_k.values[:months_to_forecast], index = next_year_dates,
                                          columns = self.df.columns)

        # Финальный прогноз: сложение тренда и сезонности
        final_forecast = next_year_trend_df + season_forecast_df

        return final_forecast, last_n_list
    
    
    def season_dec_periods(df):
        """
        Модель сезонной декомпозиции с трендом без сезонности.
            Функция для выделения тренда и сезонности из временного ряда данных,
            а также прогнозирования тренда и сезонности на следующий год.

            Из временного ряда выделяются данные за последние три периода (один период равен 12 месяцам). Удаляется тренд
            с помощью дифференцирования временного ряда. Сезонность рассчитывается как среднее значение разностей за
            последние три периода. Из исходного временного ряда вычитается рассчитанная сезонность, и остается тренд.
            Прогнозируются тренд и сезонность на следующий год. Финальный прогноз на следующий год строится как сумма
            прогнозируемого тренда и сезонности.
        Returns:
            DataFrame с прогнозом
        """
        last_date = self.df.index.max()
        end_year = last_date.year
        last_month = last_date.month

        if last_month < 12:
                last_three_years = self.df[(self.df.index >= f'{end_year - 3}-01-01') & (self.df.index < f'{end_year}-01-01')]
        if last_month == 12:
                last_three_years = self.df[(self.df.index >= f'{end_year - 2}-01-01') & (self.df.index < f'{end_year + 1}-01-01')]

        # Удаляем тренд с помощью дифференцирования
        detrended = last_three_years.diff().dropna()

        # Выделяем годы отдельно для получения сезонности
        last_year = detrended.iloc[-12:].reset_index(drop = True)
        penult_year = detrended.iloc[-24:-12].reset_index(drop = True)
        pre_penult_year = detrended.iloc[-36:-24].reset_index(drop = True)

        # Рассчитываем среднее значение сезонности
        average_season_k = (last_year + penult_year + pre_penult_year) / 3
        average_season_k = average_season_k.fillna((last_year + penult_year) / 2)
        average_season_k_4 = pd.concat([average_season_k] * 4, ignore_index=True)

        # Из исходного временного ряда вычитаем сезонность, и остается тренд
        trend = last_three_years.reset_index(drop = True) - average_season_k_4
        trend = trend.dropna()

        # Прогнозируем тренд с использованием линейной регрессии
        X = np.arange(len(trend)).reshape(-1, 1)  # Индексы временного ряда
        y = trend.values  # Значения тренда

        # Линейная регрессия для тренда
        model = LinearRegression()
        model.fit(X, y)

        # Прогнозируем тренд
        average_step = (detrended.mean())
        last_value = trend.iloc[-1]
        next_year_trend = [last_value + (i + 1) * average_step for i in range(13)]
        
        #Различные варианты построение прогноза
        if last_month == 12:
            months_to_forecast = 12  # Прогнозируем полный следующий год
            next_year_start = f'{last_year + 1}-01-01'
        else:
            months_to_forecast = self.forecast_periods
            next_year_start = last_date + pd.DateOffset(months = 1)

        next_year_dates = pd.date_range(start = last_three_years.index[-1] + pd.DateOffset(months = 1), periods = months_to_forecast, freq = 'MS')

        # Преобразуем результат в DataFrame
        next_year_trend_df = pd.DataFrame(next_year_trend, index = next_year_dates, columns = df.columns)
        season_forecast_df = pd.DataFrame(average_season_k.values, index = next_year_dates, columns = df.columns)

        # Итоговый прогноз на следующий год (тренд + сезонность)
        forecast_df = season_forecast_df + next_year_trend_df

        return forecast_df
    
    def seasonality_with_trend_periods(self, years_count = 3, w_1 = 0.2, w_2 = 0.8):
        '''
        Модель сезонной декомпозиции с трендом в N лет
        Создает новый DataFrame с спрогнозированными значениями.
        Прогноз значений выполняется на основе указанного количества предыдущих
        периодов (один период равен 12 месяцев), путем выделения из этих периодов сезонности и тренда.

        Сезонность рассчитывается как среднее нормированных значений за указанные периоды.

        Для прогноза на основе одного и двух прошлых периодов, считаем, что тренда нет.
        Будущее значение тренда на основе трёх прошлых периодов рассчитывается как сумма предыдущего значения и средней
        разницы между средними значениями прериодов (где разность между средними значениями для разных периодов имеет
        разные весовые коэффициенты).

        Args:
            years_count: Количество предыдущих периодов для анализа (1, 2 или 3)
            w_1: Вес для разности между 1 и 2 периодом (по умолчанию 0.2)
            w_2: Вес для разности между 2 и 3 периодом (по умолчанию 0.8)
        Returns:
            Новый DataFrame 
        '''
        if len(self.df) < 12 * years_count:
            raise ValueError("В DataFrame недостаточно данных для указанного количества периодов.")

        yearly_means = []
        last_n_list = []
        yearly_normalized = []
        means_df = pd.DataFrame()

        # Периоды для анализа
        for i in range(years_count):
            if i == 0:
                last_n = self.df.iloc[-12:]
            elif i == 1:
                last_n = self.df.iloc[-24:-12]
            elif i == 2:
                last_n = self.df.iloc[-36:-24]

            #Добавляем значения прошлых периодов в список и для каждого периода раасчитываем среднее
            last_n_list.append(last_n)
            last_n_mean = last_n.mean()

            #Добавляем средние значения за кждый период в список
            yearly_means.append(last_n_mean)
            means_df[f'Period_{i + 1}'] = last_n_mean

            #Рассчитываем нормированные значения и добавляем в список, сбрасывая индексы
            last_n_normalized = last_n / last_n_mean
            yearly_normalized.append(last_n_normalized.reset_index(drop = True))

        #Рассчитываем среднее для нормированных значений для прогноза сезонности
        average_normalized = sum(yearly_normalized) / years_count
        #Рассчитываем среднее для всех значений для прогноза тренда
        overall_mean = sum(yearly_means) / years_count

        #Прогноз по одному или двум прошлым периодам
        if (years_count == 1) or (years_count == 2):
            lst_overall_mean = overall_mean.tolist()
            result_df = average_normalized * lst_overall_mean
        #Прогноз по трем прошлым периодам
        if years_count == 3:
            step = (means_df['Period_2'] - means_df['Period_3']) * w_1 + (
                        means_df['Period_1'] - means_df['Period_2']) * w_2
            forecast_average = step + means_df['Period_1']
            result_df = average_normalized * forecast_average

        #Обновляем индекс
        new_index = pd.date_range(start = self.df.index.max() + pd.DateOffset(months = 1), periods = self.forecast_periods, freq = 'MS')
        result_df.index = new_index

        return result_df, last_n_list
    
