In [None]:
import torch
import math
import torch.nn as nn
import numpy as np
import pandas as pd
from glob import glob
from tqdm import tqdm
import random
from copy import deepcopy

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
BASE_DATA_DIR = 'stock_data'
NATION = 'kr' # 'kr', 'us'
MARKET = 'kosdaq' # 'kospi', 'kosdaq', 'nasdaq', 'nyse'
DATA_DIR = f'{BASE_DATA_DIR}/refined_data/{NATION}/{MARKET}/*.csv'
SECTOR_DIR = f'{BASE_DATA_DIR}/{NATION}/{MARKET}/modified_{MARKET}_top500_sector_mcap.csv'
SEED=42

In [None]:
item_list = sorted(glob(DATA_DIR))
items = []
for idx, item_dir in enumerate(tqdm(item_list)):
    item = pd.read_csv(item_dir)
    item = item[item['Date'] > "2022-12-31"]
    items.append(item.values)
    
items = np.array(items)
date_list = items[0, :, 0]
values = items[:, :, 1:].astype(np.float32)

In [None]:
class StockTradingEnv:
    def __init__(self, data, initial_investment=10000000):
        self.stock_price_history = data  # shape: (n_step, n_stock)
        self.n_stock, self.n_step, self.n_feature = self.stock_price_history.shape
        self.window_size = 10
        self.initial_investment = initial_investment

        self.current_step = None
        self.stock_owned = None
        self.cash_in_hand = None

        self.state_dim = (
            self.n_stock * self.window_size * self.n_feature + 1 + self.n_stock
        )
        self.reset()

    def reset(self):
        self.current_step = 0
        self.stock_owned = np.zeros(self.n_stock)
        self.cash_in_hand = self.initial_investment
        self.stock_price = self.stock_price_history[
            :, self.current_step : self.current_step + self.window_size, 3
        ]
        self.stock_state = self.stock_price_history[
            :, self.current_step : self.current_step + self.window_size, :
        ]
        return self._get_obs()

    def step(self, action, amount):
        prev_total_value = self._get_total_value()

        self.current_step += 1
        if self.current_step + self.window_size > self.n_step:
            done = True
            self.stock_price = self.stock_price_history[
                :, -self.window_size :, 3
            ]  # Handle edge case
            self.stock_state = self.stock_price_history[:, -self.window_size :, :]
        else:
            self.stock_price = self.stock_price_history[
                :, self.current_step : self.current_step + self.window_size, 3
            ]
            self.stock_state = self.stock_price_history[
                :, self.current_step : self.current_step + self.window_size, :
            ]
            done = False

        self._trade(action, amount)
        current_total_value = self._get_total_value()

        reward = current_total_value - prev_total_value
        info = {"current_total_value": current_total_value}
        return self._get_obs(), reward, done, info

    def _get_obs(self):
        obs = dict()
        obs["state"] = self.stock_state
        obs["owned"] = self.stock_owned
        obs["cash"] = self.cash_in_hand
        return obs

    def _get_total_value(self):
        return self.stock_owned.dot(self.stock_price[:, -1]) + self.cash_in_hand

    def _trade(self, action, amount):
        # 행동 해석 및 거래 실행
        if self.cash_in_hand <= 0:
            pass
        else:
            for i, (action_type, num_stock) in enumerate(zip(action, amount)):
                if action_type == 0:  # 매도
                    if self.stock_owned[i] >= num_stock and num_stock > 0:
                        self.stock_owned[i] -= num_stock
                        self.cash_in_hand += self.stock_price[i, -1] * num_stock

            for i, (action_type, num_stock) in enumerate(zip(action, amount)):
                if action_type == 2:  # 매수
                    total_cost = self.stock_price[i, -1] * num_stock
                    if (
                        self.cash_in_hand >= total_cost
                        and num_stock > 0
                        and total_cost > 0
                    ):
                        self.stock_owned[i] += num_stock
                        self.cash_in_hand -= total_cost

In [None]:
from models.models.dqn import DQN

policy_net = DQN(state=500 * 10 * 5, n_actions=500)
checkpoint = torch.load('./lightning_logs/version_52/checkpoints/epoch=2454-step=27005.ckpt')

state_dict = checkpoint['state_dict']

for k, v in list(state_dict.items()):
    if k.startswith('policy_net.'):
        state_dict[k[11:]] = v
        
    del state_dict[k]

policy_net.load_state_dict(state_dict, strict=False)

In [None]:
def select_action(model, state):
    state = torch.tensor(state['state'].astype(np.float32))
    action, amount = model(state)
    return action.detach().numpy(), amount.detach().numpy()

In [None]:
import torch
import torch.nn.functional as F

# 초기 투자 자본 설정
bugget = 1e7
# 주가 데이터(values)와 초기 투자 금액(bugget) 전달
env = StockTradingEnv(data=values, initial_investment=bugget)
# 시뮬레이견 환경 초기화
state = env.reset()

# 수집할 데이터를 저장할(결과 기록을 위한) 리스트
reward_list = [] # 각 단계에서 보상 저장
asset_list = [] # 각 단계에서의 총 자산 가치 저장
owned_list = [] # 각 단계에서 보유한 주식 수량 저장
action_list = [] # 각 단계에서 취한 행동 저장
cash_in_hand_list = [] # 각 단계에서 보유한 현금의 양 저장


# 매수할 금액 할당 함수 정의 - 주식의 매수량 계산. (softmax 함수 사용.)
def softmax_allocation(amounts, cash_available, prices):
    valid_mask = prices > 0  # 가격이 0보다 큰 주식만 필터링하여 선택
    valid_amounts = amounts[valid_mask] # 필터링된 주식에 대한 매수량
    valid_prices = prices[valid_mask] # 필터링된 주식에 대한 가격
    # softmax 함수를 적용하여 얻은 비율을 통해 계산된 금액입니다.
    scaled_amounts = F.softmax(torch.tensor(valid_amounts), dim=0) # softmax로 비율 계산
    scaled_amounts = (scaled_amounts / scaled_amounts.sum()) * cash_available # softmax 비율을 총 금액에 맞게 조정
    # buy_amounts는 계산된 매수량을 저장하는 배열.
    buy_amounts = np.zeros_like(prices) # 모든 주식에 대한 매수량 배열 초기화
    buy_amounts[valid_mask] = np.floor(scaled_amounts.numpy() / valid_prices).astype(int) # 매수량 계산
    return buy_amounts

# 모델에서 행동과 amount 예측
while True:
    # select_action 함수를 호출해서 현재 상태(state)에 대한 행동(action)과 매수/매도량(amount) 결정.
    # action : 모델이 예측한 주식별 행동 0(매도), 1(보유), 2(매수)
    # amount : 모델이 예측한 주식별 매수/매도량
    action, amount = select_action(policy_net, state)

    action = action.reshape(1, 500, 3).argmax(2)[0]
    amount = amount[0]
    
    # amount 값 처리
    current_prices = env.stock_price[:, -1]  # 현재 주식 가격
    if action.sum() > 0:  # 매수나 매도 행동이 있을 경우만 처리
        buy_mask = action == 2
        sell_mask = action == 0
        stay_mask = action == 1

        # 매수일 경우 softmax로 금액 할당
        buy_amounts = softmax_allocation(amount[buy_mask], env.cash_in_hand, current_prices[buy_mask])
        
        # 매도일 경우 최대 보유 주식량을 넘지 않게 조정
        sell_amounts = np.minimum(env.stock_owned[sell_mask], amount[sell_mask].astype(int))
        
        # amount 배열 재구성
        amount = np.zeros_like(action)
        amount[buy_mask] = buy_amounts
        amount[sell_mask] = sell_amounts
        amount[stay_mask] = 0  # stay의 경우 amount는 0

    else:
        amount = np.zeros_like(action)  # 모든 행동이 stay인 경우
    
    # 환경에 step 실행
    state, reward, done, info = env.step(action, amount)
    # 결과 기록
    owned_list.append(deepcopy(env.stock_owned))
    action_list.append(action)
    reward_list.append(reward)
    asset_list.append(info['current_total_value'])
    cash_in_hand_list.append(state['cash'])

    if done:
        break

In [None]:
info['current_total_value']

In [None]:
plt.figure(figsize=(30, 10))
plt.plot(asset_list, label='asset')
plt.plot(reward_list, label='reward')
plt.plot(cash_in_hand_list, label='cash_in_hand')
tick_positions = np.arange(0, items.shape[1], 365)
tick_labels = np.array(date_list)[tick_positions]
plt.xticks(tick_positions, tick_labels, rotation=45)
plt.axhline(bugget, color='red')
plt.title(f"{NATION}_{MARKET}", fontsize=20)
plt.legend()
plt.show()