# Zpětnovazební učení

V tomto cvičení budeme pracovat s knihovnou Gymnasium (udržovaný následovník knihovny Gym od OpenAI) - https://gymnasium.farama.org/, což je open source rozhraní určené pro úkoly zpětnovazebního učení. Jeho hlavní výhodou je, že implementace různých typů algoritmů pro zpětnovazební učení je v něm vcelku jednoduchá. Popis základních funkcí gymnasia se nachází v kódu níž.

Dnešní úkol bude naimplementovat agenta, který se učí chovat v nějakém prostředí (konkrétně v MountainCar) pomocí Q-učení.

Q-učení je způsob, kdy se agent učí svou strategii, jak se chovat v daném prostředí, pomocí zpětné vazby, kterou od prostředí za své chování dostává. Na rozdíl od hladového agenta (který jen v každém stavu vybírá nový stav na základě akce, co maximalizuje jeho užitek), bere v potaz to, že mezi stavy existují vztahy, které jsou dány Bellmanovými rovnicemi.

Nyní se tedy podíváme na příklad autíčka, které se snaží dostat do cíle, ale pohybuje se náhodně.

In [None]:
import utils

import gymnasium
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm

import pygame

Ukážeme si, jak si vytvořit jednoduché prostředí *MountainCar*: https://gymnasium.farama.org/environments/classic_control/mountain_car/

Cílem je, aby se autíčko dostalo z údolí až nahoru k vlaječce. V této základní verzi je zde v každém stavu náhodně zvolena akce pro pohyb.

(V následujícím kódu je zobrazování nastaveno tak, aby nám otevřelo prostředí v novém okně a v tom nám ukázalo běh agenta.)

In [None]:
env = gymnasium.make("MountainCar-v0", render_mode="human")

# Let's see, how the observation and action spaces of this environment look like
print("Observation space:", env.observation_space)
print("Observation space - low:", env.observation_space.low)
print("Observation space - high:", env.observation_space.high)
print("Action space:", env.action_space)

terminated, truncated = False, False
state, info = env.reset()
while not terminated and not truncated:
    state, reward, terminated, truncated, info = env.step(env.action_space.sample()) # Take a random action
env.close()

Pokud bychom si prostředí chtěli vyzkoušet sami, dá se udělat třeba něco takového (není to sice zcela odladěné (hlavně nedržte víc kláves najednou), ale funguje to dostatečně):

(A ano, poběží to dokola, dokud to nevypnete...)

In [None]:
# Initialize Pygame
pygame.init()

# Create a Pygame window
screen = pygame.display.set_mode((640,  480))

# Create the environment
env = gymnasium.make("MountainCar-v0", render_mode="human")

# Initialize variables
env_terminated, done = False, False
R = 0
action_to_perform = 1

# Reset the environment
state = env.reset()

# Define the mapping from keyboard keys to actions
key_to_action = {
    pygame.K_LEFT:   0,  # Move cart left
    pygame.K_RIGHT:   2  # Move cart right
}

# Main loop
while not env_terminated:
    # Process Pygame events
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            env_terminated = True
            break
            
        elif event.type == pygame.KEYDOWN:
            if event.key in key_to_action:
                action = key_to_action[event.key]
                action_to_perform = action

        elif event.type == pygame.KEYUP:
            action_to_perform = 1 # Don't do anything

    if not env_terminated and not done:
        state, reward, terminated, truncated, info = env.step(action_to_perform)
        done = terminated or truncated
        R += reward

    if done:
        print("Return:", R)
        state = env.reset()
        done = False
        R = 0

    # Update the display
    pygame.display.flip()

# Clean up
env.close()
pygame.quit()

Vraťme se zpět k agentům. Zkusíme do kódu výše připsat obecnou třídu pro agenta, který se v prostředí chová náhodně. Lze ho později použít jako základ pro zpětnovazebního agenta. Stav agenta je pozice a rychlost, akce může být akcelerace vlevo, vpravo a nebo nicnedělání. Budeme opakovat několik iterací pro "trénovaní", kdy každá iterace for cyklu je jedna hra s novým náhodným začátkem a ve while cyklu se trénují přechody mezi stavy agenta.

In [None]:
# General class for a (random) agent
class RandomAgent:
    def __init__(self, action_space):
        self.action_space = action_space
    
    def act(self, state):
        return self.action_space.sample()
    
    def train(self, state, action, reward, next_state, terminated):
        pass
    

env = gymnasium.make("MountainCar-v0")
agent = RandomAgent(env.action_space)

# Training loop (the agent trains for the duration of hundred episodes)
print("Training the agent...")
total_returns = []
for _ in range(100):
    observation, _ = env.reset()   
    done = False
    R = 0. # Cummulative reward (otherwise known as return) - just for logging purposes
    
    while not done:
        action = agent.act(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        agent.train(state, action, reward, next_state, terminated)
        
        state = next_state
        done = terminated or truncated
        R += reward
        
    total_returns.append(R)

# Let's show a plot of the learning progression
plt.plot(utils.moving_average(total_returns, 10))
plt.show()

# Evaluation
print("Evaluating the trained agent...")

# We pass only the name of the environment (and eventual arguments), not the environment itself, because we don't want to render
# anything during the training, but we want to do so during the test phase. This we achieve by creating a new environment
# for the testing / simulation with properly set render_mode argument (which we do inside the simulate function).
print("Obtained returns:", utils.simulate(agent, "MountainCar-v0", steps=200, episodes=1))

env.close()

## Úkol na cvičení

Zkuste si místo náhodného agenta naprogramovat třídu agenta, který se učí chovat v prostředí MountainCar pomocí Q-učení. Dejte pozor na to, že prostředí vrací jako stav spojité hodnoty, takže je třeba si z nich nějak udělat prostředí diskrétní (tedy s konečným množstvím stavů).

Níže je návrh obrysů takového agenta, který můžete případně použít.

In [None]:
class QAgent:
    def __init__(self, action_space, epsilon=0.1, alpha=0.1, gamma=0.9):
        self.action_space = action_space
        
        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma

        num_bins_position = 11 # subintervals for the position
        num_bins_velocity = 6 # subintervals for the velocity

        mean_pos = -0.5
        std_pos = 0.3
        min_pos = -1.2
        max_pos = 0.6

        low_prob = norm.cdf(min_pos, loc=mean_pos, scale=std_pos)
        high_prob = norm.cdf(max_pos, loc=mean_pos, scale=std_pos)

        # use normal distribution for bins position with expected value at the bottom of the valley and standard deviation such that the bins are not too wide
        percentiles = np.linspace(low_prob, high_prob, num_bins_position + 1)
        self.bins_position = norm.ppf(percentiles, loc=mean_pos, scale=std_pos)

        self.bins_velocity = np.linspace(-0.07, 0.07, num_bins_velocity + 1)

        self.Q = np.zeros((num_bins_position, num_bins_velocity, 3)) # Q[position, velocity, action]
        
        self.greedy = False
        
        
    def _discretize(self, state):
        state_position = np.digitize(state[0], self.bins_position) - 1
        state_velocity = np.digitize(state[1], self.bins_velocity) - 1
        return (state_position, state_velocity)
    
    
    def act(self, state):
        current_state = self._discretize(state)
        
        if not self.greedy and np.random.random() < self.epsilon:
            chosen_action = self.action_space.sample()
        else:
            state_qs = self.Q[current_state[0], current_state[1], :]
            max_q = np.max(state_qs)
            # break ties randomly
            best_actions = np.where(state_qs == max_q)[0]
            chosen_action = np.random.choice(best_actions)

        return chosen_action
        
        
    def train(self, state, action, reward, next_state, terminated):
        state, next_state = self._discretize(state), self._discretize(next_state)
        
        if not terminated:
            rest_of_the_episode_estimation = np.max(self.Q[next_state[0], next_state[1], :])
        else:
            rest_of_the_episode_estimation = 0.
        
        self.Q[state[0], state[1], action] += \
            self.alpha * (reward + self.gamma * rest_of_the_episode_estimation - self.Q[state[0], state[1], action])

In [None]:
env = gymnasium.make("MountainCar-v0")
agent = QAgent(env.action_space, epsilon=1, alpha=0.1, gamma=0.9)
decay = 0.9

# Training
print("Training the agent...")
agent.greedy = False

total_returns = []
for _ in range(1000):
    state, _ = env.reset()
    done = False
    R = 0. # return
    
    while not done:
        action = agent.act(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        agent.train(state, action, reward, next_state, terminated)
        
        state = next_state
        done = terminated or truncated
        R += reward
    
    agent.epsilon *= decay # decay epsilon
    total_returns.append(R)

env.close()

plt.plot(utils.moving_average(total_returns, 10))
plt.show()

# Evaluation
print("Evaluating the trained agent...")
agent.greedy = True
print("Obtained returns:", utils.simulate(agent, "MountainCar-v0", steps=200, episodes=1))