In [1]:
import pandas as pd
import numpy as np
import os
import pickle
from copy import deepcopy
from datetime import datetime, timedelta
from joblib import Parallel, delayed
import random
import torch
from tqdm import tqdm
import warnings

warnings.filterwarnings("ignore")

# 请注意以下路径无需修改
# 框架相关路径
global_root_path = rf'/home/lwyxyz'
minute_fea_path = rf'{global_root_path}/Stock60sBaseDataAll/Feather'
minute_mmap_path = rf'{global_root_path}/Stock60sBaseDataAll/Mmap'
support_data_path = rf'{global_root_path}/Stock60sConfig/support_data'
# 日频原始数据
data79_root_path = rf'{global_root_path}/2.79'
stock_daily_data_path1 = rf'{data79_root_path}/tonglian_data/ohlc_fea'
stock_daily_data_path2 = rf'{data79_root_path}/tonglian_data/support_data'
stock_daily_data_path3 = rf'{data79_root_path}/update/短周期依赖数据'
# 高频原始数据
local_data_path = rf'{global_root_path}/254.35/data/LocalDataLoader/LocalData'
trans_data_path = rf'{local_data_path}/StockTransData'
order_data_path = rf'{local_data_path}/StockOrderData'
lob_data_path = rf'{global_root_path}/hft_database/nas3/sec_lobdata'


def get_all_trade_days():
    read_file = rf"{stock_daily_data_path2}/trade_days_dict.pkl"
    all_trade_days = pd.read_pickle(read_file)['trade_days']
    all_trade_days = [x.strftime('%Y%m%d') for x in all_trade_days]
    all_trade_days.sort()
    return all_trade_days


def get_trade_days(start_date, end_date):
    all_trade_days = get_all_trade_days()
    trade_days = [date for date in all_trade_days if start_date <= date <= end_date]
    return trade_days


def load_pickle(path):
    with open(path, 'rb') as f:
        return pickle.load(f)


def dump_pickle(path, data):
    directory = os.path.dirname(path)
    if not os.path.exists(directory):
        os.makedirs(directory)
    with open(path, 'wb') as f:
        pickle.dump(data, f)


def read_stock_daily_data(field):
    read_path = rf"{local_data_path}/StockDailyData"
    read_file = "%s/%s.fea" % (read_path, field)
    read_data = pd.read_feather(read_file).set_index("date")
    return read_data


def read_high_freq_data(date, name):
    if name == "StockLob":
        read_file = rf"{lob_data_path}/{date}.fea"
    else:
        read_file = "%s/%sData/%s.fea" % (local_data_path, name, date)
    read_data = pd.read_feather(read_file)
    return read_data


def read_all_support_dict():
    support_dict_list = [x for x in os.listdir(support_data_path) if "_loc_dict" in x]
    all_dict = {}
    for x in support_dict_list:
        try:
            dict_name = x.split("trade_")[1].split("_loc_dict")[0]
            all_dict[dict_name] = load_pickle(rf"{support_data_path}/{x}")
        except:
            pass
    return all_dict


def multiple_data(data_all, proc_num):
    step = len(data_all) // proc_num
    data_multiple = []
    spilt_code_list = [data_all.iloc[i * step]["code"] for i in range(proc_num)]
    spilt_num_list = list(
        data_all.loc[data_all.code.isin(spilt_code_list)].drop_duplicates(["code"], keep="last").index)
    spilt_num_list[0] = -1
    spilt_num_list += [len(data_all)]
    for i1, i2 in zip(spilt_num_list[:-1], spilt_num_list[1:]):
        data_multiple.append(data_all.loc[i1 + 1:i2])

    return data_multiple


def get_second_60s(time_10ms):
    div = 100000
    return round(round(time_10ms // div) * div)


In [2]:
# TODO 需要设定：本批字段的名称，计算的起止日期
base_data_name = rf'higherclosevolsum'
start_date = '20211231'
end_date = '20240930'

# 计算和保存每日数据
# 基础字段保存在minute_fea_path的base_data_name子目录下
# minute_fea_path这个路径会映射到当前用户的home目录下的data_share/Stock60sBaseDataAll/Feather下，是同一个路径
fea_save_path = rf'{minute_fea_path}/{base_data_name}'
os.makedirs(fea_save_path, exist_ok=True)
#os.chmod(fea_save_path, 0o755)
trade_date_list = get_trade_days(start_date, end_date)

all_support_dict = read_all_support_dict()
standard_time_list = list(all_support_dict[rf"time_60s"].keys())  # 分钟标记
standard_time_num = len(standard_time_list)  # 分钟数

In [3]:
# 对60秒聚合的数据进行进一步处理
def proc_second_data(second_data_ori):
    second_data = {}
    for field in second_data_ori.columns:
        df = second_data_ori[field].unstack(0)

        # 必须处理的部分：时间轴对齐，保证股票代码为六位
        df = df.reindex(standard_time_list)  # 标准化时间轴
        df.columns = df.columns.map(lambda x: x[:6])  # 标准化代码

        # 自由处理的部分：对价格数据进行填前值，对量和金额填0等等，和字段的计算逻辑相关
        # 例如，对价格等序列，可以填前值
        # if (field in price_field):
        #     df = df.replace(0, np.nan).ffill().fillna(today_stock_pre_close)
        # 例如，对量额等序列，可以填0
        # elif (field in volume_amount_field) :
        #     df = df.fillna(0.0)
        # else:
        #     pass

        df = df.fillna(0.0)

        second_data[field] = df.stack()

    second_data = pd.concat(second_data, axis=1)
    return second_data


def format_second_data(second_data):
    float_cols = [x for x in second_data.columns if x not in ["code", "second"]]
    # 所有字段按float32保存
    second_data[float_cols] = second_data[float_cols].astype('float32')
    # 这里需要把列名改为code+second+字段名的顺序，否则后续转存为mmap时会有异常
    second_data = second_data[["code", "second"] + float_cols]
    return second_data


In [5]:
def convert_to_microseconds(time_str):
         # 确保是九位数
        time_str = str(time_str).zfill(9)
 
         # 计算微秒数
        microseconds = (int(time_str[:2])* 3600 + int(time_str[2:4])* 60 + int(time_str[4:6])) * 1000 + int(time_str[6:])
   
        return microseconds


# 获取某一日的基础字段，以feather存储
def get_higher_close_fea(date, proc_num,multi=1):
    """
     :param date: 基础字段的日期
     :param proc_num: 计算使用的进程数
    """

    s= read_high_freq_data(date, name="StockTrans").loc[:,['code','time','tradePrice','tradeVolume']]
     # 逐笔成交字段s
    s["second"] = s["time"].map(get_second_60s)
    s=s.groupby(['code','second']).apply(lambda group:group[group['tradePrice']>group['tradePrice'].iloc[-1]]['tradeVolume'].sum())
    s=s.reset_index(name='higher_close_volsum')
    s.set_index(['code','second'],inplace=True)
    seconddt= proc_second_data(s)
            #输入 输出都有两列索引
            #second_data = second_data.reset_index().sort_values(["code", "second"]).reset_index(drop=True)
    seconddt=seconddt.reset_index()
    second_data = format_second_data(seconddt)
    second_data .to_feather(rf'{fea_save_path}/{date}.fea', compression="zstd")
    os.chmod(rf'{fea_save_path}/{date}.fea', 0o755)

In [6]:
for date in tqdm(trade_date_list):
     get_higher_close_fea(date, 40)

100%|██████████| 666/666 [75:00:42<00:00, 405.47s/it]   


In [None]:
get_higher_close_fea('20210104', 40)