# 價值股策略

In [1]:
import pandas as pd
import numpy as np
import pymongo
from typing import Union
import datetime
from tqdm import tqdm
import matplotlib.pyplot as plt


### Class預期
1. 關於資料的讀取 - 最後都要組成單檔股票為一個sheet來處理
    * 抓取資料後存到dict? 或是存成numpy? 也可以存在pandas(每個資料一個sheet) - 以存到numpy為主要測試方向(快很多)
    * 確定numpy沒辦法用標籤對齊，可先用熊貓組成資料後對齊，最後轉成numpy再來運算(避免錯誤)
2. 關於資料的改動
    * 需要用到的資料最後變成每個資料一個sheet，捨棄不需要用的資料省下記憶體與增加速度 - 要記得思考要如何同時跟其他股票在同一時間判斷進出場(資金控管需求)
3. 關於回測
    * 關於回測的預期: return 損益、部位以及transactions - return、position index=datetime, columns=symbols+cash

In [22]:
class Base(object):
    def __init__(self, client: pymongo.MongoClient):
        self._config()
        self.client = client


    def _config(self):
        # 設定繪圖、TQDM讀取條、小數點位數、顯示視窗長度
        pd.options.plotting.backend = "matplotlib"
        tqdm.pandas(desc="progress-bar")
        plt.rcParams['font.family'] = ['Microsoft JhengHei'] # 中文標籤
        plt.rcParams['axes.unicode_minus'] = False # 負號
        pd.set_option('display.max_rows', 200)
        pd.set_option('display.float_format', lambda x: '%.3f' % x)
        pd.options.display.float_format = '{:,.4f}'.format

class DataCenter(Base):
    def __init__(self, client: pymongo.MongoClient):
        super().__init__(client)
        self.data = {}
        self.factors = {}
        self.factors_idx = {}
        self.factors_col = {}

    def get_from_mongo(self, elements: Union[str, list], db:str='Fields',
     start: Union[datetime.datetime, None]=None, 
     end: Union[datetime.datetime, None]=None):
        """
        elements: Element, 表示要在Mongo中抓哪些資料, 可用list包起來好幾項
        start: 開始時間
        end: 結束時間
        """
        if start != None:
            if type(elements) != list:
                self.data[elements] = pd.DataFrame(self.client[db][elements].find({"日期": {'$gt': start, '$lt': end}}, {'_id': 0})).set_index('日期')
            else:
                for e in elements:
                    self.data[e] = pd.DataFrame(self.client[db][e].find({"日期": {'$gt': start, '$lt': end}}, {'_id': 0})).set_index('日期')
                    print(f'Data {e} has shape {self.data[e].shape}')
        else:
            if type(elements) != list:
                self.data[elements] = pd.DataFrame(self.client[db][elements].find({}, {'_id': 0}))
                if '日期' in self.data[e].columns:
                    self.data[e] = self.data[e].set_index('日期')
            else:
                for e in elements:
                    self.data[e] = pd.DataFrame(self.client[db][e].find({}, {'_id': 0}))
                    if '日期' in self.data[e].columns:
                        self.data[e] = self.data[e].set_index('日期')
                    print(f'Data {e} has shape {self.data[e].shape}.')            

    def set_factor(self, dataname: str, data: pd.DataFrame, check: bool=False):
        """
        放到factors的所有資料要有同樣的shape
        都set完後在backtesting中主要使用factor中的數據，與data分隔開(但計算損益還是會撈data的收盤價)
        """
        self.factors_idx[dataname] = data.index
        self.factors_col[dataname] = data.columns
        self.factors[dataname] = data.values
        print(f'Factors {dataname} has shape {self.factors[dataname].shape}.')
        if check:
            d = self.factors[list(self.factors.keys())[0]].shape
            for k, v in self.factors.values():
                if v.shape != d:
                    del self.factors[k]
                    return f'{k} not have same shape, It should have {d}, but it get {v.shape}, del {k} already.'

    def set_factor_in_financial(self, base: str='EPS_Q', target: Union[str, list]='還原收盤價'):
        base_index = self.data[base].index
        if isinstance(target, str):
            target_data = self.data[target]
            target_data = target_data.iloc[target_data.index.get_indexer(base_index, method='nearest')]
            self.set_factor(target, target_data)
        else:
            for t in target:
                target_data = self.data[t]
                target_data = target_data.iloc[target_data.index.get_indexer(base_index, method='nearest')]
                self.set_factor(t, target_data)

    def creat_to_daily(self, df: pd.DataFrame, base: str='還原收盤價', real_daily: bool=False):
        """
        df: 放要轉換成日資料的值
        base: 根據哪個資料轉換成日
        real_daily: 要取得真實有交易日期或是保留財報日期
        """
        n = pd.DataFrame(columns=self.data[base].columns, index=self.data[base].index.union(df.index))
        for i in df.index:
            for c in df.columns:
                n.at[i, str(c)] = df.at[i, c]
        if real_daily:
            return n.fillna(method='ffill').loc[self.data[base].index, self.data[base].columns]
        else:
            return n.fillna(method='ffill')

class Analysis(DataCenter):
    pass

class BackTesting(Analysis):
    def st(self):
        self.data['long'] = (self.factors['本益比(近四季)'] < 15) &\
            (self.factors['股價淨值比'] < 1.5) &\
            (self.factors['成交金額(千)'].T > np.nanmean(self.factors['成交金額(千)'], axis=1)).T &\
            (self.factors['還原收盤價'] > self.factors['MA60']) &\
            (self.factors['還原收盤價'] > self.factors['MA120']) &\
            (self.factors['EPS_Q'] > 0) &\
            (self.factors['營業收入淨額_Q'] > 0) &\
            (self.factors['殖利率'] > 4)
        # self.factors['long'] = self.factors['long'].shift(1)

        self.data['short'] = (self.factors['本益比(近四季)'] > 15) &\
            (self.factors['股價淨值比'] > 1.5) &\
            (self.factors['成交金額(千)'].T > np.nanmean(self.factors['成交金額(千)'], axis=1)).T &\
            (self.factors['還原收盤價'] < self.factors['MA60']) &\
            (self.factors['還原收盤價'] < self.factors['MA120']) &\
            (self.factors['EPS_Q'] < 0) &\
            (self.factors['營業收入淨額_Q'] < 0) &\
            (self.factors['殖利率'] < 2)
        self.data['short'] = self.data['short'].shift(1)

class Trader(BackTesting):
    pass

        

In [23]:
if __name__ == '__main__':
    """
    流程:
        1. get_from_mongo所有資料
        2. 計算資料後放到td.data去 - 是否全部都要放bool呢?
        3. 將整理完整要使用的資料set到tf.factors去(期望裡面都放ndarray)
        4. 在backtesting class中計算所有回測
        5. 再建立一個analysis class去分析結果(類似pyfolio)
    """
    client = pymongo.MongoClient()
    # 將class實例化
    td = Trader(client)
    # 第一步驟
    data_list = ['本益比(近四季)', '股價淨值比', '成交金額(千)', '還原收盤價', 'EPS_Q', '營業收入淨額_Q', '殖利率', '收盤價_指數']
    start = datetime.datetime(2004, 1, 1)
    end = datetime.datetime.today()
    td.get_from_mongo(data_list, 'Fields', start, end)
    data2_list = ['TWA00', '指數彙編', '指數名稱轉換']
    td.get_from_mongo(data2_list, 'Index')
    # 第二步驟
    # 目的是用每季資料來做交易，因此要先抓出每季收到財報"後"一天的日期
    # 抑或不加入財報資料，直接用日來交易更好呢? 兩套可以分開測試
    td.data['MA60'] = td.data['還原收盤價'].rolling(60).mean()
    td.data['MA120'] = td.data['還原收盤價'].rolling(120).mean()
    td.data['本益比(近四季)'] = td.data['本益比(近四季)'].rolling(120).mean()
    td.data['成交金額(千)'] = td.data['成交金額(千)'].rolling(60).mean()
    td.data['殖利率'] = td.data['殖利率'].rolling(120).mean()
    td.data['EPS_Q'] = td.data['EPS_Q'].rolling(4).sum().pct_change()
    td.data['營業收入淨額_Q'] = td.data['營業收入淨額_Q'].rolling(4).sum().pct_change()
    # 第三步驟
    factor_list = ['MA60', 'MA120', '本益比(近四季)', '成交金額(千)', '殖利率', 'EPS_Q', '營業收入淨額_Q', '股價淨值比', '還原收盤價']
    td.set_factor_in_financial(base='EPS_Q', target=factor_list)
    # 第四步驟

Data 本益比(近四季) has shape (4666, 2211)
Data 股價淨值比 has shape (4666, 2211)
Data 成交金額(千) has shape (4666, 2211)
Data 還原收盤價 has shape (4666, 2211)
Data EPS_Q has shape (76, 2211)
Data 營業收入淨額_Q has shape (76, 2211)
Data 殖利率 has shape (4666, 2211)
Data 收盤價_指數 has shape (4666, 59)
Data TWA00 has shape (6044, 34).
Data 指數彙編 has shape (2211, 8).
Data 指數名稱轉換 has shape (1, 22).
Factors MA60 has shape (76, 2211).
Factors MA120 has shape (76, 2211).
Factors 本益比(近四季) has shape (76, 2211).
Factors 成交金額(千) has shape (76, 2211).
Factors 殖利率 has shape (76, 2211).
Factors EPS_Q has shape (76, 2211).
Factors 營業收入淨額_Q has shape (76, 2211).
Factors 股價淨值比 has shape (76, 2211).
Factors 還原收盤價 has shape (76, 2211).


In [38]:
#(td.factors['還原收盤價'] > td.factors['MA60'])
(td.factors['成交金額(千)'].T > np.nanmean(td.factors['成交金額(千)'], axis=1)).T.shape



  (td.factors['成交金額(千)'].T > np.nanmean(td.factors['成交金額(千)'], axis=1)).T.shape


42      1240
87      1416
99      1435
101     1437
154     1516
186     1580
189     1584
325     2062
409     2348
555     2496
616     2614
618     2616
627     2640
734     2904
862     3171
902     3284
958     3426
1025    3557
1065    3629
1191    4430
1207    4529
1213    4536
1216    4541
1226    4556
1228    4558
1233    4564
1283    4806
1392    5284
1462    5450
1478    5481
1515    5530
1527    5604
1545    5871
1578    6101
1639    6165
1653    6179
1673    6199
1753    6294
1786    6464
1841    6578
1843    6581
1850    6592
1853    6596
1861    6616
1862    6624
1863    6625
1864    6629
1866    6641
1872    6655
1901    6721
1912    6754
1918    6768
1928    6803
1931    6807
2022    8222
2038    8341
2039    8342
2041    8354
2047    8390
2048    8401
2050    8404
2054    8411
2057    8418
2060    8422
2063    8426
2069    8435
2071    8437
2073    8440
2074    8442
2076    8444
2077    8446
2078    8450
2082    8463
2083    8464
2084    8466
2085    8467
2087    8473

In [44]:
da = {d: 1 for d in td.data['指數彙編'][td.data['指數彙編']['2022指數彙編分類'] == '傳產-其他']['股票代號'].astype(str).values}
da['_id'] = 0
da['日期'] = 1
df = pd.DataFrame(td.client['Fields']['收盤價'].find({'日期': {'$gte': datetime.datetime(2020, 1, 1)}}, da))

In [50]:
df.columns = pd.MultiIndex.from_tuples(tuple(zip(['收盤價']*len(df.columns), df.columns)))

In [54]:
symbols = ['2330']
len({s: 1 for s in symbols})

1

## 會用到的資料列表

In [None]:
['本益比(近四季)', '股價淨值比', '成交金額(千)', '還原收盤價', 'EPS_Q', '營業收入淨額_Q', '殖利率', '收盤價_指數']
# 需要計算
['MA60', 'MA120']
['PE閥值', 'PB閥值', '殖利率閥值']
# EPS與營業收入淨額沒有顯著作用
# 在db['Index']
['TWA00', '指數彙編', '指數名稱轉換']

# 原策略
self.data['long'] = (self.data['本益比(近四季)'] < param['l1'])&\
    (self.data['股價淨值比'] < param['l2'])&\
    (self.data['成交金額(千)'].ge(self.data['成交金額(千)'].median(axis=1), axis=0))&\
    (self.data['還原收盤價'] > self.data['MA60'])&\
    (self.data['還原收盤價'] > self.data['MA120'])&\
    (self.data['EPS_daily'] > param['l3'])&\
    (self.data['營業收入淨額_daily'] > param['l4'])&\
    (self.data['殖利率'] > param['l5'])
self.data['long'] = self.data['long'].shift(1)

self.data['short'] = (self.data['本益比(近四季)'] > param['s1'])&\
    (self.data['股價淨值比'] > param['s2'])&\
    (self.data['成交金額(千)'].ge(self.data['成交金額(千)'].median(axis=1), axis=0))&\
    (self.data['還原收盤價'] < self.data['MA60'])&\
    (self.data['還原收盤價'] < self.data['MA120'])&\
    (self.data['EPS_daily'] < param['s3'])&\
    (self.data['營業收入淨額_daily'] < param['s4'])&\
    (self.data['殖利率'] < param['s1'])
self.data['short'] = self.data['short'].shift(1)

### load data