In [None]:
import numpy as np
import pandas as pd
# Для сравнения может потребоваться statsmodels, если разрешен для этой цели
# from statsmodels.tsa.seasonal import STL as sm_STL

# Вспомогательная функция для локальной регрессии (Loess)
# Эта реализация Loess упрощена и использует только numpy/pandas.
# Оригинальная статья STL описывает более сложный итеративный Loess,
# но для базовой реализации мы используем упрощенный подход.
def loess_1d(y, x, points, bandwidth):
    """
    Упрощенная 1D реализация Loess (Локальная регрессия).

    Args:
        y (np.ndarray): Значения зависимой переменной.
        x (np.ndarray): Значения независимой переменной (должны быть отсортированы).
        points (np.ndarray): Точки, в которых нужно оценить значения.
        bandwidth (float): Ширина полосы пропускания (доля данных, используемых для каждой локальной регрессии).

    Returns:
        np.ndarray: Оцененные значения в точках points.
    """
    n = len(x)
    if n == 0:
        return np.zeros_like(points, dtype=float)

    y = np.asarray(y)
    x = np.asarray(x)
    points = np.asarray(points)

    # Сортируем данные по x, если они еще не отсортированы (Loess требует сортировки)
    sort_indices = np.argsort(x)
    x_sorted = x[sort_indices]
    y_sorted = y[sort_indices]

    estimated_points = np.zeros_like(points, dtype=float)

    for i, p in enumerate(points):
        # Определяем окно данных для текущей точки
        distances = np.abs(x_sorted - p)
        # Количество точек в окне
        num_points_in_window = int(np.ceil(bandwidth * n))
        if num_points_in_window < 2: # Требуется минимум 2 точки для регрессии
             num_points_in_window = 2
        if num_points_in_window > n:
             num_points_in_window = n

        # Находим индексы ближайших точек
        nearest_indices = np.argsort(distances)[:num_points_in_window]
        x_window = x_sorted[nearest_indices]
        y_window = y_sorted[nearest_indices]
        dist_window = distances[nearest_indices]

        # Вычисляем веса по трикубической функции ядра
        # d = dist_window / max_dist, где max_dist - максимальное расстояние в окне
        max_dist = np.max(dist_window)
        if max_dist < 1e-9: # Избегаем деления на ноль
             weights = np.ones_like(x_window)
        else:
            d = dist_window / max_dist
            weights = (1 - d**3)**3

        # Выполняем взвешенную линейную регрессию в окне
        # Модель: y = a + b*x
        # Минимизируем sum(weights * (y - (a + b*x))^2)
        # Нормальные уравнения для взвешенной линейной регрессии:
        # [ sum(w)    sum(w*x) ] [ a ] = [ sum(w*y)    ]
        # [ sum(w*x)  sum(w*x^2)] [ b ] = [ sum(w*x*y)  ]

        W = np.diag(weights)
        X_mat = np.vstack([np.ones_like(x_window), x_window]).T
        Y_vec = y_window.reshape(-1, 1)

        # Решаем систему (X.T * W * X) * [a, b].T = (X.T * W * Y)
        XTWX = X_mat.T @ W @ X_mat
        XTWY = X_mat.T @ W @ Y_vec

        # Добавляем небольшую регуляризацию для устойчивости, если матрица плохо обусловлена
        try:
            coeffs = np.linalg.solve(XTWX, XTWY)
            a, b = coeffs.flatten()
        except np.linalg.LinAlgError:
            # Если матрица вырожденная, используем среднее взвешенное
            if np.sum(weights) < 1e-9:
                 a = np.mean(y_window) # Или 0, или другое разумное значение
                 b = 0
            else:
                a = np.sum(weights * y_window) / np.sum(weights)
                b = 0 # Не можем оценить наклон


        # Оцениваем значение в точке p
        estimated_points[i] = a + b * p

    return estimated_points


# Реализация алгоритма STL
class SimpleSTL:
    """
    Простая реализация Seasonal-Trend decomposition using Loess (STL)
    с использованием только numpy и pandas.
    Основана на описании из оригинальной статьи, но использует упрощенный Loess.
    """
    def __init__(self, seasonal_period, n_trend_bandwidth=None, n_lowpass_bandwidth=None,
                 seasonal_smoother_span=None, trend_smoother_span=None, lowpass_smoother_span=None,
                 n_inner_iterations=2, n_outer_iterations=0):
        """
        Инициализация STL.

        Args:
            seasonal_period (int): Период сезонности (например, 12 для месячных данных).
            n_trend_bandwidth (int, optional): Количество точек для Loess тренда. По умолчанию None (рассчитывается).
            n_lowpass_bandwidth (int, optional): Количество точек для Loess низкочастотного фильтра. По умолчанию None (рассчитывается).
            seasonal_smoother_span (float, optional): Доля данных для Loess сезонности. По умолчанию None (рассчитывается).
            trend_smoother_span (float, optional): Доля данных для Loess тренда. По умолчанию None (рассчитывается).
            lowpass_smoother_span (float, optional): Доля данных для Loess низкочастотного фильтра. По умолчанию None (рассчитывается).
            n_inner_iterations (int): Количество внутренних итераций. По умолчанию 2.
            n_outer_iterations (int): Количество внешних итераций (для робастности). По умолчанию 0.
        """
        self.seasonal_period = seasonal_period
        self.n_trend_bandwidth = n_trend_bandwidth
        self.n_lowpass_bandwidth = n_lowpass_bandwidth
        self.seasonal_smoother_span = seasonal_smoother_span
        self.trend_smoother_span = trend_smoother_span
        self.lowpass_smoother_span = lowpass_smoother_span
        self.n_inner_iterations = n_inner_iterations
        self.n_outer_iterations = n_outer_iterations

        self.seasonal = None
        self.trend = None
        self.residual = None
        self._data = None
        self._n = 0
        self._times = None # Временные метки или индексы

    def fit(self, data):
        """
        Выполняет декомпозицию временного ряда.

        Args:
            data (np.ndarray или pd.Series): Входной временной ряд (1D массив).
        """
        if not isinstance(data, (np.ndarray, pd.Series)):
            raise TypeError("Входные данные должны быть np.ndarray или pd.Series")
        if data.ndim != 1:
             raise ValueError("Входные данные должны быть 1D массивом")

        self._data = np.asarray(data, dtype=float)
        self._n = len(self._data)
        if self._n == 0:
            self.seasonal = np.array([])
            self.trend = np.array([])
            self.residual = np.array([])
            return

        # Используем индексы как "время" для Loess
        self._times = np.arange(self._n, dtype=float)

        # Инициализация компонент
        self.seasonal = np.zeros(self._n, dtype=float)
        self.trend = np.zeros(self._n, dtype=float)
        self.residual = self._data.copy() # Изначально остаток = исходные данные

        # Рассчитываем параметры сглаживания, если не заданы
        if self.seasonal_smoother_span is None:
             # Рекомендация из статьи: 7 для сезонности
             self.seasonal_smoother_span = 7 / self._n # Доля данных
        if self.n_trend_bandwidth is None:
             # Рекомендация из статьи: ближайшее нечетное к 1.5 * seasonal_period
             self.n_trend_bandwidth = int(np.ceil(1.5 * self.seasonal_period))
             if self.n_trend_bandwidth % 2 == 0:
                 self.n_trend_bandwidth += 1
             if self.n_trend_bandwidth < 3: self.n_trend_bandwidth = 3 # Минимум 3 точки
        if self.n_lowpass_bandwidth is None:
             # Рекомендация из статьи: ближайшее нечетное к seasonal_period
             self.n_lowpass_bandwidth = int(np.ceil(self.seasonal_period))
             if self.n_lowpass_bandwidth % 2 == 0:
                 self.n_lowpass_bandwidth += 1
             if self.n_lowpass_bandwidth < 3: self.n_lowpass_bandwidth = 3 # Минимум 3 точки

        # Преобразуем количество точек в долю для Loess (для единообразия с seasonal)
        if self.trend_smoother_span is None:
             self.trend_smoother_span = self.n_trend_bandwidth / self._n
        if self.lowpass_smoother_span is None:
             self.lowpass_smoother_span = self.n_lowpass_bandwidth / self._n

        # Внешние итерации (для робастности - здесь упрощено, веса робастности не используются)
        for outer_iter in range(self.n_outer_iterations + 1):
            # Инициализация весов робастности (в упрощенной версии они всегда 1)
            robustness_weights = np.ones(self._n, dtype=float)

            # Внутренние итерации
            for inner_iter in range(self.n_inner_iterations):
                # Шаг 1: Десезонирование
                # y_t - T_t^(k) (где T_t^(k) - тренд из предыдущей итерации, изначально 0)
                deseasonalized = self._data - self.trend

                # Шаг 2: Сглаживание сезонности
                # Для каждого подряда сезонности (например, все январи) сглаживаем deseasonalized
                seasonal_smoothed = np.zeros(self._n, dtype=float)
                for i in range(self.seasonal_period):
                    # Выбираем точки, соответствующие текущему подряду сезонности
                    period_indices = np.arange(i, self._n, self.seasonal_period)
                    if len(period_indices) > 0:
                         # Применяем Loess к подряду сезонности
                         # Используем индексы подряда как "время" для Loess
                         period_times = self._times[period_indices]
                         period_values = deseasonalized[period_indices]
                         # Точки для оценки - те же самые индексы подряда
                         smoothed_values = loess_1d(period_values, period_times, period_times, self.seasonal_smoother_span)
                         # Заполняем сглаженные значения обратно в полный массив
                         seasonal_smoothed[period_indices] = smoothed_values

                # Шаг 3: Сглаживание сглаженной сезонности (для получения временного ряда сезонности)
                # Применяем скользящее среднее по seasonal_period точкам
                # Затем центрируем его, вычитая скользящее среднее по seasonal_period точкам еще раз
                # (Это упрощенный подход, оригинальный STL использует Loess)
                # Здесь используем Loess для соответствия структуре STL
                seasonal_trend = loess_1d(seasonal_smoothed, self._times, self._times, self.trend_smoother_span)

                # Шаг 4: Временный сезонный компонент
                # S_t^(k+1) = smoothed_seasonal - seasonal_trend
                self.seasonal = seasonal_smoothed - seasonal_trend

                # Шаг 5: Десезонирование с использованием нового сезонного компонента
                # y_t - S_t^(k+1)
                detrended = self._data - self.seasonal

                # Шаг 6: Сглаживание десезонированного ряда для получения тренда
                # T_t^(k+1) = Loess(detrended)
                self.trend = loess_1d(detrended, self._times, self._times, self.trend_smoother_span)

                # Шаг 7: Вычисление остатков
                self.residual = self._data - self.seasonal - self.trend

                # (Внешние итерации используют остатки для обновления весов робастности,
                # но в этой упрощенной версии этот шаг пропущен)

        # После всех итераций компоненты seasonal, trend, residual готовы

    def get_components(self):
        """
        Возвращает разложенные компоненты временного ряда.

        Returns:
            dict: Словарь с ключами 'seasonal', 'trend', 'residual'.
        """
        return {
            'seasonal': self.seasonal,
            'trend': self.trend,
            'residual': self.residual
        }

# 2. Симуляция данных для тестирования STL

# Генерируем временной ряд с трендом, сезонностью и шумом
np.random.seed(42)
n_points = 120 # 10 лет месячных данных
seasonal_period = 12
times = np.arange(n_points)

# Тренд: линейный + небольшой изгиб
trend_component = 0.5 * times + 5 * np.sin(times / 50)

# Сезонность: синусоидальная с периодом 12
seasonal_component = 10 * np.sin(2 * np.pi * times / seasonal_period) + 5 * np.cos(2 * np.pi * times / (seasonal_period/2))

# Шум (остатки)
residual_component = np.random.normal(loc=0, scale=5, size=n_points)

# Исходный временной ряд
simulated_time_series = trend_component + seasonal_component + residual_component

# 3. Сравнение со стандартной реализацией (statsmodels, если разрешен для сравнения)

# Используем нашу реализацию
my_stl = SimpleSTL(seasonal_period=seasonal_period, n_inner_iterations=5, n_outer_iterations=0) # Увеличим внутренние итерации для лучшей сходимости
my_stl.fit(simulated_time_series)
my_components = my_stl.get_components()

print("Наша реализация STL:")
print("Сезонность:", my_components['seasonal'][:seasonal_period]) # Выводим первые несколько периодов
print("Тренд:", my_components['trend'][:10]) # Выводим первые 10 значений
print("Остатки (первые 10):", my_components['residual'][:10])

# # Используем стандартную реализацию из statsmodels
# try:
#     sm_stl = sm_STL(simulated_time_series, seasonal=seasonal_period, period=seasonal_period, robust=False) # robust=False для сравнения с нашей упрощенной версией
#     sm_result = sm_stl.fit()
#
#     print("\nРеализация statsmodels STL:")
#     print("Сезонность:", sm_result.seasonal[:seasonal_period])
#     print("Тренд:", sm_result.trend[:10])
#     print("Остатки (первые 10):", sm_result.resid[:10])
#
#     # Сравнение результатов (например, MSE)
#     mse_seasonal = np.mean((my_components['seasonal'] - sm_result.seasonal)**2)
#     mse_trend = np.mean((my_components['trend'] - sm_result.trend)**2)
#     mse_residual = np.mean((my_components['residual'] - sm_result.resid)**2)
#
#     print(f"\nСреднеквадратичная ошибка (MSE) между нашей и statsmodels реализациями:")
#     print(f"  Сезонность: {mse_seasonal:.6f}")
#     print(f"  Тренд: {mse_trend:.6f}")
#     print(f"  Остатки: {mse_residual:.6f}")
#
# except ImportError:
#     print("\nБиблиотека statsmodels не найдена. Пропуск сравнения.")
# except Exception as e:
#     print(f"\nОшибка при сравнении со statsmodels: {e}")


# Визуализация (опционально, требует matplotlib)
# import matplotlib.pyplot as plt
#
# plt.figure(figsize=(12, 8))
#
# plt.subplot(4, 1, 1)
# plt.plot(times, simulated_time_series, label='Исходный ряд')
# plt.title('Исходный временной ряд')
# plt.legend()
#
# plt.subplot(4, 1, 2)
# plt.plot(times, my_components['seasonal'], label='Наша сезонность')
# # if 'sm_result' in locals(): plt.plot(times, sm_result.seasonal, label='sm сезонность', linestyle='--')
# plt.title('Сезонная компонента')
# plt.legend()
#
# plt.subplot(4, 1, 3)
# plt.plot(times, my_components['trend'], label='Наш тренд')
# # if 'sm_result' in locals(): plt.plot(times, sm_result.trend, label='sm тренд', linestyle='--')
# plt.title('Компонента тренда')
# plt.legend()
#
# plt.subplot(4, 1, 4)
# plt.plot(times, my_components['residual'], label='Наши остатки')
# # if 'sm_result' in locals(): plt.plot(times, sm_result.resid, label='sm остатки', linestyle='--')
# plt.title('Остаточная компонента')
# plt.legend()
#
# plt.tight_layout()
# plt.show()

