In [None]:
!pip install git+https://gitlab-research.centralesupelec.fr/stergios.christodoulidis/text-flappy-bird-gym.git
#code was run in google collab as text flappy conflict with existing gym version

In [None]:
import os, sys
import gymnasium as gym
import time
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
import text_flappy_bird_gym


In [None]:
env=gym.make('TextFlappyBird-v0', height = 15, width = 20, pipe_gap = 4)
obs = env.reset()
seed=420
np.random.seed(seed=seed)

In [None]:
temp=(0,0)

#this code showcase that for some reasons, env.step can return something outside of the observation spaces, which is strange and prevents dictionnary initializations.
count=0

while env.observation_space.contains(temp) and count<1000:
  action=env.action_space.sample()
  temp,reward,done,_,info=env.step(action)
  print(temp)
  count+=1
print('count',count)


In [None]:
print(env.observation_space,env.action_space) #from that we derive the dimensions (can be adapted make env dependant )

state_dim=(14,22)
state_off=11
action_dim=2

Below the $\lambda$ SARSA agent

In [None]:
class LambdaSarsaAgent:
    def __init__(self,state_dim, state_off, actions_dim, alpha=0.02, gamma=0.99, epsilon=0.1, lambda_val=0.1):
        self.actions_dim = actions_dim
        self.state_dim=state_dim
        self.offset=state_off-1
        self.q_table = np.random.uniform(low=-1,high=1,size=(state_dim[0],state_dim[1],action_dim))
        self.e_trace = np.zeros((state_dim[0],state_dim[1],action_dim))
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate for epsilon-greedy policy
        self.lambda_val = lambda_val  # Eligibility trace decay


    def get_epsilon_greedy_action(self, state):
        """Select an action using an epsilon-greedy policy."""
        best_action = np.argmax(self.q_table[state[0], state[1] + self.offset])

        if np.random.uniform(0, 1) < self.epsilon:
            return np.random.choice(self.actions_dim)  # Random action (explore)
        return best_action  # Greedy action (exploit)

    def get_greedy_action(self, state):
        """Select the best action (greedy policy, no exploration)."""
        return np.argmax(self.q_table[state[0], state[1] + self.offset])

    def update(self, state, action, reward, next_state, next_action, done):

        delta=reward+(0 if done else self.gamma*self.q_table[next_state[0],next_state[1]+self.offset,next_action]-self.q_table[state[0],state[1]+self.offset,action])
        self.e_trace[state[0],state[1]+self.offset,action]+=1# replacing the trace

        self.q_table+=self.alpha*delta*self.e_trace
        self.e_trace*=self.gamma*self.lambda_val

    def train(self,env,episodes=20000,verbose=False):
      episode_reward=[]
      for episode in range(episodes):
          if verbose and episode %1000==0:
            print(episode)
          state = env.reset()[0]
          action = self.get_epsilon_greedy_action(state) #we train w/ epsilon greedy policy
          done = False
          total_reward = 0


          while not done:
              next_state, reward, done, _,info = env.step(action)
              next_action =self.get_epsilon_greedy_action(next_state) if not done else None
              self.update(state, action, reward, next_state, next_action, done)
              state, action = next_state, next_action
              total_reward += reward

          episode_reward.append(total_reward)

          #print(f"Episode {episode+1}: Total Reward = {total_reward}")
      return episode_reward


    def reset_traces(self):
        self.e_trace *=0

In [None]:
class MonteCarloAgent:
    def __init__(self, state_space,state_offset, action_dim, alpha=0.01,gamma=0.99, epsilon=0.1):
        self.state_space = state_space
        self.offset=state_offset-1
        self.action_space = action_dim
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration probability
        self.alpha=alpha

        # Initialize Q-table and Returns storage
        self.q_table = np.random.uniform(low=-1,high=1,size=(self.state_space[0], self.state_space[1], self.action_space))


    def get_epsilon_greedy_action(self, state):
        """Select an action using an epsilon-greedy policy."""
        best_action = np.argmax(self.q_table[state[0], state[1] + self.offset])

        if np.random.uniform(0, 1) < self.epsilon:
            return np.random.choice(self.action_space)  # Random action (explore)
        return best_action  # Greedy action (exploit)

    def get_greedy_action(self, state):
        """Select the best action (greedy policy, no exploration)."""
        return np.argmax(self.q_table[state[0], state[1] + self.offset])


    def update(self, episode_data):
        """Monte Carlo update after an episode"""
        G = 0  # Initialize return
        visited = set()

        for t in reversed(range(len(episode_data))):
            state, action, reward = episode_data[t]
            G = reward + self.gamma * G  # Discounted return

            if (state, action) not in visited:
                visited.add((state, action))
                self.q_table[state[0],state[1]+self.offset, action] += self.alpha * (G - self.q_table[state[0],state[1]+self.offset, action])

    def train(self,env, episodes=20000,verbose=False):
        """Train agent using Monte Carlo Control"""
        rewards=[]
        for episode in range(episodes):

            if verbose and episode %1000==0:
              print(episode)
            state =env.reset()[0]  # Reset environment
            episode_data = []
            total_reward=0

            done = False
            while not done:
                action = self.get_epsilon_greedy_action(state)

                next_state,reward,done,_,info=env.step(action)


                episode_data.append((state, action, reward))
                total_reward+=reward
                state = next_state

            self.update(episode_data)  # Update Q-values
            rewards.append(total_reward)

        print("Training complete!")
        return rewards



In [None]:
Sarsa=LambdaSarsaAgent(state_dim,state_off,action_dim)

sarsa_rewards=Sarsa.train(env,verbose=True)

In [None]:
def exponential_moving_average(data, alpha=0.05):
    ema = [data[0]]
    for i in range(1, len(data)):
        ema.append(alpha * data[i] + (1 - alpha) * ema[-1])
    return np.array(ema)



In [None]:
from scipy.ndimage import uniform_filter1d #different way to smooth out
from scipy.signal import savgol_filter

plt.plot(exponential_moving_average(sarsa_rewards)[::100]) #EMA Visualy looks the best, plot every100 point so that it doesnt looks as cluttered
legend_text = f"Max: {np.max(sarsa_rewards)}\nMean: {np.mean(sarsa_rewards):.2f}\nStd: {np.std(sarsa_rewards):.2f}"
plt.legend([legend_text], loc='lower right', fontsize=10, frameon=True)
plt.title("Smoothed Sarsa reward over training over training")
plt.xlabel("Episode")
plt.ylabel("Length of Episode")
plt.savefig('SARSAtrainig.png')

In [None]:
mcagent=MonteCarloAgent(state_space=state_dim,state_offset=state_off,action_dim=action_dim)
mc_rewards=mcagent.train(env,verbose=True)

In [None]:
plt.plot(exponential_moving_average(mc_rewards)[::100]) #EMA Visualy looks the best
legend_text = f"Max: {np.max(mc_rewards)}\nMean: {np.mean(mc_rewards):.2f}\nStd: {np.std(mc_rewards):.2f}"
plt.legend([legend_text], loc='lower right', fontsize=10, frameon=True)
plt.title("Smoothed  MC reward over training over training")
plt.xlabel("Episode")
plt.ylabel("Length of Episode")
plt.savefig('MCtrainig.png')

Play function

In [None]:
def play(agent, env_param=(15,20,4), runs=10,verbose=False): #(15,20,4) is the env param on which both agents are trained
    """Plays one episode using a greedy policy and averages rewards over multiple runs."""
    total_rewards = []
    h,w,pg=env_param
    env=gym.make('TextFlappyBird-v0', height = h, width = w, pipe_gap = pg)
    print(env.observation_space)
    for i in range(runs):
        if verbose:
          print(i)

        state = env.reset()[0] # Reset environment
        done = False
        episode_reward = 0
        count=0 #maximum play time

        while not done and count<2000:
            action = agent.get_greedy_action(state)  # we play with greedy policy
            next_state, reward, done, _,info = env.step(action)
            episode_reward += reward
            state = next_state
            count+=1

        total_rewards.append(episode_reward)  # Store reward for this run

    avg_reward = np.mean(total_rewards)  # Compute average reward over runs
    print(f"Average Reward over {runs} runs: {avg_reward:.2f}")

    return avg_reward


Here we test how does the agent perform on different env parameters

In [None]:
import itertools
sarsa_play_logs={}
hs=[15] #changing the env changes the observation spaces param, so handpicked so that it doesn't creates a to big space
ws=[15,20]
pgs=[2,4,6,8]
for (h,w,pg)in itertools.product(hs,ws,pgs):
    avg_reward=play(Sarsa,(h,w,pg))
    sarsa_play_logs[(h,w,pg)]=avg_reward

In [None]:
import itertools
mc_play_logs={}
hs=[15]
ws=[15,20]
pgs=[2,4,6,8]
for (h,w,pg)in itertools.product(hs,ws,pgs):
    avg_reward=play(mcagent,(h,w,pg))
    sarsa_play_logs[(h,w,pg)]=avg_reward

We plot the Q values heatmaps for both agents


In [None]:

import seaborn as sns

def plot_side_by_side_heatmaps(array, name,offset=0):
    fig, axes = plt.subplots(1, 2, figsize=(12, 6))

    for i in range(2):
        sns.heatmap(array[:, :, i].T, fmt=".2f", cmap="coolwarm", linewidths=0.5, ax=axes[i])
        axes[i].set_title(f"Q_values heatmap for actions {i}")
        axes[i].set_xlabel("x")
        axes[i].set_ylabel("y")
        axes[i].set_yticklabels(np.arange(array.shape[1]) - offset)

    plt.tight_layout()
    plt.savefig(f'{name} agent Q value heatmap on default env.png',dpi=300)
    plt.show()
     #save fig to local dir



In [None]:
plot_side_by_side_heatmaps(Sarsa.q_table,'sarsa',offset=Sarsa.offset)

In [None]:
plot_side_by_side_heatmaps(mcagent.q_table,'mcagent',offset=mcagent.offset)

Now we conduct parameters sweeps

In [None]:
env=gym.make('TextFlappyBird-v0', height = 15, width = 20, pipe_gap = 4)
print(env.observation_space,env.action_space) #from that we derive the dimensions (can be adapted make env dependant )

state_dim=(14,22)
state_off=11
action_dim=2

In [None]:
def plot_heatmap(results, xticks, yticks,xlabel,ylabel,title):
    """
    Plots a heatmap of agent performance with lambda and alpha varying.

    :param results: A 2D NumPy array where results[i, j] corresponds to the performance metric
                    for lambda=lambdas[i] and alpha=alphas[j].
    :param lambdas: List of lambda values (Y-axis).
    :param alphas: List of alpha values (X-axis).
    """
    plt.figure(figsize=(8, 6))
    sns.heatmap(results, xticklabels=xticks, yticklabels=yticks, annot=True, cmap="coolwarm", fmt=".2f")

    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.savefig(f'{title}.png',dpi=300)
    plt.show()



Parameters sweep for Sarsa agent

In [None]:

lambdas=[0.1,0.3,0.5]
alphas=[0.01,0.5,0.1]


res=np.zeros((len(lambdas),len(alphas)))
for i in range(len(lambdas)):
  for j in range(len(alphas)):
    print(i,j)
    lbd,alpha=lambdas[i],alphas[j]
    sarsaagent=LambdaSarsaAgent(state_dim,state_off,action_dim,alpha=alpha,lambda_val=lbd)
    sarsaagent.train(env)
    playrewards=play(sarsaagent)
    res[i,j]=playrewards

plot_heatmap(res,lambdas,alphas,'lambdas','alphas',' Sarsa Average greedy play time alpha')

In [None]:

lambdas=[0.1,0.3,0.5]
epsilons=[0.05,0.1,0.3]


res=np.zeros((len(lambdas),len(epsilons)))
for i in range(len(lambdas)):
  for j in range(len(epsilons)):
    print(i,j)
    lbd,alpha=lambdas[i],epsilons[j]
    sarsaagent=LambdaSarsaAgent(state_dim,state_off,action_dim,alpha=alpha,lambda_val=lbd)
    sarsaagent.train(env)
    playrewards=play(sarsaagent)
    res[i,j]=playrewards

plot_heatmap(res,lambdas,alphas,'lambdas','epsilons',' Sarsa Average greedy play time epsilon')

In [None]:

alphas=[0.01,0.1,0.3,0.5]
epsilons=[0.05,0.1,0.2,0.3]


res=np.zeros((len(alphas),len(epsilons)))
for i in range(len(alphas)):
  for j in range(len(epsilons)):
    print(i,j)
    alpha,epsilon=alphas[i],epsilons[j]
    mcagent=MonteCarloAgent(state_space=state_dim,state_offset=state_off,action_dim=action_dim,alpha=alpha,epsilon=epsilon)
    mcagent.train(env)
    playrewards=play(mcagent)
    res[i,j]=playrewards

plot_heatmap(res,alphas,epsilons,'alphas','epsilons',' MC Average greedy play time')