# RL HOMEWORK: MARKOV DECISION PROCESSES

*Denise Landini - 1938388*






For this homework I chose SpaceInvaders-v0 as an environment. 

I have considered all the specifications required by the homework. 

In particular:
*   The state space is characterized by 9 rows and 6 columns, in which I have:
 - 3 alterning rows (one yes and one no) of the enemies, so in total I have 12 enemies;
 - 2 empty rows;
 - 1 row in which there are 3 defensors;
 - 1 row (the last one) in which there is the agent. 
  
  (I have reported more detailes later)

*   The environment is fully observable;
*   The enemies and the defenders are static;
*   The agent is the only one who can perform actions;
*   The agent can do 3 discrete actions: 
 - go_right;
 - go_left;
 - shoot enemies or defenders.
*   I randomly generated 5 actions done by the agent;
*   I use SARSA algorithm;
*   The reward has been designed in order to rewarded the agent if he shoots, while he is penalized if he misses the shoot.

I define:
1.   INITIAL STATE: there are all enemies and all defensors, the agent is in the first cell of the last row;
2.   GOAL STATE: the agent win the play when the enemies are all dead. 



# IMPORTS

In [1]:
from typing import Dict, List, Optional, Tuple

import dataclasses
import numpy as np

# CLASS TRANSITION

In [2]:
@dataclasses.dataclass
class Transition:
    state: Tuple[int, int]
    action: str
    next_state: Tuple[int, int]
    reward: float
    termination: bool


# CLASS ENVIRONMENT

In [3]:
class Environment:
    _states: np.array # it is the map
    _rewards: np.array # cost of each cell
    _action_semantics: List[str] # actions that I can do, key of the dictionary 
    _init_state: Tuple[int, int] # initial position of the agent
    _current_state: Tuple[int, int] # current position of the agent
    _obstacle: Optional[List[Tuple[int, int]]] # position where there are the obstacles (enemies)
    _defence: Optional[List[Tuple[int, int]]]  # position where there are the defences
    _transition_probabilities: np.array 

    def __init__(self, 
                 rows: int,
                 cols: int,
                 step_cost: float,
                 defence: Optional[List[Tuple[int, int]]] = None,
                 obstacle: Optional[List[Tuple[int, int]]] = None) -> None:
        self._states = np.zeros((rows, cols)) 
        self._shoot_reward = 0
        
        obstacle = [] if obstacle is None else obstacle
        defence = [] if defence is None else defence
        
        self._obstacle = obstacle
        self._defence = defence
        
        self._rewards = -step_cost*np.ones((rows, cols))
               
        # The action that the agent can do are: go left, go right ad shoot to the enemy (or to the defence)
        self._action_semantics = ['go_left', 'go_right', 'shoot']
        # the initial position of the agent is in the lower left cell
        self._init_state = (rows - 1, 0)  
        self._current_state = self._init_state
       
        for r, c in obstacle:
            self._states[r, c] = -1
        for r, c in defence:
            self._states[r, c] = 2
          
        # probability to execute this action 0.8, to execute the other actions 0.1
        self._transition_probabilities = np.array([0.1, 0.8, 0.1])
                                                  
    @property
    def actions(self) -> List[str]:
        return self._action_semantics
    
    @property
    def current_state(self) -> Tuple[int, int]:
        return self._current_state
    
    @property
    def reward(self) -> float:
        r, c = self._current_state
        if self._shoot_reward != 0:
            add_reward = self._shoot_reward
            self._shoot_reward = 0
        else:
            add_reward = 0
        return self._rewards[r, c] + add_reward
    
    @property
    # the goal state is that all the enemies are dead
    def termination(self) -> bool:
        for i, j in self._obstacle:
            if self._states[i, j] != 0:
                return False
        return True
       
    def render(self) -> None:
        r, c = self._current_state
        for i in range(rows):
            strPrint = ""
            for j in range(cols):
                if self._states[i][j] == -1:
                    strPrint += "\33[43m \033[0m"
                elif self._states[i][j] == 2:
                    strPrint += "\33[41m \033[0m"
                elif i == r and j == c:
                    strPrint += "\33[42m \033[0m"
                else:
                    strPrint += " "
            print(strPrint)
        
    def _transition(self, state: Tuple[int, int], a: np.array) -> Tuple[int, int]:
        # if the agent moves right or left I have to change the position of the agent
        maxCols = self._states.shape[1]
        n_actions = len(self._action_semantics)
        a = a + n_actions if a < 0 else a % n_actions
        # in this case the agent goes left, so I decrese the coloumn by minus 1 
        if a == 0 and state[1] > 0:
            return (state[0], state[1] - 1)
             # in this case the agent goes right, so I increase the coloumn by plus 1 
        elif a == 1 and state[1] < maxCols-1:
            return (state[0], state[1] + 1)
            # in this case the agents shoots to the enmies or to the defence
        elif a == 2:
            for i in range(state[0]):
                if self._states[state[0]-1-i][state[1]] != 0:
                    self._states[state[0]-1-i][state[1]] = 0
                    self._shoot_reward = 10     
                    return self._current_state
            self._shoot_reward = -10
        return self._current_state
            
    def step(self, action: str) -> Transition:
        a_idx = self._action_semantics.index(action)
        rnd = np.random.rand()
        chosen_action = a_idx + np.random.choice([1, 0, -1], p=self._transition_probabilities)
        prev_state = self._current_state
        self._current_state = self._transition(self._current_state, chosen_action)
        return Transition(state=prev_state,
                      action=action,
                      next_state=self._current_state,
                      reward=self.reward,
                      termination=self.termination)
    
    def reset(self) -> None: 
        # I place the agent in the row at the bottom left (position of the inital state)
        self._current_state = (rows - 1, 0)
        # I reset all defenders and all enemies
        for r, c in self._obstacle:
            self._states[r, c] = -1
        for r, c in self._defence:
            self._states[r, c] = 2

# SPACE INVADERS ENVIRONMENT

In [4]:
# Environment:
# the enemies (obstacle) are the yellows: in this case we have 12 static enemies
# the defensors (defence) are the reds: in this case we have 3 static defensers
# the agent is green: in this case we have 1 dinamic agent 

obstacle = [(0,0), (0,2), (0,4), (0,6),
            (2,0), (2,2), (2,4), (2,6),
            (4,0), (4,2), (4,4), (4,6)]
defence = [(7,1), (7,3), (7,5)]
rows = 9
cols = 7
step_cost = 1
space_invaders_environment = Environment (rows, cols, step_cost, defence, obstacle)
print("Space invaders environment:\n")
space_invaders_environment.render()
space_invaders_environment.reset()

Space invaders environment:

[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
       
 [41m [0m [41m [0m [41m [0m 
[42m [0m      


# SHOW THE THREE ACTIONS THAT THE AGENT CAN DO

In [5]:
print("The agent can go right:\n")
space_invaders_environment.step("go_right")
space_invaders_environment.render()

print("\n\nThe agent can go left:\n ")
space_invaders_environment.step("go_left")
space_invaders_environment.render()

print("\n\nThe agent can shoot:")
space_invaders_environment.step("shoot")
space_invaders_environment.render()

space_invaders_environment.reset()

The agent can go right:

[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
       
 [41m [0m [41m [0m [41m [0m 
 [42m [0m     


The agent can go left:
 
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
       
 [41m [0m [41m [0m [41m [0m 
[42m [0m      


The agent can shoot:
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
  [43m [0m [43m [0m [43m [0m
       
       
 [41m [0m [41m [0m [41m [0m 
[42m [0m      


# 5 RANDOM ACTIONS

In [6]:
# I generated 5 random actions 
print("Initial state:\n")
space_invaders_environment.render()
print("\n")
for i in range(5):
    print("Number:", i)
    a = np.random.choice(space_invaders_environment.actions)
    print("Action:", a)
    out = space_invaders_environment.step(a)
    print(out)
    space_invaders_environment.render()
    print("\n")
space_invaders_environment.reset()


Initial state:

[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
       
 [41m [0m [41m [0m [41m [0m 
[42m [0m      


Number: 0
Action: go_left
Transition(state=(8, 0), action='go_left', next_state=(8, 0), reward=-1.0, termination=False)
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
       
 [41m [0m [41m [0m [41m [0m 
[42m [0m      


Number: 1
Action: go_left
Transition(state=(8, 0), action='go_left', next_state=(8, 1), reward=-1.0, termination=False)
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
       
 [41m [0m [41m [0m [41m [0m 
 [42m [0m     


Number: 2
Action: go_right
Transition(state=(8, 1), action='go_right', next_state=(8, 1), 

# SARSA 

In [7]:
# SARSA algorithm:
def sum_val() -> int:
  sum=0
  for x in space_invaders_environment._states:
    for y in x:
      if y == -1:
        sum+=1
  return sum

def policy(eps: float, actions: List[str]) -> Dict[int, str]:
    pi = {i: actions[np.argmax(Q[i])] if np.random.rand() >= epsilon
          else actions[np.random.randint(len(actions))] for i in Q.keys()}
    return pi

def preprocess_state(state: Tuple[int, int]) -> int:
    state:  Tuple[Tuple[int, int], int] = (state, sum_val())
    if state not in visited_states:
        visited_states[state] = len(visited_states)
        
    return visited_states[state]

state_space_size = rows*cols*sum_val()
actions = space_invaders_environment.actions
Q: Dict[int, List[float]] = {i: [0. for a in actions] for i in range(state_space_size)}
max_episodes = 2000
# usually in 500 episode the agent reaches the goal, but obviously it depends on the random actions 
max_episode_length = 500
visited_states = {}
alpha = 1e-3
epsilon = 0.1
gamma = 1

for episode in range(max_episodes):
    if (episode) % 100 == 0:
        print(f'Episode: {episode}')
        
    space_invaders_environment.reset()
    pi = policy(0. if episode == max_episodes - 1 else epsilon, actions)
    
    for _ in range(max_episode_length):
        state = preprocess_state(space_invaders_environment._current_state)
        #print(state)
        action = pi[state]
        transition = space_invaders_environment.step(action)
        action_idx = actions.index(action)
        next_action = pi[state]
        next_action_idx = actions.index(next_action)
        next_state = preprocess_state(transition.next_state)
        target = transition.reward + (1 - int(transition.termination))*gamma*Q[next_state][next_action_idx]
        Q[state][action_idx] = Q[state][action_idx] + alpha*(target - Q[state][action_idx])
        
        if episode == max_episodes - 1:
            print(transition)
            space_invaders_environment.render()
                
        if transition.termination:
            break

Episode: 0
Episode: 100
Episode: 200
Episode: 300
Episode: 400
Episode: 500
Episode: 600
Episode: 700
Episode: 800
Episode: 900
Episode: 1000
Episode: 1100
Episode: 1200
Episode: 1300
Episode: 1400
Episode: 1500
Episode: 1600
Episode: 1700
Episode: 1800
Episode: 1900
Transition(state=(8, 0), action='shoot', next_state=(8, 0), reward=-1.0, termination=False)
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
       
 [41m [0m [41m [0m [41m [0m 
[42m [0m      
Transition(state=(8, 0), action='shoot', next_state=(8, 0), reward=9.0, termination=False)
[43m [0m [43m [0m [43m [0m [43m [0m
       
[43m [0m [43m [0m [43m [0m [43m [0m
       
  [43m [0m [43m [0m [43m [0m
       
       
 [41m [0m [41m [0m [41m [0m 
[42m [0m      
Transition(state=(8, 0), action='shoot', next_state=(8, 0), reward=9.0, termination=False)
[43m [0m [43m [0m [43m [0m [