# MA線交易策略驗證
Author：余慶龍  
Establish Date：2021.7.10  
Last Modified Date：2021.9.10

## Table of Contents
1. 讀取檔案模塊
2. 指標與交易策略
3. 回測模塊
4. 執行回測並搜集資料
5. 統計結果

## 1. 讀取檔案模塊

In [12]:
"""
把csv或excel檔案轉換成dataframe
以供後續的pandas操作
"""

import pandas as pd
import numpy as np
from datetime import datetime

def readStock_file(file, filetype='csv'):


    if filetype == 'excel':
        df = pd.read_excel(file, engine='openpyxl', parse_dates=True, header=None)
    else:
        df = pd.read_csv(file)

    # 取代原本的 column 名稱
    # 檔案的日期與開高低收需要照這個順序
    colume_name = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
    df.columns = colume_name

    # 用日期這一行當做 df 的索引
    df = df.set_index('Date')

    # 把日期轉成 datetime的格式(從string)
    df.index = pd.to_datetime(df.index)

    # 照日期排序並把空資料轉成numpy的nan
    df = df.sort_index()
    df = df.replace(r'^\s*-$', np.nan, regex=True)


    for col in df.columns:
        if(col=='Date'):
            continue;
        df[col] = np.array([float(x) for x in df[col]])

    return df

def random_sample(ticker_list_file,  column = '證券代碼', times = 10):
    

    ticker_list_df = pd.read_csv(ticker_list_file, encoding='utf8', usecols=[column])
    ticker_list = ticker_list_df[column].to_list()
    
    sample = np.random.choice(ticker_list, times)
    sample_list = [i.replace(' ', '') for i in sample.tolist()]

    return sample_list

def file_list_with_directory(file_list, path):
    
    files = [path+i+'.csv' for i in file_list]
    return files

def get_dataframe(data_file, days=0):
    ticker = pd.read_csv(data_file)

    # change the name of columns
    ticker.columns = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']

    # set the column "Date" to index
    ticker = ticker.set_index('Date')

    # set index column("Date") from string to datetime.datetime
    ticker.index = pd.to_datetime(ticker.index)

    # sort index column("Date") chronologically
    ticker = ticker.sort_index()
    
    if(days):
        ticker = ticker.tail(days)
    return ticker

## 2. 指標與交易策略

In [28]:
import mplfinance as mpf

# moving window of moving average indicator
# suhc as N = 30
MOVING_WINDOW = 30


def simple_price_MA(ticker, plot=False):
    # make a copy from ticker
    dw = ticker

    # calculate moving average by pandas.DataFrame.rolling()
    dw['ma'] = dw['Close'].rolling(MOVING_WINDOW).mean()

    # initialize 'buy' & 'sell' column by filling with 0
    dw['buy'] = np.zeros(ticker.shape[0])
    dw['sell'] = np.zeros(ticker.shape[0])



    # the beginning data of MA is nan
    # so we skip those data
    for i in range(MOVING_WINDOW, dw.shape[0]):

        if(dw['Close'][i] > dw['Open'][i]):

            # Close price crossover the MA indicator upward
            if (((dw.iloc[i]['Close'] > dw.iloc[i]['ma']) and
                 (dw.iloc[i-1]['Close'] < dw.iloc[i-1]['ma'])) and
                 (dw.iloc[i]['Close'] > dw.iloc[i-1]['Close'])):

                # dw['buy'][i] = 1
                #   is a simple version of this line of code
                #   however "chained indexing" should be avoided
                #   while we are setting value.
                #   checkout reference to see more.
                dw.loc[(dw.index[i], 'buy')] = 1

        # Close price crossover the MA indicator downward
        if(dw['Close'][i] < dw['Open'][i]):
            if (((dw.iloc[i]['Close'] < dw.iloc[i]['ma']) and
                 (dw.iloc[i-1]['Close'] > dw.iloc[i-1]['ma'])) and
                 (dw.iloc[i]['Close'] < dw.iloc[i-1]['Close'])):

                dw.loc[(dw.index[i], 'sell')] = 1
                
    if plot:
        plots = []

        # the position in graph
        buy, sell = [], []

        # adjust the position to avoid signals block the lines
        for index, row in dw.iterrows():
            buy.append(row['Close']*0.985 if row['buy'] else np.nan)
            sell.append(row['Close']*1.01 if row['sell'] else np.nan)

        # np.isnan(sell) determine the value is nan or not
        # count_nonzero(np.isnan(sell)) counts how many nan we have
        # if nan amount equal the length of array, we don't have sell record
        if not np.count_nonzero(np.isnan(sell)) == len(sell):
            plots.append(mpf.make_addplot(sell , type = 'scatter', color = 'red', marker = 'v', markersize = 100))

        if not np.count_nonzero(np.isnan(buy)) == len(buy):
            plots.append(mpf.make_addplot(buy , type = 'scatter', color = '#cfc01d', marker = '^', markersize = 200))

        mpf.plot(dw, 
                    addplot = plots,
                    type='candle',       # candlestick chart of stock price
                    mav = MOVING_WINDOW, # mav is moving average window
                    volume=True,         # plots trading volume as well
                    figscale=2,          # make graph twice bigger
                    style='yahoo')       # color scheme using 'yahoo' style

    return dw

def crossovers_MA(ticker, plot=False):
    # make a copy from ticker
    dw = ticker

    MOVING_WINDOW_1 = 5
    MOVING_WINDOW_2 = 30
    # calculate moving average by pandas.DataFrame.rolling()
    dw['ma_5'] = dw['Close'].rolling(MOVING_WINDOW_1).mean()
    dw['ma_30'] = dw['Close'].rolling(MOVING_WINDOW_2).mean()

    # initialize 'buy' & 'sell' column by filling with 0
    dw['buy'] = np.zeros(ticker.shape[0])
    dw['sell'] = np.zeros(ticker.shape[0])



    # the beginning data of MA is nan
    # so we skip those data
    for i in range(MOVING_WINDOW_2, dw.shape[0]):

        if(dw['Close'][i] > dw['Open'][i]):

            # Close price crossover the MA indicator upward
            if (((dw.iloc[i]['ma_5'] > dw.iloc[i]['ma_30']) and
                 (dw.iloc[i-1]['ma_5'] < dw.iloc[i-1]['ma_30'])) and
                 (dw.iloc[i]['ma_5'] > dw.iloc[i-1]['ma_5'])):

                # dw['buy'][i] = 1
                #   is a simple version of this line of code
                #   however "chained indexing" should be avoided
                #   while we are setting value.
                #   checkout reference to see more.
                dw.loc[(dw.index[i], 'buy')] = 1

        # Close price crossover the MA indicator downward
        if(dw['Close'][i] < dw['Open'][i]):
            if (((dw.iloc[i]['ma_5'] < dw.iloc[i]['ma_30']) and
                 (dw.iloc[i-1]['ma_5'] > dw.iloc[i-1]['ma_30'])) and
                 (dw.iloc[i]['ma_5'] < dw.iloc[i-1]['ma_5'])):

                dw.loc[(dw.index[i], 'sell')] = 1
                
    if plot:
        plots = []

        # the position in graph
        buy, sell = [], []

        # adjust the position to avoid signals block the lines
        for index, row in dw.iterrows():
            buy.append(row['Close']*0.985 if row['buy'] else np.nan)
            sell.append(row['Close']*1.01 if row['sell'] else np.nan)

        # np.isnan(sell) determine the value is nan or not
        # count_nonzero(np.isnan(sell)) counts how many nan we have
        # if nan amount equal the length of array, we don't have sell record
        if not np.count_nonzero(np.isnan(sell)) == len(sell):
            plots.append(mpf.make_addplot(sell , type = 'scatter', color = 'red', marker = 'v', markersize = 100))

        if not np.count_nonzero(np.isnan(buy)) == len(buy):
            plots.append(mpf.make_addplot(buy , type = 'scatter', color = '#cfc01d', marker = '^', markersize = 200))

        mpf.plot(dw, 
                    addplot = plots,
                    type='candle',       # candlestick chart of stock price
                    mav = (MOVING_WINDOW_1, MOVING_WINDOW_2), # mav is moving average window
                    volume=True,         # plots trading volume as well
                    figscale=2,          # make graph twice bigger
                    style='yahoo')       # color scheme using 'yahoo' style

    return dw

def corss_volume(ticker, plot=False):
    # make a copy from ticker
    dw = ticker

    MOVING_WINDOW_1 = 5
    MOVING_WINDOW_2 = 30
    # calculate moving average by pandas.DataFrame.rolling()
    data['ma_5'] = data['Close'].rolling(MOVING_WINDOW_1).mean()
    data['ma_30'] = data['Close'].rolling(MOVING_WINDOW_2).mean()
    data['vol_30'] = data['Volume'].rolling(MOVING_WINDOW_1).mean()

    # initialize 'buy' & 'sell' column by filling with 0
    data['buy'] = np.zeros(ticker.shape[0])
    data['sell'] = np.zeros(ticker.shape[0])


    sell_buy_status = 0
    # the beginning data of MA is nan
    # so we skip those data
    for i in range(MOVING_WINDOW_2, data.shape[0]):

        if(data['Close'][i] > data['Open'][i]):

            #print(data['Volume'][i])
            # Close price crossover the MA indicator upward
            if (((data.iloc[i]['ma_5'] > data.iloc[i]['ma_30']) and
                 (data.iloc[i-1]['ma_5'] < data.iloc[i-1]['ma_30'])) and
                 (data.iloc[i]['ma_5'] > data.iloc[i-1]['ma_5'])):

                # data['buy'][i] = 1
                #   is a simple version of this line of code
                #   however "chained indexing" should be avoided
                #   while we are setting value.
                #   checkout reference to see more.
                data.loc[(data.index[i], 'buy')] = 1

        # Close price crossover the MA indicator downward
        if(data['Close'][i] < data['Open'][i] and
           data['Volume'][i] > data['vol_30'][i]):
            if (((data.iloc[i]['ma_5'] < data.iloc[i]['ma_30']) and
                 (data.iloc[i-1]['ma_5'] > data.iloc[i-1]['ma_30'])) and
                 (data.iloc[i]['ma_5'] < data.iloc[i-1]['ma_5'])):

                data.loc[(data.index[i], 'sell')] = 1


                
    if plot:
        plots = []

        # the position in graph
        buy, sell = [], []

        # adjust the position to avoid signals block the lines
        for index, row in dw.iterrows():
            buy.append(row['Close']*0.985 if row['buy'] else np.nan)
            sell.append(row['Close']*1.01 if row['sell'] else np.nan)

        # np.isnan(sell) determine the value is nan or not
        # count_nonzero(np.isnan(sell)) counts how many nan we have
        # if nan amount equal the length of array, we don't have sell record
        if not np.count_nonzero(np.isnan(sell)) == len(sell):
            plots.append(mpf.make_addplot(sell , type = 'scatter', color = 'red', marker = 'v', markersize = 100))

        if not np.count_nonzero(np.isnan(buy)) == len(buy):
            plots.append(mpf.make_addplot(buy , type = 'scatter', color = '#cfc01d', marker = '^', markersize = 200))

        mpf.plot(dw, 
                    addplot = plots,
                    type='candle',       # candlestick chart of stock price
                    mav = (MOVING_WINDOW_1, MOVING_WINDOW_2), # mav is moving average window
                    volume=True,         # plots trading volume as well
                    figscale=2,          # make graph twice bigger
                    style='yahoo')       # color scheme using 'yahoo' style

    return dw



## 3. 回測模塊

In [29]:
from termcolor import colored
def backtesting(data):
    # how many shares we hold, and the money balance
    have_shares , balance = 0, 0

    # multiple buy and 1 sell as "a trade"
    # record trading times and how many trade are making money
    make_money, trade_times = 0, 0
    
    if np.count_nonzero(data['buy']) == 0:
        return np.nan, np.nan
    
    # we use geometic average
    # this is products of return rate
    return_rate = 1
    for index, row in data.iterrows():

        if(row['buy'] == 1):
            balance -= row['Close']
            have_shares += 1
            print(colored('\tbuy ', 'green'),'|', colored(index.strftime("%Y-%m-%d"), 'green'), '|', colored(row['Close'], 'green'))

        # sell signal or it reach ends
        elif((data.loc[index]['sell'] == 1 or index == data.index.values[-1])and
        # and we have some shares
              have_shares != 0):
            print(colored('\tsell', 'red'),'|', colored(index.strftime("%Y-%m-%d"), 'red'), '|', colored(row['Close'],'red'), "*" , have_shares)
            

            principal = -balance
            balance += row['Close'] * have_shares

            # to calculate accuracy
            trade_times += 1
            if(balance > 0):
                make_money += 1

            # plus 1 avoiding get too small
            return_rate *= 1+(balance / principal)

            # reset balance & shares
            balance = 0
            have_shares = 0


    return_rate = (return_rate-1) * 100.0
    
    accuracy = 0.0
    if(trade_times):
        accuracy = make_money/ trade_times * 100

    return return_rate, accuracy

## 4. 執行回測並搜集資料

In [259]:
random_choice_num = 10
data_directory = '../../parse_data/data/splited/'
ticker_name_file=  '../../parse_data/data/ticker_name.csv'
TIME_INTERVAL = 600

strategy_list = [simple_price_MA, crossovers_MA,corss_volume]
dictionary_rr = {}
dictionary_acc = {}

for data_file in file_list_with_directory(random_sample(ticker_list_file = ticker_name_file, times = random_choice_num), data_directory):
    
    slim_file_name = (data_file.split('/')[-1]).split('.')[0]

    print(data_file)
    ticker = get_dataframe(data_file, days=TIME_INTERVAL)
    
    strategy_rr= []
    strategy_acc = []
    for i in range(len(strategy_list)):
        
        data = ticker.copy(deep=True)
        
        # initialize 'buy' & 'sell' column by filling with 0
        data['buy'] = np.zeros(ticker.shape[0])
        data['sell'] = np.zeros(ticker.shape[0])

        data = strategy_list[i](data)
        return_rate, acc = backtesting(data)
        
        if return_rate != np.nan:
            return_rate = (round(return_rate, 1))
            acc = (round(acc, 3))
        
        print('\treturn rate: {:>5.1f}% | acc: {:>5.1f}% | {}'.format(return_rate, acc, colored(strategy_list[i].__name__,  'red', attrs=['bold'])))
        strategy_rr.append((return_rate))
        strategy_acc.append((acc))
        print()
    

    #print(strategy_rr)
    #print(strategy_acc)
    
    dictionary_rr[slim_file_name] = strategy_rr
    dictionary_acc[slim_file_name] = strategy_acc



../../parse_data/data/splited/1475.csv
[32m	buy [0m | [32m2019-01-17[0m | [32m11.1[0m
[31m	sell[0m | [31m2019-01-18[0m | [31m9.99[0m * 1
[32m	buy [0m | [32m2019-02-11[0m | [32m10.6[0m
[32m	buy [0m | [32m2019-02-15[0m | [32m10.9[0m
[31m	sell[0m | [31m2019-03-05[0m | [31m10.3[0m * 2
[32m	buy [0m | [32m2019-05-03[0m | [32m7.68[0m
[31m	sell[0m | [31m2019-05-07[0m | [31m7.26[0m * 1
[32m	buy [0m | [32m2019-06-13[0m | [32m7.08[0m
[32m	buy [0m | [32m2019-06-21[0m | [32m6.75[0m
[32m	buy [0m | [32m2019-11-13[0m | [32m4.89[0m
[32m	buy [0m | [32m2019-12-05[0m | [32m4.88[0m
[31m	sell[0m | [31m2019-12-10[0m | [31m4.64[0m * 4
[32m	buy [0m | [32m2020-02-25[0m | [32m10.35[0m
[31m	sell[0m | [31m2020-02-27[0m | [31m9.59[0m * 1
[32m	buy [0m | [32m2020-04-14[0m | [32m7.27[0m
[32m	buy [0m | [32m2020-06-12[0m | [32m15.1[0m
[32m	buy [0m | [32m2020-08-04[0m | [32m17.55[0m
[31m	sell[0m | [31m2020-10-13[0m |

## 5. 統計結果

In [261]:
ticker_target = '0050'
df_target = readStock_file('../../parse_data/data/splited/{}.csv'.format(ticker_target))
df_target = df_target.tail(TIME_INTERVAL)
print('Compared target', colored(ticker_target, 'magenta'), ':', end=' ')
print(colored(str(round(((df_target['Close'][-1]/df_target['Close'][0])-1) * 100, 3)) +'%' , 'magenta',attrs=['bold']))
print()

extract_pattern = re.compile("(.+)%")

strategy_names = [i.__name__ for i in strategy_list]

dataframe_return_rate = pd.DataFrame.from_dict(dictionary_rr, orient='index',columns = strategy_names)
dataframe_return_rate_avg = dataframe_return_rate.mean(axis = 0, skipna = True).apply(lambda x : round(x, 3))
dataframe_return_rate_avg = (pd.DataFrame(dataframe_return_rate_avg, columns=['AVG']).transpose())
dataframe_return_rate = dataframe_return_rate.append(dataframe_return_rate_avg)

dataframe_acc = pd.DataFrame.from_dict(dictionary_acc ,orient='index',columns = strategy_names)
dataframe_acc_avg = dataframe_acc.mean(axis = 0, skipna = True).apply(lambda x : round(x, 3))
dataframe_acc_avg = (pd.DataFrame(dataframe_acc_avg, columns=['AVG']).transpose())
dataframe_acc = dataframe_acc.append(dataframe_acc_avg)

pattern = re.compile("(.+)%")
def adding_percent(df):

    for col in df.columns:
        df[col] = df[col].apply(lambda x : str(x) + '%')
    return df

def style_return(df):
    style = df.style.apply(lambda x : ['color:green' if ('nan' not in i and eval(pattern.match(i)[1]) > 1) else '' for i in x])
    style = style.apply(lambda x : ['color:red' if ('nan' not in i and -1.0 > eval(pattern.match(i)[1])) else '' for i in x])
    
    return style
def style_win(df):
    style = df.style.apply(lambda x : ['color:red' if ('nan' not in i  and eval(pattern.match(i)[1]) < 50) else '' for i in x])
    style = style.apply(lambda x : ['color:green' if ('nan' not in i and eval(pattern.match(i)[1]) > 50) else '' for i in x])

    return style




def style_avg(style):
    def custom_avg_row(row):

        # Underline + bold the AVG
        colors = ['background-color: \'\'']*len(row.index)
        colors[-1] = 'font-weight: bold;text-decoration: underline;'
        return colors
    
    style = style.apply(custom_avg_row, axis=0)
    
    # change header color
    headers = {
    'selector': 'th:not(.index_name)',
    'props': [('background-color', '#fff9c7')]
    }
    style = style.set_table_styles([headers])
    
    return style
 

from IPython.display import display

dataframe_return_rate = adding_percent(dataframe_return_rate)
adding_percent(dataframe_acc)
print('return rate:')
display(style_avg(style_return(dataframe_return_rate)))

print()
print('accuracy:')
display(style_avg(style_win(dataframe_acc)))



Compared target [35m0050[0m : [1m[35m60.995%[0m

return rate:


Unnamed: 0,simple_price_MA,crossovers_MA,corss_volume
1475,60.8%,174.0%,174.0%
4737,55.4%,-14.4%,14.6%
1583,-19.9%,-11.3%,-8.0%
1315,15.4%,140.1%,63.3%
2412,-8.1%,-3.2%,-2.6%
4904,-8.4%,-10.4%,-9.7%
2316,83.9%,76.1%,85.4%
1452,14.7%,9.1%,-1.3%
2606,-6.2%,4.2%,20.4%
1210,59.4%,35.2%,56.6%



accuracy:


Unnamed: 0,simple_price_MA,crossovers_MA,corss_volume
1475,33.333%,75.0%,75.0%
4737,25.0%,40.0%,100.0%
1583,14.286%,33.333%,40.0%
1315,20.0%,80.0%,66.667%
2412,14.286%,25.0%,0.0%
4904,19.048%,14.286%,0.0%
2316,25.0%,50.0%,50.0%
1452,31.25%,40.0%,66.667%
2606,19.231%,50.0%,50.0%
1210,38.095%,100.0%,100.0%
