In [1]:
%load_ext autoreload
%autoreload 2

import logging
import os
import ta
import numpy as np
import pandas as pd

wd = os.path.abspath("__file__").replace("/__file__", "").replace("notebooks", "")
os.chdir(wd)

from datetime import datetime, timedelta, date
from logging.handlers import TimedRotatingFileHandler
from src.utils import get_jinja_yaml_conf, create_db_engine, Clickhouse_client, Postgres_connect
from tqdm.auto import tqdm

now = datetime.now()

In [2]:
conf = get_jinja_yaml_conf('./conf/logging.yml', './conf/data.yml')
tqdm.pandas()

# logger 설정
stream = logging.StreamHandler()
# stream.setLevel(logging.DEBUG)
logger = logging.getLogger('main')
logging.basicConfig(level=eval(conf['logging']['level']),
    format=conf['logging']['format'],
    handlers = [TimedRotatingFileHandler(filename =  conf['logging']['file_name'],
                                when=conf['logging']['when'],
                                interval=conf['logging']['interval'],
                                backupCount=conf['logging']['backupCount']), 
                                   stream]
                )

In [3]:
# Only for notebooks
import re

os.environ['_ts'] = datetime.astimezone(datetime.now()).strftime('%Y-%m-%d %H:%M:%S %z')

with open('./conf/credentials', "r") as file:
    # 각 라인 읽기
    for line in file:
        # 주석(#) 또는 빈 줄은 무시
        if line.strip() == '' or line.startswith('#'):
            continue

        # 각 라인을 '='를 기준으로 key와 value로 분리
        key, value = line.strip().split('=', 1)

        # $ENV 형식의 환경변수가 있을 경우 해당 값을 가져와서 설정
        env_var_pattern = re.compile(r'\$(\w+)')
        matches = env_var_pattern.findall(value)
        for match in matches:
            value = value.replace(f"${match}", os.environ.get(match, "")).replace('"', '')

        # 환경변수로 설정
        os.environ[key] = value

os.environ['full_save'] = 'false'

In [4]:
# DB 설정
engine = create_db_engine(os.environ)
postgres_conn = Postgres_connect(engine)
click_conn = Clickhouse_client(user_name = os.environ['CLICK_USER'], password = os.environ['CLICK_PW'])
full_save = True if click_conn.get_count('stocks', 'daily_market') == 0 else os.environ['full_save'].lower() == 'true'

2024-08-27 09:11:16,852 (utils.py 60) INFO ::: Connect to 172.20.10.3. DB_NAME is stocks
2024-08-27 09:11:16,858 (utils.py 396) INFO ::: sql execute: SELECT COUNT(*) FROM stocks.daily_market FINAL


In [5]:
# 마켓 정보 가져오기
market_info = postgres_conn.get_data(conf['idx_market']['database'], conf['idx_market']['table'], 
                                columns = '*',
                                  orderby_cols = ['기준일자', '계열구분', '지수명']).rename(columns = {'대비': '전일대비', '등락률': '수익률', '상장시가총액': '시가총액'})
    

market_dates = market_info['기준일자'].sort_values().unique()

# 처리할 날짜 설정
latest_market_date = pd.to_datetime(
                        click_conn.get_maxmin_col(conf['daily_market']['database'], conf['daily_market']['table'], 
                            column = '기준일자', is_min = False)[0]
    ).date()


upload_date = market_dates[0] if full_save else latest_market_date + timedelta(days = 1)
# if upload_date > market_dates[-1]:
#     logger.info("Latest market information is uploaded already.")
#     return 

start_idx = np.where(market_dates >= upload_date)[0][0] - max(conf['agg_days'])
start_date = market_dates[0] if start_idx < 0 else market_dates[start_idx]

logger.info(f"Upload from date: {upload_date}. For preprocessing, load date from {start_date}.")



2024-08-27 09:11:22,915 (utils.py 396) INFO ::: sql execute: SELECT MAX(`기준일자`) 
                                FROM stocks.daily_market 


IndexError: index 0 is out of bounds for axis 0 with size 0

### full_save 관련

In [18]:
# 이평선 관련
logger.info("Calculates market additional indicator.")
group_info = market_info.groupby(['계열구분', '지수명'])
for day in conf['agg_days']:
    logger.info(f"aggregating process of {day}days starts!")
    ## 종가
    # 이평
    market_info[f'종가_이평{day}일'] = group_info['종가'].rolling(window=day).mean().reset_index(level = [0, 1]).iloc[:, -1]
    # 괴리율
    market_info[f'종가_이평{day}일_괴리율'] = market_info['종가'] / market_info[f'종가_이평{day}일'] * 100

    ## 거래량
    # 이평
    market_info[f'거래량_이평{day}일'] = group_info['거래량'].rolling(window=day).mean().reset_index(level = [0, 1]).iloc[:, -1]
    # 합
    market_info[f'거래량_합{day}일'] = group_info['거래량'].rolling(window=day).sum().reset_index(level = [0, 1]).iloc[:, -1]
    
    ## 시총이평
    market_info[f'시총_이평{day}일'] = group_info['시가총액'].rolling(window=day).mean().reset_index(level = [0, 1]).iloc[:, -1]
    # 거래대금이평
    market_info[f'거래대금_이평{day}일'] = group_info['거래대금'].rolling(window=day).mean().reset_index(level = [0, 1]).iloc[:, -1]
    # 거래대금 합
    market_info[f'거래대금_합{day}일'] = group_info['거래대금'].rolling(window=day).sum().reset_index(level = [0, 1]).iloc[:, -1]
    
    ## 수익률
    # 이평
    market_info[f"수익률{day}일"] = group_info['수익률'].rolling(day).progress_apply(lambda x: ((1+x/100).prod() - 1) * 100, raw = True).reset_index(level = [0, 1]).iloc[:, -1]
    # 변동성
    market_info[f'수익률_변동성{day}일'] = group_info['수익률'].rolling(window=day).std().reset_index(level = [0, 1]).iloc[:, -1]
    # 이평 연율화
    market_info[f'수익률{day}일_연율화'] = ((1 + market_info[f"수익률{day}일"] / 100) ** (240 / 720) - 1) * 100
    # 변동성 연율화
    market_info[f'수익률_변동성{day}일_연율화'] = market_info[f'수익률_변동성{day}일'] / np.sqrt(240)
    # sr 연율화
    market_info[f'SR_{day}일_연율화'] =  market_info[f'수익률{day}일_연율화'] / market_info[f'수익률_변동성{day}일_연율화']

# n일 최고/최저가
for day in conf['high_low_days']:
    market_info[f'최고가{day}일'] = group_info['고가'].rolling(window=day).max().reset_index(level = [0, 1]).iloc[:, -1]
    market_info[f'최저가{day}일'] = group_info['저가'].rolling(window=day).min().reset_index(level = [0, 1]).iloc[:, -1]
    # 괴리율
    market_info[f'최고가{day}일_괴리율'] = market_info['종가'] / market_info[f'최고가{day}일'] * 100
    market_info[f'최저가{day}일_괴리율'] = market_info['종가'] / market_info[f'최저가{day}일'] * 100


# 거래량
market_info[f'거래량1일_증가율'] = market_info.apply(lambda x: np.nan if x['거래량_이평20일'] == 0 else x['거래량'] / x['거래량_이평20일'], axis = 1)
market_info[f'거래량5일_증가율'] = market_info.apply(lambda x: np.nan if x['거래량_이평20일'] == 0 else x['거래량_이평5일'] / x['거래량_이평20일'], axis = 1)

# 거래대금시총비율
market_info[f'거래대금_시총비율_1일'] = market_info[f'거래대금'] / market_info[f'시가총액'] * 100
market_info[f'거래대금_시총비율_5일'] = market_info[f'거래대금_이평5일'] / market_info[f'시총_이평5일'] * 100

# 거래대금증가율
market_info[f'거래대금_전일대비_증가율'] = market_info[f'거래대금'] / group_info['거래대금'].shift(1) * 100
market_info[f'거래대금_5일이평대비_증가율'] = market_info[f'거래대금'] / market_info[f'거래대금_이평5일'] * 100

# 기술적 지표 생성
market_info['MACD'] = group_info.progress_apply(lambda x: ta.trend.macd(close = x['종가'], window_slow = 26, window_fast = 12)).reset_index(level = [0, 1]).iloc[:, -1]
market_info['MACD_signal'] = group_info.progress_apply(lambda x: ta.trend.macd_signal(close = x['종가'], window_slow = 26, window_fast = 12, window_sign = 9)).reset_index(level = [0, 1]).iloc[:, -1]
market_info['MACD_diff_signal'] = market_info['MACD'] - market_info['MACD_signal']
market_info['RSI'] = group_info.progress_apply(lambda x: ta.momentum.rsi(close = x['종가'], window = 14)).reset_index(level = [0, 1]).iloc[:, -1]

# Stochastic
market_info['fastK'] = group_info.progress_apply(lambda x: ta.momentum.stoch(high = x['고가'], low = x['저가'], close = x['종가'], window=14, smooth_window=1)).reset_index(drop = True)
market_info['fastD'] = market_info.groupby(['계열구분', '지수명'])['fastK'].rolling(window=3).mean().reset_index(level = [0, 1]).iloc[:, -1]
market_info['slowK'] = market_info['fastD'].copy()
market_info['slowD'] = market_info.groupby(['계열구분', '지수명'])['slowK'].rolling(window=3).mean().reset_index(level = [0, 1]).iloc[:, -1]


# 볼린저밴드
market_info['mavg'] = group_info.progress_apply(lambda x: ta.volatility.bollinger_mavg(close = x['종가'], window = 20)).reset_index(level = [0, 1]).iloc[:, -1]
market_info['up'] = group_info.progress_apply(lambda x: ta.volatility.bollinger_hband(close = x['종가'], window = 20, window_dev = 2)).reset_index(level = [0, 1]).iloc[:, -1]
market_info['dn'] = group_info.progress_apply(lambda x: ta.volatility.bollinger_lband(close = x['종가'], window = 20, window_dev = 2)).reset_index(level = [0, 1]).iloc[:, -1]

# 골든 데드
for cross_name, cross_cols in conf['tech_signal'].items():
    market_info[cross_name] = 0
    left = market_info.groupby(['계열구분', '지수명'])[cross_cols[0]]
    right = market_info.groupby(['계열구분', '지수명'])[cross_cols[1]]
    market_info.loc[(left.shift(1) <= right.shift(1)) & (left.shift(0) > right.shift(0)), cross_name] = 1 
    market_info.loc[(left.shift(1) >= right.shift(1)) & (left.shift(0) < right.shift(0)), cross_name] = -1
    market_info.loc[market_info[cross_cols[0]].isnull() | market_info[cross_cols[1]].isnull(), cross_name] = np.nan

market_info['Bband_Cross'] = 0
left = market_info.groupby(['계열구분', '지수명'])['종가']
right = market_info.groupby(['계열구분', '지수명'])['up']
market_info.loc[(left.shift(1) <= right.shift(1)) & (left.shift(0) > right.shift(0)) & (market_info['up'] > market_info['mavg'] * 1.15), 'Bband_Cross'] = 1
right = market_info.groupby(['계열구분', '지수명'])['dn']
market_info.loc[(left.shift(1) >= right.shift(1)) & (left.shift(0) < right.shift(0)) & (market_info['dn'] > market_info['mavg'] * 1.15), 'Bband_Cross'] = -1
market_info.loc[market_info['up'].isnull() | market_info['dn'].isnull() | market_info['mavg'].isnull(), 'Bband_Cross'] = np.nan

# 그외 전처리
market_info['_ts'] = os.environ['_ts']
market_info = market_info[market_info['기준일자'] >= upload_date].copy()

2024-08-21 22:51:13,027 (3291581262.py 2) INFO ::: Calculates market additional indicator.
2024-08-21 22:51:13,028 (3291581262.py 5) INFO ::: aggregating process of 5days starts!


0it [00:00, ?it/s]

2024-08-21 22:51:16,363 (3291581262.py 5) INFO ::: aggregating process of 20days starts!


0it [00:00, ?it/s]

2024-08-21 22:51:19,695 (3291581262.py 5) INFO ::: aggregating process of 60days starts!


0it [00:00, ?it/s]

2024-08-21 22:51:23,133 (3291581262.py 5) INFO ::: aggregating process of 120days starts!


0it [00:00, ?it/s]

2024-08-21 22:51:26,452 (3291581262.py 5) INFO ::: aggregating process of 240days starts!


0it [00:00, ?it/s]

2024-08-21 22:51:29,758 (3291581262.py 5) INFO ::: aggregating process of 720days starts!


0it [00:00, ?it/s]

  0%|          | 0/351 [00:00<?, ?it/s]

  0%|          | 0/351 [00:00<?, ?it/s]

  0%|          | 0/351 [00:00<?, ?it/s]

  0%|          | 0/351 [00:00<?, ?it/s]

  0%|          | 0/351 [00:00<?, ?it/s]

  0%|          | 0/351 [00:00<?, ?it/s]

  0%|          | 0/351 [00:00<?, ?it/s]

  market_info['Bband_Cross'] = 0


Unnamed: 0,_ts,기준일자,계열구분,지수명,종가,전일대비,수익률,시가,고가,저가,...,fastD,slowK,slowD,mavg,up,dn,MACD_Cross,Stoch_Fast_Cross,Stoch_Slow_Cross,Bband_Cross
0,2024-08-21 22:51:03 +0900,2010-01-04,KOSDAQ,IT H/W,504.75,22.02,4.56,487.15,504.75,487.15,...,,,,,,,,,,
1,2024-08-21 22:51:03 +0900,2010-01-04,KOSDAQ,IT S/W & SVC,1652.15,73.78,4.67,1596.00,1652.15,1595.77,...,,,,,,,,,,
2,2024-08-21 22:51:03 +0900,2010-01-04,KOSDAQ,IT부품,908.40,43.10,4.98,871.89,908.40,871.59,...,,,,,,,,,,
3,2024-08-21 22:51:03 +0900,2010-01-04,KOSDAQ,건설,94.41,-0.27,-0.29,95.60,95.64,94.16,...,,,,,,,,,,
4,2024-08-21 22:51:03 +0900,2010-01-04,KOSDAQ,금속,4010.20,36.86,0.93,3965.25,4011.09,3959.84,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
729848,2024-08-21 22:51:03 +0900,2024-08-20,전략지수,코스피200 롱 100% 코스닥150 숏 50% 선물지수,1326.75,8.66,0.66,1331.88,1334.33,1324.22,...,61.024787,61.024787,59.839971,1318.9910,1381.165496,1256.816504,0.0,-1.0,0.0,0.0
729849,2024-08-21 22:51:03 +0900,2024-08-20,전략지수,코스피200 코스닥150 고정비중 3:7 지수,1492.82,14.64,0.99,1494.18,1496.84,1485.88,...,59.594163,59.594163,54.313824,1471.4870,1580.361903,1362.612097,0.0,0.0,0.0,0.0
729850,2024-08-21 22:51:03 +0900,2024-08-20,전략지수,코스피200 코스닥150 고정비중 5:5 레버리지 지수,1038.81,19.84,1.95,1040.83,1044.00,1030.12,...,55.297341,55.297341,49.037547,1022.5155,1185.482197,859.548803,0.0,0.0,0.0,0.0
729851,2024-08-21 22:51:03 +0900,2024-08-20,전략지수,코스피200 코스닥150 고정비중 5:5 지수,1577.20,15.28,0.98,1578.75,1581.18,1570.54,...,68.636100,68.636100,59.079655,1555.0580,1670.932416,1439.183584,0.0,0.0,0.0,0.0


In [19]:
# 데이터 업로드
click_conn.df_insert(market_info, conf['daily_market']['database'], conf['daily_market']['table'])

2024-08-21 22:51:49,920 (utils.py 326) INFO ::: df insert to db starts!, schema: stocks, table: daily_market.
2024-08-21 22:51:49,920 (utils.py 284) INFO ::: data processing is started!
2024-08-21 22:51:49,920 (utils.py 396) INFO ::: sql execute: DESCRIBE TABLE stocks.daily_market
2024-08-21 22:55:34,946 (utils.py 396) INFO ::: sql execute: SELECT sorting_key FROM system.tables WHERE name = 'daily_market' AND database = 'stocks'
2024-08-21 22:55:34,949 (utils.py 305) INFO ::: data processing is finished.
2024-08-21 22:56:09,448 (utils.py 337) INFO ::: data insert is processing (729853/729853).
2024-08-21 22:56:09,449 (utils.py 339) INFO ::: data insert is finished.


729853