# Inleveropgave 2: Model-Free Prediction and Control

### Sources
- https://towardsdatascience.com/reinforcement-learning-rl-101-with-python-e1aa0d37d43b

In [1]:
from typing import Tuple, List
from collections import defaultdict
from utils import show_utility

import random as rd
import numpy as np
import matplotlib.pyplot as plt

In [2]:
class Maze:
    def __init__(self, R: np.ndarray, end: List[Tuple[int, int]], start: Tuple[int, int]):
        """The init function.
        
        args:
            R (np.ndarray): The array with the rewards per state.
            end (List[Tuple[int, int]]): A list with the positions of the end states.
        """
        self.R = R
        self.end_states = end
        self.start_state = start
        self.states = np.arange(self.R.size).reshape(self.R.shape)
    
    def get_next_action_positions(self, pos: Tuple[int, int]):
        """asdf.
        
        args:
            pos (Tuple[int, int]): .
        """
        row, col = pos

        up = (row - 1, col) if row - 1 >= 0 else pos
        right = (row, col + 1) if col + 1 < self.R.shape[1] else pos
        left = (row, col -1) if col - 1 >= 0 else pos
        down = (row + 1, col) if row + 1 < self.R.shape[0] else pos
        return up, right, left, down
    
    def get_random_position(self):
        """Returns a random position in the environment."""
        return (rd.randint(0, self.R.shape[1] - 1), rd.randint(0, self.R.shape[0] - 1))

In [3]:
def value_iteration(env: Maze, end_states: List[Tuple[int, int]], p_action: float = 0.7, discount: float = 0.9, threshold: float = 0.0001):
    
    utility = np.zeros(env.R.shape)
    
    
    
#     def __calc_action_value(self, action, other_actions) -> float:
#         """Calculates the action value of a certain action with the succesfullness probability of the actions."""
#         total_value = self.p_action * (self.R[action] + (self.discount * self.utility[action]))
        
#         dist_chance = (1 - self.p_action) / len(other_actions)
        
#         for noise_action in other_actions:
#             total_value += dist_chance * (self.R[noise_action] + (self.discount * self.utility[noise_action]))
#         return total_value
    
#     def get_greedy_policy(self, current_pos: Tuple[int, int]) -> List[int]:
#         """Gets the best policy based on greediness for each state.
        
#         args:
#             current_pos (Tuple[int, int]): The state for which the policy should be calculated.

#         returns:
#             List[int]: The actions to take based on the calculated policy.
#         """
#         actions = self.__get_action_positions(current_pos)
#         action_values = []
#         for index, action in enumerate(actions):
#             noise_actions = actions[:index] + actions[index+1:]
#             action_values.append(self.__calc_action_value(action, noise_actions))

#         # get the index of the max elements 
#         max_elem = max(action_values)
#         return [1 if i == max_elem else 0 for i in action_values]
    
#     def value_iteration(self):
#         """Value iteration method."""
#         delta = np.inf
#         # get all positions in the grid
#         positions = [(i,j) for i in range(self.R.shape[0]) for j in range(self.R.shape[1])]
        
#         while delta > self.threshold:
#             delta = 0
#             new_utility = np.zeros(self.utility.shape)
#             for pos in positions:
#                 # check if we are evaluating an end state
#                 if pos in self.end_states:
#                     # current position is an end-state so value is 0
#                     continue

#                 # save the current value
#                 value = self.utility[pos]
#                 # get the next positions of all the actions that can be taken on the current positions
#                 actions = self.__get_action_positions(pos)
#                 action_values = []
#                 for index, action in enumerate(actions):
#                     noise_actions = actions[:index] + actions[index+1:]
#                     action_values.append(self.__calc_action_value(action, noise_actions))
                
#                 # select the action with the highest utility
#                 highest_utility = max(action_values)
#                 new_utility[pos] = highest_utility
#                 # update the delta
#                 delta = max(delta, abs(value - highest_utility))
            
#             self.utility = copy.deepcopy(new_utility)

In [4]:
start_state = (3, 2)
terminal_states = [(0, 3), (3, 0)]

rewards = np.array([[-1, -1, -1, 40],
                    [-1, -1, -10, -10],
                    [-1, -1, -1, -1],
                    [10, -2, -1, -1]])

# initialize the Maze
maze = Maze(rewards, terminal_states, start_state)

## Model-Free Prediction
### Monte-Carlo Policy Evaluation

In [5]:
def generate_episode_random(env: Maze):
    """"""
    steps = []  # holds Tuples with the states, actions and rewards
    pos = env.get_random_position()
    
    # break if the chosen state is a terminal state
    while pos not in env.end_states:

        next_actions = env.get_next_action_positions(pos)
        # choose a random action and gather
        action = rd.choice(next_actions)
        
        reward = env.R[action]
        steps.append((pos, action, reward))
        # update the pos to the taken action
        pos = action
        
    # save the latest pos with all extra data
    steps.append((pos, (), 0))

    return steps

In [6]:
def generate_episode_optimal(env: Maze):
    """"""
    steps = []  # holds Tuples with the states, actions and rewards
    pos = env.get_random_position()
    
    # break if the chosen state is a terminal state
    while pos not in env.end_states:

        next_actions = env.get_next_action_positions(pos)
        # choose a random action and gather
        action = rd.choice(next_actions)
        
        reward = env.R[action]
        steps.append((pos, action, reward))
        # update the pos to the taken action
        pos = action
        
    # save the latest pos with all extra data
    steps.append((pos, (), 0))

    return steps

In [28]:
def monte_carlo_policy_evaluation(env: Maze, policy: str = "random",
                              discount: float = 0.9, n_episodes: int = 1000):
    """"""
    values = np.zeros(env.R.shape)
    state_returns = defaultdict(list)

    for _ in range(n_episodes):
        # generate a new episode with a certain policy
        if policy == "random":
            # random policy
            episode = generate_episode_random(env)
        elif policy == "optimal":
            # optimal policy
            episode = [((), (), 10)]

        G = 0
        visited_states = []
        # looping over each step and 
        for pos, action, reward in episode[::-1]:
            G = discount * G + reward
            
            if pos not in visited_states:
                # update the the current state with the new return
                state_returns[pos].append(G)
                # calculate the average value
                values[pos] = np.mean(state_returns[pos])
                # update visited states
                visited_states.append(pos)
    
    return values

In [29]:
random_values1 = monte_carlo_policy_evaluation(maze, policy="random", discount=1.0)
show_utility(random_values1)

-------------------------------------
| 0.42   | 5.25   | 15.02  | 0.0    | 
-------------------------------------
| 3.8    | 6.17   | 14.03  | 19.18  | 
-------------------------------------
| 4.65   | 5.37   | 4.57   | 3.8    | 
-------------------------------------
| 0.0    | 3.84   | -1.05  | -4.3   | 
-------------------------------------


In [30]:
random_values2 = monte_carlo_policy_evaluation(maze, policy="random", discount=0.9)
show_utility(random_values2)

-------------------------------------
| 0.12   | 5.78   | 17.88  | 0.0    | 
-------------------------------------
| 0.01   | 0.73   | 9.41   | 19.32  | 
-------------------------------------
| 3.56   | 0.95   | -0.88  | -0.05  | 
-------------------------------------
| 0.0    | 4.2    | -1.08  | -3.45  | 
-------------------------------------


In [22]:
optim_values1 = monte_carlo_policy_evaluation(maze, policy="optimal", discount=1.0)
show_utility(optim_values1)

-------------------------------------
| -720.0 | 1486.0 | 6015.0 | 0.0    | 
-------------------------------------
| -373.0 | 1971.0 | 7535.0 | 7823.0 | 
-------------------------------------
| 1392.0 | 1187.0 | 1782.0 | 837.0  | 
-------------------------------------
| 0.0    | 1132.0 | -789.0 | -1905.0 | 
-------------------------------------


In [23]:
optim_values1 = monte_carlo_policy_evaluation(maze, policy="optimal", discount=0.9)
show_utility(optim_values1)

-------------------------------------
| 10000.0 | 10000.0 | 10000.0 | 10000.0 | 
-------------------------------------
| 10000.0 | 10000.0 | 10000.0 | 10000.0 | 
-------------------------------------
| 10000.0 | 10000.0 | 10000.0 | 10000.0 | 
-------------------------------------
| 10000.0 | 10000.0 | 10000.0 | 10000.0 | 
-------------------------------------


### Temporal Difference Learning

In [9]:
def temporal_difference_learning(env: Maze, policy: str = "optimal",
                                 discount: float = 0.9, n_episodes: int = 100):
    """"""
    ...

## Model-Free Control