# Inleveropgave 2: Model-Free Prediction and Control

In [1]:
import numpy as np
from collections import defaultdict

In [54]:
class Maze():
    def __init__(self, rewards: np.array, start_pos: tuple, exit_positions: list, chance: float):
        self.rewards = rewards
        self.start_pos = start_pos
        self.exit_positions = exit_positions
        self.chance = chance

        # bind moves to constant
        self.actions = {
            "UP": (-1, 0),
            "DOWN": (1, 0),
            "LEFT": (0, -1),
            "RIGHT": (0, 1),
        }

    # Take probability into account while retrieving the next action
    def get_actual_action(self, desired_action: str):
        other_actions = list(set(self.actions.keys())- {desired_action})
        chance_left = 1 - self.chance
        chance_per_action = round(chance_left / len(other_actions), 1)
        other_probabilities = [chance_per_action for _ in range(0, len(other_actions))]

        return np.random.choice(
            [desired_action, *other_actions],
            replace=False,
            p=[self.chance, *other_probabilities]
        )
    
    # Get next position giving a certain action
    def get_next_position(self, location: tuple, action: str):
        x_dim, y_dim = self.rewards.shape
        y, x = location
        next_y, next_x = self.actions[action]
        next_x += x
        next_y += y

        # Check if position is inside grid, else stay in same place
        if (next_x >= 0 and next_x < x_dim and next_y >= 0 and next_y < y_dim):
            return (next_y, next_x)

        return location
    
    
    def value_iteration(self, theta: float = 0.001):
        n_actions = len(self.actions.keys())
        V = np.zeros(self.rewards.shape)
        while True:
            delta = 0
            for y in range(self.rewards.shape[0]):
                for x in range(self.rewards.shape[1]):  
                    location = (y, x)
                    if location in self.exit_positions:
                        continue

                    # Find best action
                    A = self.lookahead_step(location, V)
                    max_val = np.max(A)
                    delta = max(delta, np.abs(max_val - V[location]))
                    # Update value function
                    V[location] = max_val
            if delta < theta:
                break

        V = np.add(V, self.rewards)

        # Generate policy by using optimal value function
        policy = np.zeros([self.rewards.shape[0], self.rewards.shape[1], n_actions])
        for y in range(self.rewards.shape[0]):
            for x in range(self.rewards.shape[1]):
                location = (y, x)

                A = self.lookahead_step(location, V)
                best_action = np.argmax(A)
                policy[y, x, best_action] = 1.0

        return policy, V

    # Find most optimal step
    def lookahead_step(self, location: tuple, V: np.array, discount_factor: float = 1.0):
        n_actions = len(self.actions.keys())
        A = np.zeros(n_actions)
        a_idx = 0
        for action in self.actions.keys():
            next_state = self.get_next_position(location, action)
            if next_state == location:
                continue
            A[a_idx] = self.chance * (self.rewards[next_state] + discount_factor * V[next_state])
            a_idx += 1
        return A
    
    def generate_random_policy(self, theta: float = 0.001):
        n_actions = len(self.actions.keys())
        # Generate policy by using optimal value function
        policy = np.zeros([self.rewards.shape[0], self.rewards.shape[1], n_actions])
        for y in range(self.rewards.shape[0]):
            for x in range(self.rewards.shape[1]):
                location = (y, x)

                best_action = np.random.randint(0, n_actions)
                policy[y, x, best_action] = 1.0

        return policy
    
    def evaluate_policy(self, policy, n_episodes, discount_factor=1.0):
        returns_sum = np.zeros(self.rewards.shape)
        returns_count = np.zeros(self.rewards.shape)

        V = np.zeros(self.rewards.shape)

        for i_episode in range(1, n_episodes + 1):
            # Generate an episode by running the run method this returns states, actions and rewards.
            episode = self.run(policy)

            states_in_episode = episode[0]
            rewards_in_episode = episode[2]
            for curr_state in states_in_episode:
                # Get first occurance of state in this episode
                first_occurence_idx = next(i for i,state in enumerate(states_in_episode) if curr_state == state)
                # Sum up all rewards previous rewards
                G = sum([reward*(discount_factor**i) for i,reward in enumerate(rewards_in_episode[first_occurence_idx:])])
                # Calculate average return for this state over all sampled episodes
                returns_sum[curr_state] += G
                returns_count[curr_state] += 1.0
                V[curr_state] = returns_sum[curr_state] / returns_count[curr_state]

        return V
    
    def run(self, policy):
        list_actions = list(self.actions.keys())
        location = self.start_pos
        states = [location]
        actions = []
        rewards = [0]
        while True:
            if location in self.exit_positions:
                break
            desired_action = np.argmax(policy[location])
            actual_action = self.get_actual_action(list_actions[desired_action])
            location = self.get_next_position(location, actual_action)
            states.append(location)
            actions.append(actual_action)
            rewards.append(self.rewards[location])
        return states, actions, rewards

In [55]:
start_pos = (3, 2)
exit_positions = [(3, 0), (0, 3), (3, 1)]
rewards = np.array([
    [-1, -1, -1, 40],
    [-1, -1, -10, -10],
    [-1, -1, -1, -1],
    [10, -2, -1, -1],
])
chance = .7
maze = Maze(
    rewards,
    start_pos,
    exit_positions,
    chance
)

In [56]:
optimal_policy, v = maze.value_iteration()
# random_policy = maze.generate_random_policy()

In [57]:
V = maze.evaluate_policy(optimal_policy, 100)

In [58]:
print(V)

[[-31.27777778 -33.59649123 -12.72222222  40.        ]
 [-29.84516129 -29.83870968 -10.92        16.89622642]
 [ -5.11538462 -22.58823529  10.27884615  16.87962963]
 [ 10.          -2.           8.5203252   15.87096774]]
