# 제어 문제

In [2]:
#!hide
from collections import defaultdict

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt


class MCPrediction:
    def __init__(self, env, gamma=0.9):
        self.gamma = gamma
        self.env = env
        self.V = np.zeros(env.observation_space.n)
        self.returns = defaultdict(list)

    def select_action(self, policy, state):
        return policy[state]
    
    def generate_episode(self, policy):
        episode = []
        state, info = self.env.reset()
        done = False
        while not done:
            action = self.select_action(policy, state)
            next_state, reward, done, _, _ = self.env.step(action)
            episode.append((state, action, reward, next_state, done))
            state = next_state
        return episode

    def update_value(self, episode):
        G = 0
        for t in reversed(range(len(episode))):
            state, action, reward, next_state, done = episode[t]
            G = self.gamma * G + reward
            if state not in [x[0] for x in episode[:t]]:
                self.returns[state].append(G)
                self.V[state] = np.mean(self.returns[state])

    def evaluate_policy(self, policy, num_episodes=1000):
        nS = self.env.observation_space.n
        self.V_track = np.zeros((nS, num_episodes))
        for e in range(num_episodes):
            episode = self.generate_episode(policy)
            self.update_value(episode)
            self.V_track[:, e] = self.V.copy()
        return self.V, self.V_track

class SlipperyWalk(gym.Env):
    def __init__(self, length):
        super().__init__()

        self.action_space = gym.spaces.Discrete(2)
        self.observation_space = gym.spaces.Discrete(length)

        self.start_state = length // 2  # 2로 나눈 몫
        self.state = self.start_state
        self.done = False
        self.truncated = False
        self.length = length
        self.P = self.set_transition()

    def set_transition(self):
        P = {}
        transition_probs = {'MOVE': 1/2, 'STAY': 1/3, 'OPPOSITE': 1/6}
        goal = self.length - 1

        for s in range(self.length):
            P[s] = {}
            for a in range(2):
                if s == 0:
                    P[s][a] = [(1.0, s, 0, True)]  # 확률, 상태, 보상, 종료 여부
                elif s == goal:
                    P[s][a] = [(1.0, s, 1, True)]
                else:
                    transitions = []
                    for transition, prob in transition_probs.items():
                        if transition == 'MOVE':
                            next_state = s + 1 if a == 1 else s - 1
                        elif transition == 'STAY':
                            next_state = s
                        elif transition == 'OPPOSITE':
                            next_state = s - 1 if a == 1 else s + 1
                        next_state = max(0, min(goal, next_state))   # 범위 제한 표현
                        reward = 1 if next_state == goal else 0      # 삼항 연산자
                        done = next_state == 0 or next_state == goal
                        transitions.append((prob, next_state, reward, done))
                    P[s][a] = transitions

        return P

    def reset(self):
        self.state = self.start_state
        self.done = False
        self.truncated = False
        self.steps = 0
        return self.state, {}

    def step(self, action):
        transitions = self.P[self.state][action]
        probs = np.array([t[0] for t in transitions])
        i = np.random.choice(np.arange(len(transitions)), p=probs)
        _, self.state, reward, self.done = transitions[i]
        self.steps += 1
        self.truncated = self.steps >= 100
        return self.state, reward, self.done, self.truncated, {}

    def render(self, mode='human'):
        grid = ['X'] + ([' '] * (self.length - 2)) + ['O']  # 리스트 연산
        grid[self.state] = 'A'
        print('|' + '|'.join(grid) + '|') # 문자열 연산

    @property
    def unwrapped(self):
        return self

def policy_evaluation(pi, P, gamma=1.0, theta=1e-10):
    nS = len(P)  # 상태의 수
    prev_V = np.zeros(nS)  # ns와 크기가 같은 영(0)벡터
    while True:
        V = np.zeros_like(prev_V)  # prev_V와 모양이 같은 영벡터
        for s in range(nS):
            for prob, next_state, reward, done in P[s][pi[s]]:
                rtrn = reward + gamma * prev_V[next_state] * (not done)
                V[s] += prob * rtrn
        if np.max(np.abs(prev_V - V)) < theta: # 기존 V와 차이가 작으면 중단
            break
        prev_V = V.copy()
    return V

def rmse(x, y):
    return np.sqrt(np.mean((x - y)**2))

def plot_value_track(V_track, V_true, env):
    colors = plt.cm.viridis(np.linspace(0, 1, env.length - 2))

    for i in range(env.length - 2):
        j = i + 1
        plt.plot(V_track[j], color=colors[i])
        plt.hlines(V_true[j], 0, 1000, colors=colors[i], linestyles='dashed')
        plt.text(0, V_true[j], f'{i}')

## On-Policy

### MC 제어

In [3]:
class MCControl(MCPrediction):
    def __init__(self, env, gamma=0.9, epsilon=0.1):
        super().__init__(env, gamma)
        self.epsilon = epsilon  # Exploration rate
        self.Q = np.zeros((env.observation_space.n, env.action_space.n))
        self.returns = defaultdict(list)

    def select_action(self, policy, state):
        if np.random.rand() < self.epsilon:
            return self.env.action_space.sample()
        else:
            return np.argmax(self.Q[state])

    def update_value(self, episode):
        G = 0
        for t in reversed(range(len(episode))):
            state, action, reward, next_state, done = episode[t]
            G = self.gamma * G + reward
            if (state, action) not in [(x[0], x[1]) for x in episode[:t]]:
                self.returns[(state, action)].append(G)
                self.Q[state][action] = np.mean(self.returns[(state, action)])

    def improve_policy(self):
        policy = {}
        for state in range(self.env.observation_space.n):
            policy[state] = np.argmax(self.Q[state])
        return policy

    def evaluate_policy(self, policy, num_episodes=1000):
        nS = self.env.observation_space.n
        nA = self.env.action_space.n
        self.Q_track = np.zeros((nS, nA, num_episodes))
        for e in range(num_episodes):
            episode = self.generate_episode(policy)
            self.update_value(episode)
            self.Q_track[:, :, e] = self.Q.copy()
        return self.Q, self.Q_track

    def control(self, policy, num_episodes=1000):
        for _ in range(num_episodes):
            episode = self.generate_episode()
            self.update_value(episode)
        return self.improve_policy()

실험

In [6]:
env = SlipperyWalk(9)
agent = MCControl(env)
LEFT, RIGHT = 0, 1
pi = {s: LEFT for s in range(env.observation_space.n)}
agent.evaluate_policy(pi)
new_pi = agent.improve_policy()
new_pi


{0: 0, 1: 0, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 0}

### SARSA

In [5]:
class SARSA(MCControl):
    def __init__(self, env, gamma=0.9, alpha=0.1, epsilon=0.1):
        super().__init__(env, gamma, epsilon)
        self.alpha = alpha

    def generate_episode(self, policy):
        episode = []
        state, info = self.env.reset()
        action = self.select_action(policy, state)
        done = False
        while not done:
            next_state, reward, done, _, _ = self.env.step(action)
            next_action = self.select_action(policy, next_state)
            episode.append((state, action, reward, next_state, next_action, done))
            state, action = next_state, next_action
        return episode

    def update_value(self, episode):
        for t in range(len(episode)):
            state, action, reward, next_state, next_action, done = episode[t]
            td_target = reward + (self.gamma * self.Q[next_state][next_action] * (not done))
            td_error = td_target - self.Q[state][action]
            self.Q[state][action] += self.alpha * td_error

    def control(self, policy, num_episodes=1000):
        for _ in range(num_episodes):
            episode = self.generate_episode(policy)
            self.update_value(episode)
        return self.improve_policy()

In [7]:
agent = SARSA(env)
agent.evaluate_policy(pi)
new_pi = agent.improve_policy()
new_pi

{0: 0, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 0}

## Off-Policy

### Q-Learning

In [9]:
class QLearning(SARSA):
    def generate_episode(self, policy):
        episode = []
        state, info = self.env.reset()
        done = False
        while not done:
            action = self.select_action(policy, state)
            next_state, reward, done, _, _ = self.env.step(action)
            episode.append((state, action, reward, next_state, done))
            state = next_state
        return episode
    
    def update_value(self, episode):
        for t in range(len(episode)):
            state, action, reward, next_state, done = episode[t]
            td_target = reward + (self.gamma * np.max(self.Q[next_state]) * (not done))
            td_error = td_target - self.Q[state][action]
            self.Q[state][action] += self.alpha * td_error

In [11]:
agent = QLearning(env)
agent.evaluate_policy(pi)
new_pi = agent.improve_policy()
new_pi

{0: 0, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 0}

### 이중 Q-학습

In [12]:
class DoubleQLearning(QLearning):
    def __init__(self, env, gamma=0.9, alpha=0.1, epsilon=0.1):
        super().__init__(env, gamma, alpha, epsilon)
        self.QA = np.zeros((env.observation_space.n, env.action_space.n))
        self.QB = np.zeros((env.observation_space.n, env.action_space.n))

    def update_value(self, episode):
        for t in range(len(episode)):
            state, action, reward, next_state, done = episode[t]
            if np.random.rand() < 0.5:
                Q1, Q2 = self.QA, self.QB
            else:
                Q1, Q2 = self.QB, self.QA

            best_next_action = np.argmax(Q1[next_state])
            td_target = reward + (self.gamma * Q2[next_state][best_next_action] * (not done))
            td_error = td_target - Q1[state][action]
            Q1[state][action] += self.alpha * td_error
        self.Q = (self.QA + self.QB) / 2

In [13]:
agent = DoubleQLearning(env)
agent.evaluate_policy(pi)
new_pi = agent.improve_policy()
new_pi

{0: 0, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 0}

### SARSA(λ)

In [14]:
class SARSALambda(SARSA):
    def __init__(self, env, gamma=0.9, alpha=0.1, epsilon=0.1, lambd=0.9):
        super().__init__(env, gamma, alpha, epsilon)
        self.lambd = lambd
        self.E = np.zeros((env.observation_space.n, env.action_space.n))

    def update_value(self, episode):        
        self.E.fill(0)
        for t in range(len(episode)):
            state, action, reward, next_state, next_action, done = episode[t]
            td_target = reward + (self.gamma * self.Q[next_state][next_action] * (not done))
            td_error = td_target - self.Q[state][action]
            self.E[state][action] += 1
            for s in range(self.env.observation_space.n):
                for a in range(self.env.action_space.n):
                    self.Q[s][a] += self.alpha * td_error * self.E[s][a]
                    if next_action is not None:
                        self.E[s][a] *= self.gamma * self.lambd

In [15]:
agent = SARSALambda(env)
agent.evaluate_policy(pi)
new_pi = agent.improve_policy()
new_pi

{0: 0, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 0}

### 왓킨스의 Q(λ)

In [17]:
class WatkinsQLambda(SARSA):
    def __init__(self, env, gamma=0.9, alpha=0.1, epsilon=0.1, lambd=0.9):
        super().__init__(env, gamma, alpha, epsilon)
        self.lambd = lambd
        self.E = np.zeros((env.observation_space.n, env.action_space.n))

    def update_value(self, episode):
        self.E.fill(0)
        for t in range(len(episode)):
            state, action, reward, next_state, next_action, done = episode[t]
            best_next_action = np.argmax(self.Q[next_state])
            td_target = reward + (self.gamma * self.Q[next_state][best_next_action] * (not done))
            td_error = td_target - self.Q[state][action]
            self.E[state][action] += 1

            for s in range(self.env.observation_space.n):
                for a in range(self.env.action_space.n):
                    self.Q[s][a] += self.alpha * td_error * self.E[s][a]
                    if best_next_action == next_action:
                        self.E[s][a] *= self.gamma * self.lambd
                    else:
                        self.E[s][a] = 0



In [20]:
agent = WatkinsQLambda(env)
agent.evaluate_policy(pi)
new_pi = agent.improve_policy()
new_pi

{0: 0, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 0}

## 모형기반 강화학습

### Dyna-Q

In [21]:
import random

class DynaQ(QLearning):
    def __init__(self, env, gamma=0.9, alpha=0.1, epsilon=0.1, n_planning_steps=5):
        super().__init__(env, gamma, alpha, epsilon)
        self.n_planning_steps = n_planning_steps
        self.model = defaultdict(lambda: defaultdict(lambda: (0, 0, 0)))

    def update_model(self, state, action, reward, next_state, done):
        self.model[state][action] = (reward, next_state, done)

    def planning_step(self):
        for _ in range(self.n_planning_steps):
            if len(self.model) == 0:
                continue
            # 상태, 행동을 무작위로 샘플링
            state = random.choice(list(self.model.keys()))
            action = random.choice(list(self.model[state].keys()))
            reward, next_state, done = self.model[state][action]

            td_target = reward + (self.gamma * np.max(self.Q[next_state]) * (not done))
            td_error = td_target - self.Q[state][action]
            self.Q[state][action] += self.alpha * td_error

    def update_value(self, episode):
        for t in range(len(episode)):
            state, action, reward, next_state, done = episode[t]
            td_target = reward + (self.gamma * np.max(self.Q[next_state]) * (not done))
            td_error = td_target - self.Q[state][action]
            self.Q[state][action] += self.alpha * td_error
            self.update_model(state, action, reward, next_state, done)
            self.planning_step()

    def control(self, policy, num_episodes=1000):
        for _ in range(num_episodes):
            episode = self.generate_episode(policy)
            self.update_value(episode)
        return self.improve_policy()


In [25]:
agent = DynaQ(env)
agent.evaluate_policy(pi)
new_pi = agent.improve_policy()
new_pi

{0: 0, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 0}

### Trajectory Sampling

In [26]:
class DynaQTrajectory(DynaQ):
    def __init__(self, env, gamma=0.9, alpha=0.1, epsilon=0.1, n_planning_steps=5, 
                 trajectory_length=5):
        super().__init__(env, gamma, alpha, epsilon, n_planning_steps)
        self.trajectory_length = trajectory_length

    def sample_trajectory(self, start_state, start_action):
        trajectory = []
        state = start_state
        action = start_action
        for _ in range(self.trajectory_length):
            if state not in self.model or action not in self.model[state]:
                break
            reward, next_state, done = self.model[state][action]
            trajectory.append((state, action, reward, next_state, done))
            if done:
                break
            state = next_state
            action = np.argmax(self.Q[state])
        return trajectory

    def planning_step(self):
        for _ in range(self.n_planning_steps):
            if len(self.model) == 0:
                continue
            start_state = random.choice(list(self.model.keys()))
            start_action = random.choice(list(self.model[start_state].keys()))
            trajectory = self.sample_trajectory(start_state, start_action)
            for t in range(len(trajectory)):
                state, action, reward, next_state, done = trajectory[t]
                td_target = reward + (self.gamma * np.max(self.Q[next_state]) * (not done))
                td_error = td_target - self.Q[state][action]
                self.Q[state][action] += self.alpha * td_error

In [27]:
agent = DynaQTrajectory(env)
agent.evaluate_policy(pi)
new_pi = agent.improve_policy()
new_pi

{0: 0, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 0}

## 퀴즈

<iframe src="https://tally.so/embed/wgZDV1?alignLeft=1&hideTitle=1&transparentBackground=1&dynamicHeight=1" loading="lazy" width="100%" height="1600" frameborder="0" marginheight="0" marginwidth="0" title="[RL] 제어 문제"></iframe>