# Grid World
__Суммарное количество баллов: 10__

__Решение отправлять на `ml.course.practice@gmail.com`__

__Тема письма: `[HSE][ML][HW10] <ФИ>`, где вместо `<ФИ>` указаны фамилия и имя__

В этом задании вам предстоит реализовать несколько алгоритмов классического обучения с подкреплением для решения нескольких вариантов задачи GridWorld. 

In [None]:
!pip install gym[all] pyvirtualdisplay > /dev/null 2>&1
!pip install tqdm > /dev/null 2>&1
!pip install pillow
!apt update > /dev/null 2>&1
!apt install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!wget https://noto-website-2.storage.googleapis.com/pkgs/NotoMono-hinted.zip > /dev/null 2>&1
!unzip NotoMono-hinted.zip > /dev/null 2>&1
!mv NotoMono-Regular.ttf /usr/share/fonts/truetype/ > /dev/null 2>&1

In [None]:
import time
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
import copy
import random
import numpy as np
from collections import deque
import random

%matplotlib inline

### Grid World
В этом задании Вам придется работать со средой Grid World. Мир представляет из себя ограниченную сетку, по которой может перемещаться агент. Некоторые клетки могут быть заблокированы, другие будут приводить к завершению путешествия агента с различным исходом.

#### Взаимодействие со средой
`get_actions(state)` возвращает список доступных в состонии `state` действий.

`get_states()` возвращает все состояния, в которых когда-либо может быть агент.

`get_transition(state, action)` возвращает состояние, в котором окажется агент если совершит действие `action` в состоянии `state`.

`get_reward(state, action)` возвращает награду, которую получит агент если совершит действие `action` в состоянии `state`.

`step(action)` - метод, позволяющий совершить действие в среде. На вход принимает действия из набора доступных действий, на выход возвращает `state, reward, done`. Награда является вещественным числом, done может принимать значения `True` или `False`

`reset()` - метод, позволяющий вернуть среду к изначальному состоянию. Обычно используется после того, как done принял значение `True`.

In [None]:
class GridWorld:
    def __init__(self, grid):
        self._height = len(grid)
        self._width = len(grid[0])
        self._max_reward = -1e9
        self._min_reward = 1e9
        for i in range(self._height):
            for j in range(self._width):
                if grid[i][j] == 'S':
                    self._start_state = (i, j)
                if not isinstance(grid[i][j], str):
                    self._max_reward = max(grid[i][j], self._max_reward)
                    self._min_reward = min(grid[i][j], self._min_reward)
        self._state = self._start_state
        self._grid = grid
    
    def step(self, action):
        reward = self.get_reward(self._state, action)
        next_state = self.get_transition(self._state, action)
        self._state = next_state
        done = next_state[0] < 0 or next_state[1] < 0
        return next_state, reward, done
    
    def reset(self):
        self._state = self._start_state
        return self._state
    
    def _get_info(self, x, y):
        reward = self._grid[y][x]
        if isinstance(reward, str):
            reward = 0.0
            is_final = False
        else:
            is_final = True
        is_block = self._grid[y][x] == "#"
        return reward, is_final, is_block
    
    def get_actions(self, state):
        return "left", "right", "top", "bottom"
    
    def get_states(self):
        states = []
        for i in range(self._height):
            for j in range(self._width):
                if self._grid[i][j] != '#':
                    states.append((i, j))
        return tuple(states)
    
    def get_transition(self, state, action):
        y, x = state
        if (y < 0) or (x < 0):
            return state
        if not isinstance(self._grid[y][x], str):
            return -1, -1
        if self._grid[y][x] == '#':
            return state
        if action == 'left' and x > 0 and self._grid[y][x - 1] != '#':
            return y, x - 1
        if action == 'right' and x < self._width - 1 and self._grid[y][x + 1] != '#':
            return y, x + 1
        if action == 'top' and y > 0 and self._grid[y - 1][x] != '#':
            return y - 1, x
        if action == 'bottom' and y < self._height - 1 and self._grid[y + 1][x] != '#':
            return y + 1, x
        return state
    
    def get_reward(self, state, action):
        next_state = self.get_transition(state, action)
        if state[0] < 0 or state[1] < 0:
            return 0.0
        if next_state[0] == state[0] and next_state[1] == state[1]:
            return 0.0
        if isinstance(self._grid[state[0]][state[1]], str):
            return 0.0
        return self._grid[state[0]][state[1]]

### ТЕХНИЧЕСКАЯ ЧАСТЬ
Тут лежит код для отображения картинок

In [None]:
class GridworldRanderer:
    def __init__(self, gridworld, cell_px=128, border_px=32, font_size=18):
        self.gridworld = gridworld
        self.cell_px = cell_px
        self.width = self.gridworld._width
        self.height = self.gridworld._height
        self.border_px = border_px 
        self.image = Image.new('RGB', (self.gridworld._width * cell_px + 2 * border_px,
                                      self.gridworld._height * cell_px + 2 * border_px), 
                              (0, 0, 0))
        self.drawer = ImageDraw.Draw(self.image)
        self.font = ImageFont.truetype("NotoMono-Regular.ttf", font_size)
        
    def _render_borders(self, is_q=False):
        self.drawer.line([(self.border_px, self.border_px), 
                          (self.border_px + self.width * self.cell_px, self.border_px), 
                          (self.border_px + self.width * self.cell_px, self.border_px + self.height * self.cell_px), 
                          (self.border_px, self.border_px + self.height * self.cell_px), 
                          (self.border_px, self.border_px)], fill=(255, 255, 255), width=6)
        for i in range(self.width - 1):
            self.drawer.line([(self.border_px + (i + 1) * self.cell_px, self.border_px), 
                              (self.border_px + (i + 1) * self.cell_px, self.border_px + self.height * self.cell_px)], 
                             fill=(255, 255, 255), width=4)
        for i in range(self.height - 1):
            self.drawer.line([(self.border_px, self.border_px + (i + 1) * self.cell_px), 
                              (self.border_px + self.width * self.cell_px, self.border_px + (i + 1) * self.cell_px)], 
                             fill=(255, 255, 255), width=4)
        for i in range(self.width):
            for j in range(self.height):
                reward, is_final, is_block = self.gridworld._get_info(i, j)
                if is_final:
                    self.drawer.line([(self.border_px + (i) * self.cell_px + 8, self.border_px + (j) * self.cell_px + 8), 
                                      (self.border_px + (i + 1) * self.cell_px - 8, self.border_px + (j) * self.cell_px + 8), 
                                      (self.border_px + (i + 1) * self.cell_px - 8, self.border_px + (j + 1) * self.cell_px - 8), 
                                      (self.border_px + (i) * self.cell_px + 8, self.border_px + (j + 1) * self.cell_px - 8), 
                                      (self.border_px + (i) * self.cell_px + 8, self.border_px + (j) * self.cell_px + 8)], fill=(255, 255, 255), width=2)
                elif is_q and not is_block:
                    self.drawer.line([
                        (self.border_px + i * self.cell_px, self.border_px + j * self.cell_px),
                        (self.border_px + (i + 1) * self.cell_px, self.border_px + (j + 1) * self.cell_px)], 
                        fill=(255, 255, 255), width=2)
                    self.drawer.line([
                        (self.border_px + i * self.cell_px, self.border_px + (j + 1) * self.cell_px),
                        (self.border_px + (i + 1) * self.cell_px, self.border_px + j * self.cell_px)], 
                        fill=(255, 255, 255), width=2)
        
    def _clear(self):
        self.image = Image.new('RGB', (self.gridworld._width * self.cell_px + 2 * self.border_px,
                                      self.gridworld._height * self.cell_px + 2 * self.border_px), 
                              (0, 0, 0))
        self.drawer = ImageDraw.Draw(self.image)
        self.drawer.font = self.font
    
    def _render_value_squere(self, x, y, v, min_v, max_v, is_final):
        if v == 0:
            color = (0, 0, 0)
        elif v > 0:
            color = (0, int(255 * min(abs(v) / (abs(max_v) + 1e-8), 1.0)), 0)
        else:
            color = (int(255 * min(abs(v) / (abs(min_v) + 1e-8), 1.0)), 0, 0)
        self.drawer.rectangle([self.border_px + x * self.cell_px, self.border_px + y * self.cell_px,
                              self.border_px + (x + 1) * self.cell_px, self.border_px + (y + 1) * self.cell_px],
                             fill=color)
        s = f"{v:.2f}"
        tx, ty = self.drawer.textsize(s)
        self.drawer.text((
            self.border_px + x * self.cell_px + self.cell_px // 2 - tx // 2,
            self.border_px + y * self.cell_px + self.cell_px // 2 - ty // 2
        ), s, fill=(255, 255, 255), stroke_width=1, stroke_fill=(0, 0, 0))
    
    def _render_q_squere(self, x, y, qs, min_v, max_v, is_final):
        actions = [("top", 0), ("right", 1), ("bottom", 2), ("left", 3)]
        for k, a in actions:
            v = qs[k]
            if v == 0:
                color = (0, 0, 0)
            elif v > 0:
                color = (0, int(255 * min(abs(v) / (abs(max_v) + 1e-8), 1.0)), 0)
            else:
                color = (int(255 * min(abs(v) / (abs(min_v) + 1e-8), 1.0)), 0 , 0)
            self.drawer.polygon([
                self.border_px + (x + (a + 1) // 2 % 2) * self.cell_px, self.border_px + (y + (a + 1) // 2 % 2) * self.cell_px,
                self.border_px + x * self.cell_px + self.cell_px // 2, self.border_px + y * self.cell_px  + self.cell_px // 2,
                self.border_px + (x + (a // 2 + 1) % 2) * self.cell_px, self.border_px + (y + (a // 2) % 2) * self.cell_px
            ], fill=color)
            dx = 0.5 + 0.25 * (a % 2) * (1 - 2 * (a // 2))
            dy = 0.5 + 0.25 * ((a + 1) % 2) * (2 * (a // 2) - 1)
            tpx = self.border_px + int((x + dx) * self.cell_px)
            tpy = self.border_px + int((y + dy) * self.cell_px)
            s = f"{v:.2f}"
            tx, ty = self.drawer.textsize(s)
            self.drawer.text((tpx - tx // 2, tpy - ty // 2), s, 
                             fill=(255, 255, 255), stroke_width=0, stroke_fill=(0, 0, 0))
            
    def render_rewards(self):
        self._clear()
        max_v = self.gridworld._max_reward
        min_v = self.gridworld._min_reward
        for i in range(self.width):
            for j in range(self.height):
                reward, is_final, is_block = self.gridworld._get_info(i, j)
                if is_block: 
                    continue
                self._render_value_squere(i, j, reward, min_v, max_v, is_final)
        self._render_borders()
        return np.array(self.image)
    
    def render_value(self, agent):
        self._clear()
        max_v = self.gridworld._max_reward
        min_v = self.gridworld._min_reward
        for i in range(self.width):
            for j in range(self.height):
                reward, is_final, is_block = self.gridworld._get_info(i, j)
                if is_block: 
                    continue
                if is_final:
                    self._render_value_squere(i, j, reward, min_v, max_v, is_final)
                else:
                    value = agent.get_value((j, i))
                    self._render_value_squere(i, j, value, min_v, max_v, is_final)
        self._render_borders()
        return np.array(self.image)
    
    def render_q(self, agent):
        self._clear()
        max_v = self.gridworld._max_reward
        min_v = self.gridworld._min_reward
        for i in range(self.width):
            for j in range(self.height):
                reward, is_final, is_block = self.gridworld._get_info(i, j)
                if is_block: 
                    continue
                if is_final:
                    self._render_value_squere(i, j, reward, min_v, max_v, is_final)
                else:
                    qs = {}
                    for a in self.gridworld.get_actions((i, j)):
                        qs[a] = agent.get_q((j, i), a)
                    self._render_q_squere(i, j,qs, min_v, max_v, is_final)
        self._render_borders(True)
        return np.array(self.image)

In [None]:
def _show_image(img, title=None, figsize=(9, 9)):
    fig = plt.figure(figsize=figsize)
    plt.xticks([])
    plt.yticks([])
    plt.title(title)
    plt.tight_layout()
    plt.imshow(img)
    plt.show()

def visualise(gridworld, agent):
    randerer = GridworldRanderer(gridworld)
    _show_image(randerer.render_rewards(), "Environment")
    _show_image(randerer.render_value(agent), "Value function")
    _show_image(randerer.render_q(agent), "Q function")

### Различные миры
В этой части определены различные миры. Каждый из них так или иначе раскрывает возможные проблемы алгоритмов обучения с подкреплением.

In [None]:
classic_grid = [
    [" ", " ", " ", 1.0],
    [" ", "#", " ", -1.0],
    ["S", " ", " ", " "]
]
exploration_grid = [
    [" ", " ", " ", 1.0],
    [" ", "#", " ", 2.0],
    ["S", " ", " ", " "]
]
cliff_grid = [
    [" ", " ", " ", " ", " "],
    [" ", "#", "#", "#", " "],
    ["S", " ", " ", " ", 1.0],
    [-1.0, -1.0, -1.0, -1.0, -1.0]
]
cliff_exploration_grid = [
    [" ", " ", " ", " ", " ", " ", " "],
    [" ", "#", "#", "#", "#", "#", " "],
    [" ", " ", " ", " ", " ", 1.0, " "],
    [" ", "#", "#", "#", "#", "#", 8.0],
    ["S", " ", " ", " ", " ", " ", 1.0],
    [-1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0]
]
hard_exploration_grid = [
    [" ", " ", " ", " ", " ", " ", " "],
    [" ", "#", "#", "#", "#", "#", " "],
    [" ", " ", " ", " ", " ", 3.0, " "],
    [" ", "#", "#", "#", "#", "#", " "],
    [" ", " ", " ", " ", " ", 2.0, " "],
    [" ", "#", "#", "#", "#", "#", " "],
    ["S", " ", " ", " ", " ", 1.0, 10.0],
]

### Общий интерфейс для агента
`get_action(state)` возвращает состояние, которое хочет совершить агент в состоянии `state`

`get_value(state)` возвращает значение Value-function для состояния `state`

`get_q(state, action)` возвращает значение Q-function для состояния `state` и действия `action`

In [None]:
class Agent:
    def get_action(self, state):
        pass
    
    def get_value(self, state):
        pass
    
    def get_q(self, state, action):
        pass

### Задание 1 (4 балла)
Реализуйте алгоритм value iteration. Обучение должно проходить в методе `__init__` в течении `iterations` итераций. Каждая итерация включает в себя проход по всем возможным состояниям среды. Во время обучения нужно учитывать learning rate `alpha` и коэффициент дисконтирования `gamma`.

_Hint:_ используйте методы `gridworld.get_reward`, `gridworld.get_transition`, `gridworld.get_actions` и `gridworld.get_states`

In [None]:
class ValueIteration(Agent):
    def __init__(self, gridworld, iterations=100, alpha=0.9, gamma=0.8):
        self.gamma = gamma
        self.gridworld = gridworld
        for i in range(iterations):
            pass # TODO: обновить приближение value function
    
    def get_action(self, state):
        # TODO: реализовать выбор лучшего действия
        return None
    
    def get_value(self, state):
        # TODO: реализовать подсчет value function для state
        return None
    
    def get_q(self, state, action):
        # TODO: реализовать подсчет q-function для пары (state, action)
        return None

#### Посмотрим, что получилось
Для проверки алгоритма можно использовать различные варианты среды, полный список находится выше.

In [None]:
gridworld = GridWorld(hard_exploration_grid)
agent = ValueIteration(gridworld)
visualise(gridworld, agent)

### Задание 2 (4 балла)
Реализуйте алгоритм Q-learning. Процесс обучения реализован вне класса, поэтому достаточно реализовать метод `update` для пересчета значений Q-function. Во время обнвления нужно учитывать learning rate `alpha` и коэффициент дисконтирования `gamma`.

В этом задании использовать `gridworld.get_reward`, `gridworld.get_transition` и `gridworld.get_states` нельзя (`gridworld.get_actions` можно)

In [None]:
class QLearning(Agent):
    def __init__(self, gridworld, alpha=0.9, gamma=0.8):
        self.gamma = gamma
        self.alpha = alpha
        self.gridworld = gridworld

    def update(self, state, action, next_state, reward, done):
        pass # TODO: реализовать обновление значений q-function
        
    def get_action(self, state):
        # TODO: реализовать выбор лучшего действия
        return None 
    
    def get_value(self, state):
        # TODO: реализовать подсчет value function для state
        return None
    
    def get_q(self, state, action):
        # TODO: реализовать подсчет q-function для пары (state, action)
        return None

Имплементация epsilon-greedy алгоритма. Просто добавляем случайные действия с вероятностью epsilon.

In [None]:
class EpsilonGreedy(QLearning):
    def __init__(self, gridworld, alpha=0.5, gamma=0.9, eps=0.1):
        self.eps = eps
        super().__init__(gridworld, alpha=alpha, gamma=gamma)
    
    def get_action(self, state):
        if self.eps > random.random():
            return random.choice(gridworld.get_actions(state))
        else:
            return super().get_action(state)

Процесс обучения Q-learning. Сначала обучаем агента в течении `timesteps` шагов среды, после чего тестируем его на `test_episodes` эпизодах с ограничением не более `test_max_steps` шагов на каждый. Возвращает количство завершившихся эпизодов во время тестирвания и сумму полученных наград.

In [None]:
def train(gridworld, agent, train_method, timesteps, test_episodes=100, test_max_steps=100):
    train_method(gridworld, agent, timesteps)
    time = 0
    done_count = 0
    sum_reward = 0
    for ep in range(test_episodes):
        done = False
        sr = 0.
        state = gridworld.reset()
        for time in range(test_max_steps):
            action = agent.get_action(state)
            state, reward, done = gridworld.step(action)
            sr += reward
            if done:
                done_count += 1
                sum_reward += sr
                break
    return done_count, sum_reward

def train_qlearning(gridworld, agent, timesteps):
    done = False
    state = gridworld.reset()
    for t in range(timesteps):
        if done:
            state = gridworld.reset()
            done = False
        action = agent.get_action(state)
        next_state, reward, done = gridworld.step(action)
        agent.update(state, action, next_state, reward, done)
        state = next_state

Посмотрим на зависимость результата от $\epsilon$. Можно посмотреть также и на разные среды

In [None]:
gridworld = GridWorld(classic_grid)
for eps in (0.0, 0.1, 0.25, 0.5, 0.75):
    agent = EpsilonGreedy(gridworld, eps=eps)
    done_count, sum_reward = train(gridworld, agent, train_qlearning, 1000000)
    if done_count != 0:
        sum_reward /= done_count
    print(f"Epsilon: {eps} | Done count: {done_count} | Avg reward {sum_reward}")

In [None]:
gridworld = GridWorld(classic_grid)
agent = EpsilonGreedy(gridworld, eps=0.2)
done_count, sum_reward = train(gridworld, agent, train_qlearning, 1000000)
if done_count != 0:
    sum_reward /= done_count
print(f"Done count: {done_count} | Avg reward {sum_reward}")
visualise(gridworld, agent)

In [None]:
gridworld = GridWorld(cliff_grid)
for eps in (0.0, 0.1, 0.25, 0.5, 0.75):
    agent = EpsilonGreedy(gridworld, eps=eps)
    done_count, sum_reward = train(gridworld, agent, train_qlearning, 1000000)
    if done_count != 0:
        sum_reward /= done_count
    print(f"Epsilon: {eps} | Done count: {done_count} | Avg reward {sum_reward}")

In [None]:
gridworld = GridWorld(cliff_grid)
agent = EpsilonGreedy(gridworld, eps=0.2)
done_count, sum_reward = train(gridworld, agent, train_qlearning, 1000000)
if done_count != 0:
    sum_reward /= done_count
print(f"Done count: {done_count} | Avg reward {sum_reward}")
visualise(gridworld, agent)

Кажется, что в Cliff World у Q-learning действительно возникли проблемы, поэтому

### Задание 3 (2 балла)
Реализуйте алгоритм SARSA. Для этого Вам придется изменить метод `update`, а также реализовать процедуру `train_sarsa` аналогично процедуре `train_qlearning`.

В этом задании использовать `gridworld.get_reward`, `gridworld.get_transition` и `gridworld.get_states` нельзя (`gridworld.get_actions` можно)

_Hint:_ сейчас становится критичным подбор learning rate

In [None]:
class SARSA(EpsilonGreedy):
    def update(self, state, action, next_state, next_action, reward, done):
        pass # TODO: implement

def train_sarsa(gridworld, agent, timesteps):
    # TODO: Это просто копипаста метода train_qlearning. Его нужно адаптировать для обучения алгоритма SARSA
    done = False
    state = gridworld.reset()
    for t in range(timesteps):
        if done:
            state = gridworld.reset()
            done = False
        action = agent.get_action(state)
        next_state, reward, done = gridworld.step(action)
        agent.update(state, action, next_state, reward, done)
        state = next_state

In [None]:
gridworld = GridWorld(cliff_grid)
for eps in (0.0, 0.1, 0.25, 0.5, 0.75):
    agent = SARSA(gridworld, eps=eps, alpha=0.2)
    done_count, sum_reward = train(gridworld, agent, train_sarsa, 1000000)
    if done_count != 0:
        sum_reward /= done_count
    print(f"Epsilon: {eps} | Done count: {done_count} | Avg reward {sum_reward}")

In [None]:
gridworld = GridWorld(cliff_grid)
agent = SARSA(gridworld, eps=0.2, alpha=0.2)
done_count, sum_reward = train(gridworld, agent, train_sarsa, 1000000)
if done_count != 0:
    sum_reward /= done_count
print(f"Done count: {done_count} | Avg reward {sum_reward}")
visualise(gridworld, agent)

In [None]:
gridworld = GridWorld(cliff_exploration_grid)
for eps in (0.1, 0.2, 0.3, 0.5, 0.7):
    agent = SARSA(gridworld, eps=eps, alpha=0.2)
    done_count, sum_reward = train(gridworld, agent, train_sarsa, 1000000)
    if done_count != 0:
        sum_reward /= done_count
    print(f"Epsilon: {eps} | Done count: {done_count} | Avg reward {sum_reward}")

In [None]:
gridworld = GridWorld(cliff_exploration_grid)
agent = SARSA(gridworld, eps=0.2, alpha=0.2)
done_count, sum_reward = train(gridworld, agent, train_sarsa, 1000000)
if done_count != 0:
    sum_reward /= done_count
print(f"Done count: {done_count} | Avg reward {sum_reward}")
visualise(gridworld, agent)