In [2]:
import pandas as pd
import numpy as np
import gym
import stable_baselines3
from gym import spaces
from stable_baselines3 import PPO,DQN,A2C
from stable_baselines3.common.env_util import make_vec_env
import plotly.graph_objects as go
import mplfinance as mpf
import ta
import random

In [117]:
class stablebaselineEnv(gym.Env):
    def __init__(self,df, window_size, test_window_size, usdt_balance=1000, btc_size=0, leverage=1): 
        super(stablebaselineEnv, self).__init__()
        self.action_space = spaces.Discrete(4)  # 0: Long, 1: Short, 2: Close, 3: Hold
        self.observation_space = spaces.Dict({
            "chart_data": spaces.Box(low=0, high=np.inf, shape=(window_size, df.shape[1]), dtype=np.float32),
            "position": spaces.Discrete(3),  # 포지션 {0:Long, 1:Short, 2:None}
            "current_price": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # 현재 가격
            "avg_price": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # 평균 진입 가격
            "pnl": spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32),  # 미실현 손익
            "total_pnl": spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32),  # 누적 손익
            "usdt_balance": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # USDT 잔고
            "margin": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # 사용 중인 마진
            "size": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # 포지션 수량
            "total_balance": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)  # 총 자산
        })
    
        self.slice_df, self.observation_df, self.train_df = stablebaselineEnv.generate_random_data_slice(df, window_size,test_window_size) # 랜덤 위치로 slice된 차트 데이터 초기화
        self.chart_data = spaces.Box(low=0, high=np.inf, shape=(df.shape[0], df.shape[1]), dtype=np.float32)
        self.current_index = window_size
        self.current_step = self.slice_df.iloc[window_size]
        # 음.. 위치가 애매함-> 이 상태로 설정시 current_step은 self.observation_df의 마지막 행에 해당됨.
        self.start_step = window_size
        self.current_price = None
        self.df = df
        self.window_size = window_size
        self.test_window_size = test_window_size
        
        """
        할 일

        self.observation이 step이 지나감에 따라 같이 update시키기. -- 완료.
            - self.observation의 크기를 유지 하기 위해 window_siz는 유지하며 step을 따라가도록 변경 --완료
            - current_step이 df보다 커지거나 같아질 경우 done = True로 바꾸도록 변경  -- 완료

        render 함수 만들기.(가시화)
        
                
        """       
        # reset 미포함
        self.initial_usdt_balance = usdt_balance # 초기 usdt 잔고
        self.min_order = 0.002 # 최소 주문 수량
        self.fee = 0.0005 # 거래 수수료
        self.leverage = leverage # 레버리지
        
        # reset 포함
        self.usdt_balance = usdt_balance # 초기 usdt 잔고
        self.btc_size = btc_size # 포지션 수량
        self.margin = 0 # 포지션 증거금
        self.position = None # 포지션 {0:Long, 1:Short, 2:None}
        self.order_price = 0 # 주문 금액
        self.last_size_value = 0 # (평단가 계산에 필요)
        self.current_avg_price = 0 # 현재 평단가
        self.pnl = 0 # 미실현 손익
        self.closing_pnl = 0 # 실현 손익
        self.total_pnl = 0 # 누적 손익
        self.total_fee = 0 # 누적 수수료
        self.total_balance = 0 # 총 자산
        self.action_history = pd.DataFrame(columns=['action'])
        pass

    def reset(self): # 리셋 함수 -> ㅇ
        self.slice_df, self.observation_df, self.train_df  = stablebaselineEnv.generate_random_data_slice(self.df, self.window_size, self.test_window_size) 
        # reset 하며 새로운 랜덤 차트 데이터 초기화
        self.observation_space = spaces.Dict({
            "chart_data": spaces.Box(low=0, high=np.inf, shape=(window_size, df.shape[1]), dtype=np.float32),
            "position": spaces.Discrete(3),  # 포지션 {0:Long, 1:Short, 2:None}
            "current_price": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # 현재 가격
            "avg_price": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # 평균 진입 가격
            "pnl": spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32),  # 미실현 손익
            "total_pnl": spaces.Box(low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32),  # 누적 손익
            "usdt_balance": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # USDT 잔고
            "margin": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # 사용 중인 마진
            "size": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32),  # 포지션 수량
            "total_balance": spaces.Box(low=0, high=np.inf, shape=(1,), dtype=np.float32)  # 총 자산
        })
        self.usdt_balance = self.initial_usdt_balance # 초기 usdt 잔고
        self.btc_size = 0 # 포지션 수량
        self.margin = 0 # 포지션 증거금
        self.position = None # 포지션 {0:Long, 1:Short, 2:None}
        self.order_price = 0 # 주문 금액
        self.last_size_value = 0 # (평단가 계산에 필요)
        self.current_avg_price = 0 # 현재 평단가
        self.pnl = 0 # 미실현 손익
        self.closing_pnl = 0 # 실현 손익
        self.total_pnl = 0 # 누적 손익
        self.total_fee = 0 # 누적 수수료
        self.total_balance = 0 # 총 자산
        self.action_history = pd.DataFrame(columns=['action'])
        pass
    

    # action : 0=Long, 1=Short, 2=Close, 3=Hold
    # position : 0=Long, 1=Short, 2=None
    # ex) (0.002*68000)/1=136, (0.002*68000)/2=68 필요 증거금 계산 예시 #
    
    # 나중에 수량지정을 위한 함수 (min_order부분만 바꾸면됌)
    def cac_order_size(self): 
        order_size = self.min_order
        return order_size
    
    # action을 수행할 수 있는 최소한의 조건 확인
    def act_check(self, action):
        required_margin = (self.cac_order_size() * self.current_price) / self.leverage
        
        if action == 0 or action == 1:
            if self.position == action or self.position == 2 or self.position is None:
                if self.usdt_balance > required_margin:
                    return action
                else:
                    return 3
            else:
                if self.usdt_balance + self.margin + self.pnl > required_margin:
                    return action
                else:
                    return 3
        
        elif action == 2:
            if self.position == 0 or self.position == 1:
                return 2
            else:
                return 3
        
        elif action == 3 or self.position is None:
            return 3

    # 포지션 진입
    def open_position(self, action):
        order_size = self.cac_order_size()
        required_margin = (order_size * self.current_price) / self.leverage
        open_fee = order_size * self.current_price * self.fee
        
        self.usdt_balance -= required_margin + open_fee
        self.btc_size += order_size
        self.margin += required_margin
        
        self.order_price = order_size * self.current_price
        self.current_avg_price = (self.order_price + self.last_size_value) / self.btc_size
        self.last_size_value = self.btc_size * self.current_price
                        
        self.pnl = (1 if action == 0 else -1) * (
            self.current_price - self.current_avg_price) * self.btc_size * self.leverage
            
        self.total_fee -= open_fee
        self.total_balance = self.usdt_balance + self.margin
        self.position = action
        
    def close_position(self):
        closing_fee = self.btc_size * self.current_price * self.fee
        closing_pnl = (1 if self.position == 0 else -1) * (
            self.current_price - self.current_avg_price) * self.btc_size * self.leverage
        
        self.usdt_balance += self.margin + closing_pnl - closing_fee
        self.total_fee -= closing_fee
        self.total_pnl += closing_pnl
        self.closing_pnl = closing_pnl
        
        self.btc_size = 0
        self.margin = 0
        self.pnl = 0
        self.last_size_value = 0
        
        self.total_balance = self.usdt_balance + self.margin
        self.position = 2

    def act(self, action):
        action = self.act_check(action)
        if action == 0 or action == 1:  # Long or Short
            if self.position == action or self.position == 2 or self.position is None:
                self.open_position(action)
            else:
                self.close_position()
                self.open_position(action)
        
        elif action == 2:  # Close
            if self.position == 0 or self.position == 1:
                self.close_position()
        
        elif action == 3:  # Hold
            if self.position == 0:  # Long
                self.pnl = (self.current_price - self.current_avg_price) * self.btc_size * self.leverage
            elif self.position == 1:  # Short
                self.pnl = (self.current_avg_price - self.current_price) * self.btc_size * self.leverage
            
            self.total_balance = self.usdt_balance + self.margin
        return action
    
    '''
    return : self.position, self.acutal_action, self.pnl, self.closing_pnl, self.total_pnl, self.total_balance
    
    주문 수량은 일단 항상 최소 주문 금액으로 하겠습니다.
    최소 수량으로 해도 0.002개 이고 1배율일 경우 증거금 136usdt 정도 들어갑니다.
    
    추후 수량이 커질시 미결손실 또한 고려해야함
    '''


    def step(self, action):
        self.current_price = random.uniform(
            self.slice_df.iloc[self.current_index]['Open'],
            self.slice_df.iloc[self.current_index]['Close']
        ) # 현재 가격을 시가, 종가 사이 랜덤 값으로 결정됨.
        
        action = self.act(action) # action을 수행함.
        action_row = pd.DataFrame({'action': [action]}, index=[self.slice_df.index[self.current_index]])
        self.action_history = pd.concat([self.action_history, action_row])
        
        reward = None
        obs = None

        self.current_index += 1  # 현재 위치를 다음 스텝으로 옮김
        self.current_step = self.slice_df.iloc[self.current_index]
        self.observation_df = self.next_observation()

        if self.current_index >= (self.test_window_size + self.window_size) - 1:  # 현재 위치가 window_size + test_window_size만큼 커지게 되면 done=True로 변경
            done = True
        else:
            done = False

        return obs, reward, done, action, self.action_history, self.slice_df, self.observation_df, self.train_df

    def render(self, render_mode=None):
        if render_mode == "human":
            candle = go.Candlestick(open=self.slice_df['Open'],high=self.slice_df['High'],low=self.slice_df['Low'],close=self.slice_df['Close'],
                                    increasing_line_color='rgb(38, 166, 154)', increasing_fillcolor='rgb(38, 166, 154)',
                                    decreasing_line_color='rgb(239, 83, 80)', decreasing_fillcolor='rgb(239, 83, 80)', yaxis='y2')
            fig = go.Figure(data=[candle])

            # action DataFrame의 각 행에 대해 반복
            for index, row in self.action_history.iterrows():
                if index in self.slice_df.index:
                    x_position = self.slice_df.index.get_loc(index)  # x축 위치 결정
                    
                    if row['action'] == 0:
                        # 위로 향한 빨간 삼각형
                        fig.add_trace(go.Scatter(x=[x_position], y=[self.slice_df.loc[index, 'Low'] * 0.997], marker_symbol='triangle-up', marker_color='red', marker_size=20))
                    elif row['action'] == 1:
                        # 아래로 향한 파란 삼각형
                        fig.add_trace(go.Scatter(x=[x_position], y=[self.slice_df.loc[index, 'High'] * 1.003], marker_symbol='triangle-down', marker_color='blue', marker_size=20))
                    elif row['action'] == 2:
                        # 초록색 원
                        fig.add_trace(go.Scatter(x=[x_position], y=[self.slice_df.loc[index, 'High'] * 1.003], marker_symbol='circle', marker_color='green', marker_size=20))
            
                        
            # font = 'Open Sans'
            # font = 'Droid Sans'
            # font = 'PT Sans Narrow'
            font = 'Verdana'
            
            # start_step 선과 텍스트 추가
            fig.add_shape(type="line", x0=self.current_index, y0=0, x1=self.current_index, y1=1, xref='x', yref='paper', line=dict(color="rgb(255, 183, 77)", width=1))
            
            fig.add_shape(type="line", x0=self.start_step, y0=0, x1=self.start_step, y1=1, xref='x', yref='paper', line=dict(color="rgb(255, 183, 77)", width=1))
            
            fig.add_annotation(x=self.start_step, y=1, text="Start", showarrow=True, arrowhead=1, xref="x", yref="paper", arrowcolor="rgb(255, 183, 77)",arrowsize=1.1, arrowwidth=2, ax=-20, ay=-30,
                               font=dict(family=font, size=12, color="rgb(255, 183, 77)"), align="center")
            
            fig.add_annotation(x=self.current_index, y=1, text="Now", showarrow=True, arrowhead=1, xref="x", yref="paper", arrowcolor="rgb(255, 183, 77)",arrowsize=1.1, arrowwidth=2, ax=20, ay=-30,
                               font=dict(family=font, size=12, color="rgb(255, 183, 77)"), align="center")
            
            # 레이아웃 업데이트
            fig.update_layout(
                height=600,
                width=1000,
                plot_bgcolor='rgb(13, 14, 20)',
                xaxis=dict(domain=[0, 1]),
                yaxis=dict(title='Net Worth', side='right', overlaying='y2'),
                yaxis2=dict(title='Price', side='left'),
                title='RL 차트',
                template='plotly_dark'
            )

            fig.show()

 # df 데이터를 받아 window_size + test_window_size만큼 랜덤 위치로 잘라서 자른 df를 반환해주는 함수
    def generate_random_data_slice(data, window_size, test_window_size):

        max_start_index = len(data) - window_size - test_window_size
        if max_start_index <= 0:
            raise ValueError("데이터 크기가 너무 작아 분할할 수 없습니다.")

        start_index = np.random.randint(0, max_start_index)
        end_index = start_index + window_size
        test_end_index = end_index + test_window_size

        slice_df = data[start_index:test_end_index]
        observation_df = data[start_index:end_index]
        train_df = data[end_index:test_end_index]

        return slice_df, observation_df, train_df


    def next_observation(self):

        observation_df = self.slice_df.iloc[(self.current_index-101)+1:(self.current_index-1)+1]

        return observation_df


In [124]:
# def __init__(self,df, usdt_balance=1000, btc_size=0, leverage=1): 
df = pd.read_csv(r'C:\Users\user\Desktop\123244\binance_data-123.csv')
usdt_balance = 1000
btc_size = 0
leverage = 1
window_size = 100  # 에이전트가 볼 수 있는 차트의 크기 (obs 차트 데이터 크기)
test_window_size = 300  # 에이전트가 볼 수 없고 학습을 진행해야 하는 차트의 크기 
env = stablebaselineEnv(df, window_size, test_window_size, usdt_balance, btc_size, leverage)
env.reset()

In [126]:
action = 0

obs, reward, done, action, a, b, c, d = env.step(action)
#self.slice_df, self.observation_df, self.train_df
env.render(render_mode="human")
b, c, d

(                  Date      Open      High       Low     Close    Volume
 1156  2022-01-01 10:16  47055.15  47059.28  47023.84  47024.24  11.79985
 1157  2022-01-01 10:17  47023.84  47065.23  47023.19  47039.99   8.64690
 1158  2022-01-01 10:18  47039.99  47050.00  47015.07  47030.32  13.64860
 1159  2022-01-01 10:19  47030.32  47073.47  47030.31  47055.60   5.23456
 1160  2022-01-01 10:20  47055.60  47062.75  47027.80  47038.82  13.51410
 ...                ...       ...       ...       ...       ...       ...
 1551  2022-01-01 16:51  47351.37  47379.00  47345.67  47365.79  15.30315
 1552  2022-01-01 16:52  47365.78  47365.79  47301.67  47325.98  19.61359
 1553  2022-01-01 16:53  47325.98  47326.19  47286.02  47308.22  22.31236
 1554  2022-01-01 16:54  47308.99  47319.01  47269.23  47269.24  12.23504
 1555  2022-01-01 16:55  47269.24  47293.44  47253.47  47288.00  22.44721
 
 [400 rows x 6 columns],
                   Date      Open      High       Low     Close    Volume
 1158  2022

In [None]:
class testEnv():
    def __init__(self, df):
        self.df = df
        self.slice_df = self.df
        self.current_index = 1000
    
    def render(self, render_mode=None):
        if render_mode == "human":
            candle = go.Candlestick(open=self.slice_df['open'],high=self.slice_df['high'],low=self.slice_df['low'],close=self.slice_df['close'],increasing_line_color='red',decreasing_line_color='blue',yaxis='y2')
            fig = go.Figure(data=[candle])

            # start_step 선과 텍스트 추가
            fig.add_shape(type="line", x0=self.current_index, y0=0, x1=self.current_index, y1=1, xref='x', yref='paper', line=dict(color="white", width=1))
            fig.add_annotation(x=self.current_index, y=1, text="Start_Step", showarrow=True, arrowhead=1, xref="x", yref="paper", arrowcolor="white", arrowsize=2, arrowwidth=2, ax=20, ay=-30, font=dict(family="Courier New, monospace", size=12, color="#ffffff"), align="center")

            # 레이아웃 업데이트
            fig.update_layout(
                xaxis=dict(domain=[0, 1]),
                yaxis=dict(title='Net Worth', side='right', overlaying='y2'),
                yaxis2=dict(title='Price', side='left'),
                title='Candlestick',
                template='plotly_dark'
            )

            fig.show()

In [None]:
test_env = testEnv(df)

In [None]:
a = test_env.render(render_mode="human")