In [13]:
from pykrx import stock
import sys
import pandas as pd


def labellingD0(d0) -> str:
    '''
    D0 시점의 각 봉에 대한 라벨링 25가지
    '''
    openP = d0['open']
    highP = d0['high']
    lowP = d0['low']
    closeP = d0['close']

    # 장대 양봉
    if closeP >= 1.1*openP:
        if closeP < highP:
            if openP > lowP:
                return "P15"
            if highP - 2*closeP + openP >= 0:
                return "P14"
            return "P13"
        if openP > lowP:
            return "P11"
        return "P10"

    # 짧은 양봉
    elif closeP >= 1.005*openP:
        if closeP < highP:
            if openP > lowP:
                return "P05"
            if highP - 3*closeP + 2*openP >= 0:
                return "P04"
            return "P03"
        if openP > lowP:
            if 2*highP - 3*openP + lowP >= 0:
                return "P01"
            return "P02"
        return "P00"

    # 보합
    elif closeP >= openP:
        end_min = closeP - lowP
        Max_end = highP - closeP
        if end_min > Max_end*3:
            return "K01"
        elif end_min*3 < Max_end:
            return "K02"
        return "K00"

    # 짧은 음봉
    elif closeP >= 0.9*openP:
        if openP < highP:
            if closeP > lowP:
                return "M05"
            if highP - 3*openP + 2*closeP < 0:
                return "M03"
            return "M04"
        if closeP > lowP:
            if 3*closeP - lowP - 2*openP < 0:
                return "M01"
            return "M02"
        return "M00"

    # 장대 음봉
    else:
        if closeP > lowP:
            if openP < highP:
                return "M15"
            return "M11"
        if openP < highP:
            if highP - 2*openP + closeP >= 0:
                return "M14"
            return "M13"
        return "M10"


def labellingD1(d10) -> str:
    '''
    D1 5가지 x D0 25가지
    '''
    temp = d10.iloc[0]
    openP = temp['open']
    # highP = temp['high']
    # lowP = temp['low']
    closeP = temp['close']

    # 장대 양봉
    if closeP >= 1.1*openP:
        res = "P10"

    # 짧은 양봉
    elif closeP >= 1.005*openP:
        res = "P00"

    # 보합
    elif closeP >= openP:
        res = "K00"

    # 짧은 음봉
    elif closeP >= 0.9*openP:
        res = "M00"

    # 장대 음봉
    else:
        res = "M10"

    return res + labellingD0(d10.iloc[1])


def labellingD2(d210):
    '''
    D2D1 12가지 x D0 25가지
    '''
    d2_openP = d210.iloc[0]['open']
    d2_closeP = d210.iloc[0]['close']
    d1_openP = d210.iloc[1]['open']
    d1_closeP = d210.iloc[1]['close']

    d21_max = max(d2_openP, d2_closeP, d1_openP, d1_closeP)
    d21_avg = (d21_max + min(d2_openP, d2_closeP, d1_openP, d1_closeP))/2
    if d21_max/d21_avg <= 1.005:
        res = "S04"
    elif d2_openP <= d2_closeP:  # D2 양봉
        if d1_openP <= d1_closeP:
            res = "P10"
        elif d2_openP >= d1_closeP:
            res = "S07"
        elif d2_closeP > d1_openP:
            res = "S06"
        elif d2_closeP >= d1_closeP:
            res = "S03"
        else:
            res = "S05"
    elif d2_openP >= d2_closeP:  # D2 음봉
        if d1_openP >= d1_closeP:
            res = "M10"
        elif d2_closeP < d1_openP:
            res = "S01"
        elif d2_openP <= d1_closeP:
            res = "S02"
        elif d2_closeP >= d1_closeP:
            res = "S08"
        else:
            res = "S00"
    else:
        res = "S09"

    return res + labellingD0(d210.iloc[2])


def write_stockData_to_csv():
    '''
    패터닝된 pd.DataFrame을 리턴하면서 csv 파일로 저장
    '''
    sys.stdout.write("[Labelling Test]\n불러올 기업명을 입력하시오: ")
    comName = sys.stdin.readline().rstrip()
    stockCode = pd.read_csv("resources/stockcode.csv")

    try:
        stockCode = str(int(stockCode[stockCode['회사명'] == comName]['종목코드']))
    except:
        sys.stdout.write("유효하지 않은 입력입니다. \n")
        return -1

    return get_stockData_using_stockCode(stockCode)


def get_stockData_using_stockCode(stockCode, today):
    print("Loading stock data from KRX...")
    stockCode = str(stockCode)
    stockCode = "0"*(6-len(stockCode)) + stockCode

    stockData = stock.get_market_ohlcv_by_date("20120101", today, stockCode)
    comName = stockData.columns.name
    stockData.columns = pd.Index(
        ["open", "high", "low", "close", "volume"], name=comName)

    stockData['pattern1'] = None
    for i in range(len(stockData)):
        stockData['pattern1'].values[i] = labellingD0(stockData.iloc[i])

    stockData['pattern2'] = None
    for i in range(1, len(stockData)):
        stockData['pattern2'].values[i] = labellingD1(stockData.iloc[i-1:i+1])

    stockData['pattern3'] = None
    for i in range(2, len(stockData)):
        stockData['pattern3'].values[i] = labellingD2(stockData.iloc[i-2:i+1])

    # stockData.to_csv(f"resources/{stockData.columns.name}.csv")
    print(f"{comName}({stockCode})의 주식 데이터를 가져오는 데에 성공했습니다 (기간: 20120101~{today})")
    return stockData, stockCode


In [27]:
import pandas as pd
from mlxtend.preprocessing import TransactionEncoder
from mlxtend.frequent_patterns import apriori
from random import sample
#from modules.pattern_labelling import get_stockData_using_stockCode


def stockpred_apriori(stock_code=None, weight=0.05, min_P_score=50, N_items=3, last_date=None):
    if last_date == None:
        # today에 현재 시간을 불러옵니다.
        today = pd.Timestamp.now()
        today = str(today.year)+str(today.month)+str(today.day)
    else:
        today = str(last_date)
    stock_pred_df = pd.DataFrame({'P_score': [], 'predict' : [],'real': [],'stock_name' : [], 'stock_code': []})
    stock_pred_df.columns.name = today
    '''
    n-items serial association rule analysis를 통해 다음 주식 패턴을 예측합니다.

    [parameter]
    - stock_code    종목코드를 int 또는 list 형태로 입력, 미입력 시 무작위로 샘플링한 10개의 종목으로 테스트합니다.
    - weight        시간에 따른 가중치를 입력합니다. 미입력 시 0.05로 적용됩니다.
    - min_P_score   입력된 값(0 ~ 100) 미만의 P_score를 보이는 결과는 보여주지 않습니다. 미입력 시 50으로 적용됩니다.
    - N_items       연관 규칙 시 장바구니에 묶을 item 수를 정합니다. 미입력 시 3개씩 묶습니다.
    - last_date      예측하고 싶은 마지막 시점을 "YYYYMMDD" 형식으로 입력합니다. 미입력 시 오늘 날짜로 입력됩니다.
    '''
    if stock_code == None:
        stock_list = pd.read_csv('../resources/stockcode.csv')
        stock_sample = sample(list(stock_list['종목코드']), 5)
    elif stock_code == "all":
        stock_list = pd.read_csv('../resources/stockcode.csv')
        stock_sample = stock_list['종목코드']
    elif type(stock_code) == int:
        stock_sample = [stock_code]
    elif type(stock_code) == list:
        stock_sample = stock_code.copy()
    else:
        raise TypeError("stock_code에 잘못된 parameter type이 입력되었습니다.")

    if N_items > 9 or N_items < 2:
        raise TypeError("N_items에는 2 이상 9 이하의 정수가 입력되어야 합니다.")

    for stock_code in stock_sample:
        stockData, stock_code = get_stockData_using_stockCode(stock_code, today=today)

        bong_list = []
        for i in range(0, len(stockData) - N_items):
            pattern_list = []
            for n in range(N_items):
                pattern_list.append(f"0{n+1}" + stockData["pattern1"][i+n])
            bong_list.append(pattern_list)

        D = []
        for i in range(1, N_items):
            D.append(f"0{i}" + stockData['pattern1'][-N_items-1+i])

        A_pr = {'P_score': 0, 'real': stockData['pattern1'][-1], 'stock_name': stockData.columns.name, 'stock_code': stock_code}
        A = {'P0': 0, 'P1': 0, 'M0': 0, 'M1': 0, 'K0': 0}
        print("연관 분석 시작... ", end="")
        for bong_order in range(len(bong_list)):
            date_weight = bong_order//10
            current_bong = bong_list[bong_order].copy()
            next_bong = current_bong.pop()
            if D == current_bong:
                next_bong = next_bong[2:4]
                if "P0" == next_bong:
                    A["P0"] += 1+(weight*date_weight)
                elif "P1" == next_bong:
                    A["P1"] += 1+(weight*date_weight)
                elif "M0" == next_bong:
                    A["M0"] += 1+(weight*date_weight)
                elif "M1" == next_bong:
                    A["M1"] += 1+(weight*date_weight)
                elif "K0" == next_bong:
                    A["K0"] += 1+(weight*date_weight)
        print("완료")
        if (A['P0']+A['P1']+A['M0']+A['M1']+A['K0']) == 0:
            print("[Failed] 연관 규칙이 발견되지 않았습니다.")
        else:
            A_pr['P_score'] = round(((A['P0']+A['P1']) / (A['P0']+A['P1']+A['M0']+A['M1']+A['K0']))*100, 2)
            #if A['P_score'] >= min_P_score:
            print(A_pr)
            print(A)
        print()
        predict = max(A.keys(), key = (lambda k : A[k]))
        A_pred_df = pd.DataFrame({'P_score': [A_pr["P_score"]], 'predict' : [predict],'real':[ A_pr["real"][0:2]],'stock_name' : [A_pr['stock_name']], 'stock_code': [A_pr['stock_code']]})
        #print(A_pred_df)
        stock_pred_df = stock_pred_df.append(A_pred_df)
        #print(stock_pred_df)
    stock_pred_df = stock_pred_df.set_index('stock_name')
    return stock_pred_df


In [30]:
from sklearn.metrics import confusion_matrix, classification_report
sample_df = stockpred_apriori(stock_code="all", weight=0.05, min_P_score=50, N_items=3, last_date="20201022")
print(sample_df)
print(confusion_matrix(sample_df["predict"], sample_df["real"]))
print(classification_report(sample_df["predict"], sample_df["real"]))

[[3 0]
 [2 0]]
              precision    recall  f1-score   support

          M0       0.60      1.00      0.75         3
          P0       0.00      0.00      0.00         2

    accuracy                           0.60         5
   macro avg       0.30      0.50      0.37         5
weighted avg       0.36      0.60      0.45         5



  _warn_prf(average, modifier, msg_start, len(result))
