In [None]:
import gymnasium as gym
from stable_baselines3.common.utils import set_random_seed
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv

from typing import Optional
import gymnasium as gym

from typing import Optional, List, Dict

import numpy as np
import pandas as pd
import numpy as np
import torch

import const 

In [None]:
from typing import Callable

def make_env(env_id: str, rank: int, seed: int = 0) -> Callable:
    """
    Utility function for multiprocessed env.

    :param env_id: (str) the environment ID
    :param num_env: (int) the number of environment you wish to have in subprocesses
    :param seed: (int) the inital seed for RNG
    :param rank: (int) index of the subprocess
    :return: (Callable)
    """

    def _init() -> gym.Env:
        env = gym.make(env_id)
        env.reset(seed=seed + rank)
        return env

    set_random_seed(seed)
    return _init

In [None]:
# gym.Env를 상속해서 새로운 환경을 만들어야 한다.
class GridWorldEnv(gym.Env):
    """
    action space는 이용 가능한 portfolio n개에 대해서, 각각의 비율을 조절하는 것이다.
    해당 action을 취하면 결과로 취득할 수 있는 것이 differential sharpe ratio와 
    
    sharpe ratio: risk-adjusted return
    
    """
    def __init__(self,
                 lookback_T: int,
                 asset_definition: Dict[str, str],
                 market_df: pd.DataFrame,
                 ):
        
        # some definiitions of assets
        self.idx2asset = {i: asset for i, asset in enumerate(asset_definition.keys())}
        self.lookpack_T = lookback_T # TODO
        self.business_days = len(market_df)
        self.num_securities = len(asset_definition)  # TODO
        self.num_all_asset = self.num_securities + 1  # including cash
        
        # [S_1, S_2, ..., S_n], 나중에 debug하기 쉽도록 전체 정보에 대한 저장
        self.overall_state = torch.zeros(
            (self.business_days, self.num_all_asset, self.lookback_T)
        )  # to handle cash
        
        self.portfolio = torch.zeros((self.business_days, self.num_all_asset))
        
        self.observation_space = gym.spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(self.num_all_asset, self.lookback_T),
            dtype=torch.float32
        )
        
        self.action_space = gym.spaces.Box(
            low=0,
            high=1,
            shape=(self.num_all_asset,),
            dtype=torch.float32
        )
        
        ### example
        # Define the agent and target location; randomly chosen in `reset` and updated in `step`
        self._agent_location = np.array([-1, -1], dtype=np.int32)
        self._target_location = np.array([-1, -1], dtype=np.int32)
        # Observations are dictionaries with the agent's and the target's location.
        # Each location is encoded as an element of {0, ..., `size`-1}^2
        self.observation_space = gym.spaces.Dict(
            {
                "agent": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),
                "target": gym.spaces.Box(0, size - 1, shape=(2,), dtype=int),
            }
        )
        # We have 4 actions, corresponding to "right", "up", "left", "down"
        self.action_space = gym.spaces.Discrete(4)
        # Dictionary maps the abstract actions to the directions on the grid
        self._action_to_direction = {
            0: np.array([1, 0]),  # right
            1: np.array([0, 1]),  # up
            2: np.array([-1, 0]),  # left
            3: np.array([0, -1]),  # down
        }
        ###
        
    
    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        """
        여기서 overall obersevation를 잘 정의 해주도록 한다.
        """
        
        
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

        # Choose the agent's location uniformly at random
        self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=int)

        # We will sample the target's location randomly until it does not coincide with the agent's location
        self._target_location = self._agent_location
        while np.array_equal(self._target_location, self._agent_location):
            self._target_location = self.np_random.integers(
                0, self.size, size=2, dtype=int
            )

        observation = self._get_obs()
        info = self._get_info()

        return observation, info

    def _get_obs(self):
        return {"agent": self._agent_location, "target": self._target_location}
    
    def _get_info(self):
        """
        여기는 internal 분석용으로 쓰면 될 듯 하다
        """
        return {}

    def step(self, action):
        """
        action은 각 portfolio 배분 비율로 하면 될듯 하다.
        
        action을 취했을 때, 어떠한 결과를 얻어야 하는가?
        
        """
        # Map the action (element of {0,1,2,3}) to the direction we walk in
        direction = self._action_to_direction[action]
        # We use `np.clip` to make sure we don't leave the grid bounds
        self._agent_location = np.clip(
            self._agent_location + direction, 0, self.size - 1
        )

        # An environment is completed if and only if the agent has reached the target
        terminated = np.array_equal(self._agent_location, self._target_location)
        truncated = False
        
        reward = 1 if terminated else 0  # the agent is only reached at the end of the episode
        observation = self._get_obs()
        
        info = self._get_info()

        return observation, reward, terminated, truncated, info

NameError: name 'gym' is not defined

In [None]:
# PPO를 쓰기 위한 multiprocess 환경 설정

env_id = "CartPole-v1"
num_cpu = 4  # Number of processes to use
# Create the vectorized environment
env = SubprocVecEnv([make_env(env_id, i) for i in range(num_cpu)])