### TICKDB
* TRADE.TIME: 체결 자료 생성 시각
* TRADE.TIMESTAMP: 체결 자료 기록 시각
* TRADE.VOLUME_SUM: 누적 총 체결 수량
* TRADE.VOLUME_BUY_SUM: 누적 BUY 체결 수량
* TRADE.VOLUME_SELL_SUM: 누적 SELL 체결 수량 
* TRADE.SIDE: 체결 방향(+:매수, -:매도, X: 정보 없음)
* TRADE.TRADE_BUY_SUM: 누적 BUY 체결 횟수
* TRADE.TRADE_SELL_SUM: 누적 SELL 체결 횟수
* TRADE.TRADE_VALUE_SUM: 누적 총 체결 금액 

* QUOTE.TIME: 호가 자료 생성 시각
* QUOTE.TIMESTAMP: 호가 자료 기록 시각

In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import multiprocessing
import itertools

from tqdm import tqdm
from datetime import datetime 
from multiprocessing import Process, Pool



In [2]:
def get_mdays(): 
    df_hdays = pd.read_excel('/nfs/data/hdays/hdays-2018-KRX.xls')
    hdays = df_hdays['일자 및 요일'].str.extract('(\d{4}-\d{2}-\d{2})', expand=False)
    hdays = pd.to_datetime(hdays)
    hdays.name = '날짜'
    mdays = pd.date_range('2018-01-01', '2018-12-31', freq='B')
    mdays = mdays.drop(hdays)
    return mdays


In [3]:
m = multiprocessing.Manager()
q = m.Queue()

def proc_tick(args): 
    q, pbar, mdays, isin, ticker = args

    trade_list = []
    for i in mdays: 
        try:
            q.put(1)

            date = i.strftime('%Y%m%d')
            tickdb = f"/nfs/tickdb/tickdb_{date}_170000.hdf5"
            market_begin = i.replace(hour=9, minute=0, second=0, microsecond=0)
            market_end = i.replace(hour=15, minute=30, second=0, microsecond=0)
            
            TRADE = pd.read_hdf(tickdb, mode='r', key=isin)
            TRADE.TIME = TRADE.TIME.apply(lambda x: datetime.strptime(date + x, "%Y%m%d%H%M%S"))
            TRADE.TIMESTAMP = TRADE.TIMESTAMP.apply(lambda x: datetime.strptime(date + x, "%Y%m%d%H%M%S%f"))
            
            TRADE = TRADE[['TIMESTAMP','PRICE', 'PRICE_BID', 'PRICE_ASK', 'VOLUME']]
            TRADE.columns = ['TIMESTAMP', 'PRICE', 'BID', 'ASK', 'V']
            TRADE['DV'] = TRADE['PRICE'].astype('int64') * TRADE['V'].astype('int64')
            TRADE = TRADE[(TRADE.TIMESTAMP >= market_begin) & (TRADE.TIMESTAMP <= market_end)]
            trade_list.append(TRADE)
            
        except Exception as e: 
            print(tickdb, isin)
            print(e)
            
    df_tick = pd.concat(trade_list)
    df_tick = df_tick.reset_index().drop('index', axis=1)
    parq_file = f'/nfs/data/interim/{ticker}.parq'
    df_tick.to_parquet(parq_file)
    return parq_file

def progress_listener(q, n): 
    pbar = tqdm(position=0, total=n, desc=f"proc_tick")
    for i in iter(q.get, None):         
        pbar.update(1)

def preprocess_data(mdays, tables): 
    print('preproc_data() started')

    proc = Process(target=progress_listener, args=(q, len(tables) * len(mdays) ))
    proc.start() 
    
    with multiprocessing.Pool(processes=4) as pool: 
        args = list( zip( range(len(tables)), itertools.repeat(mdays), tables.keys(), tables.values()) )
        args = [ (q, i0, i1, i2, i3) for i0, i1, i2, i3 in args ]
        results = pool.map(proc_tick, (args))
        q.put(None)
        print(f"\n{len(results)} parq files preprocessed \n")
        
    print('preproc_data() finished')

    
    

In [4]:
# tickdb = f"/nfs/tickdb/tickdb_20180321_170000.hdf5"
# TRADE = pd.read_hdf(tickdb, mode='r', key='/TRADE_ST/KR7233740000')


In [5]:
# TRADE.head()

In [6]:
mdays = get_mdays()

In [7]:
# tables = {
# #     '/TRADE_ST/KR7005930003': 'TRADE_A005930',
# #     'TRADE_ST/KR7233740000': 'TRADE_A233740',
#     '/TRADE_ST/KR7122630007': 'TRADE_A122630',
# }
# preprocess_data(mdays, tables)


preproc_data() started


proc_tick:  73%|███████▎  | 178/244 [01:53<00:27,  2.36it/s]

/nfs/tickdb/tickdb_20180919_170000.hdf5 /TRADE_ST/KR7122630007
'No object named /TRADE_ST/KR7122630007 in the file'
/nfs/tickdb/tickdb_20180920_170000.hdf5 /TRADE_ST/KR7122630007
'No object named /TRADE_ST/KR7122630007 in the file'
/nfs/tickdb/tickdb_20180921_170000.hdf5 /TRADE_ST/KR7122630007
'No object named /TRADE_ST/KR7122630007 in the file'


proc_tick: 100%|██████████| 244/244 [02:37<00:00,  1.57it/s]


1 parq files preprocessed 

preproc_data() finished


In [10]:
tables = {
    '/TRADE_ST/KR7252670005': 'TRADE_A252670',
    '/TRADE_ST/KR7233740000': 'TRADE_A233740',
    '/TRADE_ST/KR7251340006': 'TRADE_A251340', 
    '/TRADE_ST/KR7122630007': 'TRADE_A122630',
}

preprocess_data(mdays, tables)


preproc_data() started


proc_tick:  38%|███▊      | 375/976 [01:19<02:24,  4.17it/s]

/nfs/tickdb/tickdb_20180919_170000.hdf5 /TRADE_ST/KR7252670005
'No object named /TRADE_ST/KR7252670005 in the file'
/nfs/tickdb/tickdb_20180920_170000.hdf5 /TRADE_ST/KR7252670005
'No object named /TRADE_ST/KR7252670005 in the file'
/nfs/tickdb/tickdb_20180921_170000.hdf5 /TRADE_ST/KR7252670005
'No object named /TRADE_ST/KR7252670005 in the file'


proc_tick:  58%|█████▊    | 565/976 [02:04<02:16,  3.01it/s]

/nfs/tickdb/tickdb_20180919_170000.hdf5 /TRADE_ST/KR7251340006
'No object named /TRADE_ST/KR7251340006 in the file'
/nfs/tickdb/tickdb_20180920_170000.hdf5 /TRADE_ST/KR7251340006
'No object named /TRADE_ST/KR7251340006 in the file'
/nfs/tickdb/tickdb_20180921_170000.hdf5 /TRADE_ST/KR7251340006
'No object named /TRADE_ST/KR7251340006 in the file'


proc_tick:  75%|███████▍  | 728/976 [02:54<01:07,  3.68it/s]

/nfs/tickdb/tickdb_20180919_170000.hdf5 /TRADE_ST/KR7122630007
'No object named /TRADE_ST/KR7122630007 in the file'
/nfs/tickdb/tickdb_20180920_170000.hdf5 /TRADE_ST/KR7122630007
'No object named /TRADE_ST/KR7122630007 in the file'
/nfs/tickdb/tickdb_20180921_170000.hdf5 /TRADE_ST/KR7122630007
'No object named /TRADE_ST/KR7122630007 in the file'


proc_tick:  93%|█████████▎| 910/976 [04:54<01:07,  1.02s/it]

/nfs/tickdb/tickdb_20180919_170000.hdf5 /TRADE_ST/KR7233740000
'No object named /TRADE_ST/KR7233740000 in the file'
/nfs/tickdb/tickdb_20180920_170000.hdf5 /TRADE_ST/KR7233740000
'No object named /TRADE_ST/KR7233740000 in the file'
/nfs/tickdb/tickdb_20180921_170000.hdf5 /TRADE_ST/KR7233740000
'No object named /TRADE_ST/KR7233740000 in the file'


proc_tick: 100%|██████████| 976/976 [06:53<00:00,  1.66s/it]


4 parq files preprocessed 

preproc_data() finished


In [11]:
TRADE_A233740 = pd.read_parquet('/nfs/data/interim/TRADE_A233740.parq')
# TRADE_A252670 = pd.read_parquet('/nfs/data/interim/TRADE_A252670.parq')
# TRADE_A122630 = pd.read_parquet('/nfs/data/interim/TRADE_A122630.parq')
# TRADE_A251340 = pd.read_parquet('/nfs/data/interim/TRADE_A251340.parq')

In [12]:
TRADE_A233740.shape

(11945993, 6)

In [9]:

# A122630 KODEX 레버리지(ISIN: KR7122630007)
# A252670 KODEX 200선물인버스2X(ISIN: KR7252670005)
# A233740 KODEX 코스닥150레버리지(ISIN: KR7233740000)
# A251340 KODEX 코스닥150선물인버스(ISIN: KR7251340006)
# A005930 삼성전자(ISIN:KR7005930003)

# TRADE_A252670 = pd.read_hdf(tickdb, mode='r', key='/TRADE_ST/KR7252670005')
# TRADE_A233740 = pd.read_hdf(tickdb, mode='r', key='/TRADE_ST/KR7233740000')
# TRADE_A251340 = pd.read_hdf(tickdb, mode='r', key='/TRADE_ST/KR7251340006')
# TRADE_A005930 = pd.read_hdf(tickdb, mode='r', key='/TRADE_ST/KR7005930003')

In [29]:
# TRADE_A252670 = pd.read_parquet('/nfs/data/interim/TRADE_A252670.parq')
# TRADE_A122630 = pd.read_parquet('/nfs/data/interim/TRADE_A122630.parq')
# TRADE_A233740 = pd.read_parquet('/nfs/data/interim/TRADE_A233740.parq')
# TRADE_A251340 = pd.read_parquet('/nfs/data/interim/TRADE_A251340.parq')
# TRADE_A005930 = pd.read_parquet('/nfs/data/interim/TRADE_A005930.parq')