<a href="https://colab.research.google.com/github/Zasegor/fourier-series-prediction/blob/main/bitcoin_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BitCoin

In [1]:
import math
import numpy as np
import pandas as pd
from scipy.signal import butter, filtfilt
from statsmodels.tsa.stattools import acf
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error
import plotly.graph_objects as go


In [2]:
class BitcoinFourierForecaster:
    """
    A class for forecasting Bitcoin prices using Fourier series with cosine terms on daily data.

    Args:
        l (int): Length of the period (in days).
        m (int): Number of periods.
        p (int): Delay size for 'delay' method or AR order for 'arima' method.
        method (str): Forecasting method ('delay' or 'arima').
        reg_lambda (float): Regularization parameter for linear systems.
        fs (float): Sampling frequency (1 Hz for daily data).
    """
    def __init__(self, l: int = 180, m: int = 5, p: int = 1, method: str = "arima", reg_lambda: float = 1e-6, fs: float = 1):
        self.l = l
        self.m = m
        self.p = p
        self.method = method.lower()
        self.reg_lambda = reg_lambda
        self.fs = fs
        self.matrix = None
        self.new_coefs = None
        self.prediction = None
        self.trend = None

        if self.method not in ["delay", "arima"]:
            raise ValueError("Method must be 'delay' or 'arima'")
        if self.l < 1 or self.m < 1 or self.p < 1:
            raise ValueError("l, m, and p must be positive integers")

    def preprocess_series(self, series: pd.Series) -> pd.Series:
        """
        Preprocesses the series: log-transform, smoothing, and detrending.

        Args:
            series: Input time series (pandas Series).

        Returns:
            Processed series (pandas Series).
        """
        series = np.log(series.replace(0, np.nan).fillna(method='ffill'))

        nyquist = 0.5 * self.fs
        lowcut = 0.001 / nyquist
        highcut = 0.05 / nyquist
        b, a = butter(4, [lowcut, highcut], btype='band')
        smoothed = filtfilt(b, a, series)

        result = seasonal_decompose(smoothed, period=int(self.l/10), model='additive', extrapolate_trend='freq')
        self.trend = result.trend
        detrended = series - self.trend

        return pd.Series(detrended, index=series.index)

    def _cos(self, k: int, t: int, l: int) -> float:
        """Computes the cosine term for Fourier series."""
        return math.cos(math.pi * k * t / l)

    def _get_matrix_and_vector(self, period_i: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
        """Constructs the matrix and vector for Fourier coefficient computation."""
        if len(period_i) < 2:
            raise ValueError("period_i must have at least 2 elements")

        l = len(period_i) - 1
        t = np.arange(l + 1)
        k = np.arange(1, l + 1)

        cos_values = np.cos(np.pi * k[:, None] * t / l)
        matrix = np.hstack([0.5 * np.ones((l + 1, 1)), cos_values.T])
        y = period_i.copy()

        return matrix, y

    def _solve_system(self, M: np.ndarray, b: np.ndarray) -> np.ndarray:
        """Solves a linear system with optional regularization."""
        if np.linalg.cond(M) > 1e6:
            M_reg = M + self.reg_lambda * np.eye(M.shape[0])
            return np.linalg.solve(M_reg, b)
        return np.linalg.solve(M, b)

    def _get_matrix_from_series(self, series: pd.Series) -> np.ndarray:
        """Converts the time series into a matrix of period observations."""
        expected_length = self.m * self.l
        if len(series) < expected_length:
            raise ValueError(f"Series length {len(series)} is less than required {expected_length}")

        series_values = series.values[:expected_length]
        if len(series_values) < expected_length:
            series_values = np.pad(series_values, (0, expected_length - len(series_values)), mode='constant')

        return series_values.reshape(self.m, self.l)

    def _get_delay_matrix(self, input_vector: np.ndarray) -> np.ndarray:
        """Constructs a delay matrix for k-NN forecasting."""
        input_vector_copy = np.copy(input_vector)
        m = input_vector_copy.shape[0] % self.p

        if m != 0:
            input_vector_copy = np.delete(input_vector_copy, range(m))

        row_dim = input_vector_copy.shape[0] // self.p
        delay_matrix = np.resize(input_vector_copy, new_shape=(row_dim, self.p)).T

        return delay_matrix

    def _find_nearest(self, row: np.ndarray) -> set:
        """Finds indices of nearest neighbors for k-NN forecasting."""
        neighbors_cnt = 2 * self.p + 1
        last_element = row[-1]
        all_neighbors = row[:-1]
        idx = set(np.argsort(np.abs(all_neighbors - last_element))[:neighbors_cnt])

        return idx

    def _predict_by_one_step(self, input_vector: np.ndarray) -> float:
        """Predicts one step ahead using the specified method."""
        if self.method == "arima":
            if len(input_vector) < 3:
                return input_vector[-1]
            try:
                model = ARIMA(input_vector, order=(min(self.p, 2), 0, 0))
                model_fit = model.fit()
                return model_fit.forecast(steps=1)[0]
            except Exception:
                return input_vector[-1]
        else:  # delay method
            delay_matrix = self._get_delay_matrix(input_vector)
            last_row = delay_matrix[-1, :]
            nearest_neighbors_indexes = self._find_nearest(last_row)

            y = np.empty((0,))
            X = np.empty((0, self.p + 1))
            for index in nearest_neighbors_indexes:
                y = np.append(y, delay_matrix[0, index + 1])
                row = np.append(np.array([1]), delay_matrix[:, index])
                row = np.reshape(row, (1, self.p + 1))
                X = np.append(X, row, axis=0)

            coef = np.dot(np.dot(np.linalg.inv(np.dot(X.T, X)), X.T), y)
            return sum(np.append(np.array([1]), delay_matrix[:, -1]) * coef)

    def _get_new_fourier_coefs(self, periods: np.ndarray) -> list:
        """Computes Fourier coefficients for the next period."""
        new_coefs = []
        coefs_for_all_periods = []

        for period in periods:
            X, y = self._get_matrix_and_vector(period)
            fourier_coef_for_period = self._solve_system(X, y)
            coefs_for_all_periods.append(fourier_coef_for_period)

        coefs_for_all_periods = np.array(coefs_for_all_periods)

        for i in range(coefs_for_all_periods.shape[1]):
            coef_for_next_period = self._predict_by_one_step(coefs_for_all_periods[:, i])
            new_coefs.append(coef_for_next_period)

        return new_coefs

    def _predict_next_period(self) -> list:
        """Predicts the time series for the next period."""
        new_period = []
        for t in range(self.l):
            s = self.new_coefs[0] / 2
            for k in range(1, len(self.new_coefs)):
                s += self.new_coefs[k] * self._cos(k, t, self.l - 1)
            new_period.append(s)

        return new_period

    def fit(self, series: pd.Series) -> 'BitcoinFourierForecaster':
        """
        Fits the model to the input time series.

        Args:
            series: Input time series (pandas Series).

        Returns:
            Self for method chaining.
        """
        processed_series = self.preprocess_series(series)
        self.matrix = self._get_matrix_from_series(processed_series)
        self.new_coefs = self._get_new_fourier_coefs(self.matrix)
        return self

    def predict(self, index: pd.Index = None) -> pd.Series:
        """
        Predicts the next period.

        Args:
            index: Index for the predicted series (optional).

        Returns:
            Predicted series for the next period (pandas Series).
        """
        self.prediction = self._predict_next_period()
        trend_last = self.trend[-1] if self.trend is not None else 0
        self.prediction = np.exp(np.array(self.prediction) + trend_last)

        if index is None:
            index = pd.date_range(start=pd.Timestamp.now(), periods=self.l, freq='D')
        return pd.Series(self.prediction[:len(index)], index=index)

    def evaluate(self, test_series: pd.Series, end_date: str = None) -> tuple[float, float]:
        """
        Evaluates the prediction against a test series.

        Args:
            test_series: Test time series (pandas Series).
            end_date: End date for evaluation (optional).

        Returns:
            Tuple of (MAE, MAPE) metrics.
        """
        if self.prediction is None:
            raise ValueError("Model must be fitted and predicted before evaluation")

        pred = pd.Series(self.prediction[:min(self.l, len(test_series))], index=test_series.index[:min(self.l, len(test_series))])
        if end_date:
            pred = pred[pred.index <= pd.Timestamp(end_date)]
            test = test_series[test_series.index <= pd.Timestamp(end_date)]
        else:
            test = test_series[:self.l]

        mae = round(mean_absolute_error(test, pred), 2)
        mape = round(mean_absolute_percentage_error(test, pred), 3)
        return mae, mape

    def plot(self, test_series: pd.Series, end_date: str = None) -> go.Figure:
        """
        Plots the predicted and test series using Plotly.

        Args:
            test_series: Test time series (pandas Series).
            end_date: End date for plotting (optional).

        Returns:
            Plotly Figure object.
        """
        if self.prediction is None:
            raise ValueError("Model must be fitted and predicted before plotting")

        pred = pd.Series(self.prediction[:min(self.l, len(test_series))], index=test_series.index[:min(self.l, len(test_series))])
        if end_date:
            pred = pred[pred.index <= pd.Timestamp(end_date)]
            test = test_series[test_series.index <= pd.Timestamp(end_date)]
        else:
            test = test_series[:self.l]

        fig = go.Figure()
        fig.add_scatter(
            x=pred.index,
            y=pred.values,
            line={'width': 2},
            name="Predicted Prices"
        )
        fig.add_scatter(
            x=test.index,
            y=test.values,
            line={'width': 2},
            name="Actual Prices"
        )
        fig.update_layout(
            title="Bitcoin Price Forecast vs Actual (Daily)",
            xaxis_title="Date",
            yaxis_title="Price (USD)",
            template="plotly_white"
        )
        return fig

In [3]:
bit_data = pd.read_csv('/content/btcusd_1-min_data.csv')

FileNotFoundError: [Errno 2] No such file or directory: '/content/btcusd_1-min_data.csv'

In [None]:
df = bit_data[bit_data['Timestamp']>=1356998400]
dfl = len(df)
dfl

In [None]:
date = pd.date_range(start='2013-01-01', periods=182*25*24*60, freq='min')
datel = len(date)
datel

In [None]:
df = df[df['Timestamp']<1356998400+datel*60]
len(df)

In [None]:
df['datetime'] = date[:dfl]
df = df.set_index('datetime')

In [None]:
daily_series = df['Close'].resample('D').mean().fillna(method='ffill')

l = 91
m = 44
train_size = l*m
train_series = daily_series[:train_size]
test_series = daily_series[train_size:train_size+l]

forecaster = BitcoinFourierForecaster(l=l, m=m, p=5, method="arima")
forecaster.fit(train_series)

In [None]:
test_pred = forecaster.predict(index=pd.date_range(start=test_series.index[0], periods=l))

mae, mape = forecaster.evaluate(test_series)
print(f"MAE: {mae}, MAPE: {mape}")

fig = forecaster.plot(test_series)
fig.show()

In [None]:
fig = go.Figure()

fig.add_scatter(
    x=train_series.index,
    y=train_series.values,
    line={'width':2},
    name="Train"
)

fig.add_scatter(
    x=test_series.index,
    y=test_series.values,
    line={'width':2},
    name="Test"
)

fig.add_scatter(
    x=test_pred.index,
    y=test_pred.values,
    line={'width':2},
    name="Pred"
)

fig.update_layout(
    title='',
    scene=dict(
        xaxis_title='',
        yaxis_title=''
    ),
    showlegend=True,
    height=600
)

In [None]:
l = 30
m = (44*13*7)//30-1
train_size = l*m
train_series = daily_series[30*110+1:train_size+1]
test_series = daily_series[train_size+1:train_size+l+1]

forecaster = BitcoinFourierForecaster(l=l, m=m-110, p=5, method="arima")
forecaster.fit(train_series)

In [None]:
test_pred = forecaster.predict(index=pd.date_range(start=test_series.index[0], periods=l))

mae, mape = forecaster.evaluate(test_series)
print(f"MAE: {mae}, MAPE: {mape}")

fig = forecaster.plot(test_series)
fig.show()

In [None]:
fig = go.Figure()

fig.add_scatter(
    x=train_series.index,
    y=train_series.values,
    line={'width':2},
    name="Train"
)

fig.add_scatter(
    x=test_series.index,
    y=test_series.values,
    line={'width':2},
    name="Test"
)

fig.add_scatter(
    x=test_pred.index,
    y=test_pred.values,
    line={'width':2},
    name="Pred"
)

fig.update_layout(
    title='BitCoin',
    scene=dict(
        xaxis_title='',
        yaxis_title=''
    ),
    showlegend=True,
    height=600
)