In [2]:
from statsmodels.tsa.stattools import coint as st_coint
import numpy as np
import tushare as ts
import talib as ta
import multiprocessing as mp

  from pandas.core import datetools


In [3]:
RQ = False
COINT_TYPE = ["c", "ct", "ctt", "nc"]
TICKER_1 = "601398.xshg"
TICKER_2 = "601318.xshg"
START_DATE = "2017-01-01"
END_DATE = "2018-01-01"
FREQUENCY = "1d"

In [None]:
while True:
    try:
        basics = ts.get_stock_basics()
        break
    except:
        pass

stock_lst = basics[basics["industry"] == "互联网"].index.tolist()

arg_lst = []
for idx, t1 in enumerate(stock_lst):
    for t2 in stock_lst[idx:]:
        if t1 == t2: continue
        arg_lst.append((t1+".xshg", t2+".xshg", START_DATE, END_DATE, FREQUENCY, (50, 1, -1, 0.1, -0.1)))

#pool = mp.Pool(8)
#rst = pool.map(multi_test_pairs, arg_lst)
#pool.close()
#pool.join()

rst = [multi_test_pairs(i) for i in arg_lst]
rst.sort(key=lambda x: x[2], reverse=True)
rst

In [5]:
test_pairs(TICKER_1, TICKER_2, START_DATE, END_DATE, FREQUENCY, (50, 1, -1, 0.1, -0.1))

Testing 601398.xshg 601318.xshg




('601398.xshg', '601318.xshg', 1.5453993551264418)

In [4]:
# ticker type follows RQ type

if RQ:
    def my_get_price(ticker_lst, start_date, end_date, frequency):
        return get_price(ticker_lst, start_date=start_date, end_date=end_date, fields=["close"], frequency=frequency).values
else:
    def my_get_price(ticker_lst, start_date, end_date, frequency):
        return np.vstack((
            ts.get_k_data(ticker_lst[0][:-5], start=start_date, end=end_date, ktype="D")["close"].values, 
            ts.get_k_data(ticker_lst[1][:-5], start=start_date, end=end_date, ktype="D")["close"].values
        )).T

    
def diff_1(ar):
    return np.hstack((np.array(np.nan), np.diff(ar)))


def gen_coint_ar(price_ar, coint_period, coint_type=COINT_TYPE, alpha=0.05):
    tmp_ar = np.ones((len(price_ar), 1)) * np.nan
    
    for idx in range(len(price_ar)):
        if idx < coint_period:
            continue
        
        for cidx, ctype in enumerate(coint_type):
            _, pvalue, _ = st_coint(
                price_ar[idx - coint_period : idx, 0], 
                price_ar[idx - coint_period : idx, 1],
                ctype,
            )
            
            if pvalue <= alpha:
                tmp_ar[idx] = 1
                break
            elif cidx == len(COINT_TYPE) - 1:
                tmp_ar[idx] = 0
    return tmp_ar
        

def gen_delta_ar(price_ar, coint_period):
    dt_ar = np.ones((len(price_ar), 1)) * np.nan
    resid_ar = np.ones((len(price_ar), 1)) * np.nan
    
    delta_y1 = diff_1(price_ar[:, 0])
    delta_y2 = diff_1(price_ar[:, 1])
    
    for idx in range(len(price_ar)):
        if idx < coint_period:
            continue
        
        this_delta_y1 = delta_y1[idx - coint_period : idx]
        this_delta_y2 = delta_y2[idx - coint_period : idx]
    
        # regress using matrix
        A = np.vstack((np.ones_like(this_delta_y1), this_delta_y1)).T
        b = this_delta_y2.T
        x = np.linalg.inv((A.T.dot(A))).dot(A.T).dot(b)
        
        dt_ar[idx] = x[1]
        resid_ar[idx] = delta_y2[idx] - x[0] * 1 - x[1] * delta_y1[idx]
        

    return dt_ar, resid_ar


def cast_to_trading_position(price_ar, args):
    coint_period, upper_in_bar_rate, lower_in_bar_rate, upper_close_bar_rate, lower_close_bar_rate = args
    
    can_trade = np.zeros((len(price_ar), 1))
    trading_signal = np.zeros_like(price_ar)
    
    # test coint
    coint_ar = gen_coint_ar(price_ar, coint_period, COINT_TYPE)
    
    # regression to find delta
    delta_ar, resid_ar = gen_delta_ar(price_ar, coint_period)
    delta_ar = np.nan_to_num(delta_ar)
    resid_ar.shape = (len(resid_ar), )
    std_ar = ta.STDDEV(resid_ar, timeperiod=5, nbdev=1)
    
    upper_in_bar = std_ar * upper_in_bar_rate
    lower_in_bar = std_ar * lower_in_bar_rate
    
    upper_close_bar = std_ar * upper_close_bar_rate
    lower_close_bar = std_ar * lower_close_bar_rate
    
    t_in_upper_bar = resid_ar < upper_in_bar
    t_in_lower_bar = resid_ar > lower_in_bar
    
    signal_ar = np.ones((len(price_ar), 1)) * np.nan
    
    for i in range(len(t_in_upper_bar)):
        if i == 0: continue
        
        if resid_ar[i] > upper_in_bar[i] and resid_ar[i - 1] <= upper_in_bar[i - 1]: # sell open
            signal_ar[i] = -1
        elif resid_ar[i] > upper_close_bar[i] and resid_ar[i - 1] <= upper_close_bar[i - 1]: # sell close
            signal_ar[i] = 0
        elif resid_ar[i] > lower_in_bar[i] and resid_ar[i - 1] <= lower_in_bar[i - 1]: # buy open
            signal_ar[i] = 1
        elif resid_ar[i] > lower_close_bar[i] and resid_ar[i - 1] <= lower_close_bar[i - 1]: # buy close
            signal_ar[i] = 0
        else: # do nothing
            signal_ar[i] = signal_ar[i - 1]
    
    signal_ar = np.nan_to_num((signal_ar * coint_ar))
        
    return np.hstack((signal_ar, -signal_ar * delta_ar)) / (1 + np.abs(delta_ar))


def test_pairs(ticker_1:str, ticker_2:str, start_date:str, end_date:str, frequency:str, args:tuple):
    print("Testing", ticker_1, ticker_2)
    try:
        price_ar = my_get_price(
            [ticker_1, ticker_2], 
            start_date=start_date, 
            end_date=end_date, 
            frequency=frequency 
        )
        position = cast_to_trading_position(price_ar, args)
    except:
        return (ticker_1, ticker_2, 0.0)
    return_multiplier = np.vstack((diff_1(price_ar[:, 0]), diff_1(price_ar[:, 1]))).T + 1
    return_multiplier = np.nan_to_num(return_multiplier)
    return (ticker_1, ticker_2, return_multiplier.T.dot(position).trace())


def multi_test_pairs(args):
    return test_pairs(*args)


In [6]:
ar = np.array([1,2])

In [8]:
dir(ar)

['T',
 '__abs__',
 '__add__',
 '__and__',
 '__array__',
 '__array_finalize__',
 '__array_interface__',
 '__array_prepare__',
 '__array_priority__',
 '__array_struct__',
 '__array_ufunc__',
 '__array_wrap__',
 '__bool__',
 '__class__',
 '__complex__',
 '__contains__',
 '__copy__',
 '__deepcopy__',
 '__delattr__',
 '__delitem__',
 '__dir__',
 '__divmod__',
 '__doc__',
 '__eq__',
 '__float__',
 '__floordiv__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__iadd__',
 '__iand__',
 '__ifloordiv__',
 '__ilshift__',
 '__imatmul__',
 '__imod__',
 '__imul__',
 '__index__',
 '__init__',
 '__init_subclass__',
 '__int__',
 '__invert__',
 '__ior__',
 '__ipow__',
 '__irshift__',
 '__isub__',
 '__iter__',
 '__itruediv__',
 '__ixor__',
 '__le__',
 '__len__',
 '__lshift__',
 '__lt__',
 '__matmul__',
 '__mod__',
 '__mul__',
 '__ne__',
 '__neg__',
 '__new__',
 '__or__',
 '__pos__',
 '__pow__',
 '__radd__',
 '__rand__',
 '__rdivmod__',
 '__reduce__',
 '__reduce_e

In [None]:
import multiprocessing
import time

def say(msg):
    print(msg)
    time.sleep(3)
    print("end")

pool = multiprocessing.Pool(3)
for i in range(3):
    pool.apply_async(say, args=(i,))
pool.close()
pool.join()