In [1]:
import numpy as np
import pandas as pd
from numpy import fft
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from warnings import filterwarnings

filterwarnings('ignore')

from modules.data_fetcher import download_historical_data

In [7]:
SYMBOL= 'BTC-USDT'
index = 100

df = download_historical_data(SYMBOL,'1hour').iloc[-1000:]
print(df.shape)
df.head()

(1000, 7)


Unnamed: 0_level_0,Timestamp,Open,Close,High,Low,Amount,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2023-01-16 08:00:00,1673852000.0,21149.4,21108.7,21156.8,21068.5,180.707264,3812605.0
2023-01-16 09:00:00,1673856000.0,21108.7,20743.8,21112.9,20646.0,720.239123,14989500.0
2023-01-16 10:00:00,1673860000.0,20747.4,20825.4,20837.0,20719.8,153.966254,3200464.0
2023-01-16 11:00:00,1673863000.0,20825.5,20846.4,20898.7,20800.1,163.292583,3403934.0
2023-01-16 12:00:00,1673867000.0,20846.6,20816.1,20857.2,20782.2,158.413103,3298596.0


In [8]:
def fourrier_extrapolation(
    data_to_predict: np.array, n_predict: int, has_trend: bool = True
):
    n = data_to_predict.size
    n_harm = 50  # number of harmonics in model
    t = np.arange(0, n)
    if has_trend == True:
        p = np.polyfit(t, data_to_predict, 1)  # find linear trend in x
        x_notrend = data_to_predict - p[0] * t  # detrended x
        x_freqdom = fft.fft(x_notrend)  # detrended x in frequency domain
        f = fft.fftfreq(n)  # frequencies
        indexes = list(range(n))
        # indexes = range(n)
        # sort indexes by frequency, lower -> higher
        indexes.sort(key=lambda i: np.absolute(f[i]))

        t = np.arange(0, n + n_predict)
        restored_sig = np.zeros(t.size)
        for i in indexes[: 1 + n_harm * 2]:
            ampli = np.absolute(x_freqdom[i]) / n  # amplitude
            phase = np.angle(x_freqdom[i])  # phase
            restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
        return restored_sig + p[0] * t
    else:
        x_notrend = data_to_predict  # detrended x
        x_freqdom = fft.fft(x_notrend)  # detrended x in frequency domain
        f = fft.fftfreq(n)  # frequencies
        indexes = list(range(n))
        # indexes = range(n)
        # sort indexes by frequency, lower -> higher
        indexes.sort(key=lambda i: np.absolute(f[i]))

        t = np.arange(0, n + n_predict)
        restored_sig = np.zeros(t.size)
        for i in indexes[: 1 + n_harm * 2]:
            ampli = np.absolute(x_freqdom[i]) / n  # amplitude
            phase = np.angle(x_freqdom[i])  # phase
            restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase)
        return restored_sig

In [11]:

pred_count = 20
train_data = df.Close[:-pred_count]
test_data = df.Close[-pred_count:]

x = np.array(train_data)

n_predict = pred_count
extrapolation = fourrier_extrapolation(x, n_predict,True)

# p.line(np.arange(0, x.size), x, legend_label="Train data", line_width=2,)
# p.line(np.arange(0, extrapolation.size), extrapolation,legend_label="Extrapolation", line_width=2, color='orange')
# p.line(np.arange(x.size, extrapolation.size), test_data,legend_label="Test data", line_width=2, color='black')


In [12]:
fig = make_subplots(
    rows=1, cols=1, subplot_titles=("Historical price", "Fear and Greed indicator")
)

fig.add_trace(
    go.Scatter(
        name="History (train data)",
        x=np.arange(0, x.size),
        y=x,
    ),
    row=1,
    col=1,
)
fig.add_trace(
    go.Scatter(
        name="Extrapolation (prediction)",
        x=np.arange(0, extrapolation.size),
        y=extrapolation,
    ),
    row=1,
    col=1,
)
fig.add_trace(
    go.Scatter(
        name="History (test data)",
        x=np.arange(x.size, extrapolation.size),
        y=test_data,
    ),
    row=1,
    col=1,
)
fig.update_layout(
    xaxis_rangeslider_visible=False,
    showlegend=True,
    title_text="Fourrier extrapolation (prediction)",
)