In [None]:
import pandas as pd
import datetime as dt
from dateutil.relativedelta import relativedelta
import math
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from sklearn.linear_model import LinearRegression

In [3]:
# AIRLINES

df = pd.read_csv('airline-passengers.csv')
ts = df['Passengers']
time = df['Month']
timereg = [*range(1, len(time) + 1)]

In [4]:
import statsmodels.api as sm

y = np.array(ts)
X = np.array([[1] * len(timereg), timereg]).T

model = sm.OLS(exog = X, endog = y).fit()
trend = model.predict(X)
errors = y - trend

In [5]:
def fourier_alpha(x):
    x = np.array(x)
    T = len(x)
    t = np.arange(1, T + 1)
    jc = np.arange(0, T // 2 + 1)

    def coef(j_):
        if not (j_ == 0 | ((j_ == jc[-1]) & (T % 2 == 0))):
            return 2 / T
        return 1 / T

    alpha = []
    for j in jc:
        alpha.append((x * np.vectorize(math.cos)(2 * math.pi * (j / T) * t)).sum() * coef(j))

    alpha = np.array(alpha)
    fc = np.array([i / T for i in jc])

    return fc, alpha

def fourier_beta(x):
    x = np.array(x)
    T = len(x)
    t = np.arange(1, T + 1)
    js = np.append(np.arange(0, (T - 1) // 2 + 1), 0)

    def coef(j_):
        return 2 / T

    beta = []
    for j in js:
        beta.append((x * np.vectorize(math.sin)(2 * math.pi * (j / T) * t)).sum() * coef(j))
    beta = np.array(beta)

    fs = np.array([i / T for i in js])

    return fs, beta

def fourier_R(x):
    R2 = fourier_alpha(x)[1] ** 2 + fourier_beta(x)[1] ** 2
    R = R2 ** (1 / 2)

    return R

def selective_spectrum(x):
    T = len(x)
    p_f = (T / 2) * (fourier_R(x) ** 2)

    return p_f

In [None]:
pd.DataFrame({'linear_freq': fourier_alpha(errors)[0],
              'selective_spectrum': selective_spectrum(errors)})\
    .to_csv('spec_df.csv', index = False)

In [None]:
fig = px.line(x = fourier_alpha(errors)[0],
              y = selective_spectrum(errors),
              # x = spec_df['linear_freq'],
              # y = spec_df['selective_spectrum'],
              title = "Spectrogramm of the errors",
              color_discrete_sequence = ['#5FD2B5'],
              labels = {
                     "x": "Linear frequency",
                     "y": "Selective spectrum"})

fig['data'][0]['line']['width'] = 2

fig.show()

In [7]:
def get_harmonic(x, n_harm):
    if n_harm < 1 or n_harm > len(ts) // 2 + 1:
        return [0 for _ in range(len(ts))]

    T = len(x)

    jc = np.arange(0, T // 2 + 1)
    js = np.append(np.arange(0, (T - 1) // 2 + 1), 0)

    alpha = fourier_alpha(x)[1]
    beta = fourier_beta(x)[1]

    p_f = selective_spectrum(x)
    all = np.array([jc, alpha, js, beta, p_f]).T
    all = all[all[:, -1].argsort()[::-1]]
    all = all[:n_harm, :]

    g = []
    for t in np.arange(1, len(x) + 1):
        g.append(
            (all[:, 1] * np.vectorize(math.cos)(2 * math.pi * (all[:, 0] / T) * t)).sum()
            + (all[:, 3] * np.vectorize(math.sin)(2 * math.pi * (all[:, 2] / T) * t)).sum()
        )

    return g

In [8]:
all_df = pd.DataFrame()

for n in range(0, len(ts) // 2 + 1):
    all_df = all_df.append(
    pd.DataFrame({'harm': [i + j for i, j in
                           zip([*trend], get_harmonic(errors, n_harm = n))],  # get_harmonic(ts, n_harm = n)
                  'ts': ts,
                  'time': time,
                  'n_harm': [n] * len(ts)}),
        ignore_index = True,
        sort = False)

In [9]:
fig = px.line(all_df,   # px.scatter
              x = "time",
              y = "harm",
              animation_frame = "n_harm",
              range_y = [min(all_df['ts']) * 0.9, max(all_df['ts']) * 1.1],
              color_discrete_sequence = ['#5FD2B5'])   # px.colors.qualitative.Dark2

fig2 = go.Figure(fig.add_traces(
    data = px.line(all_df,
                   x = "time",
                   y = "ts",
                   animation_frame = "n_harm",
                   range_y = [min(all_df['ts']) * 0.9, max(all_df['ts']) * 1.1],
                   color_discrete_sequence = ['#F03C79'])._data))

fig2['data'][0]['line']['width'] = 4
fig2['data'][1]['line']['width'] = 2.5

fig2.update_layout(title = "Fourier analysis of the time series",
                   xaxis_title = "Time",
                   yaxis_title = "Time Series",
                   width = 1000,
                   height = 500)

fig2.show()

In [10]:
N = len(ts) // 2 + 1

fig = go.Figure()

for n in range(0, N):
    if n == 0:
        fig.add_trace(
            go.Scatter(
                visible = False,
                line = dict(color = "#F03C79", width = 3),
                name = "Series",
                x = time,
                y = ts))

    fig.add_trace(
        go.Scatter(
            visible = False,
            line = dict(color = "#5FD2B5", width = 4),
            name = f"Harmonic = {n}",
            x = time,
            y = [i + j for i, j in
                           zip([*trend], get_harmonic(errors, n_harm = n))]))

# Make 10th trace visible
fig.data[0].visible = True

# Create and add slider
steps = []
for i in range(len(fig.data)):
    step = dict(method="update",
                args=[{"visible": [False] * len(fig.data)},
                      {"title": "Slider switched to step: " + str(i)}])
    step["args"][0]["visible"][0] = True    # Toggle first trace to "visible"
    step["args"][0]["visible"][i] = True
    steps.append(step)

sliders = [dict(
    active = 0,
    currentvalue = {"prefix": "Number of harmonics: "},
    pad={"t": N},
    steps = steps[1:]
)]

fig.update_layout(
    sliders = sliders,
    title = "Fourier analysis of the time series",
    xaxis_title = "Time",
    yaxis_title = "Time Series",
    width = 1000,
    height = 500
)

fig.show()