In [1]:
import asyncio
import nest_asyncio
import motor.motor_asyncio
from datetime import date, datetime, timedelta
from collections import defaultdict
import pprint
import sys
import pandas as pd
import numpy as np
import ta
import os
import matplotlib.pyplot as plt
import talib
from ipywidgets import IntProgress
from IPython.display import display

nest_asyncio.apply()
%matplotlib inline

sys.path.insert(0, os.path.abspath('../'))
import abupy
from abupy.UtilBu.ABuDateUtil import str_to_datetime

NumExpr defaulting to 8 threads.


In [2]:
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
db = client['symbol-db']
loop = asyncio.get_event_loop()

In [3]:
async def do_find_symbol_correlation(symbol, S_endDate, S_startDate):
    pipeline = [
        {"$match":{ "$expr": {"$and": [
            { "$lte" : ["$date", S_endDate ]},
            { "$gte" : ["$date", S_startDate ]},
            { "$eq" : ["$symbol", symbol]}
            ]}
        }},
        {"$project": {"_id": 0 } } # no _id to next pipeline
    ]
    collect = db['StockHisData'].aggregate(pipeline, allowDiskUse=True)
    results = []
    async for cursor in collect: # clean cursor ptr and store to list
        results.append(cursor)
    df = pd.DataFrame(results)
    try:
        df = df.set_index('date').dropna().sort_index()
        df = df.drop(columns=['symbol'])
    except Exception as e:
        pass
    return df

In [4]:
async def do_find_trader_correlation(trader, symbol, B_endDate, B_startDate):    
    pipeline = [
        {"$match": { "$expr": {"$and": [
                        { "$lte": ["$date", B_endDate ] },
                        { "$gte": ["$date", B_startDate ] },
                        { "$eq": ["$symbol", symbol]}
                        ]}
                   }},

        { "$addFields": {
            "ctrader": 
                { "$filter": {
                    "input": "$traders",
                    "as": "ttrader",
                    "cond": { "$eq" : ["$$ttrader.tdname", trader]}
                }}
        }},
        
        { "$addFields": {
            "put": 
                { "$map": {
                    "input": "$ctrader",
                    "as": "ttrader",
                    "in": "$$ttrader.put"
                }},
            "call": 
                { "$map": {
                    "input": "$ctrader",
                    "as": "ttrader",
                    "in": "$$ttrader.call"
                }},
            "weight": 
                { "$map": {
                    "input": "$ctrader",
                    "as": "ttrader",
                    "in": "$$ttrader.weight"
                }},
            
        }},
        
        { "$project": { 
            "_id": 0,
            "put": { "$cond": [{"$size": "$put" }, { "$arrayElemAt": [ "$put", -1 ] }, 0 ] },
            "call": { "$cond": [{"$size": "$put" }, { "$arrayElemAt": [ "$call", -1 ] }, 0 ] },
            "weight": { "$cond": [{"$size": "$weight" }, { "$arrayElemAt": [ "$weight", -1 ] }, 0 ] },
            "date": "$date"
        }}
    ]
    
    collect = db['StockHisTrader'].aggregate(pipeline,allowDiskUse=True)
    results = []
    async for cursor in collect: # clean cursor ptr and store to list
        results.append(cursor)
    df = pd.DataFrame(results)
    try:
        df = df.set_index('date').dropna().sort_index()
    except Exception as e:
        pass
    return df

In [5]:
async def do_find_margin_correlation(symbol, M_endDate, M_startDate):
    pipeline = [
        {"$match":{ "$expr": {"$and": [
            { "$lte" : ["$date", M_endDate ]},
            { "$gte" : ["$date", M_startDate ]},
            { "$eq" : ["$symbol", symbol]}
            ]}
        }},
        {"$project": {"_id": 0,
                "MCall": "$MagPurchBalance",
                "SPut": "$ShortSaleBalance",
                "SDivM": "$ShortDivMagRatio",
                "date": "$date"
        } } # no _id to next pipeline
    ]
    collect = db['StockHisMargin'].aggregate(pipeline, allowDiskUse=True)
    results = []
    async for cursor in collect: # clean cursor ptr and store to list
        results.append(cursor)
    df = pd.DataFrame(results)
    try:
        df = df.set_index('date').dropna().sort_index()
    except Exception as e:
        pass
    return df

In [6]:
async def do_find_Institution_correlation(symbol, I_endDate, I_startDate):
    pipeline = [
        {"$match":{ "$expr": {"$and": [
            { "$lte" : ["$date", I_endDate ]},
            { "$gte" : ["$date", I_startDate ]},
            { "$eq" : ["$symbol", symbol]}
            ]}
        }},
        {"$project": {"_id": 0,
                "ForV": "$ForeignInvestor",
                "InvV": "$InvestmentTrust",
                "DelV": "$DealerSelf",
                "date": "$date"
        } } # no _id to next pipeline
    ]
    collect = db['StockInstitutionInvest'].aggregate(pipeline, allowDiskUse=True)
    results = []
    async for cursor in collect: # clean cursor ptr and store to list
        results.append(cursor)
    df = pd.DataFrame(results)
    try:
        df = df.set_index('date').dropna().sort_index()
    except Exception as e:
        pass
    return df

In [7]:
symbol = '6756'
yy, mm, dd = date.today().isoformat().split("-")[0:3]
cdate = "{0}-{1}-{2}".format(yy, mm, dd)
#cdate = '2021-06-18'
today = str_to_datetime(cdate)

TEST_MODE = True
B_endDate = today
B_startDate = B_endDate - timedelta(days=60)
S_endDate = today
S_startDate = S_endDate - timedelta(days=120)
M_endDate = today
M_startDate = M_endDate - timedelta(days=30) 
I_endDate = today
I_startDate = I_endDate - timedelta(days=30)

In [8]:
# filter rank[:10], 全市場,買家對個股成交比重 
async def do_find_max_calls_weight_symbol():
    # test find max count of call symbols at sample time window
    pipeline = [
        {"$match": { "date": { "$lte": B_endDate} } },
        {"$match": { "date": { "$gte": B_startDate} } },
        {"$unwind": "$traders" },
        
        {"$group": { "_id": { "tdname": "$traders.tdname", "symbol": "$symbol"}, 
                    "sum_puts":  { "$sum": "$traders.put" }, 
                    "sum_calls": { "$sum": "$traders.call" },
                    "sum_weights": { "$sum": "$traders.weight" },
                    "max_weight": { "$max": "$traders.weight" },
                    "sample_days": { "$sum": 1}
                   }  },
        
        {"$project": {"_id": 0,
                      "avg_weights": {"$round" : [{"$divide": ["$sum_weights", "$sample_days"]}, 1]}, 
                      "max_weight": "$max_weight",
                      "sample_days": "$sample_days",
                      "sum_net_puts": {"$subtract": ["$sum_puts", "$sum_calls"]}, 
                      "sum_net_calls": {"$subtract": ["$sum_calls", "$sum_puts"]},                    
                      "sum_puts": "$sum_puts", 
                      "sum_calls": "$sum_calls",
                      "sum_all": {"$add": ["$sum_puts", "$sum_calls"]},
                      "symbol": "$_id.symbol", "tdname": "$_id.tdname"
                     } },
             
        {"$sort": { "sum_net_calls": -1, "max_weight": -1, "sample_days": -1,  "sum_calls": -1, "sum_puts": 1 } },
        #{"$limit": 100 }
        ]
    collect = db['StockHisTrader'].aggregate(pipeline, allowDiskUse=True)
    results = []
    async for cursor in collect: # clean cursor ptr and store to list
        results.append(cursor)
    df = pd.DataFrame(results)
    return df

In [9]:
df = loop.run_until_complete(do_find_max_calls_weight_symbol())     
df = df[df['symbol'] == symbol]
df[:30]

Unnamed: 0,avg_weights,max_weight,sample_days,sum_net_puts,sum_net_calls,sum_puts,sum_calls,sum_all,symbol,tdname
6041,5.4,14.49,10,-566.0,566.0,26.0,592.0,618.0,6756,第一金-台中
6255,5.4,10.85,14,-548.0,548.0,598.0,1146.0,1744.0,6756,統一
7430,3.0,6.82,9,-470.0,470.0,193.0,663.0,856.0,6756,永豐金證券
8473,4.6,9.84,4,-407.0,407.0,7.0,414.0,421.0,6756,群益金鼎-忠孝
10843,3.6,7.14,5,-310.0,310.0,8.0,318.0,326.0,6756,元大-信義
...,...,...,...,...,...,...,...,...,...,...
55635,1.5,2.01,10,-44.0,44.0,0.0,44.0,44.0,6756,大慶-蘆洲
56211,2.5,4.12,5,-43.0,43.0,24.0,67.0,91.0,6756,元大-西門
57156,1.6,1.92,3,-42.0,42.0,12.0,54.0,66.0,6756,大展
57361,0.9,0.92,1,-42.0,42.0,1.0,43.0,44.0,6756,土銀


In [10]:
df = loop.run_until_complete(do_find_max_calls_weight_symbol())     
df = df[df['symbol'] == symbol]
df[:30]

Unnamed: 0,avg_weights,max_weight,sample_days,sum_net_puts,sum_net_calls,sum_puts,sum_calls,sum_all,symbol,tdname
6041,5.4,14.49,10,-566.0,566.0,26.0,592.0,618.0,6756,第一金-台中
6255,5.4,10.85,14,-548.0,548.0,598.0,1146.0,1744.0,6756,統一
7430,3.0,6.82,9,-470.0,470.0,193.0,663.0,856.0,6756,永豐金證券
8473,4.6,9.84,4,-407.0,407.0,7.0,414.0,421.0,6756,群益金鼎-忠孝
10843,3.6,7.14,5,-310.0,310.0,8.0,318.0,326.0,6756,元大-信義
...,...,...,...,...,...,...,...,...,...,...
55635,1.5,2.01,10,-44.0,44.0,0.0,44.0,44.0,6756,大慶-蘆洲
56211,2.5,4.12,5,-43.0,43.0,24.0,67.0,91.0,6756,元大-西門
57156,1.6,1.92,3,-42.0,42.0,12.0,54.0,66.0,6756,大展
57361,0.9,0.92,1,-42.0,42.0,1.0,43.0,44.0,6756,土銀


In [11]:
async def get_symbol_top_traders(symbol, startD, endD):
    pipeline = [
        # add lookup hisstock filter min/max close and margin rate, total_calls/total_volume = total_weight
        {"$match":{ "$expr": {"$and": [
                { "$lte": ["$date", endD ] },
                { "$gte": ["$date", startD ] },
                { "$eq": ["$symbol", symbol ] }
            ]}}},
        {"$unwind": "$traders" },
        {"$group": { "_id": { "tdname": "$traders.tdname", "symbol": "$symbol"}, 
                    "sum_puts": { "$sum": "$traders.put" }, 
                    "sum_calls": { "$sum": "$traders.call" },
                    "sample_days": { "$sum": 1 }
                   }  },
        {"$project": {"_id": 0,
                      "sample_days": "$sample_days",
                      "sum_net_puts": {"$subtract": ["$sum_puts", "$sum_calls"]}, 
                      "sum_net_calls": {"$subtract": ["$sum_calls", "$sum_puts"]},
                      "sum_all": {"$add": ["$sum_puts", "$sum_calls"]},
                      "symbol": "$_id.symbol", "tdname": "$_id.tdname"
                     } },
        {"$sort": { "sum_net_calls": -1, "sum_net_puts": 1 } },
        {"$limit": 10 }
        ]
    collect = db['StockHisTrader'].aggregate(pipeline)  
    tops = []
    async for cursor in collect: # 
        tops.append(cursor['tdname'])
    return tops

In [12]:
top_calls= loop.run_until_complete(get_symbol_top_traders(symbol, B_startDate, B_endDate))
top_calls[:7]

['第一金-台中', '統一', '永豐金證券', '群益金鼎-忠孝', '元大-信義', '統一-內湖', '台新證券']

In [13]:
async def get_symbol_top_trader_df(symbol, tdname, startD, endD):
    pipeline = [
        # add lookup hisstock filter min/max close and margin rate, total_calls/total_volume = total_weight
        {"$match":{ "$expr": {"$and": [
                { "$lte": ["$date", endD ] },
                { "$gte": ["$date", startD ] },
                { "$eq": ["$symbol", symbol ] }
            ]}}},
        {"$unwind": "$traders" },
                {"$match":{ "$expr": {"$and": [
                { "$eq": ["$traders.tdname", tdname ] }
            ]}}},
        {"$project" : { "_id": 0, "date": "$date", "put": "$traders.put", "call": "$traders.call" } }
        ]
    collect = db['StockHisTrader'].aggregate(pipeline)  
    items = await collect.to_list(100)
    df = pd.DataFrame(items)
    df = df.set_index('date')
    return df

In [14]:
tdfs = []
for i, top_call in enumerate(top_calls[:7]):
    tdf = loop.run_until_complete(get_symbol_top_trader_df(symbol, top_call, B_startDate, B_endDate))
    tdf = tdf.rename(columns={"put": "Top{0}_put".format(i), "call": "Top{0}_call".format(i)})
    tdfs.append(tdf)
    
df2 = pd.concat(tdfs, axis=1).fillna(0)
df2

Unnamed: 0_level_0,Top0_put,Top0_call,Top1_put,Top1_call,Top2_put,Top2_call,Top3_put,Top3_call,Top4_put,Top4_call,Top5_put,Top5_call,Top6_put,Top6_call
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
2021-05-19,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,8.0
2021-05-20,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4.0,14.0
2021-05-21,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,16.0,5.0
2021-05-24,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,12.0,0.0,4.0,1.0
2021-05-25,2.0,22.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.0,0.0,5.0,9.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-07-01,0.0,0.0,0.0,0.0,0.0,0.0,0.0,30.0,0.0,0.0,0.0,15.0,2.0,9.0
2021-07-05,0.0,0.0,21.0,56.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,30.0,0.0,0.0
2021-07-06,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,4.0
2021-07-07,0.0,0.0,0.0,23.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.0,13.0


In [15]:
async def do_find_top_trader_correlation(symbol, B_endDate, B_startDate):
    top_calls= loop.run_until_complete(get_symbol_top_traders(symbol, B_startDate, B_endDate))
    tdfs = []
    for i, top_call in enumerate(top_calls[:7]):
        tdf = loop.run_until_complete(get_symbol_top_trader_df(symbol, top_call, B_startDate, B_endDate))
        tdf = tdf.rename(columns={"put": "Top{0}_put".format(i), "call": "Top{0}_call".format(i)})
        tdfs.append(tdf)
    
    df2 = pd.concat(tdfs, axis=1).fillna(0)
    return df2

In [16]:
import pandas as pd
from ta.utils import dropna
from ta.volatility import BollingerBands
from ta.trend import EMAIndicator

In [17]:
def wrap_ta_bband(df): 
    indicator_bb = BollingerBands(close=df["C"], window=20, window_dev=2)
    # Add Bollinger Bands features
    df['bb_bbm'] = indicator_bb.bollinger_mavg()
    df['bb_bbh'] = indicator_bb.bollinger_hband()
    df['bb_bbl'] = indicator_bb.bollinger_lband()
    return df

def wrap_ta_ema(df):
    ma_5 = EMAIndicator(close=df["C"], window=5)
    ma_10= EMAIndicator(close=df["C"], window=10)
    ma_20= EMAIndicator(close=df["C"], window=20)
    df['ma_5']  = ma_5.ema_indicator()
    df['ma_10'] = ma_10.ema_indicator()
    df['ma_20'] = ma_20.ema_indicator()
    return df

In [54]:
def scan_up_trend_strategy0(symbol):
    """
    constraint:
    ex: 
    [D0(t-1), D1(t), D2(t+1), D3(t+2)]
    [D0(t-2), D1(t-1),  D2(跳空漲), D3(t+1)]
    
    # 1.filter 每個F[0:4].close.all() 不能低於 MA[5] 
    # 2 filter MA[5] > MA[10] 
    """
    df0 = loop.run_until_complete(do_find_symbol_correlation(symbol, S_endDate, S_startDate))
    df1 = loop.run_until_complete(do_find_top_trader_correlation(symbol, B_endDate, B_startDate))
    df2 = loop.run_until_complete(do_find_margin_correlation(symbol, M_endDate, M_startDate))
    df3 = loop.run_until_complete(do_find_Institution_correlation(symbol, I_endDate, I_startDate))

    #MA(5), MA(10), MA(20), BBANDH, BBANDM, BBANDL
    #BBANDH(趨勢下) BBANDL(趨勢上) 壓縮通道 -> 起漲點
    df0 = wrap_ta_bband(df0)
    df0 = wrap_ta_ema(df0)
    dfx = pd.concat([df0, df1, df2, df3],  axis=1, join="outer")
    dfx['prec'] = dfx['C'].shift()
    dfx['diff'] = dfx['O'] / dfx['prec']
    
    if TEST_MODE:
        train, test = dfx.iloc[:-5], dfx.iloc[-5:]
        warmup, dfx = train.iloc[:-4], train.iloc[-4:]
    else:
        warmup, dfx = dfx.iloc[:-4], dfx.iloc[-4:]
        
    cnds = [
        (dfx['C'] > dfx['ma_5']* 0.98).all(),
        (dfx['ma_5'] > dfx['ma_10']).all(),
        (dfx['C'] > dfx['bb_bbh'] * 0.98).all(),
        (dfx['C'] > dfx['O']).sum() >= 3,
        (dfx['C'] > dfx['O'] * 1.05).any(),
        (dfx['C'] < dfx['O'] * 0.96).sum() == 0
        (dfx['C'] >= C_MIN).all(),
        (dfx['C'] <= C_MAX).all()
    ]
        
    if sum([cnd for cnd in cnds]) == len(cnds):
        dfx = dfx.iloc[[-1]]
        dfx.insert(loc=0, column='symbol', value=[symbol])
        if TEST_MODE:
            dfx.insert(loc=1, column='Test_E', value=[test['C'].max() - dfx.iloc[-1]['C']])
        return dfx
    return pd.DataFrame()  


In [55]:
symbol = '6756'
yy, mm, dd = date.today().isoformat().split("-")[0:3]
cdate = "{0}-{1}-{2}".format(yy, mm, dd)
cdate = '2021-07-01'
today = str_to_datetime(cdate)

TEST_MODE = True
C_MIN, C_MAX = 10, 1000 #close Min:Max

B_endDate = today
B_startDate = B_endDate - timedelta(days=60)
S_endDate = today
S_startDate = S_endDate - timedelta(days=120)
M_endDate = today
M_startDate = M_endDate - timedelta(days=30) 
I_endDate = today
I_startDate = I_endDate - timedelta(days=30)


In [56]:
symbols = abupy.ABuMarket.all_symbol()

In [57]:
from ipywidgets import IntProgress
from IPython.display import display

#symbols = ['2465']
#symbols = symbols[:100]
p0 = IntProgress(max=len(symbols))
p0.description = "[scan] {} up-trend strategy0".format(S_endDate)
display(p0)

IntProgress(value=0, description='[scan] 2021-07-01 00:00:00 up-trend strategy0', max=2260)

In [58]:
# run trend strategy2
dfups = pd.DataFrame()
for i, symbol in enumerate(symbols):
    try:
        p0.value = i
        dfup = scan_up_trend_strategy0(symbol)
    except Exception as e:
        #print (e)
        continue
        
    dfups = dfups.append(dfup).drop_duplicates()

In [59]:
dfups = dfups.sort_values(by=['date'], ascending=False)
dfups[:20]

KeyError: 'date'