# 13wk-2: (강화학습) – Bandit 환경 설계 및 풀이, 4x4 Grid World 게임설명

## 1. Imports

In [1]:
import gymnasium as gym
#---#
import numpy as np
import collections
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import IPython

## 2. 주요 코드 등

`-` 클래스로 Agent와 Environment 구현

```Python
class Bandit :
    """
    Bandit Game Environment
    """
    def __init__(self) :
        self.reward = None

    def step(self, action) :
        """
        보통 state, reward, terminated를 출력하나, 여기선 terminated밖에 없음
        """
        if action == 0 :
            self.reward = 1
        else :
            self.reward = 10

        return self.reward


class Agent :
    """
    Bandit Game Agent
    """
    def __init__(self) :
        self.n_experience = 0

        self.action = None
        self.reward = None
        self.actions = collections.deque(maxlen = 500)
        self.rewards = collections.deque(maxlen = 500)

        self.action_space = [0, 1]
        self.q_table = None

    def act(self) :
        """
        매우 낮은 확률로 두 그룹이 전부 관찰되지 않을 수 있음(2/100조 정도이긴 함)
        """
        if self.n_experience < 20 :
            self.action = np.random.choice(self.action_space)
        else :
            self.action = self.q_table.argmax()

    def save_experience(self) :
        self.actions.append(self.action)
        self.rewards.append(self.reward)
        self.n_experience += 1

    def learn(self) :
        actions = np.array(self.actions)
        rewards = np.array(self.rewards)

        q0 = rewards[actions == 0].mean()
        q1 = rewards[actions == 1].mean()

        self.q_table = np.array([q0, q1])
```

`-` 학습

```Python
env = Bandit()
player = Agent()

for _ in range(100) :
    ## step1 : agent action
    player.act()

    ## step2 : action -> state, reward
    player.reward = env.step(player.action)

    ## step3 : agent가 데이터를 축적하고 학습
    player.save_experience() ## 데이터를 저장
    player.learn() # 저장된 데이터를 학습

    ##---강화학습의 종료 시점 결정---##
    if (player.n_experience >= 20) and (np.array(player.rewards)[-20:].mean() > 9.5) :
        print("---게임 클리어---")
        break
```

`-` Bandit 이동경로 시각화

```Python
def show(states: List[int]) -> matplotlib.pyplot.Figure :
    """
    2차원 점들의 집합을 넣으면 빨간 점으로 이동 경로를 시각화하는 함수
    """
    fig = plt.Figure()
    ax = fig.subplots()
    ax.matshow(np.zeros([4,4]), cmap='bwr',alpha=0.0)
    sc = ax.scatter(0, 0, color='red', s=500)  
    ax.text(0, 0, 'start', ha='center', va='center')
    ax.text(3, 3, 'end', ha='center', va='center')
    # Adding grid lines to the plot
    ax.set_xticks(np.arange(-.5, 4, 1), minor=True)
    ax.set_yticks(np.arange(-.5, 4, 1), minor=True)
    ax.grid(which='minor', color='black', linestyle='-', linewidth=2)
    state_space = gym.spaces.MultiDiscrete([4,4])
    
    def update(t):
        if states[t] in state_space:
            s1,s2 = states[t]
            states[t] = [s2,s1]
            sc.set_offsets(states[t])
        else:
            s1,s2 = states[t]
            s1 = s1 + 0.5 if s1 < 0 else (s1 - 0.5 if s1 > 3 else s1)
            s2 = s2 + 0.5 if s2 < 0 else (s2 - 0.5 if s2 > 3 else s2)
            states[t] = [s2,s1]       
            sc.set_offsets(states[t])
            
    ani = FuncAnimation(fig,update,frames=len(states))
    display(IPython.display.HTML(ani.to_jshtml()))
```

`-` 4x4 GridWorld Environment

* 핵심 함수 : `gym.spaces.Discrete()`

```Python
class GridWorld :
    def __init__(self) :
        self.a2d = {
            0 : np.array([0, 1]),
            1 : np.array([0, -1]),
            2 : np.array([1, 0]),
            3 : np.array([-1, 0])
        }

        self.state = np.array([0, 0])
        self.state_space = gym.spaces.MultiDiscrete([4, 4])
        self.reward = None
        self.terminated = False

    def reset(self) :
        self.state = np.array([0, 0])
        self.reward = None
        self.terminated = False

        return self.state

    def step(self, action) :
        self.state += self.a2d[action]
        s1, s2 = self.state

        if (s1 == 3) and (s2 == 3) :
            self.reward = 100
            self.terminated = True
        
        elif self.state in self.state_space :
            self.reward = -1
            self.terminated = False

        else :
            self.reward = -10
            self.terminated = True

        return self.state, self.reward, self.terminated
```

## 3. Bandit 환경 설계 및 풀이

### **A. 대충 개념만 실습**

In [2]:
action_space = [0,1] 
actions_deque = collections.deque(maxlen=500)
rewards_deque =  collections.deque(maxlen=500)
#---#

In [3]:
for _ in range(10):
    action = np.random.choice(action_space)
    if action == 1:
        reward = 10 
    else:
        reward = 1
    actions_deque.append(action)
    rewards_deque.append(reward)

In [4]:
actions_deque

deque([np.int64(1),
       np.int64(1),
       np.int64(0),
       np.int64(0),
       np.int64(1),
       np.int64(1),
       np.int64(1),
       np.int64(0),
       np.int64(1),
       np.int64(1)],
      maxlen=500)

In [5]:
rewards_deque

deque([10, 10, 1, 1, 10, 10, 10, 1, 10, 10], maxlen=500)

In [6]:
actions_numpy = np.array(actions_deque)
rewards_numpy = np.array(rewards_deque)

In [7]:
q0 = rewards_numpy[actions_numpy == 0].mean()
q1 = rewards_numpy[actions_numpy == 1].mean()
q_table = np.array([q0,q1])
q_table

array([ 1., 10.])

In [8]:
action = q_table.argmax()

In [9]:
for _ in range(5):
    action = q_table.argmax()
    if action == 1:
        reward = 10 
    else:
        reward = 1
    actions_deque.append(action)
    rewards_deque.append(reward)
    actions_numpy = np.array(actions_deque)
    rewards_numpy = np.array(rewards_deque)    
    q0 = rewards_numpy[actions_numpy == 0].mean()
    q1 = rewards_numpy[actions_numpy == 1].mean()
    q_table = np.array([q0,q1])

In [10]:
actions_numpy

array([1, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1])

In [11]:
rewards_numpy

array([10, 10,  1,  1, 10, 10, 10,  1, 10, 10, 10, 10, 10, 10, 10])

### **B. 클래스를 이용한 구현**

```Python
class Bandit:
    def __init__(self):
        self.reward = None 
    def step(self,action):
        if action == 0:
            self.reward = 1
        else: 
            self.reward = 10 
        return self.reward 
```

```Python
class Agent:
    def __init__(self):
        pass 
    def act(self):
        # 만약에 경험이 20보다 작음 --> 랜덤액션 
        # 경험이 20보다 크면 --> action = q_tabel.argmax()
        pass 
    def save_experience(self):
        # 데이터 저장 
        pass 
    def learn(self):
        # q_table 을 업데이트하는 과정 
        pass
```

------------------------------------------------------------------------

In [None]:
class Bandit :
    def __init__(self) :
        self.reward = None
    def step(self, action) :
        if action == 0 :
            self.reward = 1
        else :
            self.reward = 10
        return self.reward

In [None]:
class Agent :
    def __init__(self) :
        self.n_experience = 0
        
        self.action = None
        self.reward = None
        self.actions = collections.deque(maxlen = 500)
        self.rewards = collections.deque(maxlen = 500)
        
        self.action_space = [0, 1]
        self.q_table = None
        
        
    def act(self) :
        if self.n_experience < 20 :
            self.action = np.random.choice(self.action_space)
        else :
            self.action = self.q_table.argmax()

        print(f"버튼 {self.action} 누름")

            
    def save_experience(self) :
        self.actions.append(self.action)
        self.rewards.append(self.reward)
        self.n_experience += 1

        
    def learn(self) :
        if self.n_experience < 20 :
            pass
        else :
            actions = np.array(self.actions)
            rewards = np.array(self.rewards)
            q0 = rewards[actions == 0].mean()
            q1 = rewards[actions != 0].mean()
    
            self.q_table = np.array([q0, q1])

In [None]:
env = Bandit()
player = Agent()

In [None]:
player.act()

`-` 기본 템플릿

In [None]:
for _ in range(100) :
    ## step1 : agent action
    player.act()

    ## step2 : action -> state, reward
    player.reward = env.step(player.action)

    ## step3 : agent가 데이터를 축적하고 학습
    player.save_experience() ## 데이터를 저장
    player.learn() # 저장된 데이터를 학습

    ##---강화학습의 종료 시점 결정---##
    if (player.n_experience >= 20) and (np.array(player.rewards)[-20:].mean() > 9.5) :
        print("---게임 클리어---")
        break

## 4. 예비학습 : `gym.spaces`

`-` 예시 1 : 샘플링 및 집합 기능 포함

In [None]:
action_space = gym.spaces.Discrete(4)
action_space

In [None]:
[action_space.sample() for _ in range(5)]

In [None]:
0 in action_space

In [None]:
4 in action_space

`-` 예시2 : n차원

In [None]:
state_space = gym.spaces.MultiDiscrete([4, 4])
state_space

In [None]:
[state_space.sample() for _ in range(5)]

In [None]:
[0, 1] in state_space

In [None]:
np.array([3, 3]) in state_space

In [None]:
np.array([3, 4]) in state_space

## 5. 4x4 Grid World 게임 설명

### **A. 게임 설명**

`-` 4x4 그리드 월드에서 상하좌우로 움직이는 에이전트가 목표점에 도달하도록 하는 게임

`-` GridWorld에서 사용되는 주요변수

    1. State : 각 격자 셀이 하나의 상태이며, 에이전트는 이러한 상태 중 하나에 있을 수 있음
    2. Action : 에이전트는 현재상태에서 다음상태로 이동하기 위해 상/하/좌/우 중 하나의 행동을 취할 수 있음
    3. Reward : 에이전트가 현재상태에서 특정 action을 하면 얻어지는 보상.
    4. Terminated : 하나의 에피소드가 종료되었음을 나타내는 상태.

`-` 에이전트 환경

* 에이전트 행동 : 상하좌우로 이동 - 4개의 행동 == `[0, 1, 2, 3]`
* 환경은 보상을 줌 : `-1, -10, +100`
  * `-1` : 격자 안에 에이전트가 있음 `&` 에이전트의 위치가 `(3, 3)`이 아님
  * `+100` : 에이전트의 위치가 `(3, 3)`
  * `-10` : 에이전트가 격자안에 있지 않음
* 에이전트 (Action) <---> 환경 (State, Reward, Terminated)

### **B. 시각화**

In [None]:
def show(states):
    """
    빨간 점으로 이동 경로를 시각화하는 함수
    """
    fig = plt.Figure()
    ax = fig.subplots()
    ax.matshow(np.zeros([4,4]), cmap='bwr',alpha=0.0)
    sc = ax.scatter(0, 0, color='red', s=500)  
    ax.text(0, 0, 'start', ha='center', va='center')
    ax.text(3, 3, 'end', ha='center', va='center')
    # Adding grid lines to the plot
    ax.set_xticks(np.arange(-.5, 4, 1), minor=True)
    ax.set_yticks(np.arange(-.5, 4, 1), minor=True)
    ax.grid(which='minor', color='black', linestyle='-', linewidth=2)
    state_space = gym.spaces.MultiDiscrete([4,4])
    
    def update(t):
        if states[t] in state_space:
            s1,s2 = states[t]
            states[t] = [s2,s1]
            sc.set_offsets(states[t])
        else:
            s1,s2 = states[t]
            s1 = s1 + 0.5 if s1 < 0 else (s1 - 0.5 if s1 > 3 else s1)
            s2 = s2 + 0.5 if s2 < 0 else (s2 - 0.5 if s2 > 3 else s2)
            states[t] = [s2,s1]       
            sc.set_offsets(states[t])
            
    ani = FuncAnimation(fig,update,frames=len(states))
    display(IPython.display.HTML(ani.to_jshtml()))

In [None]:
show([[0,0],[1,0],[2,0],[3,0],[4,0]]) # show 사용방법

## 6. 4x4 Grid World 환경 구현

In [None]:
action_space = gym.spaces.Discrete(4)
action_space.sample()

In [None]:
state_space = gym.spaces.MultiDiscrete([4, 4])
state_space.sample()

In [None]:
state = np.array([1, 1])
state += np.array([0, 1])
state

In [None]:
class GridWorld :
    def __init__(self) :
        self.a2d = {
            0 : np.array([0, 1]),
            1 : np.array([0, -1]),
            2 : np.array([1, 0]),
            3 : np.array([-1, 0]),
        }

        self.state = np.array([0, 0])
        self.state_space = gym.spaces.MultiDiscrete([4, 4])
        self.reward = None
        self.terminated = False


    def reset(self) :
        self.state = np.array([0, 0])
        self.reward = None
        self.terminated = False
        
        return self.state

    
    def step(self, action) :
        self.state += self.a2d[action]
        s1, s2 = self.state
        
        if (s1 == 3) and (s2 == 3) :
            self.reward = 100
            self.terminated = True
            
        elif self.state in self.state_space :
            self.reward = -1
            
        else :
            self.reward = -10
            self.terminated = True

        print(
            f"action = {action}\t"
            f"state = {self.state - self.a2d[action]} -> {self.state}\t"
            f"reward = {self.reward}\t"
            f"termiated = {self.terminated}"
        )

        return self.state, self.reward, self.terminated

In [49]:
env = GridWorld()

In [59]:
action_space = gym.spaces.Discrete(4)

for _ in range(50) :
    action = action_space.sample()
    
    state, reward, terminated = env.step(action)

    if terminated :
        env.reset()
        break

action = 2	state = [0 0] -> [1 0]	reward = -1	termiated = False
action = 3	state = [1 0] -> [0 0]	reward = -1	termiated = False
action = 0	state = [0 0] -> [0 1]	reward = -1	termiated = False
action = 1	state = [0 1] -> [0 0]	reward = -1	termiated = False
action = 0	state = [0 0] -> [0 1]	reward = -1	termiated = False
action = 2	state = [0 1] -> [1 1]	reward = -1	termiated = False
action = 1	state = [1 1] -> [1 0]	reward = -1	termiated = False
action = 0	state = [1 0] -> [1 1]	reward = -1	termiated = False
action = 0	state = [1 1] -> [1 2]	reward = -1	termiated = False
action = 1	state = [1 2] -> [1 1]	reward = -1	termiated = False
action = 0	state = [1 1] -> [1 2]	reward = -1	termiated = False
action = 3	state = [1 2] -> [0 2]	reward = -1	termiated = False
action = 1	state = [0 2] -> [0 1]	reward = -1	termiated = False
action = 1	state = [0 1] -> [0 0]	reward = -1	termiated = False
action = 0	state = [0 0] -> [0 1]	reward = -1	termiated = False
action = 0	state = [0 1] -> [0 2]	reward

## 7. "에이전트 <-> 환경" 상호작용 구현

`-` 우리가 구현하고 싶은 기능

* `.act()` : 액션을 결정 -> 일단 어려우니 여기선 랜덤 액션
* `.save_experience()` : 데이터를 저장 -> 해당 과정에 초점
* `.learn()` : 데이터에서의 학습 -> 어차피 랜덤이니 여기선 pass

In [None]:
class RandomAgent :
    def __init__(self) :
        self.state = np.array([0, 0])
        self.action = None
        self.reward = None
        self.next_state = None
        self.terminated = None
        #---# SARSA -> SARST
        #---#
        self.states = collections.deque(maxlen = 500)
        self.actions = collections.deque(maxlen = 500)
        self.rewards = collections.deque(maxlen = 500)
        self.next_states = collections.deque(maxlen = 500)
        self.terminations = collections.deque(maxlen = 500)
        
        self.action_space = gym.spaces.Discrete(4)
        self.n_experience = 0
        
        
    def act(self) :
        self.action = self.action_space.sample()
        
            
    def save_experience(self) :
        self.states.append(self.state.copy()) ## 왜 나만 깊은복사 이슈가...
        self.actions.append(self.action)
        self.rewards.append(self.reward)
        self.next_states.append(self.next_state.copy())
        self.terminations.append(self.terminated)
        
        self.n_experience += 1

        
    def learn(self) :
        pass

In [None]:
player = RandomAgent()
env = GridWorld()

In [None]:
scores = []

for e in range(1, 20) :
    score = 0
    player.state = env.reset()

    for t in range(50) :
        ## step 1 : Agent의 action
        player.act()
        ## step 2 : Env가 Agent의 action을 보고 next_state, reward, terminated를 전달
        player.next_state, player.reward, player.terminated = env.step(player.action)
        ## step 3 : Agent가 save & learn
        player.save_experience()
        player.learn() ## pass
        ## step 4 : next iteration으로의 이행
        player.state = player.next_state
        score += player.reward
        if player.terminated :
            print(f"---에피소드{e}종료---")
            break

    #---#
    scores.append(score)
    
    if scores[-1] > 0 :
        break

In [None]:
scores[-1]

In [None]:
paths = [np.array([0,0])]+ list(player.next_states)[-20:]
show(paths)