In [1]:
import ray
import pandas as pd
import numpy as np
from tqdm import tqdm
import time
import os
import glob
import warnings
warnings.filterwarnings('ignore')

output_folder = './data/min_new3'
path = './data/data_5m'
all_files = glob.glob(path + "/*.csv")


ray.shutdown()
ray.init(num_cpus=4,num_gpus=0)   #使用多少核

@ray.remote
def my_factor(filename):       #读入的数据文件做怎么样的处理
    df=pd.read_csv(filename)

    df['datetime'] = pd.to_datetime(df['datetime'])
    
    # 拆分date_time列为date和time两列
    df['date'] = df['datetime'].dt.strftime('%Y-%m-%d')
    df['time'] = df['datetime'].dt.strftime('%H:%M:%S')
    date = df['date'].iloc[0]      #日期存储

    df['vwap'] = df['money'] / df['volume']
    
    result_df = df.copy()
    
    
    #日内信息辅助日频基础字段生成
    tmp = result_df.groupby('code')['volume'].mean()
    result_1 = tmp.to_frame().reset_index().rename(columns={'volume':'volume_mean'})
    
    tmp = result_df.groupby('code')['volume'].std()
    result_2 = tmp.to_frame().reset_index().rename(columns={'volume':'volume_std'})
    result = pd.merge(result_1, result_2, on='code', how='inner')   #拼接
    
#     tmp = result_df[(result_df['close'] > result_df['open'])].groupby('code')['volume'].sum()
#     result_3 = tmp.to_frame().reset_index().rename(columns={'volume':'pro_vol_sum'})
#     result = pd.merge(result, result_3, on='code', how='inner')
    
#     df = result_df.copy()
#     df['close_diff'] = df.groupby('code')['close'].diff()
#     tmp = df[(df['close_diff'] > 0)].groupby('code')['volume'].sum()
#     result_4 = tmp.to_frame().reset_index().rename(columns={'volume':'gap_vol_sum'})
#     result = pd.merge(result, result_4, on='code', how='inner') 
    
#     tmp = result_df.groupby('code').apply(lambda x : x['close'].corr(x['volume']))
#     result_1 = tmp.to_frame().reset_index().rename(columns={0:'close_mean'})
    
    df = result.copy()
    
    # 创建文件夹（如果不存在）
    os.makedirs(output_folder, exist_ok=True)
    
    final = df.loc[:, ['code','volume_mean', 'volume_std']]
    filename = os.path.join(output_folder, f'{date}.csv')
    final = final.sort_values(by='code', ascending=True)
    final.to_csv(filename, index=False)
    return 


s=time.time()
res = []

for file_name in tqdm(all_files):
    res.append(my_factor.remote(file_name))    # 在循环中，对于每个文件名，将调用远程函数my_factor，并将返回的结果
                                               # 添加到res列表中。远程函数的调用通过.remote()方法实现。
results = ray.get(res)
e=time.time()
print(f'耗费{e-s}秒')

#重复运行必须要把ray关掉
ray.shutdown()

2024-03-30 21:46:37,603	INFO worker.py:1621 -- Started a local Ray instance.
100%|███████████████████████████████████████| 243/243 [00:00<00:00, 2695.65it/s]


耗费52.96462106704712秒


In [2]:
#新生成字段与原始日频字段合成

daily_path = './data/data_daily'
min_path = './data/min_new2'
output_folder = './data/new_daily_vol2'
ray.shutdown()



#初始化ray
ray.init(num_cpus=4,num_gpus=0)
@ray.remote
def my_factor(filename):
    filename_min = min_path + '/' + filename   #读取分钟数据生成的字段
    filename_daily = daily_path + '/' + filename   #读取原始的日频数据
    df_min = pd.read_csv(filename_min)
    df_daily = pd.read_csv(filename_daily)
    date = df_daily['date'].values[0]
    merged = pd.merge(df_min, df_daily, on='code', how='inner')    #拼接
    
    final = merged.copy()
    
    # 创建文件夹（如果不存在）
    os.makedirs(output_folder, exist_ok=True)
    
    filename = os.path.join(output_folder, f'{date}.csv')
    final = final.sort_values(by='code', ascending=True)
    final.to_csv(filename, index=False)
    return 


s=time.time()
res = []

for file_name in tqdm(all_files):
    file_name = os.path.basename(file_name)
    res.append(my_factor.remote(file_name))
results = ray.get(res)
e=time.time()
print(f'耗费{e-s}秒')

#重复运行必须要把ray关掉
ray.shutdown()

2024-01-06 22:57:33,265	INFO worker.py:1621 -- Started a local Ray instance.
100%|██████████████████████████████████████| 243/243 [00:00<00:00, 15883.55it/s]


耗费2.8039021492004395秒


In [4]:
ray.shutdown()

In [3]:
import pandas as pd
df = pd.read_csv('./data/new_daily4/2021-12-15.csv')
df

Unnamed: 0,code,close_mean,vwap_std,pro_vol_sum,gap_vol_sum,date,open,close,low,high,volume,money,turnover_ratio
0,000001.XSHE,1849.847769,4.054275,6.727851e+05,6.170381e+05,2021-12-15,2131.30,2136.17,2117.91,2149.56,1063619.0,2.269821e+09,0.6671
1,000002.XSHE,2554.080690,3.192147,4.353553e+05,4.398232e+05,2021-12-15,2935.73,2965.98,2934.22,2990.18,354655.0,1.052615e+09,0.5520
2,000004.XSHE,112.761494,0.379710,2.507827e+05,2.690373e+05,2021-12-15,142.81,141.47,141.18,145.35,379803.0,5.417071e+07,2.4310
3,000005.XSHE,21.869521,0.073154,2.748589e+05,2.667512e+05,2021-12-15,22.59,22.70,22.39,23.00,712955.0,1.615941e+07,0.6889
4,000006.XSHE,296.491527,0.464191,2.155356e+05,2.062865e+05,2021-12-15,208.20,210.17,207.71,213.12,229647.0,4.848737e+07,0.8373
...,...,...,...,...,...,...,...,...,...,...,...,...,...
4538,688799.XSHG,38.665208,0.141192,3.161290e+05,2.996560e+05,2021-12-15,38.46,38.55,38.38,39.02,455068.0,1.760346e+07,2.3790
4539,688800.XSHG,135.429375,4.212039,6.903930e+05,6.756340e+05,2021-12-15,127.34,139.80,123.86,142.33,1201663.0,1.623055e+08,5.4693
4540,688819.XSHG,45.289854,0.208871,1.504816e+06,1.391951e+06,2021-12-15,44.83,45.06,44.67,45.96,3470740.0,1.570406e+08,3.3154
4541,688981.XSHG,54.672500,0.344675,9.787657e+06,8.653190e+06,2021-12-15,54.88,54.13,54.01,55.26,28907438.0,1.575159e+09,1.5450
