## get data

In [3]:
# Перезапускаем среду выполнения и импортируем необходимые библиотеки
import pandas as pd

df = pd.read_csv("data/normalized_without_outliers_25_55000.csv")
df.shape

(55673, 78)

In [16]:
import numpy as np
import matplotlib.pyplot as plt
from typing import Tuple


from tsmoothie.utils_func import sim_randomwalk
from tsmoothie.smoother import *

In [13]:
# получить X
import numpy as np


X = df.loc[:, "norm_price_1":"norm_price_25"].values  # usual version without norm_floor_price
X.shape

(55673, 25)

## test different models

## smooth functions

In [82]:
def apply_exponential_smoother(prices: list[float], plt: plt, should_log_data: bool, should_plot=False) -> tuple[plt, list[float]]:
    # operate smoothing
    smoother = ExponentialSmoother(window_len=2, alpha=0.1)
    smoother.smooth(prices)

    # generate intervals
    low, up = smoother.get_intervals('sigma_interval')
    
    if should_log_data:
        print(f"smoth data for ExponentialSmoother is {smoother.smooth_data[0]}")

    if should_plot:
        # plot the first smoothed timeseries with intervals
        plt.plot(smoother.smooth_data[0], linewidth=3, label="ExponentialSmoother", alpha=0.3)

    
    return plt, smoother.smooth_data[0]



In [83]:
def apply_convolution_smoother(prices: list[float], plt: plt, should_log_data: bool, should_plot=False) -> tuple[plt, list[float]]:
    # operate smoothing
    smoother = ConvolutionSmoother(window_len=4, window_type='ones')
    smoother.smooth(prices)

    # generate intervals
    low, up = smoother.get_intervals('sigma_interval')
    
    if should_log_data:
        print(f"smoth data for ConvolutionSmoother is {smoother.smooth_data[0]}")

    if should_plot:
        # plot the first smoothed timeseries with intervals
        plt.plot(smoother.smooth_data[0], linewidth=3, label="ConvolutionSmoother", alpha=0.3)

    return plt, smoother.smooth_data[0] 


In [84]:
def apply_spectral_smoother(prices: list[float], plt: plt, should_log_data: bool, should_plot=False) -> tuple[plt, list[float]]:
    # operate smoothing
    smoother = SpectralSmoother(smooth_fraction=0.3, pad_len=20)
    smoother.smooth(prices)

    # generate intervals
    low, up = smoother.get_intervals('sigma_interval')

    if should_log_data:
        print(f"smoth data for SpectralSmoother is {smoother.smooth_data[0]}")

    if should_plot:
        # plot the first smoothed timeseries with intervals
        plt.plot(smoother.smooth_data[0], linewidth=3, label="SpectralSmoother", alpha=0.3)
    
    
    return plt, smoother.smooth_data[0]

In [85]:
def apply_polynomial_smoother(prices: list[float], plt: plt, should_log_data: bool, should_plot=False) -> tuple[plt, list[float]]:
    # operate smoothing
    smoother = PolynomialSmoother(degree=6)
    smoother.smooth(prices)

    # generate intervals
    low, up = smoother.get_intervals('prediction_interval')

    if should_log_data:
        print(f"smoth data for PolynomialSmoother is {smoother.smooth_data[0]}")

    if should_plot:
        # plot the first smoothed timeseries with intervals
        plt.plot(smoother.smooth_data[0], linewidth=3, label="PolynomialSmoother", alpha=0.3)
    
    return plt, smoother.smooth_data[0]

In [86]:
def apply_spline_smoother(prices: list[float], plt: plt, should_log_data: bool, should_plot=False) -> tuple[plt, list[float]]:
    # operate smoothing
    smoother = SplineSmoother(n_knots=6, spline_type='natural_cubic_spline')
    smoother.smooth(prices)

    # generate intervals
    low, up = smoother.get_intervals('prediction_interval')

    if should_log_data:
        print(f"smoth data for SplineSmoother is {smoother.smooth_data[0]}")

    if should_plot:
        # plot the first smoothed timeseries with intervals
        plt.plot(smoother.smooth_data[0], linewidth=3, label="SplineSmoother", alpha=0.3)


    return plt, smoother.smooth_data[0]

In [87]:
def apply_gaussian_smoother(prices: list[float], plt: plt, should_log_data: bool, should_plot=False) -> tuple[plt, list[float]]:
    # operate smoothing
    smoother = GaussianSmoother(n_knots=5, sigma=0.7)
    smoother.smooth(prices)

    # generate intervals
    low, up = smoother.get_intervals('prediction_interval')

    if should_log_data:
        print(f"smoth data for GaussianSmoother is {smoother.smooth_data[0]}")

    if should_plot:
        # plot the first smoothed timeseries with intervals
        plt.plot(smoother.smooth_data[0], linewidth=3, label="GaussianSmoother", alpha=0.3)
    
    return plt, smoother.smooth_data[0]

In [88]:
def apply_binner_smoother(prices: list[float], plt: plt, should_log_data: bool, should_plot=False) -> tuple[plt, list[float]]:
    # operate smoothing
    smoother = BinnerSmoother(n_knots=6)
    smoother.smooth(prices)

    # generate intervals
    low, up = smoother.get_intervals('prediction_interval')
    
    if should_log_data:
        print(f"smoth data for BinnerSmoother is {smoother.smooth_data[0]}")

    if should_plot:
        # plot the first smoothed timeseries with intervals
        plt.plot(smoother.smooth_data[0], linewidth=3, label="BinnerSmoother", alpha=0.3)

    return plt, smoother.smooth_data[0]

In [89]:
def apply_lowess_smoother(prices: list[float], plt: plt, should_log_data: bool, should_plot=False) -> tuple[plt, list[float]]:
    # operate smoothing
    smoother = LowessSmoother(smooth_fraction=0.2, iterations=1)
    smoother.smooth(prices)

    # generate intervals
    low, up = smoother.get_intervals('prediction_interval')

    if should_log_data:
        print(f"smoth data for LowessSmoother is {smoother.smooth_data[0]}")

    if should_plot:
        # plot the first smoothed timeseries with intervals
        plt.plot(smoother.smooth_data[0], linewidth=3, label="LowessSmoother", alpha=0.3)

    return plt, smoother.smooth_data[0]

In [90]:
def apply_kalman_smoother(prices: list[float], plt: plt, should_log_data: bool, should_plot=False) -> tuple[plt, list[float]]:
    # operate smoothing
    smoother = KalmanSmoother(component='level_trend', 
                            component_noise={'level':0.1, 'trend':0.1})
    smoother.smooth(prices)

    # generate intervals
    low, up = smoother.get_intervals('kalman_interval')

    if should_log_data:
        print(f"smoth data for KalmanSmoother is {smoother.smooth_data[0]}")

    if should_plot:
        # plot the first smoothed timeseries with intervals
        plt.plot(smoother.smooth_data[0], linewidth=3, label="KalmanSmoother", alpha=0.3)

    return plt, smoother.smooth_data[0]

## test

In [None]:
for i in range(5):
    prices = X[i]

    fig = plt.figure(figsize=(15,7))
    plt.plot(prices)

    should_log_data = False

    smoothed_predicts_list = []  # списки предиктов каждого модуля заносятся сюда (9 списков, каждый - список предсказаний)
    
    plt, pred = apply_exponential_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_convolution_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_spectral_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_polynomial_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_spline_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_gaussian_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_binner_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_lowess_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_kalman_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)

    # не все возвращают полное кол-во значений, так как некоторые используют оконные функции
    smoothed_predicts_list = [module_predict[:23] for module_predict in smoothed_predicts_list]
    # Convert the list to a numpy array for easier manipulation
    smoothed_data_array = np.array(smoothed_predicts_list)

    # Calculate the mean excluding the largest and smallest values
    mean_of_predicts_without_extremes = np.mean(np.partition(smoothed_data_array, (2, -2), axis=0)[2:-2], axis=0)

    print(mean_of_predicts_without_extremes)

    plt.scatter(range(len(mean_of_predicts_without_extremes)), mean_of_predicts_without_extremes, color="orange")

    plt.legend()
    plt.show()

    # отрисовка лейблов
    fig = plt.figure(figsize=(15,7))

    # если есть оконные функции, то надо сократить исходный временной ряд
    shorted_prices = prices
    labels = np.where(shorted_prices > mean_of_predicts_without_extremes, 0, 1)
    plt.scatter(range(len(shorted_prices)), shorted_prices, c=labels, cmap='coolwarm', label='Price vs. Mean Predict')
    
    plt.plot(prices)
    plt.plot(range(len(mean_of_predicts_without_extremes)), mean_of_predicts_without_extremes, color="orange")
    plt.show()



## predictions using mean of all

In [91]:
import matplotlib.pyplot as plt


def get_smoothed_mean(prices, plt):

    should_log_data = False

    smoothed_predicts_list = []  # списки предиктов каждого модуля заносятся сюда (9 списков, каждый - список предсказаний)
    
    #plt, pred = apply_exponential_smoother(prices, plt, should_log_data=False)
    #smoothed_predicts_list.append(pred)
    plt, pred = apply_convolution_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_spectral_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_polynomial_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_spline_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_gaussian_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_binner_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_lowess_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)
    plt, pred = apply_kalman_smoother(prices, plt, should_log_data=False)
    smoothed_predicts_list.append(pred)

    # не все возвращают полное кол-во значений, так как некоторые используют оконные функции
    smoothed_predicts_list = [module_predict for module_predict in smoothed_predicts_list]
    # Convert the list to a numpy array for easier manipulation
    smoothed_data_array = np.array(smoothed_predicts_list)

    # Calculate the mean excluding the largest and smallest values
    mean_of_predicts_without_extremes = np.mean(np.partition(smoothed_data_array, (2, -2), axis=0)[2:-2], axis=0)

    print(mean_of_predicts_without_extremes)

    return mean_of_predicts_without_extremes


get_smoothed_mean(X[0], plt)

[0.50089971 0.51017449 0.50718555 0.49595508 0.49989598 0.50840977
 0.49992591 0.47913775 0.46027338 0.45042794 0.46463385 0.49928297
 0.53615577 0.55584898 0.55355681 0.50636199 0.40842192 0.34481784
 0.30883537 0.33089754 0.37267252 0.43598375 0.52784883 0.57187619
 0.58938577]


array([0.50089971, 0.51017449, 0.50718555, 0.49595508, 0.49989598,
       0.50840977, 0.49992591, 0.47913775, 0.46027338, 0.45042794,
       0.46463385, 0.49928297, 0.53615577, 0.55584898, 0.55355681,
       0.50636199, 0.40842192, 0.34481784, 0.30883537, 0.33089754,
       0.37267252, 0.43598375, 0.52784883, 0.57187619, 0.58938577])

In [92]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
import ruptures as rpt
from pydantic import BaseModel


class Config(BaseModel):
    model_type: str = "pelt"  # pelt or binseq
    model_compute_error_type: str = "l1"
    min_size: int = 3
    penalty: float = 0.2
    threshold: float
    threshold_detrend_coeff: float


def segment_timeseries_with_regression_and_full_detrended(ts_i, norm_prices, config: Config, should_plot=False):
    if config.model_type == "pelt":
        algo = rpt.Pelt(model=config.model_compute_error_type, min_size=config.min_size, jump=1).fit(norm_prices)
    elif config.model_type == "binseq":
        algo = rpt.Binseg(model=config.model_compute_error_type, min_size=config.min_size).fit(norm_prices)
    else:
        raise ValueError("Не смог распознать тип модели")

    def calculate_dynamic_threshold(segment, threshold_detrend_coeff):
        std_dev = np.std(segment)
        dynamic_threshold = std_dev * threshold_detrend_coeff
        return dynamic_threshold

    result = algo.predict(pen=config.penalty)
    labels = np.zeros(len(norm_prices), dtype=int)

    if should_plot:
        plt.figure(figsize=(18, 8))

    for (start, end) in zip([0] + result[:-1], result):
        segment = norm_prices[start:end]

        smothed_predictions = get_smoothed_mean(segment, plt)
        
        x = np.arange(start, end)
        # Линейная регрессия для определения тренда
        reg = LinearRegression().fit(x.reshape(-1, 1), segment)
        trend = reg.predict(x.reshape(-1, 1))

        # Убираем тренд из сегмента
        detrended_segment = segment - trend
        detrended_segment += abs(min(detrended_segment))
        
        # detrended regr
        detrended_reg = LinearRegression().fit(x.reshape(-1, 1), detrended_segment)
        detrended_predictions = detrended_reg.predict(x.reshape(-1, 1))

        if should_plot:
            plt.plot(x, smothed_predictions, color="orange", label='Линейная регрессия для детрендированного сегмента')

        dynamic_threshold = calculate_dynamic_threshold(segment, config.threshold_detrend_coeff)

        labels[start:end] = np.where(smothed_predictions - segment > dynamic_threshold, 0, 1)

        if should_plot:
            plt.plot(x, segment, label='Исходный сегмент', alpha=0.5)
            plt.plot(x, detrended_segment, label='Детрендированный сегмент')

    if should_plot:
        colors = ['lightblue', 'lightpink']
        for index, (start, end) in enumerate(zip([0] + result[:-1], result)):
            plt.fill_between(np.arange(start, end), norm_prices[start:end], color=colors[index % 2], alpha=0.2)
        plt.title(f"{ts_i} -- config: {config}")
        plt.legend()
        plt.show()

    return labels.tolist()


In [93]:
from sklearn.metrics import accuracy_score

index_for_plots = (1, 5) 

config_first_model = {
    "model_type": "pelt", #pelt or binseq
    "model_compute_error_type": "l1",
    "min_size": 6,
    "penalty": 0.7,
    "threshold": 0.07,
    "threshold_detrend_coeff": 0.4
}

config: Config = Config(**config_first_model)

y_true = []
y_pred = []

for i in range(len(df)):
    if i < index_for_plots[0]:
        continue
    if i > index_for_plots[1]:
        break

    norm_prices = np.array([df.at[i, f'norm_price_{j + 1}'] for j in range(25)])

    predicted_labels = segment_timeseries_with_regression_and_full_detrended(i, norm_prices, config, True)
    y_pred.append(predicted_labels)


# accuracies = [accuracy_score(y_true[i], y_pred[i]) for i in range(len(y_pred))]

# mean_accuracy = np.mean(accuracies)
# std_deviation = np.std(accuracies)
# print(mean_accuracy, std_deviation)

# accuracies_with_i = [(accuracy_score(y_true[i], y_pred[i]), i) for i in range(len(y_pred))]
# sorted_accuracies = sorted(accuracies_with_i, key=lambda x: x[0], reverse=True)
# print(sorted_accuracies[:10])
# print(sorted_accuracies[-10:])


TypeError: get_smoothed_mean() missing 1 required positional argument: 'plt'

<Figure size 1800x800 with 0 Axes>