<a href="https://colab.research.google.com/github/cs-pub-ro/ML/blob/master/lab/lab6/Laborator_6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Învățare Automată
# Învățare prin Recompensă - rezolvarea proceselor de decizie Markov prin tehnici de programare dinamică (Value Iteration, Policy Iteration)
### Autori:
* Tudor Berariu - 2018
* Alexandru Sorici - 2020

## 1. Scopul laboratorului

Scopul laboratorului îl reprezintă înțelegerea conceptelor de proces markov de decizie (MDP), politică, valoare de stare, precum și implementarea unor metode de programare dinamică pentru rezolvarea problemei de control a unui MDP.

În cadrul laboratorului veți:
- implementa algoritmul de iterare a politicilor
- implementa algoritmul de iterare a valorilor de stare

## 2. Workspace setup

### Câteva bibioteci de care vom avea nevoie

In [104]:
import sys
import os.path
from argparse import ArgumentParser
from copy import copy
from random import choice

from typing import Dict, List, Tuple

### Definirea unui labirint

In [105]:
class Maze:

    NORTH, EAST, SOUTH, WEST = 0, 1, 2, 3  # actions

    DYNAMICS = {  # the stochastic effects of actions
        NORTH: {(0, -1): 0.1, (-1, 0): .8, (0, 1): .1},
        EAST: {(-1, 0): 0.1, (0, 1): .8, (1, 0): .1},
        SOUTH: {(0, 1): 0.1, (1, 0): .8, (0, -1): .1},
        WEST: {(1, 0): 0.1, (0, -1): .8, (-1, 0): .1},
    }

    WALL, EMPTY = "x", " "

    VISUALS = {
        (0, 0, 1, 1): "\N{BOX DRAWINGS HEAVY DOWN AND RIGHT}",
        (1, 0, 0, 1): "\N{BOX DRAWINGS HEAVY DOWN AND LEFT}",
        (1, 0, 1, 0): "\N{BOX DRAWINGS HEAVY HORIZONTAL}",
        (0, 1, 1, 0): "\N{BOX DRAWINGS HEAVY UP AND RIGHT}",
        (1, 1, 0, 0): "\N{BOX DRAWINGS HEAVY UP AND LEFT}",
        (0, 1, 0, 1): "\N{BOX DRAWINGS HEAVY VERTICAL}",
        (1, 1, 1, 1): "\N{BOX DRAWINGS HEAVY VERTICAL AND HORIZONTAL}",
        (1, 1, 1, 0): "\N{BOX DRAWINGS HEAVY UP AND HORIZONTAL}",
        (1, 1, 0, 1): "\N{BOX DRAWINGS HEAVY VERTICAL AND LEFT}",
        (1, 0, 1, 1): "\N{BOX DRAWINGS HEAVY DOWN AND HORIZONTAL}",
        (0, 1, 1, 1): "\N{BOX DRAWINGS HEAVY VERTICAL AND RIGHT}",
        (1, 0, 0, 0): "\N{BOX DRAWINGS HEAVY LEFT}",
        (0, 1, 0, 0): "\N{BOX DRAWINGS HEAVY UP}",
        (0, 0, 1, 0): "\N{BOX DRAWINGS HEAVY RIGHT}",
        (0, 0, 0, 1): "\N{BOX DRAWINGS HEAVY DOWN}",
        (0, 0, 0, 0): "\N{BOX DRAWINGS HEAVY VERTICAL AND HORIZONTAL}",
        WEST: "\N{LEFTWARDS ARROW}",
        NORTH: "\N{UPWARDS ARROW}",
        EAST: "\N{RIGHTWARDS ARROW}",
        SOUTH: "\N{DOWNWARDS ARROW}",
    }

    def __init__(self, map_name):
        self._rewards, self._cells = {}, []
        with open(os.path.join("maps", map_name), "r") as map_file:
            for line in map_file.readlines():
                if ":" in line:
                    name, value = line.strip().split(":")
                    self._rewards[name] = float(value)
                else:
                    self._cells.append(list(line.strip()))
        self._states = [(i, j) for i, row in enumerate(self._cells)
                        for j, cell in enumerate(row) if cell != Maze.WALL]

    @property
    def actions(self):
        return [Maze.NORTH, Maze.EAST, Maze.SOUTH, Maze.WEST]

    @property
    def states(self):
        return copy(self._states)

    def is_final(self, state):
        row, col = state
        return self._cells[row][col] != Maze.EMPTY

    def effects(self, state, action):
        if self.is_final(state):
            return []
        row, col = state
        next_states = {}
        for (d_row, d_col), prob in Maze.DYNAMICS[action].items():
            next_row, next_col = row + d_row, col + d_col
            if self._cells[next_row][next_col] == Maze.WALL:
                next_row, next_col = row, col
            if (next_row, next_col) in next_states:
                prev_prob, _ = next_states[(next_row, next_col)]
                prob += prev_prob
            cell = self._cells[next_row][next_col]
            reward = self._rewards["default" if cell == Maze.EMPTY else cell]
            next_states[(next_row, next_col)] = (prob, reward)
        return [(s, p, r) for s, (p, r) in next_states.items()]

    def print_policy(self, policy):
        last_row = []
        height = len(self._cells)

        for row, row_cells in enumerate(self._cells):
            width = len(row_cells)
            for col, cell in enumerate(row_cells):
                if cell == Maze.WALL:
                    north, south, west, east = 0, 0, 0, 0
                    if last_row and len(last_row) > col:
                        north = last_row[col] == Maze.WALL
                    if row + 1 < height:
                        south = self._cells[row + 1][col] == Maze.WALL
                    if col > 0:
                        west = row_cells[col - 1] == Maze.WALL
                    if col + 1 < width:
                        east = row_cells[col + 1] == Maze.WALL
                    sys.stdout.write(Maze.VISUALS[(west, north, east, south)])
                elif self.is_final((row, col)):
                    sys.stdout.write(cell)
                else:
                    action = policy[(row, col)]
                    sys.stdout.write(Maze.VISUALS[action])
            sys.stdout.write("\n")
            last_row = row_cells
        sys.stdout.flush()

    def print_values(self, v):
        for r, row_cells in enumerate(self._cells):
            print(" | ".join(["%5.2f" % v[(r, c)] if cell == Maze.EMPTY else "     "
                              for c, cell in enumerate(row_cells)]))

## Cerințe

In [106]:
MAP_NAME = 'complex'  #@param ['simple', 'complex', 'be_careful', 'suffer']
gamma = 0.9 #@param {type: "slider", min: 0.0, max: 1.0, step: 0.1}
max_delta = 1e-8 #@param {type:"float"}.

### Cerința 1
Implementați funcția **policy_iteration** pentru calculul politicii optime și a tabelului de utilitate așteptată pentru fiecare stare (celulă din grid) a labirintului. 

In [107]:
import numpy as np

def get_value(game, state, action, value_dict, gamma):
    # calculate value for state
    val = 0
    for next_state, prob, reward in game.effects(state, action):
        val += prob * (reward + gamma * value_dict[next_state])

    return val

def policy_iteration(game: Maze) -> Tuple[Dict[Tuple[int, int], int], Dict[Tuple[int, int], float]]:
    v = {s: 0 for s in game.states}
    policy = {s: choice(game.actions)
              for s in game.states if not game.is_final(s)}
    done = False

    while not done:
        done = True
        while True:
            delta = 0
            
            #calculate expected value
            for s in game.states:
                if game.is_final(s):
                    continue
                old_value = v[s]
                v[s] = get_value(game, s, policy[s], v, gamma)
                delta = max(delta, abs(old_value - v[s]))
            if delta < max_delta:
                break
        
        #calculate policy
        for s in game.states:
            if game.is_final(s):
                continue
            old_action = policy[s]
            best_value = -np.inf

            #argmax in the future
            for a in game.actions:
                action_value = get_value(game, s, a, v, gamma)
                if action_value > best_value:
                    best_value = action_value
                    policy[s] = a

            if old_action != policy[s]:
                done = False

    return policy, v

### Cerința 2
Implementați funcția **value_iteration** pentru calculul politicii optime și a tabelului de utilitate așteptată pentru fiecare stare (celulă din grid) a labirintului. 

In [108]:
def value_iteration(game: Maze) -> Tuple[Dict[Tuple[int, int], int], Dict[Tuple[int, int], float]]:
    v = {s: 0 for s in game.states}
    policy = {s: choice(game.actions) for s in game.states if not game.is_final(s)}

    done = False
    while not done:
        done = True  
        delta = 0
        for s in game.states:
            if game.is_final(s):
                continue
            old_value = v[s]
            v[s] = max(get_value(game, s, action, v, gamma) for action in game.actions)
            delta = max(delta, abs(old_value - v[s]))
        if delta < max_delta:
                break

        for s in game.states:
            if game.is_final(s):
                continue
            best_action = None
            best_value = -np.inf
        
            for action in game.actions:
                action_value = get_value(game, s, action, v, gamma)
                if action_value > best_value:
                    best_value = action_value
                    best_action = action

            if policy[s] != best_action:
                policy[s] = best_action
                done = False 

    return policy, v


## Evaluare

In [109]:
#@title
    
game = Maze(MAP_NAME)

print("Policy iteration:")
policy, v = policy_iteration(game)
game.print_values(v)
game.print_policy(policy)

print("Value iteration:")
policy, v = value_iteration(game)
game.print_values(v)
game.print_policy(policy)

Policy iteration:
      |       |       |       |       |       |       |       |       |       |       |       |      
      |  0.00 |  0.01 |  0.01 |  0.01 |  0.01 |  0.01 |  0.02 |  0.02 |  0.02 |  0.02 |  0.03 |       |      
      |       |       |       |       |       |       |       |       |       |       |  0.03 |  0.03 |       |       |       |       |       |      
      |       |  0.98 |  0.86 |  0.75 |  0.66 |  0.57 |       |       |       |       |  0.03 |  0.04 |  0.05 |  0.05 |  0.06 |  0.07 |  0.08 |      
      |       |       |       |       |       |  0.50 |       |       |       |       |       |       |       |       |       |       |  0.09 |      
      |       |       |       |       |       |  0.44 |  0.38 |  0.34 |  0.29 |  0.26 |  0.23 |  0.20 |  0.18 |  0.15 |  0.14 |  0.12 |  0.10 |      
      |       |       |       |       |       |       |       |       |       |       |       |       |       |       |       |       |       |      
┏━━━━━━━━━━━┓
┃↑→→→→