<a href="https://colab.research.google.com/github/JiUUoong/CODE/blob/main/customenv.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 커스텀 환경
# 여태는 만들어져있는 환경 가져다씀 (FrozenLake, CartPole, Taxi, PandaReachDense)
# 내가 직접 만든 환경에서 알고리즘 학습시킬수 있게
# MDP로 그림 그려놓은 내용을 코드로 옮겨적기
# 상태가 어떻고, 어떤 액션이 들어오면, 무슨 확률로 보상 주고, 다음 상태 이동

In [None]:
# 12번 슬라이드 재활용 로봇이 상호작용할 환경 만들기

In [None]:
# +에이전트 가져다 써보기
# tfagents; rl baselines

In [None]:
!pip install tf-agents[reverb]

In [None]:
import abc                  # Abstract Base Class (추상적 피상속 클래스) -> abstractmethod
# 골뱅이abstractmethod
# 밑줄에 함수 이름 적으면
# 클래스 상속할 때 해당 함수는 반드시 구현되게 만들 수 있
import tensorflow as tf
import numpy as np

# tf-agents
from tf_agents.environments import py_environment
from tf_agents.environments import tf_environment
from tf_agents.environments import tf_py_environment
from tf_agents.environments import utils
from tf_agents.environments import wrappers
from tf_agents.environments import suite_gym    # tf_agents랑 gym 연결

from tf_agents.trajectories import time_step as ts  # (s, a, r, s')
from tf_agents.trajectories import trajectory       # time_step들의 연속
from tf_agents.specs import array_spec        # state 배열, action 배열

import random

In [None]:
class RecyclerEnv(py_environment.PyEnvironment):
  def __init__(self):
    # state, action은 어떤 shape를 가진 배열인지 정해주기
    # 예를 들어 DQN 만들 때 입력층의 폭을 env.observation_space.n
    # bound 제한된
    self._action_spec = array_spec.BoundedArraySpec(
        shape=(), dtype=np.int32, minimum=0, maximum=2, name="action"
    )
    # shape=()    # 스칼라
    # dtype       # 정수
    # 0, 1, 2의 값을 가질 수 있음: 0=search, 1=wait, 2=recharge
    self._state_spec = array_spec.BoundedArraySpec(
        shape=(1,), dtype=np.int32, minimum=0, maximum=1, name="observation"
    )
    # 배터리 기준 0=low, 1=high
    # (1,)        # 스칼라
    # 초기 상태
    self._state = 1
    # 태스크 종료 여부
    self._done = False
    # discount rate
    self._gamma = 0.99
    # 12번 슬라이드 알파, 베타 -> 배터리가 얼마나 빨리 소모되는지 반영시킬 값
    self._alpha = 0.7
    self._beta = 0.95
    # 보상
    self._reward = 0
    return

  def _reset(self):       # state = env.reset()
    self._state = 1       # 배터리 완충
    self._done = False
    self._reward = 0
    return ts.restart(np.array([self._state], dtype=np.int32))

  def _step(self, action):  # 환경이름._step(action)
    # 태스크 종료 조건 구현하는 방법
    # 누적 합산 보상 (self._reward)에 조건 걸어서 done=True로 두기
    # 트레이닝 루프에서 스텝 수 최대 제한 걸어서 알아서 루프에서 나오기

    # MDP 그림을 환경으로 구현할때
    # [action 선택지 갯수 * state 갯수]
    # state 0이고, action이 0일때
    # 0, 1, 2 ; search, wait, recharge
    if self._state == 0 and action == 0:  # 배터리 얼마 없을 때 search를 하면?
      coin = random.random()              # 0~1 사이 난수
      # 이 값이랑 베타랑 비교해서 어느 상태로 이동할지 (어떤 보상이 주어질지) 결정
      if coin < self._beta:               # 베타의 확률로
        # 상태는 안 바뀌고 그대로
        self._state = 0
        # 보상은 r_search (예. 10)
        reward = 10
        self._reward += reward
        # 아래 transition 코드는 MDP 상에서 한 화살표를 나타냄
        return ts.transition(np.array([self._state], dtype=np.int32), reward, discount=self._gamma)
      else:                               # 1-베타의 확률로 방전
        self._state = 0
        reward = -10
        self._reward += reward
        return ts.transition(np.array([self._state], dtype=np.int32), reward, discount=self._gamma)
    # state 0이고, action이 1일때
    elif self._state == 0 and action == 1:
      self._state = 0
      reward = 1
      self._reward += reward
      return ts.transition(np.array([self._state], dtype=np.int32), reward, discount=self._gamma)
    elif self._state == 0 and action == 2:    # 충전
      self._state = 1
      reward = 0
      self._reward += reward
      return ts.transition(np.array([self._state], dtype=np.int32), reward, discount=self._gamma)
    elif self._state == 1 and action == 0:
      coin = random.random()              # 0~1 사이 난수
      # 이 값이랑 베타랑 비교해서 어느 상태로 이동할지 (어떤 보상이 주어질지) 결정
      if coin < self._alpha:               # 알파의 확률로
        # 상태는 안 바뀌고 그대로
        self._state = 1
        # 보상은 r_search (예. 10)
        reward = 10
        self._reward += reward
        # 아래 transition 코드는 MDP 상에서 한 화살표를 나타냄
        return ts.transition(np.array([self._state], dtype=np.int32), reward, discount=self._gamma)
      else:                               # 1-알의 확률로 방전
        self._state = 0
        reward = 10
        self._reward += reward
        return ts.transition(np.array([self._state], dtype=np.int32), reward, discount=self._gamma)

In [None]:
_env = RecyclerEnv()        # 클래스 인스턴스
_env._action_spec

TypeError: ignored