In [1]:
import pandas as pd
import numpy as np

import packtest

In [2]:
# 데이터를 불러옵니다
raw_data = pd.read_csv("Data-QuantTrading.csv", index_col="KST")
raw_data = raw_data.drop("Unnamed: 0", axis=1)
raw_data.index = pd.to_datetime(raw_data.index)

# 추가적으로 하나하나 제거해주어야 하는 문제가 있는 데이터들을 찾습니다
# 문제 있는 데이터들의 경우 15:30 종가가 존재하지 않습니다
tmp_data = raw_data.resample('d').last()
for time in raw_data.index:
    if (time.hour == 15) and (time.minute//10 == 2):
        tmp_data = tmp_data.drop(str(time.year)+"-"+str(time.month)+"-"+str(time.day))


for time in tmp_data.index:
    day = time.weekday() 
    if day == 5: # 토요일 제거
        tmp_data = tmp_data.drop(str(time.year)+"-"+str(time.month)+"-"+str(time.day))
    elif day == 6: # 일요일 제거
        tmp_data = tmp_data.drop(str(time.year)+"-"+str(time.month)+"-"+str(time.day))

problematic_days = tmp_data.index

# 토요일과 일요일 및
# 위에서 찾은 문제가 있는 데이터들을 모두 drop 합니다.
# 단순 반복문 + 조건문을 사용하기 때문에 꽤 시간이 걸립니다
pp_data = raw_data.copy()

for idx in pp_data.index:
    day = idx.weekday()
    if day == 5:
        pp_data = pp_data.drop(idx, axis=0)
    elif day == 6:
        pp_data = pp_data.drop(idx, axis=0)

    for problem in problematic_days:
        if (idx.year) == problem.year and (idx.month == problem.month) and (idx.day == problem.day):
             pp_data = pp_data.drop(idx, axis=0)

In [None]:
vwap = pd.DataFrame(pp_data["vwap"])
open = pd.DataFrame(pp_data["open"])
high = pd.DataFrame(pp_data["high"])
low = pd.DataFrame(pp_data["low"])
close = pd.DataFrame(pp_data["close"])
volume = pd.DataFrame(pp_data["volume"])
ticks = pd.DataFrame(pp_data["ticks"])

bid = pd.DataFrame(pp_data["bid"])
bid_size = pd.DataFrame(pp_data["bid_size"])
ask = pd.DataFrame(pp_data["ask"])
ask_size = pd.DataFrame(pp_data["ask_size"])
bid_ask_premium = pd.DataFrame(pp_data["ask"] - pp_data['bid'])

daily_close = []
for time in pp_data.index:
    if (time.hour == 15) and (time.minute//10 == 2):
        daily_close.append(pp_data.loc[time,"close"])
daily_close = pd.DataFrame(daily_close, index=close.resample('d').last().dropna(axis=0).index, columns=["daily_close"])

daily_open = []
for time in pp_data.index:
    if (time.hour == 9) and (time.minute//10 == 0):
        daily_open.append(pp_data.loc[time,"open"])
daily_open = pd.DataFrame(daily_open, index=close.resample('d').last().dropna(axis=0).index, columns=["daily_open"])

daily_high = []
for time in daily_close.index:
    start = "{year}-{month}-{day} 09:00".format(year=time.year, month=time.month, day=time.day)
    end = "{year}-{month}-{day} 15:30".format(year=time.year, month=time.month, day=time.day)
    daily_high.append(pp_data.loc[start:end]["high"].max())
daily_high = pd.DataFrame(daily_high, index=daily_close.index, columns=["daily_high"])

daily_low = []
for time in daily_close.index:
    start = "{year}-{month}-{day} 09:00".format(year=time.year, month=time.month, day=time.day)
    end = "{year}-{month}-{day} 15:30".format(year=time.year, month=time.month, day=time.day)
    daily_low.append(pp_data.loc[start:end]["low"].min())
daily_low = pd.DataFrame(daily_low, index=daily_close.index, columns=["daily_low"])

In [None]:
class MyTest(packtest.Packtesting):
    
    def create_packet(self, current_time):
        packet = {}
        packet["close"] = self.pack_get_data_expanding(current_time, "close")
        packet["date"] = self.pack_get_now(current_time)
        return packet
    
    @staticmethod
    def create_signal(packet):
        ma20 = np.array([np.mean(packet["close"][:,stock]) for stock in range(packet["close"].shape[1])])
        signal = np.where(packet["close"][-1,:]>ma20, 100, -1)
        return signal

In [None]:
bid.columns = ["close"]
ask.columns = ["close"]
bid_size.columns = ["close"]
ask_size.columns = ["close"]
packtest = MyTest(init_cash=100000000, bid_price=bid, ask_price=ask, bid_size=bid_size, ask_size=ask_size)
packtest.post_data("close", close)