In [1]:
!pip install gymnasium
#---#
import gymnasium as gym
#---#
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import IPython

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1


In [2]:
action_to_direction = {
    0 : np.array([1, 0]), # row+, down
    1 : np.array([0, 1]), # col+, right
    2 : np.array([-1 ,0]), # row-, up
    3 : np.array([0, -1]) # col-, left
}
# 당장쓰진 않지만 하는김에
action_to_direction2 = {0: 'down', 1: 'right', 2: 'up', 3: 'left'}

In [3]:
def show(states):
    fig = plt.Figure()
    ax = fig.subplots()
    ax.matshow(np.zeros([4,4]), cmap='bwr',alpha=0.0)
    sc = ax.scatter(0, 0, color='red', s=500)
    ax.text(0, 0, 'start', ha='center', va='center')
    ax.text(3, 3, 'end', ha='center', va='center')
    # Adding grid lines to the plot
    ax.set_xticks(np.arange(-.5, 4, 1), minor=True)
    ax.set_yticks(np.arange(-.5, 4, 1), minor=True)
    ax.grid(which='minor', color='black', linestyle='-', linewidth=2)
    state_space = gym.spaces.MultiDiscrete([4,4])
    def update(t):
        if states[t] in state_space:
            s1,s2 = states[t]
            states[t] = [s2,s1]
            sc.set_offsets(states[t])
        else:
            s1,s2 = states[t]
            s1 = s1 + 0.5 if s1 < 0 else (s1 - 0.5 if s1 > 3 else s1)
            s2 = s2 + 0.5 if s2 < 0 else (s2 - 0.5 if s2 > 3 else s2)
            states[t] = [s2,s1]
            sc.set_offsets(states[t])
    ani = FuncAnimation(fig,update,frames=len(states))
    display(IPython.display.HTML(ani.to_jshtml()))

In [4]:
class GridWorld:
    def __init__(self):
        self.state_space = gym.spaces.MultiDiscrete([4,4])
        self.action_space = gym.spaces.Discrete(4)
        self._action_to_direction = {
            0 : np.array([1, 0]), # row+, down
            1 : np.array([0, 1]), # col+, right
            2 : np.array([-1 ,0]), # row-, up
            3 : np.array([0, -1]) # col-, left
        }
        self.reset()
        self.state = None
        self.reward = None
        self.termiated = None #초기치들을 정리함.
    def step(self,action):
        direction = self._action_to_direction[action]
        self.state = self.state + direction
        if np.array_equal(self.state,np.array([3,3])):
            self.reward = 100 #그냥 리워드로만 하면 클래스 내에 저장되지 않는다.
            self.terminated = True
        elif self.state not in self.state_space:
            self.reward = -10
            self.terminated = True
        else:
            self.reward = -1
        return self.state, self.reward, self.terminated
    def reset(self):
        self.state = np.array([0,0])
        self.terminated = False
        return self.state

In [5]:
class AgentRandom:
    def __init__(self,env):
        #--# define spaces
        self.action_space = env.action_space
        self.state_space = env.state_space
        #--# replay buffer
        self.action = None
        self.actions = []
        self.current_state =  None
        self.current_states = []
        self.reward = None
        self.rewards = []
        self.next_state =  None
        self.next_states = []
        self.terminated = None
        self.terminations = []
        #--# other information
        self.n_episodes = 0
        self.n_experiences = 0
        self.score = 0
        self.playtimes = []
        self.scores = []
    def act(self):
        self.action = self.action_space.sample() #수정 대상
    def learn(self):
        pass #수정 대상
    def save_experience(self):
        self.current_states.append(self.current_state)
        self.actions.append(self.action)
        self.rewards.append(self.reward)
        self.next_states.append(self.next_state)
        self.terminations.append(self.terminated)
        #--#
        self.n_experiences = self.n_experiences + 1
        self.score = self.score + self.reward

In [6]:
env = GridWorld()
agent = AgentRandom(env)
for _ in range(10000):
    # Step1: 에피소드 준비
    agent.current_state = env.reset()
    agent.terminated = False
    agent.score = 0
    # Step2: 에피소드 진행
    #50번 내로 끝나게끔 조정하고자 1~ 51로 조정
    for t in range(1,51):
        # step1: 행동
        agent.act()
        # step2: 보상
        agent.next_state, agent.reward, agent.terminated = env.step(agent.action)
        # step3: 저장 & 학습
        agent.save_experience()
        agent.learn() # 사실학습하는 함수는 dummy 함수임..
        # step4: 다음 스텝준비
        agent.current_state = agent.next_state
        if agent.terminated: break #True면 break
    # Step3: 다음에피소드 준비
    agent.scores.append(agent.score)
    agent.playtimes.append(t)
    agent.n_episodes = agent.n_episodes + 1

#### 환경을 이해하기 위한 기록(1)

In [7]:
q_table = np.zeros([4,4,4]) #qtable은 (위치1,위치2,행동) 보상점수에 매핑됨.
count = np.zeros([4,4,4])
for i in range(agent.n_experiences):
    s1,s2 = agent.current_states[i] #현재위치 언패킹
    a = agent.actions[i] #어떤 행동?
    r = agent.rewards[i] #어떤 보상!
    q_table[s1,s2,a] = q_table[s1,s2,a] + r #위치와 행동에 따른 보상점수들 합.
    count[s1,s2,a] = count[s1,s2,a] + 1 #코드를 결과만 봐서 그랬다.. 과정을 관찰하도록..

In [22]:
count #위치별 빈도.

array([[[3060., 3031., 3025., 3034.],
        [1018., 1030.,  993., 1058.],
        [ 400.,  360.,  391.,  382.],
        [ 144.,  137.,  114.,  124.]],

       [[1021., 1021., 1092., 1017.],
        [ 712.,  694.,  686.,  712.],
        [ 347.,  377.,  379.,  366.],
        [ 160.,  146.,  159.,  157.]],

       [[ 404.,  396.,  379.,  365.],
        [ 383.,  381.,  399.,  371.],
        [ 238.,  241.,  218.,  238.],
        [ 103.,   95.,  101.,  102.]],

       [[ 137.,  156.,  152.,  117.],
        [ 159.,  123.,  188.,  158.],
        [  99.,   68.,  105.,   89.],
        [   0.,    0.,    0.,    0.]]])

In [24]:
q_table/count #element wise하게 나눠줌으로써 평균 점수를 구하려 했는데.... 0/0인게 있어서 안된다.

  q_table/count


array([[[ -1.,  -1., -10., -10.],
        [ -1.,  -1., -10.,  -1.],
        [ -1.,  -1., -10.,  -1.],
        [ -1., -10., -10.,  -1.]],

       [[ -1.,  -1.,  -1., -10.],
        [ -1.,  -1.,  -1.,  -1.],
        [ -1.,  -1.,  -1.,  -1.],
        [ -1., -10.,  -1.,  -1.]],

       [[ -1.,  -1.,  -1., -10.],
        [ -1.,  -1.,  -1.,  -1.],
        [ -1.,  -1.,  -1.,  -1.],
        [100., -10.,  -1.,  -1.]],

       [[-10.,  -1.,  -1., -10.],
        [-10.,  -1.,  -1.,  -1.],
        [-10., 100.,  -1.,  -1.],
        [ nan,  nan,  nan,  nan]]])

In [25]:
count[count == 0] = 0.01 #어짜피 0될 자리라서 그냥 아무 숫자나 줘도 됨

In [26]:
count[count == 0] #이제 0인값 없음

array([], dtype=float64)

In [27]:
q_table/count #element wise하게 나눠줌으로써 평균 점수를 구하려 했는데.... 0/0인게 있어서 안된다.

array([[[ -1.,  -1., -10., -10.],
        [ -1.,  -1., -10.,  -1.],
        [ -1.,  -1., -10.,  -1.],
        [ -1., -10., -10.,  -1.]],

       [[ -1.,  -1.,  -1., -10.],
        [ -1.,  -1.,  -1.,  -1.],
        [ -1.,  -1.,  -1.,  -1.],
        [ -1., -10.,  -1.,  -1.]],

       [[ -1.,  -1.,  -1., -10.],
        [ -1.,  -1.,  -1.,  -1.],
        [ -1.,  -1.,  -1.,  -1.],
        [100., -10.,  -1.,  -1.]],

       [[-10.,  -1.,  -1., -10.],
        [-10.,  -1.,  -1.,  -1.],
        [-10., 100.,  -1.,  -1.],
        [  0.,   0.,   0.,   0.]]])

In [31]:
q_table2 = q_table/count
q_table2 #평균 보상점수도.

array([[[ -1.,  -1., -10., -10.],
        [ -1.,  -1., -10.,  -1.],
        [ -1.,  -1., -10.,  -1.],
        [ -1., -10., -10.,  -1.]],

       [[ -1.,  -1.,  -1., -10.],
        [ -1.,  -1.,  -1.,  -1.],
        [ -1.,  -1.,  -1.,  -1.],
        [ -1., -10.,  -1.,  -1.]],

       [[ -1.,  -1.,  -1., -10.],
        [ -1.,  -1.,  -1.,  -1.],
        [ -1.,  -1.,  -1.,  -1.],
        [100., -10.,  -1.,  -1.]],

       [[-10.,  -1.,  -1., -10.],
        [-10.,  -1.,  -1.,  -1.],
        [-10., 100.,  -1.,  -1.],
        [  0.,   0.,   0.,   0.]]])

In [33]:
for i in range(4):
  print(f"action = {i}/{action_to_direction[i]}")
  print(f"action-value function \n{q_table2[:,:,i]}")

action = 0/[1 0]
action-value function 
[[ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1. 100.]
 [-10. -10. -10.   0.]]
action = 1/[0 1]
action-value function 
[[ -1.  -1.  -1. -10.]
 [ -1.  -1.  -1. -10.]
 [ -1.  -1.  -1. -10.]
 [ -1.  -1. 100.   0.]]
action = 2/[-1  0]
action-value function 
[[-10. -10. -10. -10.]
 [ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1.   0.]]
action = 3/[ 0 -1]
action-value function 
[[-10.  -1.  -1.  -1.]
 [-10.  -1.  -1.  -1.]
 [-10.  -1.  -1.  -1.]
 [-10.  -1.  -1.   0.]]


#### 환경을 이해하기 위한 기록(2)

In [51]:
q_table = np.zeros([4,4,4])
for i in range(agent.n_experiences):
  s1,s2 = agent.current_states[i]
  a = agent.actions[i]
  r = agent.rewards[i]
  q_hat = q_table[s1,s2,a] #우리가 환경을 이해해서 얻은 값, 우리가 풀어낸 값. 그냥 0 들어가있음.
  q = r #실제 답.
  diff = q - q_hat #실제 값과 우리가 구한 값의 차이: 오차 피드백 값.
  q_table[s1,s2,a] = q_hat + 0.05*diff #차이를 조금조금씩 반영해서 과거의 변화를 저장하고자 함.

In [52]:
for i in range(4):
    print(f"action = {i}/{action_to_direction2[i]}")
    print(f"action-value function = \n{q_table[:,:,i].round(2)}\n")

action = 0/down
action-value function = 
[[ -1.    -1.    -1.    -1.  ]
 [ -1.    -1.    -1.    -1.  ]
 [ -1.    -1.    -1.    99.49]
 [ -9.99 -10.    -9.94   0.  ]]

action = 1/right
action-value function = 
[[-1.   -1.   -1.   -9.99]
 [-1.   -1.   -1.   -9.99]
 [-1.   -1.   -1.   -9.92]
 [-1.   -1.   96.94  0.  ]]

action = 2/up
action-value function = 
[[-10.   -10.   -10.    -9.97]
 [ -1.    -1.    -1.    -1.  ]
 [ -1.    -1.    -1.    -0.99]
 [ -1.    -1.    -1.     0.  ]]

action = 3/left
action-value function = 
[[-10.    -1.    -1.    -1.  ]
 [-10.    -1.    -1.    -1.  ]
 [-10.    -1.    -1.    -0.99]
 [ -9.98  -1.    -0.99   0.  ]]



### 환경의 깊은 이해

하나 더 따지고 넘어가보자는 것이다. 같은 보상을 받는 행위더라도, 목표지점에 가까워지거나 멀어지는 경우는 중요한 비중이 다를 것이다. 그러한 것을 반영하여 점수 체계를 만들자

In [54]:
#어떠한 보정을 할거냐면
q_hat = q_table[3,1,1]
q_hat

-0.9981802846835637

In [56]:
#[3,1,1]은.. 여기서 한번만 오른쪽으로 가면 100점의 보상을 얻을 수 있다.. 그러한 상황을 반영하자
q_hat = (-1) + 0.99*100
q_hat

98.0

0.99란? 미래에 받을 보상이 현재에 비해 얼마나 중요한지를 결정하는 가중치..\
1에 가까울 수록 미래에 받을 보상을 중요시함.\
보통 이러한 값을 `discounting rate`라고 함.

In [60]:
q_table = np.zeros([4,4,4])
for i in range(agent.n_experiences):
    s1,s2 = agent.current_states[i]
    ss1,ss2 = agent.next_states[i]
    a = agent.actions[i]
    r = agent.rewards[i] #참값.
    q_hat = q_table[s1,s2,a] # 우리가 환경을 이해해서 얻은값, 우리가 풀어낸 답
    if agent.terminations[i]: #True 일 떈 즉 죽거나 성공한 상태일 땐,
        q = r
    else:
        future_reward = q_table[ss1,ss2,:].max() #다음 상태에서 액션을 할 때 취할 수 있는 가장 큰 값. Greedy!
        q = r + 0.99 * future_reward #현재 보상값 + 다음 상태에서 얻을 수 있는 최대의 보상값의 보정값.
    diff = q - q_hat # 실제답과 풀이한값의 차이 = 오차피드백값
    q_table[s1,s2,a] = q_hat + 0.05 * diff

In [61]:
q_table

array([[[ 88.16092016,  88.19556665, -10.        , -10.        ],
        [ 90.14265488,  89.80152019, -10.        ,  86.11117619],
        [ 91.95414869,  88.67491923,  -9.99999998,  87.57088073],
        [ 92.54698035,  -9.9911257 ,  -9.97112706,  85.75413492]],

       [[ 89.78168277,  90.16020473,  86.03994306, -10.        ],
        [ 92.30861477,  92.13145916,  87.87800144,  87.84316582],
        [ 94.41452583,  94.00556653,  89.42035209,  89.62873413],
        [ 96.65715361,  -9.99440698,  86.25492786,  90.1591488 ]],

       [[ 83.75772301,  91.96667298,  87.64411612,  -9.99999993],
        [ 88.43283782,  94.59463393,  89.65578911,  89.06357403],
        [ 93.7575497 ,  96.93340729,  91.11249559,  91.52122491],
        [ 99.49238863,  -9.92348572,  90.34201205,  90.38723296]],

       [[ -9.9911257 ,  84.7103452 ,  87.45544795,  -9.97524506],
        [ -9.99712887,  91.51562341,  90.09799788,  78.67083745],
        [ -9.93767864,  96.94363541,  91.16974586,  80.7076599 ],
    

In [62]:
for i in range(4):
    print(f"action = {i}/{action_to_direction2[i]}")
    print(f"action-value function = \n{q_table[:,:,i].round(2)}\n")

action = 0/down
action-value function = 
[[ 88.16  90.14  91.95  92.55]
 [ 89.78  92.31  94.41  96.66]
 [ 83.76  88.43  93.76  99.49]
 [ -9.99 -10.    -9.94   0.  ]]

action = 1/right
action-value function = 
[[88.2  89.8  88.67 -9.99]
 [90.16 92.13 94.01 -9.99]
 [91.97 94.59 96.93 -9.92]
 [84.71 91.52 96.94  0.  ]]

action = 2/up
action-value function = 
[[-10.   -10.   -10.    -9.97]
 [ 86.04  87.88  89.42  86.25]
 [ 87.64  89.66  91.11  90.34]
 [ 87.46  90.1   91.17   0.  ]]

action = 3/left
action-value function = 
[[-10.    86.11  87.57  85.75]
 [-10.    87.84  89.63  90.16]
 [-10.    89.06  91.52  90.39]
 [ -9.98  78.67  80.71   0.  ]]



In [63]:
q_table.max(axis=-1) #각 상태에 따른 max값

array([[88.19556665, 90.14265488, 91.95414869, 92.54698035],
       [90.16020473, 92.30861477, 94.41452583, 96.65715361],
       [91.96667298, 94.59463393, 96.93340729, 99.49238863],
       [87.45544795, 91.51562341, 96.94363541,  0.        ]])

### 에이전트 클래스 설계

#### AgentGreedy

In [64]:
class AgentGreedy(AgentRandom):
    def __init__(self,env):
        super().__init__(env) #super를 취하면 AgentRandom에서 정의한 init이 내려온다 ㅋ
        #--#
        self.q_table = np.zeros([4,4,4])
    def learn(self): # q_table
        s1,s2 = self.current_state
        ss1,ss2 = self.next_state
        a = self.action
        r = self.reward
        q_hat = self.q_table[s1,s2,a] # 우리가 환경을 이해해서 얻은값, 우리가 풀어낸 답
        if self.terminated:  #이건 True와 같은 뜻임.
            q = r #죽거나 성공하는 행동이면 다음 행동이 의미가 없음.
        else:
            future_reward = self.q_table[ss1,ss2,:].max()
            q = r + 0.99 * future_reward
        diff = q - q_hat
        self.q_table[s1,s2,a] = q_hat + 0.05 * diff
    def act(self):
        if self.n_experiences < 3000:
            self.action = self.action_space.sample() #정보가 어느정도 쌓이기 전엔 Randome하게 action.
        else:
            s1,s2 = self.current_state
            self.action = self.q_table[s1,s2,:].argmax() # 그리디..

#구현방식 완전숙지 할 것..
#강화학습계의 오버피팅 상태라고 보면 될수도?

In [65]:
env = GridWorld()
agent = AgentGreedy(env)
for _ in range(3000):
    # Step1: 에피소드 준비
    agent.current_state = env.reset()
    agent.terminated = False
    agent.score = 0
    # Step2: 에피소드 진행
    for t in range(1,51):
        # step1: 행동
        agent.act()
        # step2: 보상
        agent.next_state, agent.reward, agent.terminated = env.step(agent.action)
        # step3: 저장 & 학습
        agent.save_experience()
        agent.learn()
        # step4: 다음 스텝준비
        agent.current_state = agent.next_state
        if agent.terminated: break
    # Step3: 다음에피소드 준비
    agent.scores.append(agent.score)
    agent.playtimes.append(t)
    agent.n_episodes = agent.n_episodes + 1
    #---#
    logfreq = 300 #300번 마다 상황을 보고받고싶다.
    if (agent.n_episodes % logfreq) == 0:
        print(
            f"에피소드:{agent.n_episodes}\t"
            f"경험수:{agent.n_experiences}\t" #3000부터 학습 시작.
            f"점수(에피소드):{np.mean(agent.scores[-logfreq:]):.2f}\t" #최근 300개 평균.
            f"게임시간(에피소드):{np.mean(agent.playtimes[-logfreq:]):.2f}\t" #최근 300개 평균.
        ) #학습이 이뤄지면 딱 6번만에 성공시킨다(최단 코스)

에피소드:300	경험수:945	점수(에피소드):-10.68	게임시간(에피소드):3.15	
에피소드:600	경험수:1920	점수(에피소드):-10.78	게임시간(에피소드):3.25	
에피소드:900	경험수:2875	점수(에피소드):-9.98	게임시간(에피소드):3.18	
에피소드:1200	경험수:4576	점수(에피소드):82.13	게임시간(에피소드):5.67	
에피소드:1500	경험수:6376	점수(에피소드):95.00	게임시간(에피소드):6.00	
에피소드:1800	경험수:8176	점수(에피소드):95.00	게임시간(에피소드):6.00	
에피소드:2100	경험수:9976	점수(에피소드):95.00	게임시간(에피소드):6.00	
에피소드:2400	경험수:11776	점수(에피소드):95.00	게임시간(에피소드):6.00	
에피소드:2700	경험수:13576	점수(에피소드):95.00	게임시간(에피소드):6.00	
에피소드:3000	경험수:15376	점수(에피소드):95.00	게임시간(에피소드):6.00	


In [66]:
states = [np.array([0,0])] + agent.next_states[-agent.playtimes[-1]:] #마지막 epsiode 관찰.
show(states)

#### AgentExplorer

In [67]:
class AgentExplorer(AgentGreedy):
    def __init__(self,env):
        super().__init__(env) #AgentGreedy의 init을 가져옴.
        self.eps = 0 # 이것이 0이라는 의미는 돌발행동을 안한다는 의미. 즉 AgentGreedy 와 같은 행동을 한다는 의미
    def act(self):
        if np.random.rand() < self.eps:
            self.action = self.action_space.sample() #랜덤한 action.
        else:
            super().act() #상위클래스의 act를 그대로
#그리디하지 않게 만들자는게 Explorer의 의미.

In [68]:
env = GridWorld()
agent = AgentExplorer(env)
agent.eps = 1 # 돌발행동할 확률이 100퍼
for _ in range(3000):
    # Step1: 에피소드 준비
    agent.current_state = env.reset()
    agent.terminated = False
    agent.score = 0
    # Step2: 에피소드 진행
    for t in range(1,51):
        # step1: 행동
        agent.act()
        # step2: 보상
        agent.next_state, agent.reward, agent.terminated = env.step(agent.action)
        # step3: 저장 & 학습
        agent.save_experience()
        agent.learn()
        # step4: 다음 스텝준비
        agent.current_state = agent.next_state
        if agent.terminated: break
    # Step3: 다음에피소드 준비
    agent.scores.append(agent.score)
    agent.playtimes.append(t)
    agent.n_episodes = agent.n_episodes + 1
    agent.eps = agent.eps * 0.999 #다음 에피소드에선 돌발행동확률을 좀 줄이자 ㅋ.
    #---#
    logfreq = 300
    if (agent.n_episodes % logfreq) == 0:
        print(
            f"에피소드:{agent.n_episodes}\t"
            f"점수(에피소드):{np.mean(agent.scores[-logfreq:]):.2f}\t"
            f"게임시간(에피소드):{np.mean(agent.playtimes[-logfreq:]):.2f}\t"
            f"돌발행동(에피소드):{agent.eps:.2f}"
        )

에피소드:300	점수(에피소드):-11.00	게임시간(에피소드):3.10	돌발행동(에피소드):0.74
에피소드:600	점수(에피소드):-11.30	게임시간(에피소드):3.40	돌발행동(에피소드):0.55
에피소드:900	점수(에피소드):-10.50	게임시간(에피소드):3.34	돌발행동(에피소드):0.41
에피소드:1200	점수(에피소드):43.31	게임시간(에피소드):5.63	돌발행동(에피소드):0.30
에피소드:1500	점수(에피소드):62.12	게임시간(에피소드):6.25	돌발행동(에피소드):0.22
에피소드:1800	점수(에피소드):67.14	게임시간(에피소드):5.99	돌발행동(에피소드):0.17
에피소드:2100	점수(에피소드):77.68	게임시간(에피소드):6.09	돌발행동(에피소드):0.12
에피소드:2400	점수(에피소드):81.45	게임시간(에피소드):5.99	돌발행동(에피소드):0.09
에피소드:2700	점수(에피소드):84.94	게임시간(에피소드):6.16	돌발행동(에피소드):0.07
에피소드:3000	점수(에피소드):87.16	게임시간(에피소드):6.14	돌발행동(에피소드):0.05
