In [1]:
# !pip install yfinance -q 
import yfinance as yf 
import pandas as pd
import requests
from tqdm import tqdm
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score
from sklearn.ensemble import RandomForestClassifier

In [2]:
def screener_tickers():
    '''
    以yahoo finance推薦的篩選條件做為為選股範圍
    '''
    urls = [('https://finance.yahoo.com/screener/predefined/undervalued_growth_stocks?offset=0&count=100'),
            ('https://finance.yahoo.com/screener/predefined/growth_technology_stocks?offset=0&count=100'),
            ('https://finance.yahoo.com/screener/predefined/day_gainers?offset=0&count=100'),
            ('https://finance.yahoo.com/screener/predefined/most_actives?offset=0&count=100'),
            ('https://finance.yahoo.com/screener/predefined/undervalued_large_caps?offset=0&count=100'),
            ('https://finance.yahoo.com/screener/predefined/aggressive_small_caps?offset=0&count=100'),
            ('https://finance.yahoo.com/screener/predefined/small_cap_gainers?offset=0&count=100')]
    
    screener_list = []
    for url in urls:
        soup = BeautifulSoup(requests.get(url).content)
        res = soup.find_all('a', {'class':'Fw(600) C($linkColor)'})
        data =  [res[i].string for i in range(len(res))]
        screener_list += data
    
    return list(set(screener_list))


def watchlist():
    '''
    以S&P500成份股 + ARK ETF前30大持股為篩選範圍
    '''
    web = requests.get('https://www.slickcharts.com/sp500',
                    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36'}).text
    sp500 = pd.read_html(web)[0]['Symbol'].apply(lambda x: x.replace('.', '-')).to_list()
    ark = pd.read_html('https://cathiesark.com/ark-funds-combined/complete-holdings')[0]['Ticker'][1:31].to_list()
    res = list(set(sp500 + ark + screener_tickers()))  
    return res

sids = []
def algo(strategy = 'new'):
    '''
    old條件比較龜毛 vs new條件較簡潔
    '''
    if strategy == 'old':
        for i in tqdm(watchlist()):
            try:
                df = yf.Ticker(i).history(period = '2mo').iloc[:,:5]
                """
                1. 開盤價跳空2%以上
                2. 近10天有2天以上跳空2%
                3. 當天實體K棒漲幅3%以上
                4. 近10天有2天以上出現過漲3%的實體K棒
                5. 連續3天高過昨高且低不破昨低
                6. 近10天有6天高過昨高且低不破昨低
                7. 最新收盤價距離近10日新高不到3%
                8. 當天成交量是過去10日的1.3倍
                """
                cond1 = df["Open"] > df["Close"].shift() * 1.02
                cond2 = cond1.rolling(10).sum() >= 2
                cond3 = df["Close"] > df["Open"] * 1.03
                cond4 = cond3.rolling(10).sum() >= 2
                cond5 = ((df["High"] > df["High"].shift()) & (df["Low"] > df["Low"].shift())).rolling(3).sum() >= 3
                cond6 = ((df["High"] > df["High"].shift()) & (df["Low"] > df["Low"].shift())).rolling(10).sum() >= 6
                cond7 = df["Close"] * 1.03 > df["Close"].rolling(10).max()
                cond8 = df["Volume"] > df["Volume"].rolling(10).mean() * 1.3

                buy = pd.concat([cond1, cond2, cond3, cond4, cond5, cond6, cond7, cond8], axis=1)
                if (buy[-1:].sum(axis=1) >= 6).values[0]:
                    sids.append(i)

            except Exception as e:
                continue

    elif strategy == 'new':
        for i in tqdm(watchlist()):
            try:
                df = yf.Ticker(i).history(period = '2mo').iloc[:,:5]
                """
                1. 最新收盤價距離近月新高不到3%
                2. 當天紅K棒是過去10日平均實體K棒的2.5倍
                3. 當天成交量是過去5日的1.3倍
                4. 過去5天出現過5ma向上穿越10ma
                """
                cond1 = df["Close"] * 1.03 > df["High"].rolling(22).max()
                cond2 = (df["Close"] - df["Open"]) > abs(df["Open"] - df["Close"]).rolling(10).mean() * 2.5
                cond3 = df["Volume"] > df["Volume"].rolling(5).mean() * 1.3
                duo_ma = df["Close"].rolling(5).mean() >= df["Close"].rolling(10).mean()
                cond4 = (duo_ma == True) & ((duo_ma != duo_ma.shift()).rolling(5).sum() == 1)

                buy = pd.concat([cond1, cond2, cond3, cond4], axis=1)
                if (buy[-1:].sum(axis=1) >= 3).values[0]:
                    sids.append(i)
        
            except Exception as e:
                continue

    return sids

algo('new')

100%|██████████| 532/532 [05:14<00:00,  1.69it/s]


['O', 'FTV', 'BAX']

In [3]:
def roc_auc(sid, lookback_period):
    '''
    4個選股條件即為特徵值x
    Y籤標為隔天能否漲贏 Nasdaq (ticker: ^IXIC)
    '''
    df = yf.Ticker(sid).history(period = lookback_period).iloc[:,:5]
    cond1 = (df['Close'] * 1.03 > df['High'].rolling(22).max())
    cond2 = ((df['Close'] - df['Open']) > abs(df['Open'] - df['Close']).rolling(10).mean() * 2.5)
    cond3 = (df['Volume'] > df['Volume'].rolling(5).mean() * 1.3)
    duo_ma = df['Close'].rolling(5).mean() >= df['Close'].rolling(10).mean()
    cond4 = ((duo_ma == True) & ((duo_ma != duo_ma.shift()).rolling(5).sum() == 1))
    Y = df['Close'].pct_change().shift(-1) > yf.Ticker('^IXIC').history(period = lookback_period).iloc[:,3].pct_change().shift(-1)
    data = pd.concat([cond1, cond2, cond3, cond4, Y], axis = 1)
    data.columns = ['x1','x2','x3','x4','Y']
    
    # 已經調用過未來一天的預測，故需在原資料刪除
    data = data[:-1]
    return data


# 從候選股中擇一做ROC/AUC檢定，回測過去30個月
data = roc_auc('FTV','30mo')

# 訓練集為回測區間的前80%，測試集為回測區間的後20%
train_data, test_data = data[ :int(len(data) *0.8)], data[int(len(data) *0.8): ]

train_x = train_data.loc[:,'x1':'x4'].values
train_y = train_data['Y'].values
test_x = test_data.loc[:,'x1':'x4'].values
test_y = test_data['Y'].values

clf = RandomForestClassifier(n_estimators = 21,
                             max_depth = 10,
                             criterion = 'entropy')

model = clf.fit(train_x,train_y)  
train_acc = accuracy_score(train_y, model.predict(train_x))
test_acc = accuracy_score(test_y, model.predict(test_x))
test_roc_auc = roc_auc_score(test_y, model.predict( test_x))

result = pd.DataFrame(
    [ (train_acc, test_acc, test_roc_auc) ],
    columns= ['訓練集準度','測試集準度','測試集ROC_AUC'],
    )

result

Unnamed: 0,訓練集準度,測試集準度,測試集ROC_AUC
0,0.553571,0.484127,0.479587
