In [2]:
import numpy as np
import pandas as pd
import talib 
from talib import abstract
from joblib import delayed,Parallel,cpu_count
cpu_nums=cpu_count()
slice_windows=30

In [2]:
train_data=pd.read_csv('train.csv')
asset1=train_data[train_data['Asset_ID']==1]
asset1.to_csv('asset1.csv',index=False)
del train_data

In [3]:
train_data=pd.read_csv('asset1.csv')
train_data['amount']=train_data['Count']*train_data['VWAP']
train_data=train_data.rename(columns={'Close':'close','Open':'open','High':'high','Low':'low','Volume':'volume'  })

ta_factors=['HT_DCPERIOD','HT_DCPHASE','HT_PHASOR','HT_SINE','HT_TRENDMODE','ADD',
    'DIV','APO','BOP','CCI','CMO','MACD','MFI','MOM','PPO','ROC','RSI','TRIX','WILLR',
    'BBANDS','DEMA','EMA','HT_TRENDLINE','KAMA','MA','MAMA','MIDPOINT','MIDPRICE','SAR',
    'SAREXT','SMA','T3','TEMA','TRIMA','WMA','AVGPRICE','MEDPRICE','TYPPRICE','WCLPRICE',
    'LINEARREG','LINEARREG_ANGLE','LINEARREG_INTERCEPT','LINEARREG_SLOPE','STDDEV','TSF',
    'TRANGE','AD','ADOSC','OBV']

def generate_talib_factor(df,ta_factor):
    """"
    调取talib的因子，按照默认参数进行求值
    """
    target_fun=abstract.Function(ta_factor)
    ta_values=target_fun(df)
    #if-else below to cope with different result that some talib functions will return multi-columns while others only one
    #eg. MAXMIN return MAX and MIN two values
    if len(ta_values.shape)==1:
        ta_values=pd.DataFrame(ta_values,columns=[f'{ta_factor}'])
    else:
        ta_values=pd.DataFrame(ta_values)
        colname=ta_values.columns
        new_colname={}
        for col in colname:
            new_colname.update( {col:f'{col}'})
        ta_values.rename(columns=new_colname,inplace=True)
    return ta_values

#pd.concat(Parallel(cpu_nums)(delayed(generate_talib_factor)(train_data,factor) for factor in ta_factors),axis=1)
panel=pd.concat( [generate_talib_factor(train_data,factor) for factor in ta_factors],axis=1)
panel=(panel-panel.rolling(slice_windows).mean())/panel.rolling(slice_windows).std()
panel=pd.concat([panel,train_data[['Target','Asset_ID']]],axis=1)#the last two col is target and aseet_ID

In [4]:
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import pytorch_lightning as pl
import torch.multiprocessing
from joblib import Parallel, delayed
import numpy as np
from tqdm import tqdm

class GruDataset(Dataset):
    """
        Dataset for training
    """
    def __init__(self,dataset, seq_len,train=True):
        #Train denote whether this dataset is used for training(testing otherwise)
        #if train，up and dwon threshold filter will work; if test, no such
        def judge(df,up_threshold=0.2,down_threshold=0.0005):
            #20% abs change in target 
            length=df.shape[0]-seq_len+1
            df=df.copy()
            #float32 to save memory
            df=df.values.astype(np.float32)
            x_list=[]
            y_list=[]
            for idx in tqdm(range(0,length)):
                #像此处的index的部分，取决于
                x=df[idx:idx+seq_len,:-2]
                y=df[idx+seq_len-1,-2]
                #don't study severe price change
                if train:
                    if (abs(y)>down_threshold) and (abs(y)<up_threshold):
                        x_list.append(x)
                        y_list.append(y)
                else:
                        x_list.append(x)
                        y_list.append(y)
            length=len(x_list)
            return [x_list,y_list,length]
        #settle x array and y value in the list so that can get them easily
        #final_set=Parallel(cpu_nums)(delayed(judge)(df) for name,df in dataset.groupby('Asset_ID') ) 
        self.final_set=[judge(df) for name,df in dataset.groupby('Asset_ID')]
        self.length=sum([ self.final_set[i][2] for i in range(len( self.final_set))])
        self.dataset=dataset
        self._x=[ self.final_set[i][0][j] for i in range(len( self.final_set)) for j in range(len( self.final_set[i][0])) ]
        self._y=[ self.final_set[i][1][j] for i in range(len( self.final_set)) for j in range(len( self.final_set[i][1])) ]
    def __len__(self):
        return self.length
    def __getitem__(self, idx):
        x=self._x[idx]
        y=self._y[idx]
        return x,y


In [8]:
#example
gd=GruDataset(panel,20)
print("X: \n\n",gd[100][0],"Y: \n\n",gd[100][1],"\n\n")
print("length \n\n",len(gd))

1956263


100%|██████████| 1956263/1956263 [00:05<00:00, 375601.54it/s]


X: 

 [[ 1.3843755  -0.8948697  -0.6982844  ... -1.7272546  -0.8865081
  -0.7968843 ]
 [ 1.2581758  -0.84471047 -0.88629806 ... -1.6411021  -0.5775061
  -0.9466304 ]
 [ 1.2455662  -0.78142214 -0.730725   ... -1.5954736  -0.40727997
  -1.0813993 ]
 ...
 [-0.85551566  3.0285854   2.3037703  ... -1.1901703   0.6843566
  -0.6732313 ]
 [-1.1853576   3.1191325   1.851343   ... -1.167127    0.8078065
  -0.48269978]
 [-1.4924889   3.2350836   1.8776377  ... -1.1309848   1.0269927
  -0.2911204 ]] Y: 

 -0.0020541039
length 

 1286016


In [None]:

#A temo Dataset, used while cut train and validation  
class TempDataset(Dataset):
        def __init__(self,x,y):
            self.x=x
            self.y=y
        def __len__(self):
            return len(self.x)
        def __getitem__(self, idx):
            return self.x[idx],self.y[idx]

def data_split(df,batch_size=512,train_portion=0.75,seq_len=2400):
    """spilt data in to train set and validation set, and transform 
    into Grudf in model_structure and DataLoader in pytorch"""
    df_train_x=df[:int(len(df)*train_portion)][0]
    df_train_y=df[:int(len(df)*train_portion)][1]
    df_valid_x=df[int(len(df)*train_portion):][0]
    df_valid_y=df[int(len(df)*train_portion):][1]
    df_train=TempDataset(df_train_x,df_train_y)
    df_valid=TempDataset(df_valid_x,df_valid_y)
    #Dataloader
    train_dataloader = DataLoader(df_train, batch_size=batch_size, shuffle=True,drop_last=True)
    valid_dataloader = DataLoader(df_valid, batch_size=batch_size, shuffle=True,drop_last=True)
    return train_dataloader,valid_dataloader


class GRUNN(pl.LightningModule):
    def __init__(self,config ,feature_size:int=52):
        super().__init__()
        self.feature_size=feature_size
        self.lr=config['lr']
        self.l2=config['l2']
        self.dropout=config['dropout']
        self.layer_norm = nn.LayerNorm(feature_size)
        self.gru = nn.GRU(feature_size,int(feature_size*2/3), 2, batch_first=True, dropout=self.dropout)#hiddenlayer/inputlayer=2/3
        self.linear = nn.Linear(int(feature_size*2/3), 1) #dense

    def forward(self, x):
        #x = self.layer_norm(x)#actually this step is not necessary
        out, _ = self.gru(x)#return output,h_n
        out = self.linear(out[:, -1])
        return out

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr,weight_decay=self.l2)
        return optimizer

    def training_step(self, train_batch, batch_idx):
        x, y = train_batch
        y_pred = self(x)
        # weight = torch.Tensor([1., 0.072, 1.]).cuda()
        # loss = F.cross_entropy(y_pred, y, weight=weight)
        loss_fun = nn.MSELoss()
        loss = loss_fun(y_pred, y)
        # loss = F.binary_cross_entropy_with_logits(y_pred, y.view(-1, 1).to(torch.float))
        self.log('train/loss', loss.item())
        return loss

    def predict_step(self, batch, batch_idx: int , dataloader_idx: int = None):
        x,y=batch
        return [self(x),y]

