# Assignment 3
## [Intelligent Systems 2018-2](https://fagonzalezo.github.io/is-2018-2/)

## Markov Decision Processes and Reinforcement Learning
### Deadline: *Thursday December 13th*

**Names:**




## 1. The Optimal Parking Problem

In the optimal parking problem (OPP), the goal is to find a parking spot that is as close as possible to a store so that the driver minimizes the distance that he/she has to walk. Parking spots are numbered $1,\dots,N$. $1$ corresponds to the closest spot to the store and $N$ to the farthest. 

![parking image](parking.jpg)

The driver starts at the front of the $N$-th parking spot. At the front of each parking spot $i$ the driver can execute two actions `park` or `continue`. The driver does not know in advance which parking spots are free or not. The probability of any parking spot to be free is $p$. When the action `park` is executed, if the parking spot is free the car will be parked at the $i$-th parking spot and this will generate a reward of $-i$ (equivalent to a cost of $i$) and the process will end. If the $i$-th parking spot is occupied and the `park` action is executed or if just the `continue` action is executed, the car will move forward towards the $i-1$ parking spot. This does not generate any reward. If the driver arrives to the parking spot $1$ and cannot park, the process will end generating a reward of $-M$ (equivalent to a cost of $M$).

### 1.1 (0.5) OPP Markov decision problem

Model the OPP as a Markov decision problem (MDP). Draw the MDP as a graph showing clearly the states, the transitions with the corresponding probabilities and rewards. 

### 1.2 (0.5) Expected reward

Build a function to calculate the expected reward at each parking spot for a particular instance of the OPP.

In [249]:
def expected_rewards(N, M, p):
    '''
    N: number of parking spots
    M: cost of not being able to park (positive value)
    p: probability of a parking spot being free
    policy: a list of actions to execute at each step. 
            'P' indicates 'park'. 'C' indicates 'continue'
            
    Returns: a dictionary with the expected rewards at each parking spot.
    '''
    result = {i:0 for i in range(1,N+1)}
    ### your code here
    return result


### 1.3 (0.5) Policy evaluation

Build a function to calculate the expected reward at each parking spot for a particular instance of the OPP and for a given policy. The policy is specified by indicating which action to take at each parking spot.

In [131]:
def eval_policy(N, M, p, policy):
    '''
    N: number of parking spots
    M: cost of not being able to park (positive value)
    p: probability of a parking spot being free
    policy: a dictionary with the corresponding actions for each parking spot.
            'P' indicates 'park', 'C' indicates 'continue'. For instance for N = 3
            this could be a posible policy:
               {1:'P', 2:'P', 3:'C'}
            
    Returns: a dictionary with the expected rewards at each parking spot.
    '''
    result = {i:0 for i in range(1,N+1)}
    ### your code here
    return result

### 1.4 (0.5) Optimal policy

Build a function to calculate the policy that maximizes the reward at each parking spot for a particular instance of the OPP. 

In [142]:
def optimal_policy(N, M, p):
    '''
    N: number of parking spots
    M: cost of not being able to park (positive value)
    p: probability of a parking spot being free
            
    Returns: a dictionary with the actions of the optimal policy for each parking spot.
            'P' indicates 'park', 'C' indicates 'continue'.
    '''
    result = {i:'' for i in range(1,N+1)}
    ### your code here
    return result


### 1.5 (0.5) Minimum cost

If the cost of not parking, $M$, is small it may make sense to execute `continue` at the farthest parking spots. On the other hand, if the cost of not parking is large enough, it may be good idea to park as soon as possible, i.e., to execute park in all the parking spots.  Find a formula in terms of $N$ and $p$ for the minimum value of $M$ such that the optimal policy is to execute `park` at all parking spots. Clearly explain the derivation of the formula.

## 2. Learning to play Snake

The objective is to develop a reinforcement learning algorithm that learns to play the Snake game.

<img src="https://cloud.githubusercontent.com/assets/2750531/24808769/cc825424-1bc5-11e7-816f-7320f7bda2cf.gif" alt="Snake snapshot" width="320"/>

We will use as a basis the code in this [project](https://github.com/YuriyGuts/snake-ai-reinforcement) from [Yuriy Guts](https://github.com/YuriyGuts). The first step would be to clone that project and start looking at the code. This project builds a Deep Q-learning agent that learns to play the Snake game. It is important to notice that we do not want to reproduce this part of the project, but rather use it as a basis for building two simpler agents: one based on pure Q-learning and another one based on approximated Q-learning. 


### 2.1 (1.0) Q-learning agent

Implement a Q-learning agent that learns to play Snake. During learning, the agent must keep a table for the Q function, i.e. a table that keeps track of different combinations of states and actions. Here a state corresponds to a posible configuration of the environment. Because of these, we will need to work with small environments. The suggestion is to create smaller environments ($5 \times 5$ or $6 \times 6$) following the examples in the [levels folder](https://github.com/YuriyGuts/snake-ai-reinforcement/tree/master/snakeai/levels).

You have to create a new class that extends from the `AgentBase` class:


In [None]:
class QAgent(AgentBase):
    """ Represents an intelligent agent for the Snake environment. """
    
    def begin_episode(self):
        """ Reset the agent for a new episode. """
        pass

    def act(self, observation, reward):
        """
        Choose the next action to take.
        Args:
            observation: observable state for the current timestep. 
            reward: reward received at the beginning of the current timestep.
        Returns:
            The index of the action to take next.
        """
        return None

    def end_episode(self):
        """ Notify the agent that the episode has ended. """
        pass
    
    def train(self, env, num_episodes=1000, discount_factor=0.9, alpha=0.05, k=10):
        """
        Train the agent to perform well in the given Snake environment using Q learning.
        
        Args:
            env:
                an instance of Snake environment.
            num_episodes (int):
                the number of episodes to run during the training.
            discount_factor (float):
                discount factor (gamma) for computing the value function.
            alpha (float): 
                moving average parameter.
            k (int): 
                exploration function parameter.
        """
        pass

The class will include additional methods, in particular methods for training and housekeeping of internal variables. You can take a look to the `DeepQNetworkAgent` [class](https://github.com/YuriyGuts/snake-ai-reinforcement/blob/master/snakeai/agent/dqn.py) to see how to interact with the environment during training.  You should report the results of the training process and include a detailed analysis of these results. Also, you should be able to save the parameters of a trained agent and load them in order to show how it performs in different environments. Look at the functionality of the project that allows to load pretrained agents.

### 2.2 (1.0) Approximate Q-learning agent

Implement an approximate Q-learning agent that learns to play Snake. The agent must calculate the $Q$ function as a linear combination of features extracted from the environment:

$$ Q(s,a) = w_1f_1(s,a)+w_2f_2(s,a)\dots w_nf_n(s,a)$$

The agent must calculate these features based on the current state of the board which can be accessed through the observations from the environment. You should try different features and discuss them.

You have to create a new class that extends from the `AgentBase` class:

In [None]:
class ApproximateQAgent(AgentBase):
    """ Represents an intelligent agent for the Snake environment. """
    
    def begin_episode(self):
        """ Reset the agent for a new episode. """
        pass

    def act(self, observation, reward):
        """
        Choose the next action to take.
        Args:
            observation: observable state for the current timestep. 
            reward: reward received at the beginning of the current timestep.
        Returns:
            The index of the action to take next.
        """
        return None

    def end_episode(self):
        """ Notify the agent that the episode has ended. """
        pass
    
    def train(self, env, num_episodes=1000, discount_factor=0.9, alpha=0.05, k=10):
        """
        Train the agent to perform well in the given Snake environment using Q learning.
        
        Args:
            env:
                an instance of Snake environment.
            num_episodes (int):
                the number of episodes to run during the training.
            discount_factor (float):
                discount factor (gamma) for computing the value function.
            alpha (float): 
                moving average parameter.
            k (int): 
                exploration function parameter.
        """
        pass
    
    def features(self, observation):
        """
        Calculate features from an observation of the environment.
        
        Args:
            observation: observable state for the current timestep. 
        
        Returns:
            A list/array of feature values.

The class will include additional methods, in particular methods for training and housekeeping of internal variables. You can take a look to the `DeepQNetworkAgent` [class](https://github.com/YuriyGuts/snake-ai-reinforcement/blob/master/snakeai/agent/dqn.py) to see how to interact with the environment during training.  You should report the results of the training process and include a detailed analysis of these results. Also, you should be able to save the parameters of a trained agent and load them in order to show how it performs in different environments. Look at the functionality of the project that allows to load pretrained agents.

### 2.3 (0.5) Evaluating the performance of different learning methods

Make an analysis of the performance of the different learning agents: Q-lerning, Approximate Q-learnging and Deep Reinforcement Learning (using the pretrained models available in the [release page](https://github.com/YuriyGuts/snake-ai-reinforcement/releases)). The performance of the models must be measured in terms of the average maximum length of the snake.

## Submission

The assignment must be submitted as a compressed file containing this notebook as well as all supporting files through the following Dropbox [file request](https://www.dropbox.com/request/ctSp1nsjXNcQKBXVDXfu), before midnight of the deadline date. The file must be named as is-assign3-unalusername1-unalusername2-unalusername3.zip, where unalusername is the user name assigned by the university (include the usernames of all the members of the group).

### Test functions

You can use the following functions to test your solutions.

In [238]:
import hashlib

def round_dict(dictionary, digits):
    return {key:round(value, digits) for key, value in dictionary.items()}

def hash_list(lista):
    string = str(lista)
    hash_object = hashlib.sha1(string.encode('latin_1'))
    hex_dig = hash_object.hexdigest()
    return hex_dig

def hash_dict(dictionary):
    string = str([(key,dictionary[key]) 
                  for key in sorted(dictionary.keys())])
    hash_object = hashlib.sha1(string.encode('latin_1'))
    hex_dig = hash_object.hexdigest()
    return hex_dig

def compare(vals1, vals2, error):
    for i, val in enumerate(vals1):
        if abs(vals2[i] - val) > error:
            return False
    return True

def compare_dicts(d1, d2, error):
    if d1.keys() != d2.keys():
        return False
    for i in d1:
        if abs(d1[i] - d2[i]) > error:
            print(d1[i],d2[i])
            return False
    return True

def compare_dicts_d(d1, d2, error):
    if d1.keys() != d2.keys():
        return False
    for i in d1:
        if d1[i] != d2[i]:
            return False
    return True

def rand_policy(N, seed):
    random.seed(seed)
    policy = random.choices(['P', 'C'], k=N)
    return {i:policy[i-1] for i in range(1, N + 1)}

def test1():
    # case 1
    N, M, p = 10, 20, 0.05
    answer = {1: -19.05, 2: -18.1975, 3: -17.437625, 4: -16.765744, 5: -16.177457, 6: -15.668584, 7: -15.235155, 8: -14.873397, 9: -14.579727, 10: -14.350741}
    result = expected_rewards(N, M, p)
    if not compare_dicts(answer, result, 0.001):
        return False
    # case 2
    N, M, p = 10, 100, 0.5
    answer = {1: -50.5, 2: -26.25, 3: -14.625, 4: -9.3125, 5: -7.15625, 6: -6.578125, 7: -6.578125, 8: -6.578125, 9: -6.578125, 10: -6.578125}
    result = expected_rewards(N, M, p)
    if not compare_dicts(answer, result, 0.001):
        return False

    # case 3
    N, M, p = 50, 10000, 0.1
    answer = "02047eaf324745e70fd84be50c79b8303e2d8b34"
    result = hash_dict(round_dict(expected_rewards(N, M, p), 4))
    if not answer == result:
        print(answer, result)
        return False
    
    # case 4
    N, M, p = 100, 200, 0.05
    answer = "a6d6934abea3631e95484057c40d28638d2bf51b"
    result = hash_dict(round_dict(expected_rewards(N, M, p), 4))
    if not answer == result:
        print(answer, result)
        return False

    return True


def test2():
    # case 1
    N, M, p = 10, 20, 0.05
    pol = rand_policy(N, 124)
    answer = {1: -20.0, 2: -20.0, 3: -19.15, 4: -19.15, 5: -18.4425, 6: -18.4425, 7: -17.870375, 8: -17.376856, 9: -17.376856, 10: -17.376856}
    result = eval_policy(N, M, p, pol)
    if not compare_dicts(answer, result, 0.001):
        return False
    
    # case 2
    N, M, p = 10, 100, 0.5
    pol = rand_policy(N, 123)
    answer = {1: -50.5, 2: -26.25, 3: -14.625, 4: -9.3125, 5: -9.3125, 6: -7.65625, 7: -7.65625, 8: -7.828125, 9: -7.828125, 10: -8.914062}
    result = eval_policy(N, M, p, pol)
    if not compare_dicts(answer, result, 0.001):
        return False

    # case 3
    N, M, p = 50, 10000, 0.1
    pol = rand_policy(N, 125)
    answer = "184ea94fb88be2b8b64362b8a1bcc564d9f06acb"
    result = hash_dict( round_dict(eval_policy(N, M, p, pol), 4))
    if not answer == result:
        return False

    # case 4
    N, M, p = 100, 200, 0.05
    pol = rand_policy(N, 126)
    answer =  "b08946cb4b27ce82526f68c1c02b0b6ad6ab50a8"
    result = hash_dict( round_dict(eval_policy(N, M, p, pol), 4))
    if not answer == result:
        return False
    
    return True



def test3():
    # case 1
    N, M, p = 10, 20, 0.05
    answer = {1: 'P', 2: 'P', 3: 'P', 4: 'P', 5: 'P', 6: 'P', 7: 'P', 8: 'P', 9: 'P', 10: 'P'}
    result = optimal_policy(N, M, p)
    if not compare_dicts_d(answer, result, 0.001):
        return False

    # case 2
    N, M, p = 10, 100, 0.5
    answer = {1: 'P', 2: 'P', 3: 'P', 4: 'P', 5: 'P', 6: 'P', 7: 'C', 8: 'C', 9: 'C', 10: 'C'}
    result = optimal_policy(N, M, p)
    if not compare_dicts_d(answer, result, 0.001):
        return False

    # case 3
    N, M, p = 50, 10000, 0.1
    answer = "2057cfda7eb7b171be964137648d85a6b4f4314f"
    result = hash_dict(optimal_policy(N, M, p))
    if not answer == result:
        return False

    # case 4
    N, M, p = 100, 200, 0.05
    answer = "f707b5fdbd0f29a4550b4730c7594aa61b373e17"
    result = hash_dict(optimal_policy(N, M, p))
    if not answer == result:
        return False

    return True

def evaluate():
    score = 0 
    for test in [test1, test2, test3]:
        if test():
            score += 1
    return score

print ("Score: ", evaluate(), "/ 3")

Score:  3 / 3
