# Reinforcement learning with Foolsball
- Reinforcement learning is learning to make decisions from experience.
- Games are a good testbed for RL.
 

# About Foolsball
- 5x4 playground that provides a football/foosball-like environment.
- A controllable player:
  - always spawned in the top-left corner
  - displayed as '⚽'
  - can move North, South, East or West.
  - can be controlled algorithmically
- A number of **static** opponents, each represented by 👕, that occupy certain locations on the field.
- A goalpost 🥅 that is fixed in the bottom right corner

## Goals
### Primary goal
- We want the agent to learn to reach the goalpost 

### Secondary goals
- We may want the agent to learn to be efficient in some sense, for example, take the shortest path to the goalpost. 

## Rules 
- Initial rules:
    - The ball can be (tried to be) moved in four direction: \['n','e','w',s'\]
    - Move the ball to an unmarked position: -1 points
    - Move the ball to a position marked by a defender: -5 points
    - Try to move the ball ouside the field: -1 (ball stays in the previous position)
    - Move the ball into the goal post position: +5


## Some variables definitions to make our enviroment look pretty

In [None]:
agent = '⚽'
opponent = '👕'
goal = '🥅'

arena = [['⚽', ' ' , '👕', ' ' ],
         [' ' , ' ' , ' ' , '👕'],
         [' ' , '👕', ' ' , ' ' ],
         [' ' , ' ' , ' ' , '👕'],
         [' ' , '👕', ' ' , '🥅']]

## Full code for the Foolsball class

In [None]:
import numpy as np

class Foolsball(object):
    def __to_state__(self,row,col):
        """Convert from indices (row,col) to integer position."""
        return row*self.n_cols + col
    
    
    def __to_indices__(self, state):
        """Convert from inteeger position to indices(row,col)"""
        row = state // self.n_cols
        col = state % self.n_cols
        return row,col

    def __deserialize__(self,map:list,agent:str,opponent:str, goal:str):
        """Convrt a string representation of a map into a 2D numpy array
        Param map: list of lists of strings representing the player, opponents and goal.
        Param agent: string representing the agent on the map 
        Param opponent: string representing every instance of an opponent player
        Param goal: string representing the location of the goal on the map
        """
        ## Capture dimensions and map.
        self.n_rows = len(map)
        self.n_cols = len(map[0])
        self.n_states = self.n_rows * self.n_cols
        self.map = np.asarray(map)

        ## Store string representations for printing the map, etc.
        self.agent_repr = agent
        self.opponent_repr  = opponent
        self.goal_repr = goal

        ## Find initial state, the desired goal state and the state of the opponents. 
        self.init_state = None
        self.goal_state = None
        self.opponents_states = []

        for row in range(self.n_rows):
            for col in range(self.n_cols):
                if map[row][col] == agent:
                    # Store the initial state outside the map.
                    # This helps in quickly resetting the game to the initial state and
                    # also simplifies printing the map independent of the agent's state. 
                    self.init_state = self.__to_state__(row,col)
                    self.map[row,col] = ' ' 

                elif map[row][col] == opponent:
                    self.opponents_states.append(self.__to_state__(row,col))

                elif map[row][col] == goal:
                    self.goal_state = self.__to_state__(row,col)

        assert self.init_state is not None, f"Map {map} does not specify an agent {agent} location"
        assert self.goal_state is not None,  f"Map {map} does not specify a goal {goal} location"
        assert self.opponents_states,  f"Map {map} does not specify any opponents {opponent} location"

        return self.init_state
    
    
    def __init__(self,map,agent,opponent,goal):
        """Spawn the world, create variables to track state and actions."""
        # We just need to track the location of the agent (the ball)
        # Everything else is static and so a potential algorithm doesn't 
        # have to look at it. The variable `done` flags terminal states.
        self.state = self.__deserialize__(map,agent,opponent,goal)
        self.done = False
        self.actions = ['n','e','w','s']

        # Set up the rewards
        self.default_rewards = {'unmarked':-1, 'opponent':-5, 'outside':-1, 'goal':+5}
        self.set_rewards(self.default_rewards)
        
    def set_rewards(self,rewards):
        if not self.state == self.init_state:
            print('Warning: Setting reward while not in initial state! You may want to call reset() first.')
        for key in self.default_rewards:
            assert key in rewards, f'Key {key} missing from reward.'
            self.rewards = rewards
            
            
    def reset(self):
        """Reset the environment to its initial state."""
        # There's really just two things we need to reset: the state, which should
        # be reset to the initial state, and the `done` flag which should be 
        # cleared to signal that we are not in a terminal state anymore, even if we 
        # were earlier. 
        self.state = self.init_state
        self.done  = False
        return self.state
    
    def __get_next_state_on_action__(self,state,action):
        """Return next state based on current state and action."""
        assert not self.__is_terminal_state__(state), f"Action {action} undefined for terminal state {state}"
        
        row, col = self.__to_indices__(state)
        action_to_index_delta = {'n':[-1,0], 'e':[0,+1], 'w':[0,-1], 's':[+1,0]}

        row_delta, col_delta = action_to_index_delta[action]
        new_row , new_col = row+row_delta, col+col_delta

        ## Return current state if next state is invalid
        if not(0<=new_row<self.n_rows) or\
        not(0<=new_col<self.n_cols):
            return state  

        ## Construct state from new row and col and return it.    
        return self.__to_state__(new_row, new_col)    
    
  
    def __get_reward_for_transition__(self,state,next_state):
        """ Return the reward based on the transition from current state to next state. """
        ## Transition rejected due to illegal action (move)
        assert not self.__is_terminal_state__(state), f"Reward is undefined for terminal state {state}"
        
        if next_state == state:
            reward = self.rewards['outside']

        ## Goal!
        elif next_state == self.goal_state:
            reward = self.rewards['goal']

        ## Ran into opponent. 
        elif next_state in self.opponents_states:
            reward = self.rewards['opponent']

        ## Made a safe and valid move.   
        else:
            reward = self.rewards['unmarked']

        return reward    
    
    
    def __is_terminal_state__(self, state):
        return (state == self.goal_state) or (state in self.opponents_states) 
    
      
    def step(self,action):
        """Simulate state transition based on current state and action received."""
        assert not self.done, \
        f'You cannot call step() in a terminal state({self.state}). Check the "done" flag before calling step() to avoid this.'
        next_state = self.__get_next_state_on_action__(self.state, action)

        reward = self.__get_reward_for_transition__(self.state, next_state)

        done = self.__is_terminal_state__(next_state)

        self.state, self.done = next_state, done

        return next_state, reward, done
    
    
    
    def render(self):
        """Pretty-print the environment and agent."""
        ## Create a copy of the map and change data type to accomodate
        ## 3-character strings
        _map = np.array(self.map, dtype='<U3')

        ## Mark unoccupied positions with special symbol.
        ## And add extra spacing to align all columns.
        for row in range(_map.shape[0]):
            for col in range(_map.shape[1]):
                if _map[row,col] == ' ':
                    _map[row,col] = ' + '

                elif _map[row,col] == self.opponent_repr: 
                    _map[row,col] =  self.opponent_repr + ' '

                elif _map[row,col] == self.goal_repr:
                    _map[row,col] = ' ' + self.goal_repr + ' '

        ## If current state overlaps with the goal state or one of the opponents'
        ## states, susbstitute a distinct marker.
        if self.state == self.goal_state:
            r,c = self.__to_indices__(self.state)
            _map[r,c] = ' 🏁 '
        elif self.state in self.opponents_states:
            r,c = self.__to_indices__(self.state)
            _map[r,c] = ' ❗ '
        else:
            r,c = self.__to_indices__(self.state)
            _map[r,c] = ' ' + self.agent_repr

        for row in range(_map.shape[0]):
            for col in range(_map.shape[1]):
                print(f' {_map[row,col]} ',end="")
            print('\n') 

        print()


# Verify the environment
Execute the two cell below and ensure that there are no runtime error and the rendering happens correctly. You should see output like this:

```
  ⚽   +   👕    +  

  +    +    +   👕  

  +   👕    +    +  

  +    +    +   👕  

  +   👕    +    🥅  

```

In [None]:
foolsball = Foolsball(arena, agent, opponent, goal)

In [None]:
foolsball.render()

# Explore the environment
- Run the next cell to play with the environment and score a few goals. 
- If there are any errors you may want to go back and update the code for the final implementation of the `Foolsball` class. 
- Make sure to run the cell containing your final implementation before retrying

In [None]:
## Move: n,s,e,w
## Reset: r
## Exit: x
while True:
    try:
        act = input('>>')

        if act in foolsball.actions:
            print(foolsball.step(act))
            print()
            foolsball.render()
        elif act == 'r':
            print(foolsball.reset())
            print()
            foolsball.render()
        elif act == 'x':
            break
        else:
            print(f'Invalid input:{act}')
    except Exception as e:
        print(e)

# Understanding the first bits of terminology.
## State 
- In RL, state refers to information about the environment and then agent.
- An RL algorithm inspects the state to decide which action to take.
- Exactly what information gets captured in `state` depends on a few factors:
  - The complexity of the environment: 
    - The number of actors, 
    - the nature of the environment, for example text or images. 
  - The complexity of the algorithm
    - A simple algorithm may only need information about the agent and its immediate surroundings.
    - A more complex algorithm may need information about the whole environment.


## Setup
- In our case we want the algorithm to only know about the location of the agent on the field. 
- We could have included information about the opponents too which would perhaps aid in the decision making but we chose not to.  

- The state therefore is a tuple: (row, col), representing the location of the agent. 
- There are 20 possible values that `state` can take on:
  - `row` can range from 0 through 4
  - `col` can range from 0 through 3

## Implementation details
- The state is actually stored as a single integer that can take on values between 0 and 19.

## Actions
The agents can perfrom actions in an environment.

## Setup
- Our agent can perform one kind of action: navigate up, down, right or left.
- It has 4 actions: 'n', 'e', 'w', 's'.

# Learning from experience
Any RL set up can be modeled as shown below:

![](https://encrypted-tbn0.gstatic.com/images?q=tbn%3AANd9GcTMDmrmnl_dAyjCOErHPak2gLXmQTgQnVT8gQ&usqp=CAU)

- The agent performs an action in the environment
- The state of the environment and agent change as a result
- The agent receives a reward and the updated state from the environment

## Rewards
- Reward is the signal that an agent receives after it performs an action.
- The reward structure has to be decided by us. 
- The biggest challenge of RL is that reward is often sparse. 

## Set up
- In our case the reward depends on the rules of the game and our goal.
  - If the agent runs into an opponent, the game gets over and the reward is negative (penalizes the agent).
  - If the agent makes it to the goalpost, the game gets over and the reward is positive.
  - if the agent takes the ball out of the field the reward is negative.
  - If the agent makes a valid move what shoud the reward be?

## Implementation
- The default reward structure in our case is  `{'unmarked':-1, 'opponent':-5, 'outside':-1, 'goal':+5}`
- This can be changed at any time by calling `set_rewards()`.
- Taking the ball to an unmarked position seems like a small step towards reaching the goalpost. Why would we then ever want to have a negative reward for this type of manouver?

# Step 1: Understand the concept of returns¶
- Complete the get_return() function.
- Calculate returns for a few sample paths by running the next few cells

In [None]:
path1 = ['e','s','e','s','s','s','e']
path2 = ['s','e','e','s','s','s','e']
path3 = ['s','s','s','e','e','s','e']
path4 = ['s','s','s','s','n','e','e','s','e']

In [None]:
def get_return(path):
    foolsball.reset()
    foolsball.render()
  
    _return_ = 0
    for act in path: 
        #Todo

    print(f'Return (accumulated reward): {_return_}')

In [None]:
get_return(path1)

In [None]:
get_return(path2)

In [None]:
get_return(path3)

In [None]:
get_return(path4)

# Step 2: Experiment with a different reward structure.
- Does it encourage the agent to take the shortest route?

In [None]:
## Update reward structure to: {'unmarked':0, 'opponent':-5, 'outside':-1, 'goal':+5}

In [None]:
get_return(path1)

In [None]:
get_return(path4)

# Step 3: Implement discounted returns¶
- Get introduced to discounted return as a means to set acceptable time horizons.$$Discounted\ Return = R_{t_1} + \gamma*R_{t_2} + \gamma^2*R_{t_3} + ... + \gamma^{n-1}*R_{t_n}$$where $R_{t_k}$ is the reward after step k and $\gamma$ is called the discount factor.
- Complete the code below to implement discounted returns.
- The discount factor $\gamma$ is a hyperparameter (why?) often set to 0.9 😜
- Run the next few cells to see if discounting indeed has the effect we want (shorter paths have higher rewards)

In [None]:
def get_discounted_return(path, gamma=0):
    foolsball.reset()
    foolsball.render()
    _return_ = 0
            
    print(f'Return (accumulated reward): {_return_}')

In [None]:
HYPER_PARAMS = {'gamma':0.9}

In [None]:
get_discounted_return(path1, HYPER_PARAMS['gamma'])

In [None]:
get_discounted_return(path4, HYPER_PARAMS['gamma'])

# Formalizing the problem:
- We want the agent to reach the goalpost AND attain the **highest discounted return**.
- This means making safe and efficient moves
 - Running into opponents means game over
 - Repeated 'outsides' means inefficiency
 - Long detours are also inefficient

## The Conundrum
 - We already know how to compute the discounted return for a single path.
 - We can generate all possible paths and calculate their returns and pick a path with the highest discounted return.
 - Alas there are too many paths (4 possible decisions at each step => combinatorial explosion).
 

## The "Trick"
- Even though there are too many paths, all of them are made up of a smaller number of (state,action) pairs.
- We can calculate the highest discounted return for each of the 80(=20x4) state action pairs.
- To emphasize, we want to calculate the return for each (state,action) pair, not just the reward.
 - Calculating return means peeking into the future, beyond that (state,action) pair.
- Once we have the discounted returns for every (state, action) pair, we can construct a path with the highest discounted return by repeatedly picking the (state, action) pair that has the highest discounted return. 

# Step 4: Construct a Rewards table
- As a precursor to calculating returns for every (state,action) pair, let's try to calculate the reward for every (state,action) pair.

- Understand how the code in the next two cells creates a Pandas table to store the rewards for every (state, action) pair.

- We will cheat a little by using private methods of the Foolsball class

- Use the __get_next_state_on_action__() and __get_reward_for_transition__() methods to complete the code in the third cell below.

- Run the fourth cell to view the rewards table.

- Notice that rewards for terminal states are kept undefined since no actions are allowed in those states.

In [None]:
import pandas as pd

In [None]:
REWARDS_TBL = pd.DataFrame.from_dict({s:{a:None for a in foolsball.actions} for s in range(foolsball.n_states)}, orient='index')
REWARDS_TBL

In [None]:
for state in REWARDS_TBL.index:
    if not foolsball.__is_terminal_state__(state): #Only calculate rewards for non-terminal states
        for action in REWARDS_TBL.columns:
            next_state = foolsball.__get_next_state_on_action__(state,action)
            REWARDS_TBL.loc[state, action] = foolsball.__get_reward_for_transition__(state, next_state)

In [None]:
terminal_states = foolsball.opponents_states+[foolsball.goal_state]
print(terminal_states)
REWARDS_TBL

# Step 5:Construct an empty Returns table
- Implement the function `make_returns_table()`

- The table should have one row for every state and one column for every action, just like the rewards table.

- The returns for all actions in every terminal state should be zero (why?)

- The returns for all other state, action pairs should be left undefined (`None`)

- Trying to fill up these entries will be the focus of the rest of the notebook.

In [None]:
def make_returns_table(states_list, actions_list, terminal_states):
    """Create an empty returns table."""
    return table

In [None]:
RETURNS_TBL = make_returns_table(range(foolsball.n_states), foolsball.actions, terminal_states)

In [None]:
print(terminal_states)
RETURNS_TBL

# Step 6: Use dynamic programming to fill up the returns table
- The **highest discounted return** for a **(state, action)** can be defined in terms of returns of the next state.

$ Return(state_t,action_t) = Reward(state_t,state_{t+1}) + \gamma * \max \begin{bmatrix} Return(state_{t+1}, action=='n')\\ Return(state_{t+1}, action=='e')\\  Return(state_{t+1}, action=='w')\\  Return(state_{t+1}, action=='s') \end{bmatrix}$

- Complete the implementation of `compute_returns_v0()` below 
- Run the next cell. The code causes a stack overflow. Why?


In [None]:
def compute_returns_v0(table,state,action, debug=False): 
    """ Recursively compute the discounted return for a (state,action) pair"""
    if table.loc[state, action] is None:
        next_state = foolsball.__get_next_state_on_action__(state, action)
        
        if debug:
            print(f'Trying to fill ({state},{action},{next_state})')

    
    else:
        if debug:
            print((state,action),f'Returning pre-computed return {table.loc[state][action]}')
    
    return table.loc[state,action]

In [None]:
RETURNS_TBL = make_returns_table(range(foolsball.n_states), foolsball.actions, terminal_states)
for a in foolsball.actions:
    compute_returns_v0(RETURNS_TBL,state=0, action=a, debug=True)


# Contd..
- The code above crashed becasue of indefinite recursion caused by a state,action pairs that resulted in the next state being the same as the current state
- We can fix this by catching this case and a returning a large negative return.
- Why is the large negative return necessary?
- Make this change in the code below to implement this strategy
- Run the next two cells to see if the RETURNS table gets filled up.

In [None]:
def compute_returns_v1(table,state,action, debug=False): 
    """ Recursively compute the discounted return for a (state,action) pair"""
    if table.loc[state, action] is None:
        next_state = foolsball.__get_next_state_on_action__(state, action)
        
        if debug:
            print(f'Trying to fill ({state},{action},{next_state})')

    
    else:
        if debug:
            print((state,action),f'Returning pre-computed return {table.loc[state][action]}')
    
    return table.loc[state,action]

In [None]:
RETURNS_TBL = make_returns_table(range(foolsball.n_states), foolsball.actions, terminal_states)
for a in foolsball.actions:
    compute_returns_v1(RETURNS_TBL,state=0, action=a, debug=True)

# Contd..
- The code above crashed becasue of indefinite mutual recursion caused by a (state,action) pair that resulted in the (next_state, next_action) bringing us back to the first state.
- We can fix this by eliminating recursion altogether and relying on the values in the table. 
- How do we get values in the table to begin with? 
- How do we make sure that the returns for all (state, action) pairs get updated?

In [None]:
def compute_returns_v2(table,state,action, debug=False): 
    """ Recursively compute the discounted return for a (state,action) pair"""
    if not foolsball.__is_terminal_state__(state):

        next_state = foolsball.__get_next_state_on_action__(state, action)
        reward = foolsball.__get_reward_for_transition__(state, next_state)
    
    return table.loc[state,action]

In [None]:
def make_returns_table(states_list, actions_list, terminal_states):
    """Create an empty returns table."""
    table = pd.DataFrame.from_dict({s:{a:0 for a in actions_list} for s in states_list}, orient='index')
    return table

In [None]:
RETURNS_TBL = make_returns_table(range(foolsball.n_states), foolsball.actions, terminal_states)
RETURNS_TBL

In [None]:
for s in range(foolsball.n_states):
    for a in foolsball.actions:
        compute_returns_v2(RETURNS_TBL,state=s, action=a, debug=True)
RETURNS_TBL

# Step 7: Let the returns stabilize (converge)¶
- How do the estimates of returns evolve over iterations?
- How do we know when to stop?

In [None]:
RETURNS_TBL = make_returns_table(range(foolsball.n_states), foolsball.actions, terminal_states)
for i in range(1,50):
    RETURNS_TBL_OLD = #Todo
    for s in range(foolsball.n_states):
        for a in foolsball.actions:
            compute_returns_v2(RETURNS_TBL,state=s, action=a, debug=True)
    
    if i%5 == 0:
        print(f'\n{i} iterations')
        print(RETURNS_TBL)
    
    deltas = #Todo
    if #Todo < 1e-3:
        print(f'\nConvergence achieved at {i} iterations')
        print(RETURNS_TBL)
        break

# Step 8: Intro to Policies
- To use the returns table for making decisions, we grab the action with the highest return for each of the states in the table.
- This is called a greedy policy.
- It's a greedy policy since we take the best action at each state.
- Implement the function `greedy_policy_from_returns_tbl()`.
- Run the next two cells to output a greedy policy derived from the returns table filled earlier.

In [None]:
def greedy_policy_from_returns_tbl(table):
    policy = {s:None for s in table.index }
    #Todo
            
    return policy

In [None]:
policy0 = greedy_policy_from_returns_tbl(RETURNS_TBL)
policy0

In [None]:
def pretty_print_policy(policy):
    direction_repr = {'n':' 🡑 ', 'e':' 🡒 ', 'w':' 🡐 ', 's':' 🡓 ', None:' ⬤ '}

    for row in range(foolsball.n_rows):
        for col in range(foolsball.n_cols):
            state = foolsball.__to_state__(row, col)
            print(direction_repr[policy[state]],end='')
        print()

In [None]:
pretty_print_policy(policy0)