# Import packages


In [67]:
import numpy as np
import pylab as pl
from numpy import fft
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
import datetime
from dateutil.relativedelta import relativedelta


In [68]:
def fourier_transfer_function(
    n_harm, stock_name, date_data_start, date_transfer_start, date_transfer_end):

    # get data_stock's infomation
    date_data_end = date_transfer_start
    data_stock = yf.Ticker(stock_name).history(
        start=date_data_start, end=date_data_end)['Close']
    array_data = np.array(data_stock)
    n_data = array_data.size
    time_data = np.arange(0, n_data)

    # detrend data
    # find linear trend in data
    Polynomial = np.polyfit(time_data, array_data, 1)
    data_notrend = array_data - Polynomial[0] * time_data    # detrended x

    # fft process
    data_freqdom = fft.fft(data_notrend, n=n_data)
    frequence = fft.fftfreq(n_data)
    f_positive = frequence[np.where(frequence > 0)]
    data_freqdom_positive = data_freqdom[np.where(frequence > 0)]

    # sort indexes
    indexes = list(range(f_positive.size))      # frequencies
    # sort method 1
    # indexes.sort(key = lambda i: np.absolute(frequence[i]))     # sort indexes by frequency, lower -> higher
    # sort method 2 :
    # sort indexes by amplitudes, lower -> higher
    indexes.sort(key=lambda i: np.absolute(data_freqdom[i]))
    indexes.reverse()       # sort indexes by amplitudes, higher -> lower

    # get data_all_time'size
    data_all_time = yf.Ticker(stock_name).history(
        start=date_data_start, end=date_transfer_end)['Close']
    time_transfer = np.arange(0, data_all_time.size)
    mixed_harmonic = np.zeros(data_all_time.size)

    # mix harmonics
    for i in indexes[:n_harm]:
        ampli = np.absolute(data_freqdom_positive[i]) / n_data     # amplitude
        phase = np.angle(data_freqdom_positive[i])      # phase
        harmonic = ampli * \
            np.cos(2 * np.pi * f_positive[i] * time_transfer + phase)
        mixed_harmonic += harmonic

    transferred_signal = pd.DataFrame(
        {'Close': mixed_harmonic}, index=data_all_time.index)
    return transferred_signal


In [69]:
# fourier_transfer_function(20, "^GSPC", '2021-01-01', '2022-01-01', '2022-02-01')


In [70]:
def find_pv_function(pv_range, data):
    pd.options.mode.chained_assignment = None
    pv = data['Close']
    data['peaks'] = pd.Series(dtype='float64')
    data['valleys'] = pd.Series(dtype='float64')
    peaks = data['peaks']
    valleys = data['valleys']
    for idx in range(0, len(pv)):
        if idx < pv_range :
            if pv[idx] == pv.iloc[0:pv_range*2+1].max():
                peaks.iloc[idx] = pv[idx]
            if pv[idx] == pv.iloc[0:pv_range*2+1].min():
                valleys.iloc[idx] = pv[idx]
        if pv[idx] == pv.iloc[idx-pv_range:idx+pv_range].max():
            peaks.iloc[idx] = pv[idx]
        if pv[idx] == pv.iloc[idx-pv_range:idx+pv_range].min():
            valleys.iloc[idx] = pv[idx]
    return peaks, valleys


In [71]:
data = yf.Ticker("^GSPC").history(start='2021-01-01', end='2022-01-01')
# data['Close'][0] = 5000
pv_range = 2
find_pv_function(pv_range, data)


(Date
 2020-12-31            NaN
 2021-01-04            NaN
 2021-01-05            NaN
 2021-01-06            NaN
 2021-01-07            NaN
                  ...     
 2021-12-27    4791.189941
 2021-12-28            NaN
 2021-12-29    4793.060059
 2021-12-30            NaN
 2021-12-31            NaN
 Name: peaks, Length: 253, dtype: float64,
 Date
 2020-12-31            NaN
 2021-01-04    3700.649902
 2021-01-05            NaN
 2021-01-06            NaN
 2021-01-07            NaN
                  ...     
 2021-12-27            NaN
 2021-12-28            NaN
 2021-12-29            NaN
 2021-12-30            NaN
 2021-12-31    4766.180176
 Name: valleys, Length: 253, dtype: float64)

In [72]:
def find_pv_delay_function(data, transferred_signal):
    if (data.index[0] == transferred_signal.index[0] and data.index[-1] == transferred_signal.index[-1]):
        p_data = pd.DataFrame(
            {'peaks': data['peaks'], 'count': range(len(data))})
        p_data = p_data.drop(p_data[p_data['peaks'].isna()].index)
        p_data_list = list(p_data['count'])
        p_transferred = pd.DataFrame(
            {'peaks': transferred_signal['peaks'], 'count': range(len(transferred_signal))})
        p_transferred = p_transferred.drop(
            p_transferred[p_transferred['peaks'].isna()].index)
        p_transferred_list = list(p_transferred['count'])
        p_list = []
        for i in range(0, len(p_transferred_list)):
            temp = []
            temp_abs = []
            temp_2 = []
            for j in range(0, len(p_data_list)):
                temp.append((p_data_list[j] - p_transferred_list[i]))
                temp_abs.append(abs(p_data_list[j] - p_transferred_list[i]))
            for k in range(0, len(temp_abs)):
                if temp_abs[k] == min(temp_abs):
                    temp_2 = temp[k]
            p_list.append(temp_2)
        p_transferred['delay'] = p_list

        v_data = pd.DataFrame(
            {'valleys': data['valleys'], 'count': range(len(data))})
        v_data = v_data.drop(v_data[v_data['valleys'].isna()].index)
        v_data_list = list(v_data['count'])
        v_transferred = pd.DataFrame(
            {'valleys': transferred_signal['valleys'], 'count': range(len(transferred_signal))})
        v_transferred = v_transferred.drop(
            v_transferred[v_transferred['valleys'].isna()].index)
        v_transferred_list = list(v_transferred['count'])
        v_list = []
        for i in range(0, len(v_transferred_list)):
            temp = []
            temp_abs = []
            temp_2 = []
            for j in range(0, len(v_data_list)):
                temp.append((v_data_list[j] - v_transferred_list[i]))
                temp_abs.append(abs(v_data_list[j] - v_transferred_list[i]))
            for k in range(0, len(temp_abs)):
                if temp_abs[k] == min(temp_abs):
                    temp_2 = temp[k]
            v_list.append(temp_2)
        v_transferred['delay'] = v_list
        return p_transferred['delay'], v_transferred['delay']
    else:
        print('error : data = ', data.index,
              'transferred_signal = ', transferred_signal.index)


In [73]:
# data = yf.Ticker("^GSPC").history(start='2021-01-01', end='2021-01-30')
# transferred_signal = fourier_transfer_function(20, "^GSPC", '2021-01-01', '2021-01-15', '2021-01-30')
# pv_range = 2
# transferred_signal['peaks'] = find_pv_function(pv_range, transferred_signal)[0]
# transferred_signal['valleys'] = find_pv_function(pv_range, transferred_signal)[1]
# data['peaks'] = find_pv_function(pv_range, data)[0]
# data['valleys'] = find_pv_function(pv_range, data)[1]
# transferred_signal['peaks_delay'] = find_pv_delay_function(data, transferred_signal)[0]
# transferred_signal['valleys_delay'] = find_pv_delay_function(data, transferred_signal)[1]
# print(data)
# print(transferred_signal)


In [74]:
def draw_plot(data, transferred_signal):
    fig, axes = plt.subplots(2, 1, figsize=(15, 8))
    axes[0].plot(data.index, data['Close'],
                 c='gray', label='data', linewidth=3)
    axes[1].plot(transferred_signal.index, transferred_signal['Close'],
                 c='gray', label='Predict', linewidth=3)
    try:
        axes[0].plot(data.index, data['peaks'], '^',
                     c='royalblue', label='peaks')
        axes[0].plot(data.index, data['valleys'], 'v',
                     c='orangered', label='valleys')
        axes[1].plot(transferred_signal.index, transferred_signal['peaks'],
                     '^', c='royalblue', label='peaks')
        axes[1].plot(transferred_signal.index, transferred_signal['valleys'], 'v',
                     c='orangered', label='valleys')
    except:
        pass
    try:
        for i, label in enumerate(transferred_signal['peaks_delay']):
            axes[1].annotate(label, (transferred_signal['peaks'].index[i],
                             transferred_signal['peaks'][i]), fontsize=14)
        for i, label in enumerate(transferred_signal['valleys_delay']):
            axes[1].annotate(label, (transferred_signal['valleys'].index[i],
                             transferred_signal['valleys'][i]), fontsize=14)
    except:
        pass
    axes[0].set_ylabel("price", fontsize=14)
    axes[0].grid(True)
    axes[1].grid(True)
    axes[1].set_ylabel("amplitude", fontsize=14)
    axes[0].legend()
    axes[1].legend()
    plt.show()
    return


# error function

兩種 fit_method 計算 fit_error，得 error_p,error_v。
get_fit_error_function 為 error_p 及 error_v 取平均。
best_fit 為取最接近零的 error 之 harmonic。
slide_error 為預測之第一個轉折 delay。
final_error 為 slide_error 取絕對值後取平均。


In [75]:
def get_fit_error_function(transferred_signal, fit_method):
    temp_p = transferred_signal.drop(
        transferred_signal[transferred_signal['peaks_delay'].isna()].index)
    temp_v = transferred_signal.drop(
        transferred_signal[transferred_signal['valleys_delay'].isna()].index)
    if fit_method == 'mean':
        error_p = temp_p['peaks_delay'].mean()
        error_v = temp_v['valleys_delay'].mean()
    elif fit_method == 'abs':
        error_p = abs(transferred_signal['peaks_delay']).mean()
        error_v = abs(transferred_signal['valleys_delay']).mean()
    else:
        return 'wrong fit_method'
    error = (error_p + error_v)/2
    return error


In [76]:
# data = yf.Ticker("^GSPC").history(start='2020-01-01', end='2021-02-01')
# data2 = fourier_transfer_function(19, "^GSPC", '2020-01-01', '2021-01-01', '2021-02-01')
# pv_range = 2
# data2['peaks'] = find_pv_function(pv_range, data2)[0]
# data2['valleys'] = find_pv_function(pv_range, data2)[1]
# data['peaks'] = find_pv_function(pv_range, data)[0]
# data['valleys'] = find_pv_function(pv_range, data)[1]
# data2['peaks_delay'] = find_pv_delay_function(
#     data[data.index <= '2021-01-01'], data2[data2.index <= '2021-01-01'])[0]
# data2['valleys_delay'] = find_pv_delay_function(
#     data[data.index <= '2021-01-01'], data2[data2.index <= '2021-01-01'])[1]
# print('abs =', get_fit_error_function(data2, 'abs'))
# print('mean =', get_fit_error_function(data2, 'mean'))
# print('positive =', get_fit_error_function(data2, 'positive'))


In [77]:
# data2['peaks_delay'] = find_pv_delay_function(data, data2)[0]
# data2['valleys_delay'] = find_pv_delay_function(data, data2)[1]
# draw_plot(data, data2)


In [78]:
def get_first_delay_function(transferred_signal):
    temp = pd.DataFrame()
    temp['peaks_delay'] = transferred_signal['peaks_delay']
    temp['valleys_delay'] = transferred_signal['valleys_delay']
    temp = temp.dropna(how='all')
    if np.isnan(temp['peaks_delay'].iloc[0]) == False:
        Date = temp['peaks_delay'].index[0]
        delay = temp['peaks_delay'].iloc[0]
        pv = 'peaks'
    else:
        Date = temp['valleys_delay'].index[0]
        delay = temp['valleys_delay'].iloc[0]
        pv = 'valleys'
    return Date, delay, pv


In [79]:
# data = yf.Ticker("^GSPC").history(start='2020-01-01', end='2021-01-01')
# data2 = fourier_transfer_function(20, "^GSPC", '2020-01-01', '2021-01-01', '2021-02-01')
# pv_range = 2
# data2['peaks'] = find_pv_function(pv_range, data2)[0]
# data2['valleys'] = find_pv_function(pv_range, data2)[1]
# data['peaks'] = find_pv_function(pv_range, data)[0]
# data['valleys'] = find_pv_function(pv_range, data)[1]
# data2['peaks_delay'] = find_pv_delay_function(
#     data[data.index <= '2021-01-01'], data2[data2.index <= '2021-01-01'])[0]
# data2['valleys_delay'] = find_pv_delay_function(
#     data[data.index <= '2021-01-01'], data2[data2.index <= '2021-01-01'])[1]
# get_first_delay_function(data2)


In [80]:
def single_task(
    stock_name, date_data_start, date_predict_start, 
    date_predict_end, n_harm, pv_range, fit_method):

    data = yf.Ticker(stock_name).history(start=date_data_start, end=date_predict_end)
    transferred_signal = fourier_transfer_function(n_harm, stock_name, date_data_start, date_predict_start, date_predict_end)
    data['peaks'] = find_pv_function(pv_range, data)[0]
    data['valleys'] = find_pv_function(pv_range, data)[1]
    transferred_signal['peaks'] = find_pv_function(pv_range, transferred_signal)[0]
    transferred_signal['valleys'] = find_pv_function(pv_range, transferred_signal)[1]
    transferred_signal['peaks_delay'] = find_pv_delay_function(
        data[data.index <= date_predict_start], transferred_signal[transferred_signal.index <= date_predict_start])[0]
    transferred_signal['valleys_delay'] = find_pv_delay_function(
        data[data.index <= date_predict_start], transferred_signal[transferred_signal.index <= date_predict_start])[1]
    error = get_fit_error_function(transferred_signal, fit_method)
    transferred_signal['peaks_delay'] = find_pv_delay_function(data, transferred_signal)[0]
    transferred_signal['valleys_delay'] = find_pv_delay_function(data, transferred_signal)[1]
    return transferred_signal, error


In [81]:
# a = single_task("^GSPC", '2021-01-01', '2022-01-01',
#                     '2022-02-01', 20, 2, 'abs')
# data = a[0]
# transferred_signal = a[1]
# error = a[2]
# draw_plot_1(data, transferred_signal)
# print(error)


In [82]:
def fit_error_task(
    stock_name, date_data_start, date_predict_start, date_predict_end, 
    n_harm_lower_limit, n_harm_upper_limit, pv_range, fit_method):

    errors = pd.Series(dtype='float64')
    for i in range(n_harm_lower_limit, n_harm_upper_limit+1):
        temp = single_task(stock_name, date_data_start, date_predict_start,
                           date_predict_end, i, pv_range, fit_method)
        errors = pd.concat([errors, pd.Series(temp[1])])
    errors = errors.reset_index(drop=True)
    errors = errors.abs()
    best_fit = errors.idxmin() + n_harm_lower_limit
    transferred_signal, errors = single_task(stock_name, date_data_start, date_predict_start,
                                    date_predict_end, best_fit, pv_range, fit_method)
    return transferred_signal, errors, best_fit


In [83]:
# a = fit_error_task(
#     "^GSPC", '2021-06-01', '2022-07-01', '2022-02-01', 15, 17, 2, 'mean')
# a


In [84]:
def main_function(
    stock_name, date_predict_start, data_range, slide_range, n_slide, pv_range, 
    n_harm_lower_limit, n_harm_upper_limit, fit_method):
    
    date_predict_start = datetime.datetime.strptime(date_predict_start, '%Y-%m-%d') # ex.'2021-01-01'
    result_table = pd.DataFrame(
        columns=['Start_Date', 'Target_Date', 'delay', 'pv', 'error', 'best_fit'])
    for i in range(n_slide):
        date_data_start = date_predict_start - relativedelta(months=+data_range) # ex.'2020-07-01'
        date_predict_end = date_predict_start + relativedelta(months=+data_range) # ex.'2021-07-01'
        transferred_signal, error, best_fit = fit_error_task(
            stock_name, date_data_start, date_predict_start, date_predict_end, n_harm_lower_limit, n_harm_upper_limit, pv_range, fit_method)
        transferred_signal = transferred_signal.drop(
            transferred_signal[transferred_signal.index < date_predict_start].index)
        result_table.loc[i, 'error'] = round(error, 2)
        result_table.loc[i, 'best_fit'] = best_fit
        result_table.loc[i, 'Start_Date'] = date_predict_start # ex.'2021-01-01'
        result_table.loc[i, 'Target_Date'], result_table.loc[i,'delay'], result_table.loc[i, 'pv'] = get_first_delay_function(transferred_signal)
        date_data_start = date_data_start + relativedelta(weeks=+slide_range) # ex.'2020-07-15'
        date_predict_start = date_predict_start + relativedelta(weeks=+slide_range) # ex.'2021-01-15'
    final_error = round(
        sum([abs(ele) for ele in result_table['delay']]) / len(result_table['delay']), 2)
    return final_error, result_table


In [85]:
def draw_plot_result_table(data, final_error, result_table):
    fig, axes = plt.subplots(2, 1, figsize=(15, 8))
    axes[0].plot(data.index, data['Close'], 'gray', label='data', linewidth=3)
    axes[0].plot(data.index, data['peaks'], '^', c='royalblue', label='peaks')
    axes[0].plot(data.index, data['valleys'], 'v',c='orangered', label='valleys')
    for i, label in enumerate(result_table['delay']):
        if result_table['pv'][i] == 'peaks':
            axes[1].plot(result_table['Target_Date'][i],result_table['delay'][i], '*',
                         c='royalblue', label='peaks')
            axes[1].annotate(label, (result_table['Target_Date'][i],
                                     result_table['delay'][i]), fontsize=14)
        else:
            axes[1].plot(result_table['Target_Date'][i], result_table['delay'][i], '*', 
                         c='orangered', label='valleys')
            axes[1].annotate(label, (result_table['Target_Date'][i],
                             result_table['delay'][i]), fontsize=14)
    axes[0].set_ylabel("Stock price", fontsize=14)
    axes[0].grid(True)
    axes[1].grid(True)
    axes[1].set_ylabel("delay", fontsize=14)
    axes[0].set_xlim(data.index[0], data.index[-1])
    axes[1].set_xlim(data.index[0], data.index[-1])
    plt.show()
    return


In [86]:
# slide_abs_test = main_function(
#     stock_name="^GSPC", date_predict_start='2021-01-01', data_range=6,
#     slide_range=2, n_slide=4, pv_range=2, n_harm_lower_limit=20, n_harm_upper_limit=20, fit_method='abs'
# )
# print(slide_abs_test[0])
# print(slide_abs_test[1])
# data = yf.Ticker("^GSPC").history(start='2021-01-01', end='2021-12-31')
# data['peaks'] = find_pv_function(2, data)[0]
# data['valleys'] = find_pv_function(2, data)[1]
# draw_plot_result_table(data, slide_abs_test[0], slide_abs_test[1])

In [87]:
# slide_mean_test = main_function(
#     stock_name="^GSPC", main_function='2021-01-01', data_range=6,
#     slide_range=2, n_slide=4, pv_range=2, n_harm_lower_limit=20, n_harm_upper_limit=20, fit_method='mean'
# )
# print(slide_mean_test[0])
# print(slide_mean_test[1])
# data = yf.Ticker("^GSPC").history(start='2021-01-01', end='2021-12-31')
# data['peaks'] = find_pv_function(2, data)[0]
# data['valleys'] = find_pv_function(2, data)[1]
# draw_plot_result_table(data, slide_mean_test[0], slide_mean_test[1])

In [88]:
slide_mean_test = main_function(
    stock_name="^GSPC", date_predict_start='2021-01-01', data_range=6,
    slide_range=2, n_slide=24, pv_range=2, n_harm_lower_limit=20, n_harm_upper_limit=40, fit_method='mean'
)
print(slide_mean_test[0])
print(slide_mean_test[1])


1.67
             Start_Date          Target_Date delay       pv error best_fit
0   2021-01-01 00:00:00  2021-01-06 00:00:00  -2.0  valleys   0.0       20
1   2021-01-15 00:00:00  2021-01-20 00:00:00   2.0  valleys  0.01       39
2   2021-01-29 00:00:00  2021-01-29 00:00:00   0.0  valleys  0.01       37
3   2021-02-12 00:00:00  2021-02-18 00:00:00  -3.0    peaks   0.0       24
4   2021-02-26 00:00:00  2021-03-03 00:00:00   2.0    peaks  0.02       39
5   2021-03-12 00:00:00  2021-03-16 00:00:00   1.0    peaks -0.01       24
6   2021-03-26 00:00:00  2021-03-30 00:00:00  -2.0    peaks  0.02       31
7   2021-04-09 00:00:00  2021-04-12 00:00:00   2.0  valleys   0.0       29
8   2021-04-23 00:00:00  2021-05-03 00:00:00   1.0  valleys  0.04       38
9   2021-05-07 00:00:00  2021-05-10 00:00:00   2.0  valleys   0.0       39
10  2021-05-21 00:00:00  2021-05-27 00:00:00   1.0    peaks  0.01       23
11  2021-06-04 00:00:00  2021-06-09 00:00:00   3.0    peaks  0.01       20
12  2021-06-18 00:00

In [89]:
# slide_abs_test = main_function(
#     stock_name="^GSPC",date_predict_start='2021-01-01',data_range=6,
#     slide_range=2,n_slide=2,pv_range=2,n_harm_lower_limit=20,n_harm_upper_limit=40,fit_method='abs'
#     )
# print(slide_abs_test[0])
# print(slide_abs_test[1])


In [90]:
# slide_mean_test = main_function(
#     stock_name="^GSPC",date_predict_start='2021-01-01',data_range=6,
#     slide_range=2,n_slide=24,pv_range=2,n_harm_lower_limit=20,n_harm_upper_limit=40,fit_method='mean'
#     )
# print(slide_mean_test[0])
# print(slide_mean_test[1])


In [91]:
# slide_abs_test_2 = main_function(
#     stock_name="^GSPC",date_predict_start='2021-01-01',data_range=6,
#     slide_range=1,n_slide=48,pv_range=2,n_harm_lower_limit=20,n_harm_upper_limit=40,fit_method='abs'
#     )
# print(slide_abs_test_2[0])
# print(slide_abs_test_2[1])


In [92]:
# slide_mean_test_2 = main_function(
#     stock_name="^GSPC",date_predict_start='2021-01-01',data_range=6,
#     slide_range=1,n_slide=48,pv_range=2,n_harm_lower_limit=20,n_harm_upper_limit=40,fit_method='mean'
#     )
# print(slide_mean_test_2[0])
# print(slide_mean_test_2[1])


In [93]:
# slide_abs_test_3 = main_function(
#     stock_name="^GSPC",date_predict_start='2021-01-01',data_range=6,
#     slide_range=2,n_slide=24,pv_range=2,n_harm_lower_limit=0,n_harm_upper_limit=60,fit_method='abs'
#     )
# print(slide_abs_test_3[0])
# print(slide_abs_test_3[1])


In [94]:
# slide_mean_test_3 = main_function(
#     stock_name="^GSPC",date_predict_start='2021-01-01',data_range=6,
#     slide_range=2,n_slide=24,pv_range=2,n_harm_lower_limit=0,n_harm_upper_limit=60,fit_method='mean'
#     )
# print(slide_mean_test_3[0])
# print(slide_mean_test_3[1])


In [95]:
# slide_abs_test_4 = main_function(
#     stock_name="^GSPC",date_predict_start='2021-01-01',data_range=6,
#     slide_range=2,n_slide=24,pv_range=2,n_harm_lower_limit=0,n_harm_upper_limit=40,fit_method='abs'
#     )
# print(slide_abs_test_4[0])
# print(slide_abs_test_4[1])


In [96]:
# slide_mean_test_4 = main_function(
#     stock_name="^GSPC",date_predict_start='2021-01-01',data_range=6,
#     slide_range=2,n_slide=24,pv_range=2,n_harm_lower_limit=0,n_harm_upper_limit=40,fit_method='mean'
#     )
# print(slide_mean_test_4[0])
# print(slide_mean_test_4[1])
