Импортируем библиотеки

In [1]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from ipywidgets import interact
import ipywidgets as widgets
from ipywidgets import VBox
from scipy.interpolate import interp1d, lagrange, CubicSpline, interp1d, pchip
from scipy.interpolate import BarycentricInterpolator, Akima1DInterpolator
from sklearn.metrics import mean_squared_error, mean_absolute_error
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.api import VAR
import warnings
from statsmodels.tools.sm_exceptions import ConvergenceWarning
warnings.simplefilter('ignore', ConvergenceWarning)
warnings.simplefilter('ignore', UserWarning)
warnings.simplefilter(action='ignore', category=FutureWarning)

Создаём Pandas DataFrame из файла IBM.csv

In [2]:
file_path = 'MSFT.csv'
df = pd.read_csv(file_path)
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
df["Volume"] = df["Volume"] / max(df["Volume"])

Отрисуем график стоимости акций

In [3]:
trace1 = go.Scatter(x=df.index, y=df['Volume'], mode="lines+markers", name='Исходные данные')
fig1 = go.FigureWidget(layout_yaxis_range=[0,np.max(df['Volume'])*1.1])
fig1.add_trace(trace1)
fig1.update_xaxes(title='Дата')
fig1.update_yaxes(title='Стоимость')
fig1

FigureWidget({
    'data': [{'mode': 'lines+markers',
              'name': 'Исходные данные',
              'type': 'scatter',
              'uid': '6d78d849-b6e2-4620-ae64-2099be4f7741',
              'x': array([datetime.datetime(2022, 9, 6, 0, 0),
                          datetime.datetime(2022, 9, 7, 0, 0),
                          datetime.datetime(2022, 9, 8, 0, 0), ...,
                          datetime.datetime(2023, 8, 31, 0, 0),
                          datetime.datetime(2023, 9, 1, 0, 0),
                          datetime.datetime(2023, 9, 5, 0, 0)], dtype=object),
              'y': array([0.24770853, 0.28021068, 0.235998  , ..., 0.30674084, 0.17341293,
                          0.21531904])}],
    'layout': {'template': '...',
               'xaxis': {'title': {'text': 'Дата'}},
               'yaxis': {'range': [0, 1.1], 'title': {'text': 'Стоимость'}}}
})

### ARIMA

In [4]:

# Прогнозирование с использованием ARIMA
order = (5, 1, 1)  # параметры модели ARIMA (p, d, q)
model = ARIMA(df['Volume'], order=order)
result = model.fit()

forecast_steps = 10 
forecast = result.get_forecast(steps=forecast_steps)

forecast_values = forecast.predicted_mean
# Визуализация результатов
trace2 = go.Scatter(
    x=pd.date_range(start=df.index[-1], periods=forecast_steps + 1, freq='B')[1:],
    y=forecast_values,
    mode='markers',
    name='ARIMA'
)

fig2 = go.FigureWidget(data=[trace1, trace2])
fig2.update_layout(xaxis_title='Дата', yaxis_title='Стоимость')

def redraw_arima(**args):
    p, d, q, end, forecast_n = args.values()
    
    order = (p, d, q)  # параметры модели ARIMA (p, d, q)
    model = ARIMA(df['Volume'][:end], order=order)
    result = model.fit()

    forecast = result.get_forecast(steps=forecast_n)
    
    forecast_values = forecast.predicted_mean
    
    mse_arima = mean_squared_error(df['Volume'][end:end+forecast_n], forecast_values)
    mae_arima = mean_absolute_error(df['Volume'][end:end+forecast_n], forecast_values)
    
    fig2["data"][1].x = df.index[end:end+forecast_n]
    fig2["data"][1].y = forecast_values
    
    print(f'MSE (ARIMA): {mse_arima}')
    print(f'MAE (ARIMA): {mae_arima}')


interact(
    redraw_arima, 
    int1=widgets.IntSlider(value=5, min=0, max=10, step=1,description='p:'), 
    int2=widgets.IntSlider(value=1, min=0, max=10, step=1,description='d:'),
    int3=widgets.IntSlider(value=1, min=0, max=10, step=1,description='q:'), 
    int4=widgets.IntSlider(value=1, min=1, max=len(df.index), step=1,description='end:'), 
    int5=widgets.IntSlider(value=1, min=1, max=100, step=1,description='forecast_n:')
)

# Вывод результатов
fig2

interactive(children=(IntSlider(value=5, description='p:', max=10), IntSlider(value=1, description='d:', max=1…

FigureWidget({
    'data': [{'mode': 'lines+markers',
              'name': 'Исходные данные',
              'type': 'scatter',
              'uid': '7c91fef1-b4dd-4a6a-aba2-6fff2c80cdf1',
              'x': array([datetime.datetime(2022, 9, 6, 0, 0),
                          datetime.datetime(2022, 9, 7, 0, 0),
                          datetime.datetime(2022, 9, 8, 0, 0), ...,
                          datetime.datetime(2023, 8, 31, 0, 0),
                          datetime.datetime(2023, 9, 1, 0, 0),
                          datetime.datetime(2023, 9, 5, 0, 0)], dtype=object),
              'y': array([0.24770853, 0.28021068, 0.235998  , ..., 0.30674084, 0.17341293,
                          0.21531904])},
             {'mode': 'markers',
              'name': 'ARIMA',
              'type': 'scatter',
              'uid': 'e3b63325-f24d-4f90-a3ae-85e4bdaee7dd',
              'x': array([datetime.datetime(2022, 9, 7, 0, 0)], dtype=object),
              'y': array([0.24770853])}],


### VAR

In [9]:
order = 2  # порядок VAR-модели
model_var = VAR(df)
result_var = model_var.fit(order)

forecast_steps = 10 
lag_order = result_var.k_ar
forecast = result_var.forecast(df.values[-lag_order:], steps=forecast_steps)

# Визуализация результатов
trace2 = go.Scatter(
    x=pd.date_range(start=df.index[-1], periods=forecast_steps + 1, freq='B')[1:],
    y=forecast[:, -1],
    mode='markers',
    name='VAR'
)

fig3 = go.FigureWidget(data=[trace1, trace2])
fig3.update_layout(xaxis_title='Дата', yaxis_title='Стоимость')

def redraw_var(**args):
    order, end, forecast_n = args.values()

    model_var = VAR(df)
    result_var = model_var.fit(order)
    forecast = result_var.forecast(model_var.y[:end], steps=forecast_n)
    forecast_values = forecast[:,-1]
    
    mse_arima = mean_squared_error(df['Volume'][end:end+forecast_n], forecast_values)
    mae_arima = mean_absolute_error(df['Volume'][end:end+forecast_n], forecast_values)
    
    fig3["data"][1].x = df.index[end:end+forecast_n]
    fig3["data"][1].y = forecast_values
    
    print(f'MSE (ARIMA): {mse_arima}')
    print(f'MAE (ARIMA): {mae_arima}')


interact(
    redraw_var, 
    int1=widgets.IntSlider(value=5, min=0, max=10, step=1,description='p:'), 
    int2=widgets.IntSlider(value=1, min=1, max=len(df.index), step=1,description='end:'), 
    int3=widgets.IntSlider(value=1, min=1, max=100, step=1,description='forecast_n:')
)

# Вывод результатов
fig3

interactive(children=(IntSlider(value=5, description='p:', max=10), IntSlider(value=1, description='end:', max…

FigureWidget({
    'data': [{'mode': 'lines+markers',
              'name': 'Исходные данные',
              'type': 'scatter',
              'uid': 'cc900ff3-ebfe-4225-8b34-a0fbe35d2402',
              'x': array([datetime.datetime(2022, 9, 6, 0, 0),
                          datetime.datetime(2022, 9, 7, 0, 0),
                          datetime.datetime(2022, 9, 8, 0, 0), ...,
                          datetime.datetime(2023, 8, 31, 0, 0),
                          datetime.datetime(2023, 9, 1, 0, 0),
                          datetime.datetime(2023, 9, 5, 0, 0)], dtype=object),
              'y': array([0.24770853, 0.28021068, 0.235998  , ..., 0.30674084, 0.17341293,
                          0.21531904])},
             {'mode': 'markers',
              'name': 'VAR',
              'type': 'scatter',
              'uid': '60b26bfb-68f0-4346-985a-5a937fe2bd12',
              'x': array([datetime.datetime(2023, 9, 6, 0, 0),
                          datetime.datetime(2023, 9, 7, 0, 0

### Сплайн интерполяция

In [None]:
# Построение линейной регрессии
interp = CubicSpline(x, y)
y_cubic = interp(x_smooth)
# Построение графика
trace2 = go.Scatter(
    x=x_smooth,
    y=y_cubic,
    mode='markers',
    name='Сплайн интерполяция'
)
trace3 = go.Scatter(x=x, y=y, mode="markers", name='Выколотые')
fig4 = go.FigureWidget(data=[trace1, trace2, trace3])
fig4.update_layout(xaxis_title='Дата', yaxis_title='Стоимость')

def redraw_cubic(**args):
    result = np.zeros_like(x, dtype=bool)
    for arg in args.values():
        result[arg[0]:arg[1]] = True

    x_changed = x[result]
    interp = CubicSpline(x_changed, y[x_changed])
    y_cubic = interp(x_smooth)
    fig4["data"][1].y = y_cubic
    fig4["data"][2].x = x_changed
    fig4["data"][2].y = y[x_changed]
    mask = np.ones(y.size, dtype=bool)
    mask[x_changed] = False
    print(np.sum(np.power(np.array(y[mask]) - np.array(y_cubic[mask]), 2)))

interact(
    redraw_cubic, 
    int1=widgets.IntRangeSlider(value=[x_1, x_2], min=x_1, max=x_2, step=1,description='int1:'), 
    int2=widgets.IntRangeSlider(value=[x_2, x_3], min=x_2, max=x_3, step=1,description='int2:'),
    int3=widgets.IntRangeSlider(value=[x_3, x_4], min=x_3, max=x_4, step=1,description='int3:'), 
    int4=widgets.IntRangeSlider(value=[x_4, x_5], min=x_4, max=x_5, step=1,description='int4:'), 
    int5=widgets.IntRangeSlider(value=[x_5, x_6], min=x_5, max=x_6, step=1,description='int5:'), 
)
fig4

### Интерполяция Ньютона

In [None]:
def divided_diff(x, y):
    n = len(y)
    coef = np.zeros([n, n])
    coef[:,0] = y
    for j in range(1,n):
        for i in range(n-j):
            coef[i][j] = (coef[i+1][j-1] - coef[i][j-1]) / (x[i+j]-x[i])     
    return coef

def newton_poly(coef, x_data, x):
    n = len(x_data) - 1 
    p = coef[n]
    for k in range(1,n+1):
        p = coef[n-k] + (x -x_data[n-k])*p
    return p

a_s = divided_diff(x, y)[0, :]

y_newton = newton_poly(a_s, x, x_smooth)

trace2 = go.Scatter(
    x=x_smooth,
    y=y_newton,
    mode='markers',
    name='Интерполяция Ньютона'
)

fig5 = go.FigureWidget(data=[trace1, trace2])
fig5.update_layout(xaxis_title='Дата', yaxis_title='Стоимость')

def redraw_cubic(**args):
    result = np.zeros_like(x, dtype=bool)
    for arg in args.values():
        result[arg[0]:arg[1]] = True

    x_changed = x[result]
    y_changed = y[result]
    a_s = divided_diff(x_changed, y_changed)[0, :]
    y_newton = newton_poly(a_s, x_changed, x_smooth)
    fig5["data"][1].y = y_newton

interact(
    redraw_cubic, 
    int1=widgets.IntRangeSlider(value=[x_1, x_2], min=x_1, max=x_2, step=1,description='int1:'), 
    int2=widgets.IntRangeSlider(value=[x_2, x_3], min=x_2, max=x_3, step=1,description='int2:'),
    int3=widgets.IntRangeSlider(value=[x_3, x_4], min=x_3, max=x_4, step=1,description='int3:'), 
    int4=widgets.IntRangeSlider(value=[x_4, x_5], min=x_4, max=x_5, step=1,description='int4:'), 
    int5=widgets.IntRangeSlider(value=[x_5, x_6], min=x_5, max=x_6, step=1,description='int5:'), 
)
fig5