In [52]:
import datetime
import multiprocessing as mp
import os
import random
import re
import warnings
from functools import partial

import matplotlib.patches as mpathes
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy
import seaborn as sns
import sklearn
from matplotlib import ticker
from numba import jit
from pandarallel import pandarallel
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm

warnings.filterwarnings("ignore")

plt.rcParams['font.family'] = ['DejaVu Sans']

pd.set_option('display.max_columns', 33)

pandarallel.initialize()

INFO: Pandarallel will run on 128 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


In [53]:
from PqiDataSdk import PqiDataSdk

ds = PqiDataSdk(user="zyding", size=1, pool_type="mt")

In [3]:
tickers = ds.get_ticker_list(date='all')
start_date = '20170101'
end_date = '20221125'
lst_trade_date = ds.get_trade_dates(start_date=start_date, end_date=end_date)
DTIndex_trade_date = pd.to_datetime(lst_trade_date).map(lambda x: x.date)

In [40]:
%%time
data_hf = ds.get_mins_history(tickers=tickers,
                              start_date='20221123',
                              end_date='20221123',
                              day_type='trade',
                              price_mode='after')

CPU times: user 4.71 s, sys: 3.14 s, total: 7.86 s
Wall time: 5.93 s


In [36]:
dict_feature_process = {
    'TradeValue': np.sum,
    'Twap': np.mean,
    'TwapAp1': np.mean,
    'TwapBp1': np.mean,
    'UpLimit':np.max,
    'DownLimit':np.max
}

In [37]:
def process_mins_history(df, mins=30, feature_process=dict_feature_process):
    try:
        df = df[df.Abnormal == 0]
        df['group'] = np.floor(df.index // mins)
        return df.groupby('group').agg(feature_process).reset_index()[
            feature_process.keys()].T.stack()
    except:
        return pd.DataFrame(index=[1], columns=feature_process.keys(),
                            data=df).T.stack()

In [47]:
err = []

In [48]:
def save_mins_data(date):
    data_hf = ds.get_mins_history(tickers=tickers,
                                  start_date=date,
                                  end_date=date,
                                  day_type='trade',
                                  price_mode='after')
    try:
        dict_temp = {(i, j): process_mins_history(data_hf[i][j])
                     for i in data_hf.keys()
                     if not list(data_hf[i].values())[0].empty
                     for j in data_hf[i].keys() if not data_hf[i][j].empty}
        df_temp = pd.DataFrame(dict_temp).T.stack().dropna()
        df_temp.to_parquet(f'/mnt/ceph/low_freq_team/low_fre_alpha/zy_shared/gru/temp/{date}.parquet')
        return f'{date}_saved'
    except:
        err.append(date)

In [None]:
with mp.Pool(processes=64) as pool:
    lst_res = list(tqdm(pool.imap(save_mins_data, lst_trade_date), total=len(lst_trade_date)))

In [91]:
def join_mins_data(factor_path):
    lst = sorted([factor_path + path for path in os.listdir(factor_path) if path.endswith('.parquet')])
    df_res = pd.concat(pd.read_parquet(path).swaplevel(0, 1) for path in tqdm(lst))
    
    return df_res

In [92]:
df_joined = join_mins_data("/mnt/ceph/low_freq_team/low_fre_alpha/zy_shared/gru/temp/")

100%|██████████| 1434/1434 [01:27<00:00, 16.32it/s]


In [93]:
df_joined

Unnamed: 0,Unnamed: 1,Unnamed: 2,DownLimit,TradeValue,Twap,TwapAp1,TwapBp1,UpLimit
20170103,000001,0,0.000,139338502.880,955.591,956.116,955.066,0.000
20170103,000001,1,0.000,64971606.400,960.107,960.631,959.583,0.000
20170103,000001,2,0.000,34188274.900,959.926,960.451,959.402,0.000
20170103,000001,3,0.000,27468460.460,958.420,958.949,957.890,0.000
20170103,000001,4,0.000,20721239.030,957.697,958.225,957.169,0.000
...,...,...,...,...,...,...,...,...
20221125,689009,3,0.000,2889973.040,32.457,32.479,32.435,0.000
20221125,689009,4,0.000,4147415.640,32.436,32.454,32.419,0.000
20221125,689009,5,0.000,2581847.900,32.466,32.479,32.453,0.000
20221125,689009,6,0.000,12048170.440,32.541,32.549,32.532,0.000


In [99]:
df_res = pd.DataFrame(index = pd.MultiIndex.from_product(df_joined.index.levels), columns=df_joined.columns, data=df_joined)

In [100]:
df_res

Unnamed: 0,Unnamed: 1,Unnamed: 2,DownLimit,TradeValue,Twap,TwapAp1,TwapBp1,UpLimit
20170103,000001,0,0.000,139338502.880,955.591,956.116,955.066,0.000
20170103,000001,1,0.000,64971606.400,960.107,960.631,959.583,0.000
20170103,000001,2,0.000,34188274.900,959.926,960.451,959.402,0.000
20170103,000001,3,0.000,27468460.460,958.420,958.949,957.890,0.000
20170103,000001,4,0.000,20721239.030,957.697,958.225,957.169,0.000
...,...,...,...,...,...,...,...,...
20221125,689009,3,0.000,2889973.040,32.457,32.479,32.435,0.000
20221125,689009,4,0.000,4147415.640,32.436,32.454,32.419,0.000
20221125,689009,5,0.000,2581847.900,32.466,32.479,32.453,0.000
20221125,689009,6,0.000,12048170.440,32.541,32.549,32.532,0.000


In [101]:
df_res.to_parquet("/mnt/ceph/low_freq_team/low_fre_alpha/zy_shared/gru/data/hfData_30min.parquet")