In [165]:
import numpy as np
import plotly.graph_objects as go

import DataRetriever as dr

RETRIEVER = dr.DataRetriever()
CON_ATTRIBUTES = RETRIEVER.get_attributes(file_name='consuming_attributes.pkl')
DATA = RETRIEVER.get_data(file_name='All-Subsystems-hour-Year2.pkl')[CON_ATTRIBUTES].sum(axis=1).clip(lower=0)

TRAIN_SIZE = int(DATA.size * 0.8)

TRAIN, TEST = DATA[0:TRAIN_SIZE].to_numpy(), DATA[TRAIN_SIZE: DATA.size].to_numpy()
TRAIN_INDEX, TEST_INDEX = list(range(0, len(TRAIN))), list(range(0, len(TEST)))

In [166]:
def fourierExtrapolation(data: np.array, number_of_predictions: int, n_sinusoids: int) -> np.array:
    """
    Predict {number_of_predictions} observations after the index data.size of {data}, using {n_sinusoids} sinusoids.
    :param data: The data on which to train the model. Corresponds to {_x = x_0, x_1, ... x_(n-1)} in the theory
    :param number_of_predictions: The amount of predictions to output. Corresponds to {x_((n-1)+1), x_((n-1)+2), ..., x_((n-1)+{number_of_predictions})}
    :param n_sinusoids: The amount of sinusoids on which to base the predictions. I.e. the data probably contains many sinusoids, but we only wish to make predictions based on the {n_sinusoids} largest frequencies.
    :return: A numpy array of length data.size + number_of_predictions, containing the transformed original data + predictions
    """
    data_size = data.size  # n
    sample_index = np.arange(0, data_size)  # sum-limits (0) to (n-1)

    # Fit a linear regression line to data
    linear_trend = np.polyfit(x=sample_index, y=data, deg=1)
    # Subtract the learned line (linear_trend[0] * sample_index) from original data (data).
        # Here, linear_trend[0] are the coefficients of the linear regression, # and sample_index is equivalent to X;
        # thereby giving the, well-known from STAT, form (beta * X)
    x_data_detrended = data - linear_trend[0] * sample_index  # The series of numbers x = {x1, x2, ...}

    X_frequency_domain = np.fft.fft(x_data_detrended)  # The series of complex numbers X = {X1, X2, ...}
    frequencies = np.fft.fftfreq(data_size)  # Some frequencies, e.g. {4, 3, -7, 8, -5, ...}
    indexes = list(range(data_size))  # {0, 1, ..., n-1}
    indexes.sort(key=lambda idx: np.absolute(X_frequency_domain[idx]), reverse=True)  # ascendingly sort indices by frequency
    
    sample_index = np.arange(0, data_size + number_of_predictions)  # sample_index = {0, 1, ..., n-1, n, n+1, ..., (n-1)+number_of_predictions}
    x_restored_sig = np.zeros(sample_index.size)  # Prepare a numpy array to receive x reconstructed from its Fourier Transform

    for i in indexes[:1 + n_sinusoids]:
        amplitude = np.absolute(X_frequency_domain[i]) / data_size   # amplitude
        phase = np.angle(X_frequency_domain[i])                      # phase
        x_restored_sig += amplitude * np.cos(2 * np.pi * frequencies[i] * sample_index + phase)

    return x_restored_sig + linear_trend[0] * sample_index

In [167]:
x = TRAIN
n_predict = TEST.size
n_harmonics = 100

predictions = fourierExtrapolation(data=x,
                                   number_of_predictions=n_predict,
                                   n_sinusoids=n_harmonics)

fig = go.Figure()

fig.add_trace(go.Scatter(x=np.arange(DATA.size),
                         y=predictions,
                         name='Prediction',
                         mode='lines'))

fig.add_trace(go.Scatter(x=np.arange(TRAIN.size),
                         y=TRAIN,
                         name='Training Data',
                         mode='lines'))

fig.add_trace(go.Scatter(x=np.arange(start=TRAIN.size, stop=DATA.size),
                         y=TEST,
                         name='Test Data',
                         mode='lines'))


fig.show()