<a href="https://colab.research.google.com/github/DionisiusMayr/FreewayGame/blob/main/aline.almeida/a_freeway.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Useful Resources
* [Manual of the game](https://www.gamesdatabase.org/Media/SYSTEM/Atari_2600/Manual/formated/Freeway_-_1981_-_Zellers.pdf)
* [Freeway.asm source code](http://bjars.com/source/Freeway.asm) 

# 1. Description 

## 1.1. The problem addressed

- The nature of your environment

- What are the terminal states

- How is the reward function defined

- All parameters employed in your methods (discount factor, step size, etc.)

## 1.2. The MDP formulation

- How the problem was modeled

- Implementation specifics and restrictions

## 1.3. The discretization model adopted

```
      14  # Chicken Y
    , 16  # Chicken Lane Collide
    , 18  # Chicken Collision flag (with the bottom car)
    , 22  # Car X Direction
    , 23, 24, 25, 26, 27, 28, 29, 30, 31, 32  # Z Car Patterns
    , 33, 34, 35, 36, 37, 38, 39, 40, 41, 42  # Car Motion Timmers
    , 43, 44, 45, 46, 47, 48, 49, 50, 51, 52  # Car Motions
    , 87, 88  # Car Shape Ptr
    # TODO: test if this makes any difference
    , 89, 90  # Chicken Shape Ptr
    # TODO: test if this makes any difference
    , 106, 107  # Chicken Sounds
    , 108, 109, 110, 111, 112, 113, 114, 115, 116, 117  # Car X Coords
```

# 2. Implementation

## 2.1. Setup

### Baseline agent


In [1]:
## Install the dependencies:

#!pip install gym
#!pip install gym[atari]

In [2]:
## import the libraries
import gym
import time

In [3]:
## Convert hex score values to int 

def convert_score(hex_score: int) -> int:
    """Convert the score from the hex represation used in memory to base 10."""
    return (hex_score // 16) * 10 + (hex_score % 16)

In [4]:
## Baseline agent: always choose the action "up"

from abc import ABC
from abc import abstractmethod


class Agent(ABC):
    """
    Abstract class to implement agents.
    It requires an "__init__" method to set the required parameters (such
    as epsilon) and an "act" method that implements the policy of the agent.
    """
    @abstractmethod
    def __init__(self, **params):
        pass
    
    @abstractmethod
    def act(self, ob):
        pass

    
class Baseline(Agent):
    """The Baseline agent always move up, regardless of the reward received."""
    def __init__(self):
        pass
   
    def act(self, ob):
        return 1  # Always move up!

    
print('Testing agents.py...')
agent = Baseline()
print('All good!')

Testing agents.py...
All good!


In [5]:
## The Freeway enviorment 

import gym
def get_env():
    env = gym.make('Freeway-ram-v0')
    state = env.reset()
    return (env, state)

In [9]:
## Run the agent and return the scores

def run(Agent: Agent, render: bool=False, n_runs: int=1, verbose=True, colab_render:bool=False):
    scores = []  #List of each run rewards
    
    for i in range(n_runs):
        
        if colab_render == False:
            env, initial_state = get_env()
        
        else:
            env, initial_state = get_env()
            monitor_dir = os.getcwd()
            env = wrappers.Monitor(env, monitor_dir, video_callable=lambda ep_id: ep_id%1000 == 0, force=True)
            env.reset()

        agent = Agent()
        game_over = False
        action = agent.act(initial_state) 

        while not game_over:   
            ## step
            ob, reward, game_over, _ = env.step(action)
            
            ## action
            action = agent.act(ob)

        player_score = convert_score(ob[103]) #The byte 103 contains the Player 1 score.
        if verbose:
            print(f"Score #{i}: {player_score}")

        scores.append(player_score)
        env.close()
        
    return scores

In [10]:
## init the environment

env, initial_state = get_env()
print("Action Space:", env.action_space)
print("Observation Space:", env.observation_space)

Action Space: Discrete(3)
Observation Space: Box(0, 255, (128,), uint8)


The agent in this game has three possible actions:

* 0: Stay
* 1: Move forward
* 2: Move back

As a simple baseline, we are using an agent that moves always **up**.

In [11]:
## Run the baseline agent

scores = run(Baseline, render=False, n_runs=5)
scores

Score #0: 21
Score #1: 23
Score #2: 23
Score #3: 21
Score #4: 21


[21, 23, 23, 21, 21]

In [12]:
## Mean score of the baseline agent

print("Mean score:", sum(scores) / len(scores))

Mean score: 21.8


In [13]:
## Colab graphical requirements 

import numpy as np 
import pandas as pd 
import shutil
import os

!apt-get install python-opengl -y
!apt install xvfb -y
!pip install pyvirtualdisplay
!pip install https://github.com/pyglet/pyglet/archive/pyglet-1.5-maintenance.zip
!apt-get install ffmpeg -y

from pyvirtualdisplay import Display
import gym
from gym import wrappers
from gym import envs
import matplotlib.pyplot as plt

from IPython.display import clear_output
clear_output()

In [14]:
## render 1 episode of baseline strategy on colab

display = Display(visible=0,size=(1000,1000))
display.start()

run(Baseline, render=False, n_runs=1, verbose=True, colab_render=True)

from IPython.display import HTML
from base64 import b64encode
video = [v for v in os.listdir('./') if 'mp4' in v]
video.sort()
print(len(video))
vid_1 = open(video[0],'rb').read()
data_url_1 = "data:video/mp4;base64," + b64encode(vid_1).decode()
HTML("""
<video width=400 height=400 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url_1)

Score #0: 21
1


## 2.2. Monte Carlo Control -- **tem alguma coisa errada aqui, na função act**

In [28]:
## Episodes

import time
from collections import namedtuple
from typing import List

Step = namedtuple('Step', ['state', 'action', 'reward', 'score'])
class Episode(object):
    """
    An Episode is a representation of a single run of the game.
    It contains all the steps taken: the rewards associated with each
    state-action pairs and the total score at that point.
    You can acess it in a list-like interface: `episode[10].action`
    Note: We use "reward" and "score" here because it allows us to explore 
    different reward strategies.
    """
    def __init__(self):
        self.steps = []
        self.length = 0

    def __iter__(self):
        return self.steps.__iter__()

    def __getitem__(self, i):
        return self.steps[i]

    def add_step(self, state, action, reward, score):
        step = Step(state.data.tobytes(), action, reward, score)
        self.steps.append(step)
        self.length += 1

    def print_epi(self):
        for s in self.steps:
            print(f"{s.state[:10]}...:\ta {s.action} -> r {int(s.reward)} -> s {int(s.score)}")

    def get_final_score(self):
        return max([s for _, _, _, s in self.steps])

    def get_total_reward(self):
        return sum([r for _, _, r, _ in self.steps])

    def print_final_score(self):
        final_score = self.get_final_score()
        print(f"Final Score at t = {self.length}: {int(final_score)}")


def generate_episode(env, agent: Agent, RAM_mask: List[int], render: bool=False) -> Episode:
    """Performs one run of the game and returns an Episode containing all the steps taken."""
    epi = Episode()
    game_over = False
    state = env.reset()[RAM_mask] # Select useful bytes
    action = agent.act(state)  # TODO: aren't reducing the dimensionality
                                  # of the first action, but it shouldn't
                                  # impact the final result
    score = 0
    while not game_over:
        ## step
        ob, reward, game_over, _ = env.step(action)


        ## Doesn't matter what car we hit
        ob[16] = 1 if ob[16] != 255 else 0

        ## Reduce chicken y-position
        ob[14] = ob[14] // 3

        ## The chicken is in the x-posistion ~49
        ## We don't need to represent cars far from the chicken
        for i in range(108, 118):
            if ob[i] < 20 or ob[i] > 80:
                ob[i] = 0
            else:
                ## Reduce the cars x-positions sample space
                ob[i] = ob[i] // 3

        if reward == 1:
            score += 1
        elif ob[16] == 1:  # Collision!
            reward = -1

        epi.add_step(state, action, reward, score)
        state = ob[RAM_mask]
        action = agent.act(state)  # Next action

    return epi

In [29]:
## MC episodes

from typing import List
def MonteCarloES(RAM_mask: List[int], render: bool=False):
    epi = generate_episode(env, agent, RAM_mask=RAM_mask, render=render)
    return agent.update_policy(epi)

In [30]:
## MC agent (e-greedy)

class MonteCarloControl(Agent):
    def __init__(self, gamma: float, available_actions: int, N0: float):
        self.gamma = gamma
        self.available_actions = available_actions

        self.Q = defaultdict(lambda: np.zeros(available_actions))
        
        # TODO: Are we able to use numpy arrays for `Returns`?
        self.Returns = defaultdict(lambda: defaultdict(list))
        self.pi = defaultdict(lambda: 1)
        self.N0 = N0

    def act(self, ob):
        state = ob.data.tobytes()
        visits_on_state = sum([len(v) for k, v in self.Returns[state].items()])
        epsilon = self.N0 / (self.N0 + visits_on_state)

        ## random
        if np.random.choice(np.arange(2), p=[1 - epsilon, epsilon]):
            return np.random.choice(self.available_actions)  # Explore!
        
        ## greedy
        else:
            return self.pi[state]

    def update_policy(self, episode):
        G = 0
        S = np.array([s for s, _, _, _ in episode])
        A = np.array([a for _, a, _, _ in episode])
        R = np.array([r for _, _, r, _ in episode])

        for t in reversed(range(episode.length - 1)):
            # TODO: add the action to this comment
            # TODO: According to the algorithm I should check if S_t appers in
            #  the sequence S_0, S_1, S_2, ..., S_t-1.
            G = self.gamma * G + R[t + 1]
            self.Returns[S[t]][A[t]].append(G)
            self.Q[S[t]][A[t]] = sum(self.Returns[S[t]][A[t]]) / len(self.Returns[S[t]][A[t]])  # Mean
            self.pi[S[t]] = self.Q[S[t]].argmax()

#         print(f"Pi: {len(pi):8} ", end='')#, Q: {len(Q)}, Returns: {len(Returns)}")

        return episode.get_final_score(), episode.get_total_reward()

In [36]:
from collections import defaultdict

## Getting the info we care about directly from the RAM
RAM_mask = [
      14                                                ## Chicken Y
    , 16                                                ## Chicken Lane Collide
    , 108, 109, 110, 111, 112, 113, 114, 115, 116, 117  ## Car X Coords
]


## initialize the environment
env, initial_state = get_env()


## Monte Carlo agent
agent = MonteCarloControl(gamma=0.95, available_actions=2, N0=0.5)


MonteCarloES(RAM_mask=RAM_mask, render=False)


scores = []
total_rewards = []
n_runs = 3000
for i in range(n_runs):
    render = i % 201 == 200
    score, total_reward = MonteCarloES(RAM_mask=RAM_mask, render=render)
    scores.append(score)
    total_rewards.append(total_reward)
    if i%100==0:
        print(f"Run [{i:3}] - Total reward:{total_reward:7.2f} Mean scores:{sum(scores) / len(scores):.2f} Means Scores[:-10]:{sum(scores[-10:]) / 10:5.2f} Score:{score:2} ")

Run [  0] - Total reward:  -63.00 Mean scores: 11.00 Means Scores[:-10]:  1.10 Score: 11 
Run [100] - Total reward:  -40.00 Mean scores: 11.96 Means Scores[:-10]: 12.10 Score: 12 
Run [200] - Total reward:  -42.00 Mean scores: 12.36 Means Scores[:-10]: 12.90 Score: 14 
Run [300] - Total reward:  -25.00 Mean scores: 12.55 Means Scores[:-10]: 12.20 Score: 15 
Run [400] - Total reward:  -27.00 Mean scores: 12.67 Means Scores[:-10]: 13.50 Score: 14 
Run [500] - Total reward:  -29.00 Mean scores: 12.71 Means Scores[:-10]: 13.50 Score: 14 
Run [600] - Total reward:  -28.00 Mean scores: 12.78 Means Scores[:-10]: 13.10 Score: 15 
Run [700] - Total reward:  -37.00 Mean scores: 12.83 Means Scores[:-10]: 13.70 Score: 14 
Run [800] - Total reward:  -45.00 Mean scores: 12.88 Means Scores[:-10]: 13.70 Score: 12 
Run [900] - Total reward:  -43.00 Mean scores: 12.88 Means Scores[:-10]: 13.00 Score: 12 
Run [1000] - Total reward:  -19.00 Mean scores: 12.90 Means Scores[:-10]: 13.60 Score: 14 
Run [1100

In [37]:
## render 1 episode of MC on colab

display = Display(visible=0,size=(1000,1000))
display.start()


env, initial_state = get_env()
monitor_dir = os.getcwd()
env = wrappers.Monitor(env, monitor_dir, video_callable=lambda ep_id: ep_id%1000 == 0, force=True)
env.reset()

##chose the agent
agent = agents.MonteCarloControl(gamma=0.95, available_actions=2, N0=0.5)
game_over = False
action = agent.act(initial_state)
while not game_over:
    ob, reward, game_over, _ = env.step(action)
    action = agent.act(ob)
env.close()


from IPython.display import HTML
from base64 import b64encode
video = [v for v in os.listdir('./') if 'mp4' in v]
video.sort()
print(len(video))
vid_2 = open(video[0],'rb').read()
data_url_2 = "data:video/mp4;base64," + b64encode(vid_2).decode()
HTML("""
<video width=400 height=400 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url_2)

NameError: ignored

## 2.3. Q-learning (or some variation like DoubleQ-learning)

In [None]:
## Q-learing agent

class QLearning(Agent):
    def __init__(self, alpha: float, gamma: float, available_actions: int, N0: float):
        self.alpha = alpha
        self.gamma = gamma
        self.available_actions = available_actions
        self.N0 = N0

        self.Q = defaultdict(lambda: np.zeros(available_actions))
        self.state_visits = defaultdict(lambda: 0)

    def act(self, state):
        epsilon = self.N0 / (self.N0 + self.state_visits[state])
        self.state_visits[state] += 1

        ## Explore!
        if np.random.choice(np.arange(2), p=[1 - epsilon, epsilon]):
            return np.random.choice(self.available_actions)  
        
        ## Bias toward going forward
        elif self.Q[state].max() == 0.0:
            return 1  
        
        ## Greedy action
        else:
            return self.Q[state].argmax()  

    def update_Q(self, old_state, new_state, action, reward):
        self.Q[old_state][action] = self.Q[old_state][action] + self.alpha * (reward + (self.gamma * self.Q[new_state].max()) - self.Q[old_state][action])

In [None]:
## Getting the info we care about directly from the RAM
RAM_mask = [
      14                                                ## Chicken Y
    , 16                                                ## Chicken Lane Collide
    , 108, 109, 110, 111, 112, 113, 114, 115, 116, 117  ## Car X Coords
]


## initialize the environment
env, initial_state = get_env()


## Q learning agent
agent = agents.QLearning(alpha=0.9, gamma=0.9, available_actions=2, N0=2.5)

scores = []
total_rewards = []
n_runs = 3000
for i in range(n_runs):
    render = i % 101 == 0
    game_over = False
    state = env.reset()[RAM_mask].data.tobytes()  # Select useful bytes
    
    action = agent.act(state)  # TODO: aren't reducing the dimensionality
                               # of the *first* state, but it shouldn't
                               # impact the final result

    
    score = 0
    total_reward = 0

    while not game_over:
        if render:
            time.sleep(0.005)
            env.render()

        old_state = state
        ob, reward, game_over, _ = env.step(action)

        # Doesn't matter where we were hit
        ob[16] = 1 if ob[16] != 255 else 0

        # Reduce chicken y-position
        ob[14] = ob[14] // 3

        # The chicken is in the x-posistion ~49
        # We don't need to represent cars far from the chicken
        for b in range(108, 118):
            if ob[b] < 20 or ob[b] > 80:
                ob[b] = 0
            else:
                # Reduce the cars x-positions sample space
                ob[b] = ob[b] // 3

        if reward == 1:
            score += 1
        elif ob[16] == 1:  # Collision!
            reward = -1

        total_reward += reward
        state = ob[RAM_mask].data.tobytes()
        agent.update_Q(old_state, state, action, reward)

        action = agent.act(state)  # Next action

    scores.append(score)
    total_rewards.append(total_reward)

    print(f"Run [{i:3}] - Total reward: {total_reward:7.2f} Mean scores: {sum(scores) / len(scores):.2f} Means Scores[:-10]: {sum(scores[-10:]) / len(scores[-10:]):5.2f} Score: {score:2} ")

## 2.4. SARSA ($\lambda$)

## 2.5. Linear function approximator

# 3. Evaluation

The system must be evaluated according to the quality of the solutions found and a critical evaluation is expected on the relationship between adopted parameters x solution performance. Graphs and tables representing the evolution of the solutions are expected. Additional comparisons with the literature are welcome, although they are not mandatory.

## 3.1. Computational cost

## 3.2. Optimality

## 3.3. Influence of reward function

## 3.4. State and action space sizes

# 4. Discussion

## 4.1. The advantages and disadvantages of bootstrapping in your problem

## 4.2. How the reward function influenced the quality of the solution? Was your group able to achieve the expected policy given the reward function defined?

## 4.3. How function approximation influenced the results? What were the advantages and disadvantages of using it in your problem?