# Import

In [1]:
# ======================================================
# 주가 데이터 
import FinanceDataReader as fdr
import pandas_datareader.data as pdr

# ======================================================
# TA-Lib
# import talib

# ======================================================
# basic library
import warnings

import openpyxl
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler

import time
import math
import os
import os.path
import random
import shutil
import glob

# ======================================================
# tqdm
from tqdm import tqdm

# ======================================================
# datetime
from datetime import timedelta, datetime

# ======================================================
# plotting library
import matplotlib.pyplot as plt

import plotly.graph_objects as go
import plotly.subplots as ms
import plotly.express as px

import mplfinance as fplt
from mplfinance.original_flavor import candlestick2_ohlc, volume_overlay

import matplotlib.dates as mdates
from matplotlib.dates import DateFormatter
from matplotlib.dates import MonthLocator
from PIL import Image

import seaborn as sns
import cv2
import csv

# plt.rcParams['figure.dpi'] = 150
from IPython.display import clear_output

In [2]:
kospi = pd.read_excel("./2022년_마지막거래일기준_시총.xlsx") # 2022년도 마지막 거래일 기준

  warn("Workbook contains no default style, apply openpyxl's default")


# 이미지 생성 파라미터

In [3]:
# 이미지 생성용 파라미터
seq_len = 10 # 5일당 1주
window_len = int(seq_len/2)

kospi_tickers = kospi.sort_values("시가총액", ascending=False).iloc[:150, :]# 시총 상위 100개만 활용

train_start_day = '2013-01-01'
train_end_day = '2021-01-07' 

test_start_day = '2021-01-01'
test_end_day = '2022-12-31'

# 이미지 생성 함수

In [4]:
# 이미지 생성용 함수
def plot_candles(pricing, title=None, trend_line = False, volume_bars=False, color_function=None, technicals=None):
    
    def default_color(index, open_price, close_price, low, high):
        return 'b' if open_price[index] > close_price[index] else 'r'
    color_function = color_function or default_color
    technicals = technicals or []
    open_price = pricing['Open']
    close_price = pricing['Close']
    low = pricing['Low']
    high = pricing['High']
    oc_min = pd.concat([open_price, close_price], axis=1).min(axis=1)
    oc_max = pd.concat([open_price, close_price], axis=1).max(axis=1)
    
    def plot_trendline(ax, pricing, linewidth=5):
        x = np.arange(len(pricing))
        y = pricing.values
        z = np.polyfit(x, y, 1)
        p = np.poly1d(z)
        ax.plot(x, p(x), 'g--', linewidth = linewidth)
    
    if volume_bars:
        fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, gridspec_kw={'height_ratios': [3,1]},figsize=(7,7))
    else:
        fig, ax1 = plt.subplots(1, 1)
    if title:
        ax1.set_title(title)
    fig.tight_layout()
    x = np.arange(len(pricing))
    candle_colors = [color_function(i, open_price, close_price, low, high) for i in x]
    candles = ax1.bar(x, oc_max-oc_min, bottom=oc_min, color=candle_colors, linewidth=0)
    lines = ax1.vlines(x , low, high, color=candle_colors, linewidth=1)
    
    # 추세선 생성
    if trend_line:
        plot_trendline(ax1, pricing['Close'])
    
    ax1.xaxis.grid(True)
    ax1.yaxis.grid(True)
    ax1.xaxis.set_tick_params(which='major', length=3.0, direction='in', top='off')
    ax1.set_xticklabels([])
    ax1.set_yticklabels([])
    ax1.xaxis.set_visible(False)
    ax1.yaxis.set_visible(False)
    ax1.axis(False)

    for indicator in technicals:
        ax1.plot(x, indicator)
    
    if volume_bars:
        volume = pricing['Volume']
        volume_scale = None
        scaled_volume = volume
        if volume.max() > 1000000:
            volume_scale = 'M'
            scaled_volume = volume / 1000000
        elif volume.max() > 1000:
            volume_scale = 'K'
            scaled_volume = volume / 1000
        ax2.bar(x, scaled_volume, color=candle_colors)
        volume_title = 'Volume'
        if volume_scale:
            volume_title = 'Volume (%s)' % volume_scale
        #ax2.set_title(volume_title)
        ax2.xaxis.grid(True)
        ax2.set_xticklabels([])
        ax2.set_yticklabels([])
        ax2.set_yticklabels([])
        ax2.set_xticklabels([])
        ax2.axis(False)
    return fig    

In [5]:
def plot_trendline(pricing):
    x = np.arange(len(pricing))
    y = pricing.values
    z = np.polyfit(x, y, 1)
    slope = z[0]
    angle = np.arctan(slope) * 180 / np.pi
    return angle

# T의 캔들 차트와 방향성 확인을 위한 T기간 다음의 T/2일치 캔들 차트 (거래량 차트 X)

In [None]:
route_new = f"./img_without_vol_{seq_len}/train"

# 각 종목의 데이터를 가져와서 종목 별로 이미지를 생성하여 기간 별로 저장 (Train)
for ticker in tqdm(kospi_tickers['종목코드']):
    try:
        data = []
        stock_data = pd.DataFrame(fdr.DataReader(ticker, train_start_day, train_end_day))
        stock_data = stock_data[['Open', 'High', 'Low', 'Close', 'Volume']].astype(float)
        stock_data = stock_data.reset_index()
        stock_data_train = stock_data.copy()

        for i in range(0, len(stock_data_train) - seq_len, window_len):
            # 캔들 차트 10일치
            candlestick_data = stock_data_train.iloc[i : i + seq_len].copy() # 캔들 차트용 seq_len
            candlestick_data = candlestick_data.reset_index().drop('index', axis = 1)

            if not os.path.exists(route_new + f'/{ticker}'):
                os.makedirs(route_new + f'/{ticker}')
            fig = plot_candles(candlestick_data, trend_line=True, volume_bars=False)

            fig.savefig(route_new + f'/{ticker}' + f'/{ticker}_{i}', dpi=72)
            fig.clf()

            # 캔들 차트 10일치 뒤 방향 확인을 위한 5일치
            candlestick_data = stock_data_train.iloc[i + seq_len : i + seq_len + window_len].copy() # 캔들 차트용 seq_len
            candlestick_data = candlestick_data.reset_index().drop('index', axis = 1)

            if not os.path.exists(route_new + f'_moving/{ticker}'):
                os.makedirs(route_new + f'_moving/{ticker}')

            # 추세선 각도 계산
            angle = plot_trendline(candlestick_data['Close'])
            
            if angle > 0:
                trend = "Up"
            else:
                trend = "Down"

            fig = plot_candles(candlestick_data, trend_line=True, volume_bars=False)
            fig.savefig(route_new + f'_moving/{ticker}' + f'/{ticker}_{i}_moving', dpi=72)
            fig.clf()
            
            # 추세선 각도 라벨 저장
            with open(route_new + f'_moving/{ticker}_moving_label.txt' , 'a') as f:
                f.write(route_new + f'_moving/{ticker}' + f'/{ticker}_{i}_moving'+ ',' + trend + "\n")
     
    except:
        continue

clear_output()

In [None]:
route_new = f"./img_without_vol_{seq_len}/test"

# 각 종목의 데이터를 가져와서 종목 별로 이미지를 생성하여 기간 별로 저장 (test)
for ticker in tqdm(kospi_tickers['종목코드']):
    try:
        data = []
        stock_data = pd.DataFrame(fdr.DataReader(ticker, test_start_day, test_end_day))
        stock_data = stock_data[['Open', 'High', 'Low', 'Close', 'Volume']].astype(float)
        stock_data = stock_data.reset_index()
        stock_data_test = stock_data.copy()

        for i in range(0, len(stock_data_test) - seq_len, window_len):
            # 캔들 차트 10일치
            candlestick_data = stock_data_test.iloc[i : i + seq_len].copy() # 캔들 차트용 seq_len
            candlestick_data = candlestick_data.reset_index().drop('index', axis = 1)

            if not os.path.exists(route_new + f'/{ticker}'):
                os.makedirs(route_new + f'/{ticker}')
            fig = plot_candles(candlestick_data, trend_line=True, volume_bars=False)

            fig.savefig(route_new + f'/{ticker}' + f'/{ticker}_{i}', dpi=72)
            fig.clf()

            # 캔들 차트 10일치 뒤 방향 확인을 위한 5일치
            candlestick_data = stock_data_test.iloc[i + seq_len : i + seq_len + window_len].copy() # 캔들 차트용 seq_len
            candlestick_data = candlestick_data.reset_index().drop('index', axis = 1)

            if not os.path.exists(route_new + f'_moving/{ticker}'):
                os.makedirs(route_new + f'_moving/{ticker}')

            # 추세선 각도 계산
            angle = plot_trendline(candlestick_data['Close'])
            
            if angle > 0:
                trend = "Up"
            else:
                trend = "Down"

            fig = plot_candles(candlestick_data, trend_line=True, volume_bars=False)
            fig.savefig(route_new + f'_moving/{ticker}' + f'/{ticker}_{i}_moving', dpi=72)
            fig.clf()
            
            # 추세선 각도 라벨 저장
            with open(route_new + f'_moving/{ticker}_moving_label.txt' , 'a') as f:
                f.write(route_new + f'_moving/{ticker}' + f'/{ticker}_{i}_moving'+ ',' + trend + "\n")
     
    except:
        continue

clear_output()

# T의 캔들 차트와 방향성 확인을 위한 T기간 다음의 T/2일치 캔들 차트 (거래량 차트 O)

In [8]:
route_new = f"./img_with_vol_{seq_len}/train"

# 각 종목의 데이터를 가져와서 종목 별로 이미지를 생성하여 기간 별로 저장 (Train)
for ticker in tqdm(kospi_tickers['종목코드']):
    try:
        data = []
        stock_data = pd.DataFrame(fdr.DataReader(ticker, train_start_day, train_end_day))
        stock_data = stock_data[['Open', 'High', 'Low', 'Close', 'Volume']].astype(float)
        stock_data = stock_data.reset_index()
        stock_data_train = stock_data.copy()

        for i in range(0, len(stock_data_train) - seq_len, window_len):
            # 캔들 차트 10일치
            candlestick_data = stock_data_train.iloc[i : i + seq_len].copy() # 캔들 차트용 seq_len
            candlestick_data = candlestick_data.reset_index().drop('index', axis = 1)

            if not os.path.exists(route_new + f'/{ticker}'):
                os.makedirs(route_new + f'/{ticker}')
            fig = plot_candles(candlestick_data, trend_line=True, volume_bars=False)

            fig.savefig(route_new + f'/{ticker}' + f'/{ticker}_{i}', dpi=72)
            fig.clf()

            # 캔들 차트 10일치 뒤 방향 확인을 위한 5일치
            candlestick_data = stock_data_train.iloc[i + seq_len : i + seq_len + window_len].copy() # 캔들 차트용 seq_len
            candlestick_data = candlestick_data.reset_index().drop('index', axis = 1)

            if not os.path.exists(route_new + f'_moving/{ticker}'):
                os.makedirs(route_new + f'_moving/{ticker}')

            # 추세선 각도 계산
            angle = plot_trendline(candlestick_data['Close'])
            
            if angle > 0:
                trend = "Up"
            else:
                trend = "Down"

            fig = plot_candles(candlestick_data, trend_line=True, volume_bars=True)
            fig.savefig(route_new + f'_moving/{ticker}' + f'/{ticker}_{i}_moving', dpi=72)
            fig.clf()
            
            # 추세선 각도 라벨 저장
            with open(route_new + f'_moving/{ticker}_moving_label.txt' , 'a') as f:
                f.write(route_new + f'_moving/{ticker}' + f'/{ticker}_{i}_moving'+ ',' + trend + "\n")
     
    except:
        continue

clear_output()

In [None]:
route_new = f"./img_with_vol_{seq_len}/test"

# 각 종목의 데이터를 가져와서 종목 별로 이미지를 생성하여 기간 별로 저장 (test)
for ticker in tqdm(kospi_tickers['종목코드']):
    try:
        data = []
        stock_data = pd.DataFrame(fdr.DataReader(ticker, test_start_day, test_end_day))
        stock_data = stock_data[['Open', 'High', 'Low', 'Close', 'Volume']].astype(float)
        stock_data = stock_data.reset_index()
        stock_data_test = stock_data.copy()

        for i in range(0, len(stock_data_test) - seq_len, window_len):
            # 캔들 차트 10일치
            candlestick_data = stock_data_test.iloc[i : i + seq_len].copy() # 캔들 차트용 seq_len
            candlestick_data = candlestick_data.reset_index().drop('index', axis = 1)

            if not os.path.exists(route_new + f'/{ticker}'):
                os.makedirs(route_new + f'/{ticker}')
            fig = plot_candles(candlestick_data, trend_line=True, volume_bars=False)

            fig.savefig(route_new + f'/{ticker}' + f'/{ticker}_{i}', dpi=72)
            fig.clf()

            # 캔들 차트 10일치 뒤 방향 확인을 위한 5일치
            candlestick_data = stock_data_test.iloc[i + seq_len : i + seq_len + window_len].copy() # 캔들 차트용 seq_len
            candlestick_data = candlestick_data.reset_index().drop('index', axis = 1)

            if not os.path.exists(route_new + f'_moving/{ticker}'):
                os.makedirs(route_new + f'_moving/{ticker}')

            # 추세선 각도 계산
            angle = plot_trendline(candlestick_data['Close'])
            
            if angle > 0:
                trend = "Up"
            else:
                trend = "Down"

            fig = plot_candles(candlestick_data, trend_line=True, volume_bars=True)
            fig.savefig(route_new + f'_moving/{ticker}' + f'/{ticker}_{i}_moving', dpi=72)
            fig.clf()
            
            # 추세선 각도 라벨 저장
            with open(route_new + f'_moving/{ticker}_moving_label.txt' , 'a') as f:
                f.write(route_new + f'_moving/{ticker}' + f'/{ticker}_{i}_moving'+ ',' + trend + "\n")
     
    except:
        continue

clear_output()