In [3]:
import pandas

In [4]:
import yfinance as yf
import pandas as pd
from pandas_datareader import data
from datetime import datetime

In [5]:
yf.pdr_override() #以pandasreader常用的格式覆寫

In [6]:
target_stock = 'AAPL'  #股票代號變數

In [7]:
start_date = datetime(2014, 1, 1)
end_date = datetime(2021, 5, 30) #設定資料起訖日期

In [8]:
df = data.get_data_yahoo([target_stock], start_date, end_date) 

[*********************100%***********************]  1 of 1 completed


# 回測

In [9]:
import sys, setuptools, tokenize

In [10]:
from backtesting import Backtest, Strategy #引入回測和交易策略功能

from backtesting.lib import crossover #從lib子模組引入判斷均線交會功能
from backtesting.test import SMA #從test子模組引入繪製均線功能

import pandas as pd #引入pandas讀取股價歷史資料CSV檔



In [11]:
class SmaCross(Strategy): #交易策略命名為SmaClass，使用backtesting.py的Strategy功能
    n1 = 5 #設定第一條均線日數為5日(周線)
    n2 = 20 #設定第二條均線日數為20日(月線)，這邊的日數可自由調整

    def init(self):
        self.sma1 = self.I(SMA, self.data.Close, self.n1) #定義第一條均線為sma1，使用backtesting.py的SMA功能算繪
        self.sma2 = self.I(SMA, self.data.Close, self.n2) #定義第二條均線為sma2，使用backtesting.py的SMA功能算繪
    
    def next(self):
        if crossover(self.sma1, self.sma2): #如果周線衝上月線，表示近期是上漲的，則買入
            self.buy()
        elif crossover(self.sma2, self.sma1): #如果周線再與月線交叉，表示開始下跌了，則賣出
            self.sell()

In [12]:
#回測
df = df.interpolate()
df.index = pd.to_datetime(df.index)

In [13]:
df

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2013-12-31,19.791786,20.045713,19.785713,20.036428,17.819059,223084400
2014-01-02,19.845715,19.893929,19.715000,19.754642,17.568451,234684800
2014-01-03,19.745001,19.775000,19.301071,19.320715,17.182550,392467600
2014-01-06,19.194643,19.528570,19.057142,19.426071,17.276245,412610800
2014-01-07,19.440001,19.498571,19.211430,19.287144,17.152693,317209200
...,...,...,...,...,...,...
2021-05-24,126.010002,127.940002,125.940002,127.099998,127.099998,63092900
2021-05-25,127.820000,128.320007,126.320000,126.900002,126.900002,72009500
2021-05-26,126.959999,127.389999,126.419998,126.849998,126.849998,56575900
2021-05-27,126.440002,127.639999,125.080002,125.279999,125.279999,94625600


In [14]:
test = Backtest(df, SmaCross, cash=10000, commission=.002)

In [15]:
result = test.run()

In [16]:
print(result) # 直接print文字結果

Start                     2013-12-31 00:00:00
End                       2021-05-28 00:00:00
Duration                   2705 days 00:00:00
Exposure Time [%]                   98.338692
Equity Final [$]                 64671.021741
Equity Peak [$]                  73729.873784
Return [%]                         546.710217
Buy & Hold Return [%]              521.917229
Return (Ann.) [%]                   28.672256
Volatility (Ann.) [%]               37.282891
Sharpe Ratio                         0.769046
Sortino Ratio                        1.487584
Calmar Ratio                         0.740378
Max. Drawdown [%]                  -38.726491
Avg. Drawdown [%]                    -4.01002
Max. Drawdown Duration      721 days 00:00:00
Avg. Drawdown Duration       31 days 00:00:00
# Trades                                    1
Win Rate [%]                            100.0
Best Trade [%]                     546.845392
Worst Trade [%]                    546.845392
Avg. Trade [%]                    

In [17]:
test.plot() #將線圖網頁依照指定檔名保存



# ML測試

## load data

In [18]:
target_stock = 'AAPL'  #股票代號變數
start_date = datetime(2014, 1, 1)
end_date = datetime(2021, 5, 30) #設定資料起訖日期
df = data.get_data_yahoo([target_stock], start_date, end_date) 

[*********************100%***********************]  1 of 1 completed


## ML 產生預測資料

### 訓練資料

In [49]:
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime
from datetime import timedelta
from tqdm import tqdm
sns.set()
tf.compat.v1.random.set_random_seed(1234)

Instructions for updating:
non-resource variables are not supported in the long term


#### 正規化

In [27]:
df

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2013-12-31,19.791786,20.045713,19.785713,20.036428,17.819059,223084400
2014-01-02,19.845715,19.893929,19.715000,19.754642,17.568451,234684800
2014-01-03,19.745001,19.775000,19.301071,19.320715,17.182550,392467600
2014-01-06,19.194643,19.528570,19.057142,19.426071,17.276245,412610800
2014-01-07,19.440001,19.498571,19.211430,19.287144,17.152693,317209200
...,...,...,...,...,...,...
2021-05-24,126.010002,127.940002,125.940002,127.099998,127.099998,63092900
2021-05-25,127.820000,128.320007,126.320000,126.900002,126.900002,72009500
2021-05-26,126.959999,127.389999,126.419998,126.849998,126.849998,56575900
2021-05-27,126.440002,127.639999,125.080002,125.279999,125.279999,94625600


In [28]:
minmax = MinMaxScaler().fit(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = minmax.transform(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = pd.DataFrame(df_log)
df_log.head()

Unnamed: 0,0
0,0.015336
1,0.01336
2,0.010318
3,0.011056
4,0.010082


#### 拆分訓練驗證資料

In [50]:

test_size = 30
simulation_size = 10

df_train = df_log.iloc[:-test_size]
df_test = df_log.iloc[-test_size:]

In [94]:
class Model:
    def __init__(
        self,
        learning_rate,
        num_layers,
        size,
        size_layer,
        output_size,
        forget_bias = 0.1,
    ):
        def lstm_cell(size_layer):
            return  tf.compat.v1.nn.rnn_cell.LSTMCell(size_layer, state_is_tuple = False)

        rnn_cells =  tf.compat.v1.nn.rnn_cell.MultiRNNCell(
            [lstm_cell(size_layer) for _ in range(num_layers)],
            state_is_tuple = False,
        )
        self.X = tf.placeholder(tf.float32, (None, None, size))
        self.Y = tf.placeholder(tf.float32, (None, output_size))
        drop = tf.keras.layers.Dropout(
            rnn_cells, output_keep_prob = forget_bias
        )
        self.hidden_layer = tf.placeholder(
            tf.float32, (None, num_layers * 2 * size_layer)
        )
        _, last_state = tf.nn.dynamic_rnn(
            drop, self.X, initial_state = self.hidden_layer, dtype = tf.float32
        )
        with tf.variable_scope('decoder', reuse = False):
            rnn_cells_dec = tf.nn.rnn_cell.MultiRNNCell(
                [lstm_cell(size_layer) for _ in range(num_layers)], state_is_tuple = False
            )
            drop_dec = tf.compat.v1.nn.rnn_cell.DropoutWrapper(
                rnn_cells_dec, output_keep_prob = forget_bias
            )
            self.outputs, self.last_state = tf.nn.dynamic_rnn(
                drop_dec, self.X, initial_state = last_state, dtype = tf.float32
            )
            
        self.logits = tf.layers.dense(self.outputs[-1], output_size)
        self.cost = tf.reduce_mean(tf.square(self.Y - self.logits))
        self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(
            self.cost
        )
        
def calculate_accuracy(real, predict):
    real = np.array(real) + 1
    predict = np.array(predict) + 1
    percentage = 1 - np.sqrt(np.mean(np.square((real - predict) / real)))
    return percentage * 100

def anchor(signal, weight):
    buffer = []
    last = signal[0]
    for i in signal:
        smoothed_val = last * weight + (1 - weight) * i
        buffer.append(smoothed_val)
        last = smoothed_val
    return buffer

In [95]:

num_layers = 1
size_layer = 128
timestamp = 5
epoch = 300
dropout_rate = 0.8
future_day = test_size
learning_rate = 0.01

In [96]:
def forecast():
    tf.reset_default_graph()
    modelnn = Model(
        learning_rate, num_layers, df_log.shape[1], size_layer, df_log.shape[1], dropout_rate
    )
    sess = tf.InteractiveSession()
    sess.run(tf.global_variables_initializer())
    date_ori = pd.to_datetime(df.iloc[:, 0]).tolist()

    pbar = tqdm(range(epoch), desc = 'train loop')
    for i in pbar:
        init_value = np.zeros((1, num_layers * 2 * size_layer))
        total_loss, total_acc = [], []
        for k in range(0, df_train.shape[0] - 1, timestamp):
            index = min(k + timestamp, df_train.shape[0] - 1)
            batch_x = np.expand_dims(
                df_train.iloc[k : index, :].values, axis = 0
            )
            batch_y = df_train.iloc[k + 1 : index + 1, :].values
            logits, last_state, _, loss = sess.run(
                [modelnn.logits, modelnn.last_state, modelnn.optimizer, modelnn.cost],
                feed_dict = {
                    modelnn.X: batch_x,
                    modelnn.Y: batch_y,
                    modelnn.hidden_layer: init_value,
                },
            )        
            init_value = last_state
            total_loss.append(loss)
            total_acc.append(calculate_accuracy(batch_y[:, 0], logits[:, 0]))
        pbar.set_postfix(cost = np.mean(total_loss), acc = np.mean(total_acc))
    
    future_day = test_size
    output_predict = np.zeros((df_train.shape[0] + future_day, df_train.shape[1]))
    output_predict[0] = df_train.iloc[0]
    upper_b = (df_train.shape[0] // timestamp) * timestamp
    init_value = np.zeros((1, num_layers * 2 * size_layer))

    for k in range(0, (df_train.shape[0] // timestamp) * timestamp, timestamp):
        out_logits, last_state = sess.run(
            [modelnn.logits, modelnn.last_state],
            feed_dict = {
                modelnn.X: np.expand_dims(
                    df_train.iloc[k : k + timestamp], axis = 0
                ),
                modelnn.hidden_layer: init_value,
            },
        )
        init_value = last_state
        output_predict[k + 1 : k + timestamp + 1] = out_logits

    if upper_b != df_train.shape[0]:
        out_logits, last_state = sess.run(
            [modelnn.logits, modelnn.last_state],
            feed_dict = {
                modelnn.X: np.expand_dims(df_train.iloc[upper_b:], axis = 0),
                modelnn.hidden_layer: init_value,
            },
        )
        output_predict[upper_b + 1 : df_train.shape[0] + 1] = out_logits
        future_day -= 1
        date_ori.append(date_ori[-1] + timedelta(days = 1))

    init_value = last_state
    for i in range(future_day):
        o = output_predict[-future_day - timestamp + i:-future_day + i]
        out_logits, last_state = sess.run(
            [modelnn.logits, modelnn.last_state],
            feed_dict = {
                modelnn.X: np.expand_dims(o, axis = 0),
                modelnn.hidden_layer: init_value,
            },
        )
        init_value = last_state
        output_predict[-future_day + i] = out_logits[-1]
        date_ori.append(date_ori[-1] + timedelta(days = 1))
    
    output_predict = minmax.inverse_transform(output_predict)
    deep_future = anchor(output_predict[:, 0], 0.3)
    
    return deep_future[-test_size:]

In [97]:

results = []
for i in range(simulation_size):
    print('simulation %d'%(i + 1))
    results.append(forecast())

simulation 1


TypeError: ('Keyword argument not understood:', 'output_keep_prob')

In [98]:
forecast()



TypeError: ('Keyword argument not understood:', 'output_keep_prob')

## 回測

In [15]:
class ml_back_test(Strategy): #交易策略命名為SmaClass，使用backtesting.py的Strategy功能
    n1 = 5 #設定第一條均線日數為5日(周線)
    n2 = 20 #設定第二條均線日數為20日(月線)，這邊的日數可自由調整

    def init(self):
        self.sma1 = self.I(SMA, self.data.Close, self.n1) #定義第一條均線為sma1，使用backtesting.py的SMA功能算繪
        self.sma2 = self.I(SMA, self.data.Close, self.n2) #定義第二條均線為sma2，使用backtesting.py的SMA功能算繪
    
    def next(self):
        if crossover(self.sma1, self.sma2): #如果周線衝上月線，表示近期是上漲的，則買入
            self.buy()
        elif crossover(self.sma2, self.sma1): #如果周線再與月線交叉，表示開始下跌了，則賣出
            self.sell()

In [25]:
ml_back_test.I

<function backtesting.backtesting.Strategy.I(self, func: Callable, *args, name=None, plot=True, overlay=None, color=None, scatter=False, **kwargs) -> numpy.ndarray>

In [26]:
ml_back_test.crossover

AttributeError: type object 'ml_back_test' has no attribute 'crossover'

In [30]:
from backtesting.test import SMA

In [31]:
SMA

<function backtesting.test.SMA(arr: pandas.core.series.Series, n: int) -> pandas.core.series.Series>