In [1]:
import numpy as np
import matplotlib.pyplot as plt 
from pprint import pprint
from tqdm import tqdm 
from random import choice

In [2]:
actions= ['up', 'down', 'left', 'right']
vectors= {'up': np.array([-1, 0]),
          'right': np.array([0, 1]),
          'down': np.array([1, 0]),
          'left': np.array([0, -1])
          }

#Global variables
num_rows= 3
num_cols= 3
num_actions= len(actions)
initial_state= np.array([0, 0])
final_state= np.array([2,2])
gamma=0.7
episode_length=20
num_episodes=1000


- **actions**: list of possible actions
- **vectors**: dictionary for mapping each direction to its vector

### Rewards
- **rewards**: the array of given rewards taken from the exo paper
- **reward_function**: a dict for maping a actual state and the action taken by the agent to their respective reward {state: {direction: reward}}\
e.g.: 
```python
{(0, 0): {'up': -1,
          'down': 2/3}...
          ...}
```
from state `(0, 0)` if the agent moves `up` it gets `-1`

In [3]:
rewards=np.array([
    [[-1., 2/3, -1., 1.], [-1., 2., 1/2, 3/2], [-1., 5/2, 1/2, -1.]],
    [[1/3, 3/2, -1., 4/3.], [1/4, 3., 1/3, 3/2], [1/4, 7/2, 1., -1.]],
    [[1/2, -1., -1., 3/2], [4/5, -1., 1., 3.], [1/2, -1., 4/5, -1.]],
    ])
reward_function= {(i//num_rows, i%num_cols): {actions[j]: rewards[i//num_rows, i%num_cols, j] for j in range(num_actions)} for i in range(num_rows*num_cols)}
pprint(reward_function)

{(0, 0): {'down': 0.6666666666666666, 'left': -1.0, 'right': 1.0, 'up': -1.0},
 (0, 1): {'down': 2.0, 'left': 0.5, 'right': 1.5, 'up': -1.0},
 (0, 2): {'down': 2.5, 'left': 0.5, 'right': -1.0, 'up': -1.0},
 (1, 0): {'down': 1.5,
          'left': -1.0,
          'right': 1.3333333333333333,
          'up': 0.3333333333333333},
 (1, 1): {'down': 3.0, 'left': 0.3333333333333333, 'right': 1.5, 'up': 0.25},
 (1, 2): {'down': 3.5, 'left': 1.0, 'right': -1.0, 'up': 0.25},
 (2, 0): {'down': -1.0, 'left': -1.0, 'right': 1.5, 'up': 0.5},
 (2, 1): {'down': -1.0, 'left': 1.0, 'right': 3.0, 'up': 0.8},
 (2, 2): {'down': -1.0, 'left': 0.8, 'right': -1.0, 'up': 0.5}}


### Policies:
the policies follow a pattern where each policy is a dict mapping the state to the possible actions then we pick one randomly\
{state: possible actions} e.g:
```python
{(0, 0) : ['right'] ...}
```
if the agent is in state `(0, 0)` it will go `right`

In [4]:
policy_1= {(i, j): ['down'] if i != 2 else ['right'] for i in range(num_rows) for j in range(num_cols)}
policy_3= {(i, j): actions for i in range(num_rows) for j in range(num_cols)}
policy_2= {
    (0, 0): ['right', 'down'],
    (0, 1): ['down'],
    (0, 2): ['down', 'left'],
    (1, 0): ['right'],
    (1, 1): actions,
    (1, 2): ['left'],
    (2, 0): ['up', 'right'],
    (2, 1): ['up'],
    (2, 2): ['up', 'left']
}
print('policy_1:')
pprint(policy_1)
print('\npolicy_2:')
pprint(policy_2)
print('\npolicy_3:')
pprint(policy_3)

policy_1:
{(0, 0): ['down'],
 (0, 1): ['down'],
 (0, 2): ['down'],
 (1, 0): ['down'],
 (1, 1): ['down'],
 (1, 2): ['down'],
 (2, 0): ['right'],
 (2, 1): ['right'],
 (2, 2): ['right']}

policy_2:
{(0, 0): ['right', 'down'],
 (0, 1): ['down'],
 (0, 2): ['down', 'left'],
 (1, 0): ['right'],
 (1, 1): ['up', 'down', 'left', 'right'],
 (1, 2): ['left'],
 (2, 0): ['up', 'right'],
 (2, 1): ['up'],
 (2, 2): ['up', 'left']}

policy_3:
{(0, 0): ['up', 'down', 'left', 'right'],
 (0, 1): ['up', 'down', 'left', 'right'],
 (0, 2): ['up', 'down', 'left', 'right'],
 (1, 0): ['up', 'down', 'left', 'right'],
 (1, 1): ['up', 'down', 'left', 'right'],
 (1, 2): ['up', 'down', 'left', 'right'],
 (2, 0): ['up', 'down', 'left', 'right'],
 (2, 1): ['up', 'down', 'left', 'right'],
 (2, 2): ['up', 'down', 'left', 'right']}


### **Agent `Class`**: 
to instanciate an Agent with :
- **attributs**:
  - **policy**: how to act
  - **initial_state**: from where to start
  - **vectors**: how it can move
  - **reward_function**: how much it gets
- **methods**:
  - **act(verbose)**: put the agent into work phase
    - verbose: is for printing some logs about the agent
    - returns: the reward and the action


In [5]:
class Agent:

    def __init__(self, initial_state: np.ndarray, policy: dict, vectors: dict, reward_function: dict):
        self.state= initial_state
        self.policy= policy
        self. vectors= vectors
        self.reward_function= reward_function
    
    def act(self, verbose: int= 0):
        action= choice(self.policy[tuple(self.state)]) # choosing randomly and uniformely from possible actions
        reward= self.__get_reward(tuple(self.state), action)# get the associated reward for the action taken

        if verbose:
            log= f"""                   current state: {tuple(self.state)}
                        choosen action: {action}
                        reward: {reward}
                """
            print(log)
        
        if reward == -1:
            return reward, action
        
        # update agent's state
        self.state= self.state + self.vectors[action]
        if verbose:
            print(f'new state= {self.state}')
        return reward, action

    def __get_reward(self, state: tuple, action: str):
        reward= self.reward_function[state][action]
        return reward



### Calculate discount reward:
is a function that returns the discount reward given a list of rewards [t, t+1, t+2, ..]\
disount reward<sub>t</sub>= gamma * R<sub>t+1</sub> + gamma<sup>2</sup> * R<sub>t+2</sub> + gamma<sup>3</sup> * R<sub>t+3</sub> ...

In [6]:
def calculate_discount_reward(rewards, gamma= 0.5):
    discount_reward = 0
    for i, reward in enumerate(rewards):
        discount_reward += reward * gamma**i
    return discount_reward

### Monte Carlo simulation 
is the function that returns the discount reawrd of the rewards of the episodes of simulation
- if the agent arrives to destination (2, 2) it will stop from iterating 
- returns the discount reward

In [7]:
def monte_carlo_simulation(initial_state: tuple= initial_state, destination: tuple= final_state, policy: dict= policy_1, vectors: dict= vectors, reward_function: dict= reward_function, gamma: float= gamma, num_episodes: int= num_episodes, episode_length: int= episode_length, verbose: int= 0):
    
    agent= Agent(initial_state=initial_state, policy=policy, vectors=vectors, reward_function= reward_function)
    rewards= []
    agent_path= []
    arrived= False

    for i in tqdm(range(num_episodes)):
        if np.array_equal(agent.state, np.array(destination)):
                arrived= True
                break
        episode_rewards= []

        for j in range(episode_length):
            if np.array_equal(agent.state, np.array(destination)):
                arrived= True
                break
            else:
                reward, action= agent.act(verbose)
                episode_rewards.append(reward)
                agent_path.append(action)

        
        rewards.append(calculate_discount_reward(episode_rewards, gamma))

    if arrived:   
        print(f'arrived to destination in the episode {i} and step {j} ') 
    else:
        print("didn't arrive")

    return calculate_discount_reward(rewards), agent_path
    


In [9]:
# a function to run the simulation
def run_simulation(policies: list, initial_state: tuple= initial_state, destination: tuple= final_state, vectors: dict= vectors, reward_function: dict= reward_function, gamma: float= gamma, num_episodes: int= num_episodes, episode_length: int= episode_length, verbose: int= 0):
    Vs= []
    for policy in policies:
        Vs.append(monte_carlo_simulation(initial_state= initial_state, destination= destination, policy= policy, vectors= vectors, reward_function= reward_function, gamma= gamma, num_episodes= num_episodes, episode_length= episode_length, verbose= verbose)[0])
    return Vs

In [10]:
dr, path= monte_carlo_simulation(policy=policy_1)
print()
print(f'the discount reward is: {dr}')
print('the agent\'s path:')
print(' --> '.join(path))

  0%|          | 1/1000 [00:00<00:04, 223.36it/s]

arrived to destination in the episode 1 and step 4 

the discount reward is: 3.480666666666666
the agent's path:
down --> down --> right --> right





In [11]:
Vs= run_simulation(policies= [policy_1, policy_2, policy_3])
Vs

  0%|          | 1/1000 [00:00<00:00, 1840.41it/s]


arrived to destination in the episode 1 and step 4 


100%|██████████| 1000/1000 [00:00<00:00, 2456.58it/s]


didn't arrive


  0%|          | 2/1000 [00:00<00:00, 1090.42it/s]

arrived to destination in the episode 2 and step 17 





[3.480666666666666, 8.388361071424578, -0.26654527489276497]