In [1]:
%reload_ext autoreload
%autoreload 2
import os
import time
import datetime
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim
from torch.utils.data import DataLoader, TensorDataset
import random
import multiprocessing
import matplotlib.pyplot as plt
import pyarrow.parquet as pq
from model import mlp
def init_seeds(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    random.seed(seed)
## 随机数种子的设置
init_seeds(seed=666)
device = torch.device("cuda:0")

In [2]:
data_path = '/data/local_data/shared/102/intern_data_yzhou/zy4_parquet'
mean_std_path = '/mnt/beegfs/strategy_intern/zzdai_intern/for_wpxu/data_processed/mean_std_all.pkl'
feas = pd.read_pickle('/mnt/beegfs/strategy_intern/zzdai_intern/for_wpxu/feas353.pkl')
feas = [i for i in feas if 'x_' in i]
feas = [i for i in feas if 'x_1663' not in i]
start_date = '20170101'
end_date = '20200531'
mean_std_df = pd.read_pickle(mean_std_path)
means = mean_std_df.loc[:, 'mean'].astype(np.float32)
stds = mean_std_df.loc[:, 'std']
stds = stds.apply(lambda x: max(1e-10, x)).astype(np.float32)
y_cols = ['m1_ts_y_60twap_2n1open_Wgted_fullmarket_ex', 
          'm1_ts_y_60twap_2n2open_Wgted_fullmarket_ex', 
          'm1_ts_y_60twap_2n3open_Wgted_fullmarket_ex']
y_name = y_cols[0]
def process_data_mp(data_path, feas, means, stds, y_cols, y_name, filters, date_start, date_end):
    files = f'{data_path}/{date_start}.parquet'
    dataset = pq.ParquetDataset(files, use_legacy_dataset=False, filters=filters)
    _tmp_data = dataset.read(columns=['date', 'TimeStamp', 'ticker', 'm1_ts_eod_rank_group']+feas+y_cols).to_pandas()
    _tmp_data.replace([-np.inf, np.inf], np.nan, inplace=True)
    _tmp_data['y1'] = _tmp_data.groupby(['date','TimeStamp', 'm1_ts_eod_rank_group'])[y_name].rank(axis=0, pct=True)
    _tmp_data[feas] = (_tmp_data[feas] - means.loc[feas])/stds.loc[feas]
    _tmp_data[feas] = _tmp_data[feas].fillna(0)
    _tmp_data[feas] = _tmp_data[feas].astype(np.float32)
    _tmp_data = _tmp_data.dropna()
    print(date_start, 'finished')
    return _tmp_data

zt_filter = ('m1_ts_z_tag_up_limit','=',0.0)
dt_filter = ('m1_ts_z_tag_down_limit','=',0.0)
yna_filter = (y_name,'!=',np.nan)
filters = [zt_filter, dt_filter, yna_filter]
file_list = sorted(os.listdir(data_path))
date_all = [i.split('.')[0] for i in file_list]
date_list = [date for date in date_all if date>=start_date and date<=end_date]
mul_dfs = []
data_fea = []
pool = multiprocessing.Pool(32)
for i, dt in enumerate(date_list):
    mul_dfs.append(pool.apply_async(process_data_mp, (data_path, feas, means, stds, y_cols, y_name, filters, dt, dt)))
for item in mul_dfs:
    data_fea.append(item.get())
pool.close()
data_fea = pd.concat(data_fea, ignore_index=True)
data_fea = data_fea.sort_values(by=['date', 'TimeStamp', 'ticker'], ascending=True).reset_index(drop=True)
data_fea

20170206 finished
20170110 finished
20170118 finished
20170123 finished
20170116 finished
20170203 finished
20170119 finished
20170209 finished
20170207 finished
20170103 finished
20170217 finished
20170104 2017011720170216finished  
finishedfinished

20170210 finished
20170125 finished
20170126 finished
20170112 finished
20170208 finished
20170124 finished
20170213 finished
20170214 finished
20170120 finished
20170109 finished20170220 
finished
20170105 finished
20170106 20170111finished 
finished
2017022220170221  finishedfinished

20170215 finished
20170113 finished
20170223 finished
20170224 finished
20170227 finished
20170228 finished
20170301 finished
20170302 finished
20170303 finished
20170306 finished
20170307 finished
20170308 finished
20170309 finished
20170310 finished
20170313 finished
20170314 finished
20170315 finished
20170316 finished
20170317 finished
20170320 finished
20170321 finished
20170322 finished
20170323 finished
20170324 finished
20170327 finished
20170328 f

Unnamed: 0,date,TimeStamp,ticker,m1_ts_eod_rank_group,x_297,x_1185,x_2034,x_963,x_34,x_24,...,x_2079,x_669,x_1313,x_1370,x_2169,x_884,m1_ts_y_60twap_2n1open_Wgted_fullmarket_ex,m1_ts_y_60twap_2n2open_Wgted_fullmarket_ex,m1_ts_y_60twap_2n3open_Wgted_fullmarket_ex,y1
0,20170103,93500000,000001,9,0.000000,0.000000,2.244516,-0.257979,0.085926,0.000000,...,-0.507487,0.125797,-0.067372,0.082170,-0.000025,-1.691355,-0.006202,-0.015217,-0.013787,0.389313
1,20170103,93500000,000002,9,0.000000,0.000000,2.244516,-1.172740,0.085926,0.000000,...,-1.172024,0.125797,-0.067372,0.082170,-0.000025,-1.691355,-0.011067,-0.004409,-0.012301,0.160305
2,20170103,93500000,000004,2,0.000000,0.000000,2.244516,0.011106,0.085926,0.000000,...,0.489318,0.125797,-0.067372,0.082170,-0.000025,-1.691355,-0.007151,-0.005595,-0.014103,0.247148
3,20170103,93500000,000005,5,0.000000,0.000000,2.244516,-1.172740,0.085926,0.000000,...,-1.172024,0.125797,-0.067372,0.082170,-0.000025,-1.691355,0.005877,0.006150,0.014996,0.809160
4,20170103,93500000,000006,8,0.000000,0.000000,2.244516,-0.396293,0.085926,0.000000,...,0.655452,0.125797,-0.067372,0.082170,-0.000025,-1.691355,-0.005115,-0.005131,0.009620,0.381679
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
101542504,20200529,144500000,688388,3,-0.659620,-0.133213,-0.441238,0.333571,0.018242,1.638251,...,-0.341353,0.061437,-0.174547,0.557318,-0.000025,1.697092,-0.007443,0.011940,-0.004652,0.300366
101542505,20200529,144500000,688389,0,3.399152,2.876295,-0.441238,-1.353063,0.018242,0.876064,...,1.652258,0.061437,0.779824,0.557318,-0.000025,1.697092,0.004475,-0.013188,0.017150,0.745520
101542506,20200529,144500000,688396,7,0.000000,0.000000,-0.441238,-0.782245,0.018242,0.360456,...,1.319989,0.061437,-0.449481,0.557318,-0.000025,1.697092,0.025772,0.050579,0.084653,0.892086
101542507,20200529,144500000,688398,0,0.716784,0.136792,-0.441238,1.233602,0.018242,0.437993,...,1.153855,0.061437,-0.993084,0.557318,-0.000025,1.697092,-0.008992,-0.024924,-0.037442,0.200717


In [3]:
data_train = data_fea.iloc[:int(0.95 * len(data_fea))].reset_index(drop=True)
data_valid = data_fea.iloc[int(0.95 * len(data_fea)):].reset_index(drop=True)
del data_fea
print(f"train:{data_train.iloc[0].date}, {data_train.iloc[-1].date}")
print(f"valid:{data_valid.iloc[0].date}, {data_valid.iloc[-1].date}")
y_train = data_train[['date', "TimeStamp", y_name]].copy()
y_valid = data_valid[['date', "TimeStamp", y_name]].copy()
x_valid_tensor, y_valid_tensor = torch.tensor(np.array(data_valid[feas]), dtype=torch.float32), torch.tensor(np.array(data_valid['y1']), dtype=torch.float32)
dataset_valid = TensorDataset(x_valid_tensor, y_valid_tensor)
dataloader_valid  = DataLoader(dataset=dataset_valid, batch_size=2**14, shuffle=False, num_workers=4)

train:20170103, 20200330
valid:20200330, 20200529


In [4]:
def my_metric(y_pred, y_label, perc=3, y_name='m1_ts_y_60twap_2n1open_Wgted_fullmarket_ex'):
    y_test = y_label.copy()
    y_test['y_pred'] = y_pred                     
    y_test['y_pred_bin'] = y_test.groupby(['date', "TimeStamp"])['y_pred'].transform(lambda x: x>=np.percentile(x, 100-perc))
    y_metric = y_test.loc[y_test['y_pred_bin'].values, :].copy()
    rtn = y_metric.groupby(['date', "TimeStamp"])[y_name].apply(np.mean).groupby(['date']).mean()    
    return np.sum(rtn)

In [5]:
def train(itrs, net, dataloader_train, optim, device, dataloader_valid, y_valid, experiment_path, lr_schedule_values=None, scheduler=None, interval=0, ema_net=None, regularization_strength=1, metric_type='sharpe', loss_type='mse'):
    l_loss = []
    metrics = []
    net.train()
    best_metric = -1e10 # 用来记录验证集最优的metric
    global_steps = 0 # 用来记录迭代次数
    criterion = torch.nn.MSELoss(reduction='mean')
    for epoch_i in range(1000):
        n = 0
        start = time.time()
        l = 0
        for batch_i, (x_batch, y_batch) in enumerate(dataloader_train):
            global_steps += 1  # global training iteration
            if global_steps > itrs:
                break
            x_batch  = x_batch.to(device)
            y_batch = y_batch.to(device)
            output = net(x_batch)
            loss = criterion(y_batch.view(-1), output.view(-1))
                
            loss.backward() # 反向传播
            optim.step() # 参数更新
            optim.zero_grad()
            l += loss.item() * x_batch.shape[0]
            n += x_batch.shape[0]
            ## epoch内计算验证集的metric
            if dataloader_valid is not None and global_steps % interval == 0 and global_steps>=600:
                metric = valid(net, dataloader_valid, device, y_valid, metric_type)
                if metric > best_metric:
                    best_metric = metric
                    torch.save(net.state_dict(), os.path.join(experiment_path, 'best.pth'))
                print(f"epoch:{epoch_i}, batch:{batch_i}, loss:{loss.item()}")
                print(f"itrs:{global_steps}, metric:{metric}")
        if global_steps > itrs:
            break
        if scheduler is not None:
            scheduler.step()
        l_loss.append(l/n)
        print(f'loss:{l/n}') 
        print(f'cost:{time.time() - start}s') 
        
    
def valid(net, dataloader_valid, device, y_valid, metric_type='rtn', loss_type='mse'):
    y_pred = torch.Tensor()
    l = 0
    metric = 0
    n = 0
    net.eval()  # 推理的时候需要将model的模式设置为eval，固定住BN层的均值方差
    print("valid dataloader: ")
    ll = []
    if loss_type == 'mse':
        criterion = torch.nn.MSELoss(reduction='mean')
    with torch.no_grad():
        for batch_i, (x_batch, y_batch) in enumerate(dataloader_valid):
            bs = x_batch.shape[0]
            n += bs
            x_batch  = x_batch.to(device)
            y_batch = y_batch.to(device)
            output = net(x_batch)
            loss = criterion(y_batch.view(-1), output.view(-1))
            l += loss.item() * bs
            y_pred = torch.cat((y_pred, output.reshape(-1).to('cpu')), dim=0)
            if batch_i % 100 == 0 or batch_i == len(dataloader_valid)-1:
                print(f"{batch_i}/{len(dataloader_valid)}")
        print(f'valid loss:{l/n}')
        metric = my_metric(y_pred.detach().numpy(), y_valid)
    net.train()
    return metric

def get_parameter_groups(model, weight_decay=0.0):
    parameter_group_names = {}
    parameter_group_vars = {}

    for name, param in model.named_parameters():
        if not param.requires_grad:
            continue  # frozen weights
        ## bias，BN参数不施加正则
        if len(param.shape) == 1 or name.endswith(".bias"):
            group_name = "no_decay"
            this_weight_decay = 0.
        else:
            group_name = "decay"
            this_weight_decay = weight_decay
        
        if group_name not in parameter_group_names:
            parameter_group_names[group_name] = {
                "weight_decay": this_weight_decay,
                "params": [],
            }
            parameter_group_vars[group_name] = {
                "weight_decay": this_weight_decay,
                "params": [],
            }
        parameter_group_vars[group_name]["params"].append(param)
        parameter_group_names[group_name]["params"].append(name)
    return list(parameter_group_vars.values())  


In [6]:
save_path = '/mnt/beegfs/strategy_intern/zzdai_intern/for_wpxu/nn_results/mlp_baseline'
for i in range(5):
    model_type = 'mlp' 
    wd = 1e-8
    itrs = 2000
    num_workers = 4
    batch_size = 2**16
    ## 创建存放模型的experiment_path
    experiment_path = os.path.join(save_path, model_type+'_'+str(i))
    if not os.path.exists(experiment_path):
        os.makedirs(experiment_path)
    ## 根据model_type进行模型的初始化
    net = eval(model_type)(input_size=len(feas))
    net = net.to(device)
    data_tmp = data_train.sample(frac=0.80, axis=0).sort_index().reset_index(drop=True)
    x_train_tensor, y_train_tensor = torch.tensor(np.array(data_tmp[feas]), dtype=torch.float32), torch.tensor(np.array(data_tmp['y1']), dtype=torch.float32)
    dataset_train = TensorDataset(x_train_tensor, y_train_tensor)
    dataloader_train = DataLoader(dataset=dataset_train, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    print('load done!')
    
    parameters = get_parameter_groups(net, weight_decay=wd) # 对模型进行分组正则
    opt_args = dict(lr=3e-2, weight_decay=wd) # 优化器参数
    optim = torch.optim.Adam(parameters, **opt_args) # 优化器初始化
    scheduler = torch.optim.lr_scheduler.ExponentialLR(optim, gamma=0.9, verbose=True) # 优化器的schedule
    train(itrs, net, dataloader_train, optim, device, dataloader_valid, y_valid, experiment_path, scheduler=scheduler, interval=50)

load done!
Adjusting learning rate of group 0 to 3.0000e-02.
Adjusting learning rate of group 1 to 3.0000e-02.
valid dataloader: 
0/310
100/310
200/310
300/310
309/310
valid loss:0.08165034925084405
epoch:0, batch:599, loss:0.0791986808180809
itrs:600, metric:0.09679195641319388
valid dataloader: 
0/310
100/310
200/310
300/310
309/310
valid loss:0.0813148568360165
epoch:0, batch:649, loss:0.07929404079914093
itrs:650, metric:0.11565698495453237
valid dataloader: 
0/310
100/310
200/310
300/310
309/310
valid loss:0.0812236293827595
epoch:0, batch:699, loss:0.07813281565904617
itrs:700, metric:0.10654773762257592
valid dataloader: 
0/310
100/310
200/310
300/310
309/310
valid loss:0.08122950270331472
epoch:0, batch:749, loss:0.07852187752723694
itrs:750, metric:0.11117532188860019
valid dataloader: 
0/310
100/310
200/310
300/310
309/310
valid loss:0.08121043403749974
epoch:0, batch:799, loss:0.07816293835639954
itrs:800, metric:0.11997165976123243
valid dataloader: 
0/310
100/310
200/310
3