Emilien Biré

# Imports and utils

In [None]:
from collections import defaultdict
import os
import sys
import time

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from mpl_toolkits.mplot3d import Axes3D


import text_flappy_bird_gym

In [None]:
np.random.seed(12)

In [None]:
def plot_q_function(Q_dict):
    """A function to plot a Q function
    """
    x = [k[0] for k in Q_dict.keys()]
    y = [k[1] for k in Q_dict.keys()]
    q_values = np.array([np.max(v) for v in Q_dict.values()])
    fig = plt.figure(figsize=(8,6))
    ax = fig.add_subplot(111, projection='3d')

    # Create a surface plot
    ax.plot_trisurf(x, y, q_values, cmap='viridis', edgecolor='none')

    # Labels
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Q-values')
    ax.set_title('Q-value Surface')

    plt.show()

# The environment

In [None]:
env = gym.make('TextFlappyBird-v0', height = 15, width = 20, pipe_gap = 4)
obs,_ = env.reset()
print(env.observation_space)
print(env.action_space)

# MC control agent

I took the liberty to copy the code from TP4, since it's the same framework.

In [None]:
def generate_episode_from_Q(env, Q, epsilon, nA):
    """ generates an episode from following the epsilon-greedy policy """
    episode = []
    state,_ = env.reset()
    while True:
        action = np.random.choice(np.arange(nA), p=get_probs(Q[state], epsilon, nA)) \
                                    if state in Q else env.action_space.sample()
        # take a step in the environement 
        next_state, reward, done, info,_ = env.step(action)
        episode.append((state, action, reward))
        state = next_state
        if done:
            break
    return episode

def get_probs(Q_s, epsilon, nA):
    """ obtains the action probabilities corresponding to epsilon-greedy policy """
    policy_s = np.ones(nA) * (epsilon / nA)
    best_a = np.argmax(Q_s)
    policy_s[best_a] = 1 - epsilon + (epsilon / nA)
    return policy_s

def update_Q(env, episode, Q, alpha, gamma):
    """ updates the action-value function estimate using the most recent episode """
    states, actions, rewards = zip(*episode)
    # prepare for discounting
    discounts = np.array([gamma**i for i in range(len(rewards)+1)])
    rewards = np.array(rewards)
    for i, state in enumerate(states):
        g = np.array(rewards)[i:,None].T.dot(discounts[:-i-1,None])[0][0]
        old_Q = Q[state][actions[i]]
        Q[state][actions[i]] = old_Q + alpha * (g - old_Q)
    return Q

In [None]:
def evaluate_MCC(env, Q):
    """A function that evaluate a trained MCC agent
    """
    tot_rewards = []
    nA = env.action_space.n
    for i_episode in range(500):
        state,_ = env.reset()
        r = 0
        while True:
            action = np.random.choice(np.arange(nA), p=get_probs(Q[state], 0, nA)) \
                                if state in Q else env.action_space.sample()
            state, reward, done, _, info = env.step(action)
            r+=reward
            # If player is dead break
            if done or r>500: #We set a max reward 
                tot_rewards.append(r)
                r=0
                break

    env.close()

    return tot_rewards

In [None]:
def mc_control(env, num_episodes, alpha, gamma=1.0, epsilon = 0.1, epsilon_end=0.01, plot = False):
    nA = env.action_space.n
    # initialize empty dictionary of arrays
    Q = defaultdict(lambda: np.zeros(nA))
    performances = {}
    # loop over episodes
    
    for i_episode in range(1, num_episodes+1):
        # monitor progress
        # cur_eps = epsilon_end * (i_episode/num_episodes) + (1 - (i_episode/num_episodes)) * epsilon
        if i_episode % 100 == 0:
            print("\rEpisode {}/{}. Eps={}".format(i_episode, num_episodes,epsilon), end="")
            policy = dict((k,np.argmax(v)) for k, v in Q.items())
            tot_rewards = evaluate_MCC(env, Q)
            
            performances[str(i_episode)] = tot_rewards
            sys.stdout.flush()
        
        if plot and i_episode in [1000, num_episodes//2, num_episodes]:
            plot_q_function(Q)
        # set the value of epsilon
        # generate an episode by following epsilon-greedy policy
        
        episode = generate_episode_from_Q(env, Q, epsilon, nA)
        # update the action-value function estimate using the episode
        Q = update_Q(env,episode, Q, alpha, gamma)
    # determine the policy corresponding to the final action-value function estimate
    policy = dict((k,np.argmax(v)) for k, v in Q.items())
    return policy, Q, performances

## Evaluating alpha (step size)

In [None]:
for a in [0.001,0.003,0.005,0.01]:
    print("Alpha=",a)
    policy, Q, training_perfs = mc_control(env, 30000, a, epsilon=0.2)
    training_steps = [int(k) for k in training_perfs.keys()]
    mean_rewards = []
    std_rewards = []
    for rewards in training_perfs.values():
        mean_rewards.append(np.array(rewards).mean())
        std_rewards.append(np.array(rewards).std())

    plt.plot(training_steps, mean_rewards, label="Alpha="+str(a))

plt.legend()
plt.grid("on")
plt.show()

## Evaluating epsilon


In [None]:
for eps in [0.01, 0.05,0.1,0.2]:
    print("Eps=",eps)
    policy, Q, training_perfs = mc_control(env, 30000, alpha=0.004, epsilon = eps)
    training_steps = [int(k) for k in training_perfs.keys()]
    mean_rewards = []
    std_rewards = []
    for rewards in training_perfs.values():
        mean_rewards.append(np.array(rewards).mean())
        std_rewards.append(np.array(rewards).std())

    plt.plot(training_steps, mean_rewards, label="Eps="+str(eps))

plt.legend()
plt.grid("on")
plt.show()

## Best MCC

In [None]:
best_mcc_policy, Q, mcc_best_performances = mc_control(env, 30000, alpha = 0.05, epsilon = 0.05, plot = True)
training_steps = [int(k) for k in mcc_best_performances.keys()]
mean_rewards = []
std_rewards = []
for rewards in mcc_best_performances.values():
    mean_rewards.append(np.array(rewards).mean())
    std_rewards.append(np.array(rewards).std())

plt.plot(training_steps, mean_rewards)

plt.legend()
plt.grid("on")
plt.show()

# SARSA

In [None]:
def evaluate_SARSA(env, agent):
    """A function to evaluate a trained SARSA agent
    """
    tot_rewards = []

    for i_episode in range(500):
        state,_ = env.reset()
        first_action = agent.agent_start(state)
        state, reward, done, _, info = env.step(first_action)
        r = reward
        while True:
            next_action = agent.agent_step(reward,state, training = False)
            state, reward, done, _, info = env.step(next_action)
            r+=reward
            # If player is dead break
            if done or r>500: #We set a max reward
                tot_rewards.append(r)
                r=0
                break

    env.close()

    return tot_rewards

In [None]:
class SarsaAgent():
    def agent_init(self, agent_init_info):
        """Setup for the agent called when the experiment first starts.
        
        Args:
        agent_init_info (dict), the parameters used to initialize the agent. The dictionary contains:
        {
            num_states (int): The number of states,
            num_actions (int): The number of actions,
            epsilon (float): The epsilon parameter for exploration,
            step_size (float): The step-size,
            discount (float): The discount factor,
        }
        
        """
        # Store the parameters provided in agent_init_info.
        self.num_actions = agent_init_info["num_actions"]
        self.num_states = agent_init_info["num_states"]
        self.epsilon = agent_init_info["epsilon"]
        self.epsilon_end = agent_init_info["epsilon_end"]
        self.step_size = agent_init_info["step_size"]
        self.discount = agent_init_info["discount"]
        self.rand_generator = np.random.RandomState(agent_init_info["seed"])

        # Create an array for action-value estimates and initialize it to zero.
        self.q = defaultdict(lambda: np.zeros(self.num_actions))
        self.current_eps = self.epsilon

    def update_eps(self, iteration, max_iteration):
        self.current_eps = self.epsilon #* (1 - iteration/max_iteration) + self.epsilon_end * (iteration/max_iteration)

        
    def agent_start(self, state):
        """The first method called when the episode starts, called after
        the environment starts.
        Args:
            state (int): the state from the
                environment's evn_start function.
        Returns:
            action (int): the first action the agent takes.
        """
        
        # Choose action using epsilon greedy.
        current_q = self.q[state]
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions)
        else:
            action = self.argmax(current_q)
        self.prev_state = state
        self.prev_action = action
        return action
    
    def agent_step(self, reward, state, training = True):
        """A step taken by the agent.
        Args:
            reward (float): the reward received for taking the last action taken
            state (int): the state from the
                environment's step based on where the agent ended up after the
                last step.
        Returns:
            action (int): the action the agent is taking.
        """
        
        # Choose action using epsilon greedy.
        current_q = self.q[state]
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions)
        else:
            action = self.argmax(current_q)

        if training:
            probs = get_probs(self.q[state], self.epsilon, self.num_actions)
            
            expected_q = sum([probs[a] * self.q[state][a] for a in range(self.num_actions)])
            
            self.q[self.prev_state][self.prev_action] = self.q[self.prev_state][self.prev_action] \
                + self.step_size * (reward + self.discount * expected_q - self.q[self.prev_state][self.prev_action])
        
        self.prev_state = state
        self.prev_action = action
        return action
    
    def agent_end(self, reward):
        """Run when the agent terminates.
        Args:
            reward (float): the reward the agent received for entering the
                terminal state.
        """

        self.q[self.prev_state][self.prev_action] = self.q[self.prev_state][self.prev_action] \
            + self.step_size * (reward  - self.q[self.prev_state][self.prev_action])
        
    def argmax(self, q_values):
        """argmax with random tie-breaking
        Args:
            q_values (Numpy array): the array of action-values
        Returns:
            action (int): an action with the highest value
        """
        top = float("-inf")
        ties = []

        for i in range(len(q_values)):
            if q_values[i] > top:
                top = q_values[i]
                ties = []

            if q_values[i] == top:
                ties.append(i)

        return self.rand_generator.choice(ties)

In [None]:
def sarsa(env, num_episodes,agent_init_infos, plot=False):
    agent = SarsaAgent()
    agent.agent_init(agent_init_infos)
    performances = {}
    # loop over episodes
    for i_episode in range(1, num_episodes+1):
        # monitor progress
        agent.update_eps(i_episode, num_episodes)
        if i_episode % 100 == 0:
            print("\rEpisode {}/{}.".format(i_episode, num_episodes), end="")
            tot_rewards = evaluate_SARSA(env, agent)
            performances[str(i_episode)] = tot_rewards
            sys.stdout.flush()
        if plot == True and i_episode in [1000,num_episodes//2,num_episodes]:
            plot_q_function(agent.q)
                
        state,_ = env.reset()
        first_action = agent.agent_start(state)
        state, reward, done, _, info = env.step(first_action)
        r = reward
        while True:
            next_action = agent.agent_step(reward,state, training = True)
            state, reward, done, _, info = env.step(next_action)
            # If player is dead break
            r+=reward
            if done or r>500:
                agent.agent_end(reward)
                break
        
        
    # determine the policy corresponding to the final action-value function estimate
    return agent, performances

## Evaluating for eps

In [None]:
for eps in [0.01,0.05,0.1,0.2]:
    print("Eps=", eps)
    agent, performances = sarsa(env,30000,{
        "num_actions": env.action_space.n,
        "num_states": env.observation_space[0].n * env.observation_space[1].n,
        "epsilon": eps,
        "epsilon_end": 0,
        "step_size": 0.1,
        "discount": 1.0,
        "seed" : 0, 

    })
    training_steps = [int(k) for k in performances.keys()]
    mean_rewards = []
    std_rewards = []
    for rewards in performances.values():
        mean_rewards.append(np.array(rewards).mean())
        std_rewards.append(np.array(rewards).std())

    plt.plot(training_steps, mean_rewards, label = "Eps=" +str(eps))
plt.legend()
plt.grid("on")
plt.show()

## Evaluating Alpha (step size)

In [None]:
for step in [0.05,0.1,0.2,0.3]:
    print("Step size=",step)
    agent, performances = sarsa(env,30000,{
        "num_actions": env.action_space.n,
        "num_states": env.observation_space[0].n * env.observation_space[1].n,
        "epsilon": 0.01,
        "epsilon_end": 0,
        "step_size": step,
        "discount": 1.0,
        "seed" : 0, 

    })
    training_steps = [int(k) for k in performances.keys()]
    mean_rewards = []
    std_rewards = []
    for rewards in performances.values():
        mean_rewards.append(np.array(rewards).mean())
        std_rewards.append(np.array(rewards).std())

    plt.plot(training_steps, mean_rewards, label = "Step=" +str(step))
plt.legend()
plt.grid("on")
plt.show()

In [None]:
best_sarsa_agent, sarsa_best_performances = sarsa(env,30000,{
        "num_actions": env.action_space.n,
        "num_states": env.observation_space[0].n * env.observation_space[1].n,
        "epsilon": 0.01,
        "epsilon_end": 0,
        "step_size": 0.3,
        "discount": 1.0,
        "seed" : 0, 

    }, plot = True)
training_steps = [int(k) for k in sarsa_best_performances.keys()]
mean_rewards = []
std_rewards = []
for rewards in sarsa_best_performances.values():
    mean_rewards.append(np.array(rewards).mean())
    std_rewards.append(np.array(rewards).std())

plt.plot(training_steps, mean_rewards)

In [None]:
def plot_training_performances(training_perfs_list, labels):
    for training_perfs,label in zip(training_perfs_list,labels):
        training_steps = [int(k) for k in training_perfs.keys()]
        mean_rewards = []
        std_rewards = []
        for rewards in training_perfs.values():
            mean_rewards.append(np.array(rewards).mean())
            std_rewards.append(np.array(rewards).std())
        
        plt.plot(training_steps, mean_rewards, label =label)
        
    plt.grid("on")
    plt.legend()
    plt.tight_layout()
    plt.show()

# Comparing best MCC and best SARSA

In [None]:
plot_training_performances([sarsa_best_performances, mcc_best_performances], ["SARSA", "MCC"])

# Different Environments

The goal here is to test how our best models behave on different environment sizes (height, width, and pipe holes)

In [None]:
pipe_gaps = [1,2,3,4,5,6]
env_hight = [9,15,20,30]
env_width = [12,20,30,50]

## Modifying the screen size

In [None]:
scores_sarsa = []
scores_mcc = []
config = []
for h,w in zip(env_hight,env_width):
    print(h,w)
    env_hw = gym.make('TextFlappyBird-v0', height = h, width = w, pipe_gap = 4)
    scores_sarsa.append(np.mean(evaluate_SARSA(env_hw, best_sarsa_agent)))
    scores_mcc.append(np.mean(evaluate_MCC(env_hw,best_mcc_policy)))
    config.append(str((h,w)))

plt.plot(config, scores_sarsa, label = "SARSA", marker="o")
plt.plot(config, scores_mcc, label = "MCC", marker="o")
plt.grid("on")
plt.legend()


## Modifying the pipe gap

In [None]:
scores_sarsa = []
scores_mcc = []
config = []
for p_g in pipe_gaps:
    env_pg = gym.make('TextFlappyBird-v0', height = 15, width = 20, pipe_gap = p_g)
    scores_sarsa.append(np.mean(evaluate_SARSA(env_pg, best_sarsa_agent)))
    scores_mcc.append(np.mean(evaluate_MCC(env_pg,best_mcc_policy)))
    config.append(p_g)

plt.plot(config, scores_sarsa, label = "SARSA", marker="o")
plt.plot(config, scores_mcc, label = "MCC", marker="o")
plt.grid("on")
plt.yscale("log")
plt.legend()