# 14wk-2: 강화학습 (3) – 4x4 Grid World (`AgentGreedy`,`AgentExplorer`)

최규빈  
2024-06-06

<a href="https://colab.research.google.com/github/guebin/DL2024/blob/main/posts/14wk-2.ipynb"><img src="https://colab.research.google.com/assets/colab-badge.svg" style="text-align: left"></a>

# 1. 강의영상

In [1]:
# {{<video https://youtu.be/playlist?list=PLQqh36zP38-zHvVuJ92xfdypwHwDFgg8k&si=iI4IhthblTsJTmIv >}}

# 2. Imports

In [2]:
#!pip install gymnasium
#---#
import gymnasium as gym
#---#
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import IPython

# 3. `Env`, `AgentRandom`

In [3]:
action_to_direction = {
    0 : np.array([1, 0]), # row+, down
    1 : np.array([0, 1]), # col+, right
    2 : np.array([-1 ,0]), # row-, up
    3 : np.array([0, -1]) # col-, left
}
action_to_direction2 = {0: 'down', 1: 'right', 2: 'up', 3: 'left'} # 당장쓰진 않지만 하는김에 

In [4]:
class GridWorld:
    def __init__(self):
        self.state_space = gym.spaces.MultiDiscrete([4,4])
        self.action_space = gym.spaces.Discrete(4) 
        self._action_to_direction = {
            0 : np.array([1, 0]), # row+, down
            1 : np.array([0, 1]), # col+, right
            2 : np.array([-1 ,0]), # row-, up
            3 : np.array([0, -1]) # col-, left
        }
        self.reset()
    def step(self,action):
        direction = self._action_to_direction[action]
        self.state = self.state + direction
        if np.array_equal(self.state,np.array([3,3])): 
            reward = 100 
            self.terminated = True
        elif self.state not in state_space:
            reward = -10
            self.terminated = True
        else:
            reward = -1 
        return self.state, reward, self.terminated
    def reset(self):
        self.state = np.array([0,0])
        self.terminated = False   
        return self.state 

In [5]:
class AgentRandom: 
    def __init__(self,env):
        #--# spaces 
        self.action_space = env.action_space
        self.state_space = env.state_space
        #--# replay buffer 
        self.action = None 
        self.actions = [] 
        self.current_state =  None 
        self.current_states = [] 
        self.reward = None 
        self.rewards = [] 
        self.next_state =  None 
        self.next_states = [] 
        self.terminated = None 
        self.terminations = []
        #--# other information
        self.n_episodes = 0         
        self.n_experiences = 0
        self.score = 0        
        self.playtimes = [] 
        self.scores = []    
    def act(self):
        self.action = self.action_space.sample()
    def learn(self):
        pass 
    def save_experience(self):
        self.current_states.append(self.current_state)        
        self.actions.append(self.action)
        self.rewards.append(self.reward)  
        self.next_states.append(self.next_state)
        self.terminations.append(self.terminated)
        #--#
        self.n_experiences = self.n_experiences + 1 
        self.score = self.score + self.reward

# 4. `AgentGreedy`

## A. 환경의 이해

`-` 무작위로 10000판을 진행해보자.

In [102]:
for _ in range(20000):
    # Step1: 에피소드 준비 
    agent.current_state = env.reset()
    agent.terminated = False 
    agent.score = 0 
    # Step2: 에프소드 진행 
    for t in range(50):
        # step1: 행동
        agent.act() 
        # step2: 보상 
        agent.next_state, agent.reward, agent.terminated = env.step(agent.action)
        # step3: 저장 & 학습 
        agent.save_experience() 
        agent.learn()
        # step4: 다음 스텝준비 
        agent.current_state = agent.next_state 
        if agent.terminated: break 
    # Step3: 다음에피소드 준비 
    agent.scores.append(agent.score) 
    agent.playtimes.append(t+1)
    agent.n_episodes = agent.n_episodes + 1 

In [103]:
agent.n_experiences

`-` 데이터관찰

In [104]:
print(f"에이전트: 현재상태/행동 = {agent.current_states[0]} / {agent.actions[0]}")
print(f"환경: 보상/다음상태 = {agent.rewards[0]} / {agent.next_states[0]}")

에이전트: 현재상태/행동 = [0 0] / 1
환경: 보상/다음상태 = -1 / [0 1]

In [105]:
print(f"에이전트: 현재상태/행동 = {agent.current_states[1]} / {agent.actions[1]}")
print(f"환경: 보상/다음상태 = {agent.rewards[1]} / {agent.next_states[1]}")

에이전트: 현재상태/행동 = [0 1] / 1
환경: 보상/다음상태 = -1 / [0 2]

In [106]:
print(f"에이전트: 현재상태/행동 = {agent.current_states[2]} / {agent.actions[2]}")
print(f"환경: 보상/다음상태 = {agent.rewards[2]} / {agent.next_states[2]}")

에이전트: 현재상태/행동 = [0 2] / 3
환경: 보상/다음상태 = -1 / [0 1]

In [107]:
print(f"에이전트: 현재상태/행동 = {agent.current_states[3]} / {agent.actions[3]}")
print(f"환경: 보상/다음상태 = {agent.rewards[3]} / {agent.next_states[3]}")

에이전트: 현재상태/행동 = [0 1] / 0
환경: 보상/다음상태 = -1 / [1 1]

In [108]:
print(f"에이전트: 현재상태/행동 = {agent.current_states[4]} / {agent.actions[4]}")
print(f"환경: 보상/다음상태 = {agent.rewards[4]} / {agent.next_states[4]}")

에이전트: 현재상태/행동 = [1 1] / 3
환경: 보상/다음상태 = -1 / [1 0]

`-` 환경을 이해하기 위한 기록 (1)

In [109]:
q_table = np.zeros([4,4,4])
count = np.zeros([4,4,4])
for i in range(agent.n_experiences):
    s1,s2 = agent.current_states[i] 
    a = agent.actions[i] 
    r = agent.rewards[i] 
    q_table[s1,s2,a] = q_table[s1,s2,a] + r
    count[s1,s2,a] = count[s1,s2,a] + 1 

In [110]:
count[count == 0] = 0.01 
q_table = q_table/count

In [111]:
q_table[:,:,3]

In [112]:
for a in range(4):
    print(
        f"action = {a}/{action_to_direction2[a]}\n" 
        f"action-value function = \n {q_table[:,:,a].round(3)}\n" 
)

action = 0/down
action-value function = 
 [[ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1. 100.]
 [-10. -10. -10.   0.]]

action = 1/right
action-value function = 
 [[ -1.  -1.  -1. -10.]
 [ -1.  -1.  -1. -10.]
 [ -1.  -1.  -1. -10.]
 [ -1.  -1. 100.   0.]]

action = 2/up
action-value function = 
 [[-10. -10. -10. -10.]
 [ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1.   0.]]

action = 3/left
action-value function = 
 [[-10.  -1.  -1.  -1.]
 [-10.  -1.  -1.  -1.]
 [-10.  -1.  -1.  -1.]
 [-10.  -1.  -1.   0.]]


`-` 환경을 이해하기 위한 기록 (2)

In [113]:
q_table = np.zeros([4,4,4])
for i in range(agent.n_experiences):
    s1,s2 = agent.current_states[i]
    a = agent.actions[i]
    r = agent.rewards[i]
    q_hat = q_table[s1,s2,a] # 우리가 환경을 이해하고 있는 값, 우리가 풀어낸 답 
    q = r # 실제 답 
    diff = q - q_hat # 실제답과 풀이한값의 차이 = 오차피드백값 
    q_table[s1,s2,a] = q_hat + 0.05 * diff ## 새로운답 = 원래답 + 오차피드백값 

In [114]:
q_table[:,:,0]

In [115]:
for a in range(4):
    print(
        f"action = {a}/{action_to_direction2[a]}\n" 
        f"action-value function = \n {q_table[:,:,a].round(2)}\n" 
)

action = 0/down
action-value function = 
 [[ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1. 100.]
 [-10. -10. -10.   0.]]

action = 1/right
action-value function = 
 [[ -1.  -1.  -1. -10.]
 [ -1.  -1.  -1. -10.]
 [ -1.  -1.  -1. -10.]
 [ -1.  -1. 100.   0.]]

action = 2/up
action-value function = 
 [[-10. -10. -10. -10.]
 [ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1.  -1.]
 [ -1.  -1.  -1.   0.]]

action = 3/left
action-value function = 
 [[-10.  -1.  -1.  -1.]
 [-10.  -1.  -1.  -1.]
 [-10.  -1.  -1.  -1.]
 [-10.  -1.  -1.   0.]]


## B. 환경의 깊은 이해

`-` action=1 일때 각 state의 가치 (=기대보상)

In [208]:
q_table[:,:,1]

`-` 분석1

In [209]:
q_table[3,2,1]

-   상태 (3,2)에서 행동 1을 하게되면 100의 보상을 얻으므로 기대보상값은
    100근처 –\> 합리적임

`-` 분석2

In [210]:
q_table[3,1,1]

-   상태 (3,1)에서 행동 1을 하게되면 -1 의 보상을 얻으므로 기대보상값은
    -1 근처 –\> 합리적일까??

`-` 비판: 분석2는 합리적인것 처럼 보이지만 data를 분석한 뒤에는 그다지
합리적이지 못함.

`-` 상황상상

-   빈 종이를 줌
-   빈 종이에는 0 또는 1을 쓸 수 있음 (action = 0 혹은 1)
-   0을 쓸때와 1을 쓸때 보상이 다름
-   무수히 많은 데이터를 분석해보니, 0을 쓰면 0원을 주고 1을 쓰면
    10만원을 보상을 준다는 것을 “알게 되었음”
-   이때 빈 종이의 가치는 5만원인가? 10만원인가? –\> 10만원아니야?

`-` 직관: 생각해보니 현재 $s=(3,1)$ $a=1$에서 추정된(esitated) 값은
`q_table[3,1,1]` $\approx$ -1 이지만[1], 현실적으로는 “실제보상(-1)과
잠재적보상(100)”을 동시에 고려해야 하는게 합리적임

[1] 즉 next_state가 가지는 잠재적값어치는 고려되어있지 않음

In [48]:
q_hat = q_table[3,1,1]
q_hat

In [49]:
q = (-1) + 0.99 * 100 
q

-   여기에서 0.99는 “미래에 받을 보상이 현재에 비해 얼마나 중요한지를
    결정하는 가중치” 이다.
-   1에 가까울수록 미래에 받을 보상을 매우 중시한다는 의미 (즉 빈종이
    $\approx$ 십만원 으로 생각한다는 의미)
-   0.99는 보통 $\gamma$라는 기호로 표기하며 `discount rate`이라고
    표현한다. (외우세여)

`-` 즉 $q(s,a)$는 모든 $s$, $a$에 대하여

$$q(s,a) \approx \text{reward}(s,a) + 0.99 \times \max_{a}q(s',a)$$

가 성립한다면 $q(s,a)$는 타당하게 추정된 것이라 볼 수 있다. 물론 수식을
좀 더 엄밀하게 쓰면 (terminated, not-terminated 로 나누어 쓰면) 아래와
같다.

$$q(s,a) \approx \begin{cases}  \text{reward}(s,a) + 0.99 \times \max_{a}q(s',a) & \text{not terminated} \\ \text{reward}(s,a) & \text{terminated} \end{cases}$$

> **Note**
>
> 대충 설명하면서 넘어갔지만 이 수식을 **벨만방정식**이라고 부른다.
> (외우세여) 위의 식은 강화학습에서 가장 중요한 식이며 원래 버전은
> 아래와 같다.
>
> $$Q^\star(s,a) = R(s,a) +\gamma\sum_{s'}P(s'|s,a)\max_{a}Q(s',a)$$
>
> 여기에서 $P(s'|s,a)$ 는 상태 $s \in {\cal S}$에서 행동
> $a \in {\cal A}$를 했을때 $s'$에 있을 확률이다. 이러한 확률은
> “바람,소용돌이” 등의 외부의 확률적인 요소가 있는 환경에서 의미가
> 있으며 우리의 예제에서는 의미가 없다.

In [84]:
q_table = np.zeros([4,4,4])
for i in range(agent.n_experiences):
    s1,s2 = agent.current_states[i]
    ss1,ss2 = agent.next_states[i]
    a = agent.actions[i]
    q_hat = q_table[s1,s2,a] 
    if agent.terminations[i]:
        q = agent.rewards[i]
    else:
        future_reward = q_table[ss1,ss2,:].max()
        q = agent.rewards[i] + 0.99 * future_reward
    diff = q - q_hat
    q_table[s1,s2,a] = q_hat + 0.05 * diff 

In [85]:
for a in range(4):
    print(
        f"action = {a}/{action_to_direction2[a]}\n" 
        f"action-value function = \n {q_table[:,:,a].round(2)}\n" 
)

action = 0/down
action-value function = 
 [[-0.47 -0.14 -0.05  0.  ]
 [-0.1  -0.05  0.   -0.05]
 [-0.05 -0.05  0.    5.  ]
 [ 0.   -0.5   0.    0.  ]]

action = 1/right
action-value function = 
 [[-0.61 -0.14  0.    0.  ]
 [-0.1  -0.05 -0.1  -0.5 ]
 [-0.1  -0.1   0.    0.  ]
 [ 0.    0.    0.    0.  ]]

action = 2/up
action-value function = 
 [[-3.02 -3.02  0.    0.  ]
 [-0.17 -0.1   0.    0.  ]
 [ 0.    0.   -0.05  0.  ]
 [-0.05  0.    0.    0.  ]]

action = 3/left
action-value function = 
 [[-2.26 -0.39 -0.1   0.  ]
 [-3.02 -0.1  -0.05  0.  ]
 [-0.5  -0.05 -0.05  0.  ]
 [ 0.    0.    0.    0.  ]]


## C. 행동 전략 수립

`-` 상태 (0,0)에 있다고 가정해보자.

In [86]:
q_table[0,0,:]

-   행동 0 혹은 행동 1을 하는게 유리하다. // 행동 2,3을 하면 망한다.

`-` 상태 (2,3)에 있다고 가정해보자.

In [87]:
q_table[2,3,:]

-   행동 0을 하는게 유리함.

`-` 상태 (3,2)에 있다고 가정해보자.

In [88]:
q_table[3,2,:]

-   행동1을 하는게 유리함

`-` 각 상태에서 최적은 action은 아래와 같다.

In [89]:
q_table[0,0,:].argmax()

In [90]:
q_table[2,3,:].argmax()

In [91]:
q_table[3,2,:].argmax()

`-` 전략(=정책)을 정리해보자.

In [92]:
policy = np.array(['?????']*16).reshape(4,4)
policy

In [187]:
for s1 in range(4):
    for s2 in range(4):
        policy[s1,s2] = action_to_direction2[q_table[s1,s2,:].argmax()]
policy

In [188]:
q_table.max(axis=-1)

## D. 에이전트 클래스 설계

In [146]:
class AgentGreedy(AgentRandom):
    def __init__(self,env):
        super().__init__(env)
        self.q_table = np.zeros([4,4,4]) 
    def learn(self):
        s1,s2 = self.current_state
        ss2,ss2 = self.next_state
        a = self.action 
        q_hat = self.q_table[s1,s2,a] 
        if self.terminated:
            q = self.reward
        else:
            future_reward = q_table[ss1,ss2,:].max()
            q = self.reward + 0.99 * future_reward 
        diff = q - q_hat 
        self.q_table[s1,s2,a] = q_hat + 0.05 * diff 
    def act(self):
        if self.n_experiences < 3000: 
            self.action = self.action_space.sample() 
        else:
            s1,s2 = self.current_state 
            self.action = self.q_table[s1,s2,:].argmax()

## E. 환경과 상호작용

In [172]:
env = GridWorld() 
agent = AgentGreedy(env) 
for _ in range(5000):
    # Step1: 에피소드 준비 
    agent.current_state = env.reset()
    agent.terminated = False 
    agent.score = 0 
    # Step2: 에프소드 진행 
    for t in range(50):
        # step1: 행동
        agent.act() 
        # step2: 보상 
        agent.next_state, agent.reward, agent.terminated = env.step(agent.action)
        # step3: 저장 & 학습 
        agent.save_experience() 
        agent.learn()
        # step4: 다음 스텝준비 
        agent.current_state = agent.next_state 
        if agent.terminated: break 
    # Step3: 다음에피소드 준비 
    agent.scores.append(agent.score) 
    agent.playtimes.append(t+1)
    agent.n_episodes = agent.n_episodes + 1
    if (agent.n_episodes % 500) ==0:
        print(
            f"Epsiode: {agent.n_episodes} \t"
            f"Score: {np.mean(agent.scores[-100:])} \t"
            f"Playtime: {np.mean(agent.playtimes[-100:])}\t"
        )     

Epsiode: 500    Score: -10.64   Playtime: 2.74  
Epsiode: 1000   Score: 87.27    Playtime: 12.63 
Epsiode: 1500   Score: 90.66    Playtime: 10.34 
Epsiode: 2000   Score: 94.0     Playtime: 7.0   
Epsiode: 2500   Score: 94.0     Playtime: 7.0   
Epsiode: 3000   Score: 95.0     Playtime: 6.0   
Epsiode: 3500   Score: 95.0     Playtime: 6.0   
Epsiode: 4000   Score: 95.0     Playtime: 6.0   
Epsiode: 4500   Score: 95.0     Playtime: 6.0   
Epsiode: 5000   Score: 95.0     Playtime: 6.0   

## F. 상호작용결과 시각화

In [173]:
states = [np.array([0,0])] + agent.next_states[-agent.playtimes[-1]:] 
show(states)

# 5. `AgentExplorer`

## A. 클래스 설계

In [175]:
class AgentExplorer(AgentGreedy):
    def __init__(self,env):
        super().__init__(env)
        self.eps = 0 
    def act(self):
        if np.random.rand() < self.eps:
            self.action = self.action_space.sample() 
        else:
            super().act()

## B. 환경과 상호작용

In [184]:
env = GridWorld() 
agent = AgentExplorer(env) 
agent.eps = 1
for _ in range(5000):
    # Step1: 에피소드 준비 
    agent.current_state = env.reset()
    agent.terminated = False 
    agent.score = 0 
    # Step2: 에프소드 진행 
    for t in range(50):
        # step1: 행동
        agent.act() 
        # step2: 보상 
        agent.next_state, agent.reward, agent.terminated = env.step(agent.action)
        # step3: 저장 & 학습 
        agent.save_experience() 
        agent.learn()
        # step4: 다음 스텝준비 
        agent.current_state = agent.next_state 
        if agent.terminated: break 
    # Step3: 다음에피소드 준비 
    agent.scores.append(agent.score) 
    agent.playtimes.append(t+1)
    agent.n_episodes = agent.n_episodes + 1
    agent.eps = agent.eps * 0.999
    #--#
    if (agent.n_episodes % 500) ==0:
        print(
            f"Epsiode: {agent.n_episodes} \t"
            f"Score: {np.mean(agent.scores[-100:])} \t"
            f"Playtime: {np.mean(agent.playtimes[-100:])}\t"
            f"Epsilon: {agent.eps : .2f}"
        )   

Epsiode: 500    Score: -10.28   Playtime: 3.48  Epsilon:  0.61
Epsiode: 1000   Score: -1.61    Playtime: 4.71  Epsilon:  0.37
Epsiode: 1500   Score: 38.27    Playtime: 8.83  Epsilon:  0.22
Epsiode: 2000   Score: 59.41    Playtime: 9.69  Epsilon:  0.14
Epsiode: 2500   Score: 82.61    Playtime: 7.39  Epsilon:  0.08
Epsiode: 3000   Score: 80.89    Playtime: 6.91  Epsilon:  0.05
Epsiode: 3500   Score: 92.8     Playtime: 6.0   Epsilon:  0.03
Epsiode: 4000   Score: 92.71    Playtime: 6.09  Epsilon:  0.02
Epsiode: 4500   Score: 93.85    Playtime: 6.05  Epsilon:  0.01
Epsiode: 5000   Score: 92.86    Playtime: 5.94  Epsilon:  0.01

## C. 상호작용 결과 시각화

In [185]:
states = [np.array([0,0])] + agent.next_states[-agent.playtimes[-1]:] 
show(states)