In [2]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta, time
import glob
import os

#import matplotlib.pyplot as plt
#import matplotlib
#import matplotlib.dates as mdates
#from matplotlib import dates

### 전처리를 위한 전처리 코드
- A301, B901, C301, C101로 나뉘어 있는 데이터를 통합해서 각 종목별로 나눈다.
- 또한 각 종목별로 모든 날짜가 통합되어있는 데이터를 일별로 나눈다.
- 결과적으로 데이터는 종목별로 파일을 생성해 나뉘고 각 종목 파일에는 날짜별로 파싱되어 데이터가 저장된다.
- 저장 경로: 코도가 있는 폴더에서 DATA 폴더를 생성해 두면 그 안에 각 종목 파일이 생긴다.

In [None]:
# 각 종목 파일 생성
file_paths = glob.glob('./A301/*') 
file_name = []
for file_path in file_paths:
    file_name.append(file_path.split('\\')[-1].split('_')[0])

path = './DATA/'

# 해당 경로에 폴더가 없다면 폴더 생성
for i in file_name:
    if not os.path.exists(f'{path}{i}'):
        os.makedirs(f'{path}{i}')

In [None]:
# 날짜별로 분류, 필요한 컬럼만 뽑아서 저장
instances = ['A301','B901','C301', 'C101']
for instance in instances:
    file_paths = glob.glob(f'./{instance}/' + "*") 
    file_name = []
    for file_path in file_paths:
        file_name.append(file_path.split('\\')[-1])

    for i in file_name: # 종목 이름 순환
        isin = i.split('_')[0]
        if instance == 'A301':
            required_columns = ['server_time','Trading Price','Trading volume','day']
        if instance == 'B901':
            required_columns = ['server_time','Member Number 1 for Ask','Ask_Trading Volume 1','Member Number 1 for Bid','Bid_Trading Volume 1',
            'Member Number 2 for Ask','Ask_Trading Volume 2','Member Number 2 for Bid','Bid_Trading Volume 2','Member Number 3 for Ask',
            'Ask_Trading Volume 3','Member Number 3 for Bid','Bid_Trading Volume 3','Member Number 4 for Ask','Ask_Trading Volume 4','Member Number 4 for Bid',
            'Bid_Trading Volume 4','Member Number 5 for Ask','Ask_Trading Volume 5','Member Number 5 for Bid','Bid_Trading Volume 5','day']
        if instance == 'C301':
            required_columns = ['server_time','Arbitrage Ask Trust Trading Volume','Arbitrage Ask Principal Trading Volume',
            'Arbitrage Bid Trust Trading Volume','Arbitrage Bid Principal Trading Volume','Non-Arbitrage Ask Trust Trading Volume',
            'Non-Arbitrage Ask Principal Trading Volume','Non-Arbitrage Bid Trust Trading Volume','Non-Arbitrage Bid Principal Trading Volume','day']
        if instance == 'C101':
            required_columns = ['Investor Code','Accumulated Ask Trading Volume','Accumulated Bid Trading Volume','day']
        
        DF = pd.read_parquet(f'./{instance}/' + i, columns=required_columns) #실시간 체결
        day = DF['day'].unique()
            
        for j in day: #날짜 순환
            df = DF[DF['day']==j]
            df.drop('day',axis=1, inplace=True)

            df.to_parquet(f'./DATA/{isin}/{instance}_{j}.parquet', index=False, engine='pyarrow')


### 모델 학습을 위한 전처리
- 각 파일에 있는 데이터를 가져와서 전처리를 진행한다.
- 정답 데이터는 학습데이터 가장 오른쪽 컬럼에 생성된다. 
- 정답 데이터 값은 스칼라이고 같은 스칼라 값이 모든 행에 똑같이 들어간다.
- 모두 돌리는데 대략 89분이 걸린다.

In [3]:
# 각 종목명(폴더명 가져오기)
above_file_paths = glob.glob('./DATA/*')
above_file_paths
above_file_names = []
for file_path in above_file_paths:
    above_file_names.append(file_path.split('\\')[-1])

remove = ['KR7022100002','KR7086520004','KR7091990002','KR7247540008'] #비어있음
above_file_names = [x for x in above_file_names if x not in remove]

In [4]:
# 특정 경로에 있는 모든 A301 이름 모두 가져오기
file_pattern = 'A301_*.parquet'
file_paths = glob.glob(f'./DATA/KR7000270009/' + file_pattern) 
# 파일 이름만 추출, 날짜만 추출
A301S_file_name = []
dates = []
for file_path in file_paths:
    A301S_file_name.append(file_path.split('\\')[-1])  # 경로에서 파일 이름만 추출
    _file_name = file_path.split('\\')[-1]
    cleaned_name = _file_name.replace('A301_', '').replace('.parquet', '')
    dates.append(cleaned_name)

# 특정 경로에 있는 모든 B901S 이름 모두 가져오기
file_pattern = 'B901_*.parquet'
file_paths = glob.glob(f'./DATA/KR7000270009/' + file_pattern) 
B901S_file_name = []
for file_path in file_paths:
    B901S_file_name.append(file_path.split('\\')[-1])  # 경로에서 파일 이름만 추출

# 특정 경로에 있는 모든 C301S 이름 모두 가져오기
file_pattern = 'C301_*.parquet'
file_paths = glob.glob(f'./DATA/KR7000270009/' + file_pattern) 
C301S_file_name = []
for file_path in file_paths:
    C301S_file_name.append(file_path.split('\\')[-1])  # 경로에서 파일 이름만 추출

# 특정 경로에 있는 모든 C101S 이름 모두 가져오기
file_pattern = 'C101_*.parquet'
file_paths = glob.glob(f'./DATA/KR7000270009/' + file_pattern) 
C101S_file_name = []
for file_path in file_paths:
    C101S_file_name.append(file_path.split('\\')[-1])  # 경로에서 파일 이름만 추출

# 스케일러 정의
# 시간 컬럼 제외
# 절댓값의 최댓값으로 나눠주어 절댓값 중 가장 큰 값이 1 혹은 -1이 되고, 0값과 음수, 양수가 유지된다.
def scaler(df):
    df[df.columns[1:]] = df[df.columns[1:]]/abs(df[df.columns[1:]]).max()
    df = df.fillna(value=0)
    return df

In [5]:
for above_file_name in above_file_names: #모든 종목 순환
    for A,B,C,Y,date in zip(A301S_file_name, B901S_file_name, C301S_file_name, C101S_file_name, dates): #모든 날짜 순환

        dfa = pd.read_parquet(f'./DATA/{above_file_name}/' + A) #실시간 체결
        dfb = pd.read_parquet(f'./DATA/{above_file_name}/' + B) #상위거래원
        dfc = pd.read_parquet(f'./DATA/{above_file_name}/' + C) #프로그램매매
        dfy = pd.read_parquet(f'./DATA/{above_file_name}/' + Y) #장마감
        
        # 이름 변경
        korcol={
        'server_time':'시간',
        'Trading Price':'체결가격',
        'Trading volume':'거래량',}
        dfa = dfa.rename(columns=korcol)

        korcol={'server_time': '시간',
        'Member Number 1 for Ask': '1단계매도회원번호',
        'Ask_Trading Volume 1': '1단계매도체결수량',
        'Member Number 1 for Bid': '1단계매수회원번호',
        'Bid_Trading Volume 1': '1단계매수체결수량',
        'Member Number 2 for Ask': '2단계매도회원번호',
        'Ask_Trading Volume 2': '2단계매도체결수량',
        'Member Number 2 for Bid': '2단계매수회원번호',
        'Bid_Trading Volume 2': '2단계매수체결수량',
        'Member Number 3 for Ask': '3단계매도회원번호',
        'Ask_Trading Volume 3': '3단계매도체결수량',
        'Member Number 3 for Bid': '3단계매수회원번호',
        'Bid_Trading Volume 3': '3단계매수체결수량',
        'Member Number 4 for Ask': '4단계매도회원번호',
        'Ask_Trading Volume 4': '4단계매도체결수량',
        'Member Number 4 for Bid': '4단계매수회원번호',
        'Bid_Trading Volume 4': '4단계매수체결수량',
        'Member Number 5 for Ask': '5단계매도회원번호',
        'Ask_Trading Volume 5': '5단계매도체결수량',
        'Member Number 5 for Bid': '5단계매수회원번호',
        'Bid_Trading Volume 5': '5단계매수체결수량',}
        dfb = dfb.rename(columns=korcol)

        korcol={'server_time': '시간'}
        dfc = dfc.rename(columns=korcol)

        # 공백이 있으면 타입변화가 되지 않음으로 공백을 문자열 0으로 변경
        dfa.replace('     ', '0', inplace=True)
        dfb.replace('     ', '0', inplace=True)
        dfc.replace('     ', '0', inplace=True)
        dfy.replace('     ', '0', inplace=True)

        # 데이터 타입 변경
        # 시간 제외 int로 변환
        columns = dfa.columns.difference(['시간'])
        dfa[columns] = dfa[columns].astype(int)

        columns = dfb.columns.difference(['시간'])
        dfb[columns] = dfb[columns].astype(int)

        columns = dfc.columns.difference(['시간'])
        dfc[columns] = dfc[columns].astype(int)


        # 프로그램 매매 순매수 수량 구하기
        buy = dfc['Arbitrage Bid Trust Trading Volume']+dfc['Arbitrage Bid Principal Trading Volume']+dfc['Non-Arbitrage Bid Trust Trading Volume']+dfc['Non-Arbitrage Bid Principal Trading Volume']
        sell = dfc['Arbitrage Ask Trust Trading Volume']+dfc['Arbitrage Ask Principal Trading Volume']+dfc['Non-Arbitrage Ask Trust Trading Volume']+dfc['Non-Arbitrage Ask Principal Trading Volume']
        accrued_amount = buy - sell

        occurred_amount = []

        for i in range(len(accrued_amount)):
            if i == 0:    
                occurred_amount.append(accrued_amount[0])
            if i > 0:
                occurred_amount.append(accrued_amount[i] - accrued_amount[i-1])
        dfc['순매수수량'] = occurred_amount

        drop_columns = ['Arbitrage Ask Trust Trading Volume','Arbitrage Ask Principal Trading Volume','Arbitrage Bid Trust Trading Volume','Arbitrage Bid Principal Trading Volume',
            'Non-Arbitrage Ask Trust Trading Volume','Non-Arbitrage Ask Principal Trading Volume','Non-Arbitrage Bid Trust Trading Volume','Non-Arbitrage Bid Principal Trading Volume',]
        dfc = dfc.drop(drop_columns,axis=1)


        # 상위거래원 데이터는 대략 1분마다 한 번씩 올라온다
        # 하나의 증권사가 상위거래원에 뜰 만큼 거래를 하고 추가적으로 거래를 하면 추가 거래한 양이 해당 증권사의 거래수량에 누적되어 나타남
        # 그걸 그 순간의 거래량으로 바꾸는 코드
        sellbuy = ['매도', '매수']
        for z in sellbuy:
            for j in range(1,6):
                member = {}
                current_volume = []
                for i in range(len(dfb)):
                    # 첫 번째 거래는 그냥 추가
                    if i == 0: 
                        _x = dfb[f'{j}단계{z}체결수량'][i]
                    
                    # 회원번호가 이전과 같으면 현재에서 이전 거래량을 뺀다.
                    if i>0 and dfb[f'{j}단계{z}회원번호'][i] == dfb[f'{j}단계{z}회원번호'][i-1]: #&과 and는 다르다. &로 하면 안됨
                        _x = dfb[f'{j}단계{z}체결수량'][i] - dfb[f'{j}단계{z}체결수량'][i-1]
                    
                    # 회원번호가 달라졌는데 이전에 등장하지 않은 회원번호일 때.
                    if i>0 and dfb[f'{j}단계{z}회원번호'][i] != dfb[f'{j}단계{z}회원번호'][i-1] and dfb[f'{j}단계{z}회원번호'][i] not in member.keys():
                        _x = dfb[f'{j}단계{z}체결수량'][i] # 그냥 추가
                        member[dfb[f'{j}단계{z}회원번호'][i-1]] = dfb[f'{j}단계{z}체결수량'][i-1] #딕셔너리에 회원번호, 거래량 메모
                    
                    # 회원번호가 달라졌는데 이전에 등장한 회원번호일 때.
                    if i>0 and dfb[f'{j}단계{z}회원번호'][i] != dfb[f'{j}단계{z}회원번호'][i-1] and dfb[f'{j}단계{z}회원번호'][i] in member.keys():
                        _x = dfb[f'{j}단계{z}체결수량'][i] - member[dfb[f'{j}단계{z}회원번호'][i]] #같은 회원번호가 마지막으로 가졌던 거래량 빼기
                        member[dfb[f'{j}단계{z}회원번호'][i-1]] = dfb[f'{j}단계{z}체결수량'][i-1] #딕셔너리에 회원번호, 거래량 새롭게 갱신

                    current_volume.append(_x)
                dfb[f'{j}단계{z}체결수량'] = current_volume


        # 외국인 회원을 전부 1로 그외는 0으로 바꾼다.
        foreign = [
            29,33,35,36,37,38,40,41,42,43,44,45,54,58,60,61,62,67,74,75,506,513,
            516,519,520,521,523,537,538,539,611,907,908,939,942]

        for i in range(1, 6):
            _foreign_label = []
            for j in dfb[f'{i}단계매도회원번호']:
                if j in foreign:
                    _foreign_label.append(1)
                else:
                    _foreign_label.append(0)
            dfb[f'{i}단계매도회원번호'] = _foreign_label
                    
        for i in range(1, 6):
            _foreign_label = []
            for j in dfb[f'{i}단계매수회원번호']:
                if j in foreign:
                    _foreign_label.append(1)
                else:
                    _foreign_label.append(0)
            dfb[f'{i}단계매수회원번호'] = _foreign_label


        # 상위거래원 외국인 증권사와 기관 컬럼 분리
        dfb_split = pd.DataFrame(dfb['시간'])
        sellbuy = ['매도','매수']
        for j in sellbuy:
            for i in range(1,6):
                dfb_split_x = dfb[dfb[f'{i}단계{j}회원번호'] == 1][['시간',f'{i}단계{j}체결수량']]
                dfb_split_x = dfb_split_x.rename(columns={f'{i}단계{j}체결수량':f'{i}단계{j}체결수량_외국인'})
                dfb_split = pd.merge(dfb_split, dfb_split_x, on='시간', how='left')
                
                dfb_split_x = dfb[dfb[f'{i}단계{j}회원번호'] == 0][['시간',f'{i}단계{j}체결수량']]
                dfb_split_x = dfb_split_x.rename(columns={f'{i}단계{j}체결수량':f'{i}단계{j}체결수량_기관'})
                dfb_split = pd.merge(dfb_split, dfb_split_x, on='시간', how='left')
        dfb_split 


        #모든 데이터를 합치기 위한 데이터 프레임 생성
        # 시작 시간과 끝 시간 설정 
        start_time = datetime.strptime('08:30:00', '%H:%M:%S') #문자열을 시간으로
        end_time = datetime.strptime('18:00:00', '%H:%M:%S')

        # 시간 데이터 타입으로 이루어진 리스트 생성
        time_list = []
        current_time = start_time
        while current_time <= end_time:
            time_list.append(current_time.time().strftime('%H%M%S')) 
            current_time += timedelta(seconds=1)

        dft = pd.DataFrame({'시간':time_list})

        before_decrease = len(dfa)
        dfa['체결가격'] = dfa['체결가격']/100000
        # 데이터 양 줄이기
        dfa = dfa[dfa['체결가격'] * dfa['거래량'] > 10] # 거래량 X 체결가격이 100만원보다 작은 값을 개인으로 보고 제거
            # 계산 값이 너무 크면 계산값이 갑자기 마이너스가 되는 경우가 생김
        after_decrease = len(dfa)
        dfc = dfc[dfc['순매수수량']!=0] # 순매수수량이 0인 행 제거. 10월 5일분  852개


        # 데이터 프레임 합치기
        dft = pd.merge(dft, dfa[['시간','거래량', '체결가격']], on='시간', how='left')
        dft = pd.merge(dft, dfb_split , on='시간', how='left')
        dft = pd.merge(dft, dfc[['시간', '순매수수량']], on='시간', how='left')

        # '시간'컬럼 제외하고 다른 커럼의 값이 모두 Null인 경우 해당 행 제거
        dft.dropna(subset=dft.columns.difference(['시간']), how='all', inplace=True) 

        # Null을 0으로 채움
        dft.fillna(value=0, inplace=True)

        # 스케일링 #시간 컬럼 제외
        dft = scaler(dft)

        # 데이터 프레임 길이를 모두 200000으로 맞춤
        desired_length = 350000

        if len(dft) < desired_length:
            # 부족한 행 수 계산
            num_rows_to_add = desired_length - len(dft)
        else:
            print('길이:',len(dft))
            print('before decrease dfa:',before_decrease)
            print('after decrease dfa:',after_decrease)
            raise ValueError("데이터 프레임의 길이가 350000보다 크다")

        empty_df = pd.DataFrame(0, index=range(num_rows_to_add), columns=dft.columns)

        dft = pd.concat([dft,empty_df], ignore_index=True)

        #시간 컬럼 제거
        dft.drop('시간', axis=1, inplace=True)


        # 가장 마지막에 나오는 데이터 12개만 남기기
        dfy = dfy[-12:]

        # 데이터 타입 변경
        dfy[dfy.columns] = dfy[dfy.columns].astype(int)

        # 순수량 구하기
        dfy['누적매매 체결 순수량'] = dfy['Accumulated Bid Trading Volume'] - dfy['Accumulated Ask Trading Volume']

        # 필요없는 컬럼 제거
        drop_columns = ['Accumulated Ask Trading Volume','Accumulated Bid Trading Volume']
        dfy.drop(drop_columns, axis=1, inplace=True)

        foreign = dfy[dfy['Investor Code'] == 9000.0]['누적매매 체결 순수량'].item()

        dft['y'] = foreign

        # Parquet로 데이터프레임 저장 (경로 지정)
        dft.to_parquet(f'./Preprocessed_data/{above_file_name}_{date}.parquet', index=False, engine='pyarrow')

### 사용하지 않는 코드

In [None]:
# 빈 데이터 # 공휴일 # 같은 날짜의 다른 인터페이스도 비어있다.
#df = pd.read_parquet('./sam_time_c101/samsung_20230301_C101S.parquet')
#df = pd.read_parquet('./sam_time_c101/samsung_20230501_C101S.parquet')
#df = pd.read_parquet('./sam_time_c101/samsung_20230505_C101S.parquet')
#df = pd.read_parquet('./sam_time_c101/samsung_20230529_C101S.parquet')
#df = pd.read_parquet('./sam_time_c101/samsung_20230531_C101S.parquet')
#df = pd.read_parquet('./sam_time_c101/samsung_20230606_C101S.parquet')
#df = pd.read_parquet('./sam_time_c101/samsung_20230815_C101S.parquet')
#df = pd.read_parquet('./sam_time_c101/samsung_20230928_C101S.parquet')
#df = pd.read_parquet('./sam_time_c101/samsung_20230929_C101S.parquet')

In [6]:
# 파일들의 행 길이를 모두 가져온다
'''
length = []
for i in date:
    df = pd.read_parquet(f'./new data/{i}.parquet')
    length.append(len(df))

max(length)
'''

188

In [91]:
# '시간'컬럼을 datetime 타입으로 바꾸기
'''
year = 2023
month = 10
day = 5
dfa['시간'] = pd.to_datetime(dfa['시간'], format='%H%M%S') #시간 데이터 타입으로 바꾸기
dfa['시간'] = dfa['시간'].apply(lambda x: pd.to_datetime(datetime.combine(datetime(year, month, day), x.time()))) #년도, 날짜 집어넣기

dfb['시간'] = pd.to_datetime(dfb['시간'], format='%H%M%S') #시간 데이터 타입으로 바꾸기
dfb['시간'] = dfb['시간'].apply(lambda x: pd.to_datetime(datetime.combine(datetime(year, month, day), x.time()))) #년도, 날짜 집어넣기

dfc['시간'] = pd.to_datetime(dfc['시간'], format='%H%M%S') #시간 데이터 타입으로 바꾸기
dfc['시간'] = dfc['시간'].apply(lambda x: pd.to_datetime(datetime.combine(datetime(year, month, day), x.time()))) #년도, 날짜 집어넣기
'''

In [182]:
# 회원번호가 변하는 시점의 인덱스 저장
# 양수면 처음 등장한 회원번호
# 음수면 한 번 이상 등장했던 회원번호
'''
again = {}
change_idx = []
for i in range(len(dfb)):
    if i>0 and dfb['1단계매도회원번호'][i] != dfb['1단계매도회원번호'][i-1] and dfb['1단계매도회원번호'][i] not in again.keys():
        _x = i
        again[dfb['1단계매도회원번호'][i-1]] = dfb['1단계매도체결수량'][i-1]
        change_idx.append(_x)
    if i>0 and dfb['1단계매도회원번호'][i] != dfb['1단계매도회원번호'][i-1] and dfb['1단계매도회원번호'][i] in again.keys():
        _x = i *-1
        again[dfb['1단계매도회원번호'][i-1]] = dfb['1단계매도체결수량'][i-1]
        change_idx.append(_x)
change_idx
'''

[31, -38, -45, -46, -52, -59, -73, -77, -87, -372]

In [None]:
#모든 데이터를 합치기 위한 데이터 프레임 생성 #Datetime 타입
# 시작 시간과 끝 시간 설정 
'''
start_time = datetime.strptime('08:30:00', '%H:%M:%S') #문자열을 시간으로
end_time = datetime.strptime('18:00:00', '%H:%M:%S')

# 시간 데이터 타입으로 이루어진 리스트 생성
time_list = []
current_time = start_time
while current_time <= end_time:
    time_list.append(datetime.combine(datetime(year, month, day),current_time.time())) #시간을 문자열로
    current_time += timedelta(seconds=1)


dft = pd.DataFrame({'시간':time_list})
dft
'''

In [72]:
# 시간 컬럼을 인덱스로, 인덱스는 컬럼으로
'''
dft['인덱스'] = dft.index
dft.index = dft['시간']
dft = dft.drop('시간',axis=1)
'''

In [None]:
# 시각화
'''
matplotlib.rcParams['font.family'] = 'Malgun Gothic'

data = {'시간': ['140000', '141000']}
time = pd.DataFrame(data)
time['시간'] = pd.to_datetime(time['시간'], format='%H%M%S')
time['시간'] = time['시간'].apply(lambda x: pd.to_datetime(datetime.combine(datetime(year, month, day), x.time())))

start_time1 = time['시간'][0]
end_time1 = time['시간'][1]

#arange1 = (dfa['시간'] > start_time1) & (dfa['시간'] < end_time1)


fig = plt.figure(figsize=(50, 20))

x = dft[(dft.index > start_time1)&(dft.index < end_time1)]['인덱스']
y1 = dft[(dft.index > start_time1)&(dft.index < end_time1)]['거래량']/100
y2 = dft[(dft.index > start_time1)&(dft.index < end_time1)]['체결가격']/1000
y3 = dft[(dft.index > start_time1)&(dft.index < end_time1)]['순매수수량']/100
y4 = dft[(dft.index > start_time1)&(dft.index < end_time1)]['1단계매도체결수량_외국인']/100

plt.plot(x, y1, label='거래량')
plt.plot(x, y2, label='체결가격')
plt.plot(x, y3, label='순매수수량')
plt.plot(x, y4, label='1단계매도체결수량_외국인')

#dateFmt = mdates.DateFormatter('%H%M%S')
#ax.xaxis.set_major_formatter(dateFmt)

plt.fill_between(x, y1) # alpha=0.5 로 투면도 조절
plt.fill_between(x, y2) # alpha=0.5 로 투면도 조절
plt.fill_between(x, y3)
plt.fill_between(x, y4)
plt.legend()

plt.xticks(rotation=45)
#plt.grid()

#plt.xlim(0, 6)  # x축의 범위
#plt.ylim(-100, 1000)  # y축의 범위
'''