In [1]:
from tuneta.tune_ta import TuneTA
import pandas as pd
import numpy as np

import talib

PATH_READ_TRAIN = '123181_train_raw.csv'
PATH_READ_TEST = '123181_test_raw.csv'

df = pd.read_csv(PATH_READ_TRAIN, index_col='Timestamp')
df_test = pd.read_csv(PATH_READ_TEST, index_col='Timestamp')

df = df.drop(['InstrumentID','TradingDay','PreClosePrice',], axis=1)
df.index = pd.to_datetime(df.index)

df['Avg'] = talib.AVGPRICE(df['Open'], df['High'], df['Low'], df['Close'])
df_test['Avg'] = talib.AVGPRICE(df_test['Open'], df_test['High'], df_test['Low'], df_test['Close'])


print(df.columns)

Index(['Open', 'High', 'Low', 'Volume', 'Turnover', 'Close', 'Avg'], dtype='object')


In [2]:
def target(df, column, period):
    if period == 1:
        return (df[column].shift(-1) - df[column])/df[column]
    if period > 1:
        return (df[column].rolling(window=period).mean().shift(-1 * period) - df[column]) / df[column]

In [3]:
def class_target(value):
    if value > 0:
        return 1
    elif value < 0:
        return -1
    else:
        return 0

In [4]:
def TBtarget(df):
    df['15min_range'] = df['High'].rolling(15).max() - df['Low'].rolling(15).min()

    def getDailyVol(data, span=30):
        df = data.assign(Return = lambda x: data['Close'] / data['Close'].shift(1)-1)
        sigma = df['Return'].ewm(span=span).std()
        return sigma

    vol = getDailyVol(data=df)
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])

    events = df[['Timestamp']].copy(deep=True)
    events['VB'] = df['Timestamp'] + pd.Timedelta(minutes=15)
    events['Vol'] = vol

    def TBL(df, events, width):
        
        res = events[['Timestamp', 'VB']].copy(deep=True)
        
        if width[0] > 0: events['UB'] = width[0]*events['Vol']
        else: events['UB'] = np.nan
            
        if width[1] > 0: events['DB'] = -width[1]*events['Vol']
        else: events['DB'] = np.nan
            
        for col,date,vb in res.itertuples():
            df0 = df[(df['Timestamp'] > date) & (df['Timestamp'] < vb)].copy(deep=True)
            df0['Return'] = df0['Close'] / df.loc[df['Timestamp'] == date, 'Close'].iloc[0]-1
            
            idx = (res['Timestamp'] == date)
            
            res.loc[idx, 'ut'] = df0.loc[df0['Return'] > events.loc[idx,'UB'].iloc[0], 'Timestamp'].min()
            res.loc[idx, 'dt'] = df0.loc[df0['Return'] < events.loc[idx,'DB'].iloc[0], 'Timestamp'].min()
                
        return res
    def get_first_touch(df, events, width):
        res = TBL(df, events, width)
        res['First'] = res[['VB', 'ut', 'dt']].dropna(how='all').min(axis=1)
        return res

    # 上下栅栏的宽度由 vol(span) * 系数[up,down] 决定
    result = get_first_touch(df,events,width = [0.4,0.4])

    print(result.head)

    def get_label(df,result):
        result = result.dropna(subset=['First'])
        outcome = result[['Timestamp']].copy(deep=True)
        
        price_t0 = pd.merge(result,df,on=['Timestamp'],how='left')['Close']
        price_t1 = pd.merge(result,df,left_on=['First'], right_on=['Timestamp'], how = 'left')['Close']
        
        outcome['Return'] = price_t1/price_t0-1
        outcome['TB_15min_Label'] = np.sign(outcome['Return'].dropna())
        
        outcome = outcome.drop(['Return'], axis = 1)

        return outcome

    outcome = get_label(df,result)

    return pd.merge(df,outcome, on=['Timestamp'],how='inner')

In [5]:

df['Target_Close_1min'] = target(df,'Close', 1)
df['Target_Avg_1min'] = target(df, 'Avg', 1)

df = df[df['Target_Close_1min'] != 0]
df = df.dropna()

df_test['Target_Close_1min'] = target(df_test,'Close', 1)
df_test['Target_Avg_1min'] = target(df_test, 'Avg', 1)

df_test = df_test[df_test['Target_Close_1min'] != 0]
df_test = df_test.dropna()

df['Target_Class_Avg_1min'] = df['Target_Avg_1min'].apply(class_target)
df_test['Target_Class_Avg_1min'] = df_test['Target_Avg_1min'].apply(class_target)

In [6]:
train_X = df[['Open',
            'High',
            'Low',
            'Volume',
            'Turnover',
            'Close']]

test_X = df_test[['Open',
            'High',
            'Low',
            'Volume',
            'Turnover',
            'Close']]

train_y = df['Target_Class_Avg_1min']
test_y = df_test['Target_Class_Avg_1min']

In [7]:
tt = TuneTA(n_jobs=8, verbose=True)

tt.fit(train_X, train_y,
# 优化指标
indicators=['tta'],
# 待优化参数的两个参数范围（时间的短期和长期）
ranges=[(4, 40)],
# 每个时间段最多100次试验，以搜索最佳指标参数
trials=300,
# 在每个时间段持续20次试验没有改善后停止搜索参数
early_stop=50)

In [None]:
tt.prune(max_inter_correlation=.7)

tt.report(target_corr=True, features_corr=True)

In [None]:
features_train = tt.transform(train_X)

features_test = tt.transform(test_X)

In [None]:
print('Number of rows before cleaning: %d'%features_train.shape[0])
features_train = features_train.dropna()
print('Number of rows after cleaning: %d'%features_train.shape[0])


print('Number of rows before cleaning: %d'%features_test.shape[0])
features_test = features_test.dropna()
print('Number of rows after cleaning: %d'%features_test.shape[0])

In [None]:
train_result = pd.merge(features_train, train_y, how='left', left_index= True, right_index=True)
test_result = pd.merge(features_test, test_y, how='left', left_index= True, right_index=True)

In [None]:
train_result.to_csv('train_withF_Avg_1min.csv')
test_result.to_csv('test_withF_Avg_1min.csv')