In [10]:
import glob
import os

import ray
import pandas as pd
import numpy as np
from tqdm import tqdm

work_path = os.path.abspath(os.getcwd())
project_name = "saturday7_KRX_HFT"
project_path = work_path[:work_path.find(project_name) + len(project_name)]
os.chdir(project_path)

RAW_DIR = f'./data/krx_raw'
MST_DIR = f'./data/krx_mst'

In [11]:
ray.init(ignore_reinit_error=True, object_store_memory=1e+9, num_cpus=os.cpu_count(), include_dashboard=True)

2023-09-21 15:10:19,540	INFO worker.py:1476 -- Calling ray.init() again after it has already been called.


0,1
Python version:,3.11.4
Ray version:,2.7.0
Dashboard:,http://127.0.0.1:8265


In [12]:
def read_file(date_, filename):
    file_path = os.path.join(RAW_DIR, date_, filename)
    df = pd.read_parquet(file_path)
    return df


def pad_with_zeros(arr):
    if len(arr) >= 10:
        return np.array(arr[:10])  # 이미 10개 이상이면 첫 10개만 반환

    padding_size = 10 - len(arr)
    padding_zeros = np.zeros(padding_size)
    return np.concatenate((np.array(arr), padding_zeros))


def a3_b6_to_c7(a3, b6):
    c7 = pd.concat([a3.reset_index(), b6.reset_index()], axis=0)
    c7.sort_values(by=['time', 'index'], ascending=[True, True], inplace=True)
    c7.drop(columns=['index'], inplace=True)
    c7.reset_index(drop=True, inplace=True)
    return c7


def c7_to_g7(df_c7):
    before_b6_tuple = None

    c7_list = []
    for tuple_data in df_c7.itertuples():
        if np.isnan(tuple_data[2]):  # b6 데이터.
            before_b6_tuple = tuple_data
        else:  # a3 데이터.
            if before_b6_tuple is None:
                continue
            idx = tuple_data[0]
            time = tuple_data[1]
            tick_price = tuple_data[2]
            tick_qty = tuple_data[3]
            tick_direction = tuple_data[4]

            ask_price = before_b6_tuple[5]
            ask_qty = before_b6_tuple[6]
            bid_price = before_b6_tuple[7]
            bid_qty = before_b6_tuple[8]

            ask_qty_change = ask_qty.copy()
            bid_qty_change = bid_qty.copy()

            check = 0

            if tick_price in ask_price:
                index = np.where(ask_price == tick_price)[0][0]
                ask_qty_change[index] -= tick_qty

                if ask_qty_change[index] <= 0:
                    ask_qty_change = ask_qty_change[index + 1:]
                    ask_price = ask_price[index + 1:]

            elif tick_price in bid_price:
                index = np.where(bid_price == tick_price)[0][0]
                bid_qty_change[index] -= tick_qty

                if bid_qty_change[index] <= 0:
                    bid_price = bid_price[index + 1:]
                    bid_qty_change = bid_qty_change[index + 1:]

            ask_qty_change = pad_with_zeros(ask_qty_change)
            ask_price = pad_with_zeros(ask_price)
            bid_qty_change = pad_with_zeros(bid_qty_change)
            bid_price = pad_with_zeros(bid_price)

            ask_price = ask_price.astype(np.int64)
            ask_qty_change = ask_qty_change.astype(np.int64)
            bid_price = bid_price.astype(np.int64)
            bid_qty_change = bid_qty_change.astype(np.int64)

            tick_tuple_data = (idx, time, tick_price, tick_qty, tick_direction,
                               ask_price, ask_qty_change, bid_price, bid_qty_change)

            c7_list.append(tick_tuple_data)
            before_b6_tuple = tick_tuple_data

    g7_krx = pd.DataFrame(c7_list, columns=[
        'idx', 'time', 'tick_price', 'tick_qty', 'tick_direction', 'ask_price', 'ask_qty', 'bid_price', 'bid_qty'])
    g7_krx = g7_krx.drop(['idx'], axis=1)
    return g7_krx

In [13]:
@ray.remote
def create_g7(ymd, isin):
    a3_name = f"{isin}_A3@Seoul.parquet"
    b6_name = f"{isin}_B6@Seoul.parquet"

    df_a3 = read_file(ymd, a3_name)
    df_b6 = read_file(ymd, b6_name)

    c7_krx = a3_b6_to_c7(df_a3, df_b6)
    g7_krx = c7_to_g7(c7_krx)
    g7_krx.to_parquet(f"{RAW_DIR}/{ymd}/{isin}_G7@Seoul.parquet", compression='zstd')

In [14]:
df_krx_mst: pd.DataFrame = pd.read_parquet(f"{MST_DIR}/krx_mst.parquet")

In [16]:
# ray 병렬처리 코드 create_g7
ray.get([
    create_g7.remote(row['nsYMD'], row['nsISIN']) for idx, row in df_krx_mst.iterrows()
])

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]