# Assignment #1

## Enviroment 기초

### Gym Env를 통해 Environment 익숙해지기

#### 예시: 고속도로 게임

고속도로 게임은 n개의 차선을 달리는 고전게임입니다.
각 차선에는 차량이 있을 수 있으므로 차량이 빈 차선으로 시간 내에 순발력 있게 이동하면 됩니다.
이 게임을 gym.Env를 통해 구현해보겠습니다.


In [None]:
import gym
import numpy as np
from copy import deepcopy
from IPython.display import clear_output
from stable_baselines3.common.type_aliases import GymObs, GymStepReturn

import os, time

PROJECT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))


class HighwayGame(gym.Env):
    def __init__(self, n_lanes: int = 3, max_steps: int = 60):
        """
        클래스의 기본적인 세팅을 초기화 합니다.
        (왠만하면) 이 클래스에서 쓰일 모든 변수는 여기서 초기화 돼야 합니다.
        Env class는 observation_space와 action_space가 필수적으로 직접 초기화 되어야 합니다.

        Parameters
        ----------
        n_lanes: int
            Number of lanes in the highway
        max_steps: int
            Maximum number of steps in the game = maximum time steps
        """
        self.n_lanes = n_lanes
        self.max_steps = max_steps
        self.road = np.load(os.path.join(PROJECT_PATH, "w1/road.npy"))  # 미리 준비한 highway 맵

        self.steps = 0
        self.current_lane = n_lanes//2      # 처음 위치는 가운데 lane

        self.observation_space = gym.spaces.Discrete(n_lanes)   # 첫번째 ... n_lanes 번째 lane에 차가 있는지 여부
        self.action_space = gym.spaces.Discrete(n_lanes)    # 첫번째 ... n_lanes 번째 lane 중 하나로 이동

        self.reset()


    def get_obs(self) -> GymObs:
        """
        현재 agent가 관측하는 상태를 반환합니다.
        """
        return self.road[min(self.steps + 1, self.max_steps - 1)]   # agent가 관측하는 바로 앞 차선 상태

    def reset(self) -> GymObs:
        """
        Env를 초기화 합니다.
        """
        self.steps = 0
        self.current_lane = self.n_lanes//2
        return self.get_obs()

    def step(self, action: int, debug=False) -> GymStepReturn:
        """
        Env의 한 step을 진행합니다.
        step은 현재 상태에서 action을 취한 후, 다음 상태, reward, done, info를 반환합니다.

        reward: float
            현재 상태에서 action을 취한 후 받는 reward
        done: bool
            현재 상태에서 action을 취한 후 게임이 끝났는지 여부
        info: dict
            현재 상태에서 action을 취한 후 추가 정보

        Parameters
        ----------
        action: int
            Agent가 선택한 action = 이동할 lane 번호
        debug: bool
            디버깅을 위한 flag

        Returns
        -------
        GymStepReturn: Tuple[GymObs, float, bool, dict]
            observation, reward, done, info
        """
        reward, done = 0, False
        info = dict()

        self.steps += 1

        # action이 유효한지 확인 후 reward 계산, 게임 종료 여부 판별
        if action < 0 or action >= self.n_lanes:
            reward = -1
            done = True
            info['msg'] = "Invalid action"

        elif self.road[self.steps, action]!=0:
            reward = -1
            done = True
            info['msg'] = "You crashed!"

        elif self.steps == self.max_steps - 1:
            reward = 500
            done = True
            info['msg'] = "Finished!"

        else:
            self.current_lane = action  # action이 유효하면 action을 반영

        if debug:
            self.render()
            print(f"Action: {action}, Reward: {reward}, Done: {done}, Info: {info}")

        return self.get_obs(), reward, done, info

    def render(self, mode="human"):
        """
        Env를 시각화 합니다.

        Parameters
        ----------
        mode: str
            시각화 모드
        """
        clear_output(wait=True)     # Jupyter 환경의 output을 지워줍니다.
        print(f"Step: {self.steps}/{self.max_steps}")
        for i, lane in enumerate(self.road[self.steps:self.steps+5]):
            repr_str = ["X" if l else " " for l in lane]   # lane에 차가 있으면 X, 없으면 " "로 표시
            if i == 0:
                repr_str[self.current_lane] = "O"   # agent의 위치는 O로 표시
            print("|"+" ".join(repr_str)+"|")

    def play(self, policy=None):
        obs = self.reset()
        done = False
        self.render()

        while not done:
            time.sleep(0.5)
            action = policy.action(obs) if policy else self.action_space.sample()
            obs, reward, done, info = self.step(action)
            self.render()
            if done:
                print(info['msg'])

#### 직접 플레이 해보기

- 내 위치 : O
- 상대 차 위치 : X
- 이동 방향 : down
- action : 이동할 lane의 번호 = (0, 1, 2)

In [None]:
class UserInputPolicy:
    @staticmethod
    def action():
        return int(input('Enter Action: '))

In [None]:
env = HighwayGame()
env.play(policy=UserInputPolicy)

## 과제 1

원하는 환경을 gym.Env를 통해 만들어 보세요!

**제약조건**
- action space는 disrete space로 구현해야 합니다.
- get_obs() 메소드를 구혆해야 합니다.

**잘 만들었는지 확인하는 방법** </br>

```
from stable_baselines3.common.env_checker import check_env

check_env(env)
```

In [None]:
# 이곳에 Env를 만들어 주세요
class Env(gym.Env):
    def __ini__(self):
        pass
    def step(self, action):
        pass


Env를 체크합니다.

In [None]:
from stable_baselines3.common.env_checker import check_env

check_env(env)

만약, 만든 Env의 reward가 적절하고, 종료조건이나 기타 에러가 없다면 직접 학습해서 자동으로 Env가 움직이도록 학습할 수 있습니다.
미리 준비된 Monte-Carlo 알고리즘으로 학습해봅니다.

**주의**
- Env의 observation_space와 action_space가 전부 discrete 해야 합니다.
- 너무 큰 space의 경우 학습이 며칠 걸릴 가능성이 높습니다.  (action space size * obs space size < 100 권장...)

In [None]:
from study.w2.MC_TD import MonteCarloAlgorithm, GreedyPolicy, ValueFunction
# from w2.MC_TD import MonteCarloAlgorithm, GreedyPolicy


vf = ValueFunction(env)
policy = GreedyPolicy(env, vf, eps=0.9)

# 아래 두 변수를 적절히 조정하면 학습에 성공할 수 있습니다.
total_steps = 5000      # 총 학습 episode 수
random_frac = 0.6       # 랜덤하게 action을 수행할 episode 수의 전체 episode 수에 대한 비율


# 랜덤성을 부여하는 변수를 현재까지 학습한 episode수에 따라 조절하는 함수
# learning rate scheduler와 비슷합니다.
def eps_schedule(i):
    if i < total_steps * random_frac:
        return 0.9
    return 0.9 ** (1 + 0.01 * i) + 0.01


mc = MonteCarloAlgorithm(env, vf, policy, gamma=0.9)
mc.learn(total_steps, eps_schedule=eps_schedule, verbose=1)

Env에 play라는 함수가 구현되어 있다면, 직접 자동으로 플레이 해볼 수 있습니다.

구현이 되어 있지 않다면 다음 코드를 Env의 play 메소드로 설정해 보세요.

```
def play(self, policy=None):
    obs = self.reset()
    done = False
    self.render()

    while not done:
        time.sleep(0.5)
        action = policy.action() if policy else self.action_space.sample()
        obs, reward, done, info = self.step(action)
        self.render()
        if done:
            print(info)
```

In [None]:
env.play(policy)        # env에 play라는 메소드가 있어야 합니다.