In [38]:
# This script implements a Text Flappy Bird Reinforcement Learning setup
# It merges improved training methods with a cleaner visualization approach

# Install necessary libraries and the custom environment
!pip install gymnasium numpy matplotlib
!pip install git+https://gitlab-research.centralesupelec.fr/stergios.christodoulidis/text-flappy-bird-gym.git

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import random
import pickle
from collections import defaultdict
import time
from IPython.display import clear_output
import warnings
warnings.filterwarnings('ignore')

# Import the custom Flappy Bird text environment
import text_flappy_bird_gym

# Set random seeds to keep results reproducible
np.random.seed(42)
random.seed(42)

Collecting git+https://gitlab-research.centralesupelec.fr/stergios.christodoulidis/text-flappy-bird-gym.git
  Cloning https://gitlab-research.centralesupelec.fr/stergios.christodoulidis/text-flappy-bird-gym.git to /private/var/folders/jj/5zz72tmj3wd9r01_nzqg4mv40000gn/T/pip-req-build-5s7iepko
  Running command git clone --filter=blob:none --quiet https://gitlab-research.centralesupelec.fr/stergios.christodoulidis/text-flappy-bird-gym.git /private/var/folders/jj/5zz72tmj3wd9r01_nzqg4mv40000gn/T/pip-req-build-5s7iepko
  Resolved https://gitlab-research.centralesupelec.fr/stergios.christodoulidis/text-flappy-bird-gym.git to commit ca2797e9270195313423324c9d0f205f6cbb3d28
  Preparing metadata (setup.py) ... [?25ldone


In [39]:
def refine_state_view(screen):
    """
    Convert the raw screen array into a more compact, meaningful state representation.
    This helps the agent focus on critical information for the Flappy Bird environment.
    """
    # Locate the bird
    bird_positions = np.where(screen == 1)
    if len(bird_positions[0]) > 0:
        bird_x, bird_y = bird_positions[0][0], bird_positions[1][0]
    else:
        bird_x, bird_y = 0, 0

    # Locate the pipes
    pipe_positions = np.where(screen == 2)

    # Construct a simplified view focusing on nearby pipes
    if len(pipe_positions[0]) > 0:
        pipe_x_coords = pipe_positions[0]
        right_pipes = pipe_x_coords[pipe_x_coords > bird_x]

        if len(right_pipes) > 0:
            closest_pipe_x = np.min(right_pipes)
            matching_y_coords = pipe_positions[1][pipe_x_coords == closest_pipe_x]

            if len(matching_y_coords) > 0:
                sorted_positions = np.sort(matching_y_coords)
                if len(sorted_positions) > 1:
                    # Identify the largest gap in the pipe
                    vertical_gaps = np.diff(sorted_positions)
                    biggest_gap_index = np.argmax(vertical_gaps)

                    gap_bottom = sorted_positions[biggest_gap_index]
                    gap_top = sorted_positions[biggest_gap_index + 1]
                    gap_center = (gap_bottom + gap_top) // 2

                    # Calculate distances for state representation
                    horizontal_gap = closest_pipe_x - bird_x
                    vertical_gap = gap_center - bird_y

                    # Bin horizontal distance with a cap
                    if horizontal_gap <= 3:
                        horizontal_bin = horizontal_gap
                    else:
                        horizontal_bin = 3 + (horizontal_gap - 3) // 2
                    horizontal_bin = min(8, horizontal_bin)

                    # Bin vertical distance more precisely
                    vertical_bin = max(-4, min(4, vertical_gap))

                    # Player's absolute vertical location for edge considerations
                    bird_centered_position = bird_y - (screen.shape[1] // 2)
                    bird_centered_position = min(2, max(-2, bird_centered_position))

                    return (horizontal_bin, vertical_bin, bird_centered_position)
                else:
                    # Partial pipe data was found
                    return (
                        min(8, (closest_pipe_x - bird_x)),
                        0,
                        min(2, max(-2, bird_y - screen.shape[1] // 2))
                    )
            else:
                # No valid pipe y-coords found
                return (min(8, 10), 0, min(2, max(-2, bird_y - screen.shape[1] // 2)))
        else:
            # All pipes are to the left, so it's presumably open
            return (8, 0, min(2, max(-2, bird_y - screen.shape[1] // 2)))
    else:
        # No pipes are visible yet
        return (8, 0, min(2, max(-2, bird_y - screen.shape[1] // 2)))


def show_game_progress(env, reward=None, episode=None, step=None,
                       total_reward=None, agent_type=None, action=None):
    """
    Render the textual Flappy Bird environment with optional status details.
    Kept here for demonstration replays, but removed from training loops.
    """
    from IPython.display import clear_output
    clear_output(wait=True)
    game_state_str = env.render()

    # Collect optional info
    extra_info = []
    if episode is not None:
        extra_info.append(f"Episode: {episode}")
    if step is not None:
        extra_info.append(f"Step: {step}")
    if reward is not None:
        extra_info.append(f"Reward: {reward}")
    if total_reward is not None:
        extra_info.append(f"Accumulated Score: {total_reward}")
    if agent_type is not None:
        extra_info.append(f"Agent: {agent_type}")
    if action is not None:
        extra_info.append(f"Action Taken: {'Flap' if action == 1 else 'Idle'}")

    if extra_info:
        game_state_str += "\n" + " | ".join(extra_info)

    print(game_state_str)
    print("-" * 50)
    time.sleep(0.2)  # Slight delay for readability


def plot_training_progress(scores, current_episode, label=""):
    """
    Plots the training scores (rewards) live after each episode.
    """
    clear_output(True)  # Clears the previous plot
    plt.figure(figsize=(8, 4))
    plt.plot(scores, label=f'{label} Rewards')
    plt.title(f"Training Progress - Episode {current_episode}")
    plt.xlabel("Episode")
    plt.ylabel("Score")
    plt.grid(True)
    if label:
        plt.legend()
    plt.show()


In [40]:
class MonteCarloLearner:
    """
    A Monte Carlo approach to learn a policy for the Text Flappy Bird game.
    Uses first-visit MC with optimistic initialization.
    """

    def __init__(self, action_space_size, discount_factor=0.99, exploration_prob=0.2, initial_q_val=15.0):
        self.action_space_size = action_space_size
        self.discount_factor = discount_factor
        self.exploration_prob = exploration_prob
        self.min_epsilon = 0.001
        self.epsilon_decay = 0.995

        # Optimistic Q-array
        self.Q = defaultdict(lambda: np.ones(action_space_size) * initial_q_val)
        self.return_map = defaultdict(lambda: defaultdict(list))

    def choose_action(self, state, allow_explore=True):
        """
        Select an action by epsilon-greedy strategy.
        """
        if allow_explore and random.random() < self.exploration_prob:
            return random.randint(0, self.action_space_size - 1)
        else:
            return np.argmax(self.Q[state])

    def adapt_exploration(self, current_episode, total_episodes):
        """
        Adjust epsilon over time depending on training progress.
        """
        if current_episode < total_episodes * 0.1:
            self.exploration_prob = 0.5
        elif current_episode < total_episodes * 0.3:
            self.exploration_prob = 0.3
        else:
            self.exploration_prob = max(
                self.min_epsilon,
                0.2 * np.exp(-3.0 * current_episode / total_episodes)
            )

    def train_agent(self, env, num_episodes=1500, max_steps=1000):
        """
        Train using Monte Carlo (first-visit). Episodes are generated and
        Q-values are updated after each complete trajectory.
        """
        episode_score_log = []
        top_score = -float('inf')

        for ep in range(1, num_episodes + 1):
            # Update epsilon
            self.adapt_exploration(ep, num_episodes)

            # Generate a single trajectory (no console game printing)
            state_log, action_log, reward_log = self.create_episode_trajectory(
                env,
                max_steps=max_steps
            )

            # Compute returns from the end to the beginning
            G = 0
            returns_list = []
            for r in reversed(reward_log):
                G = self.discount_factor * G + r
                returns_list.insert(0, G)

            # First-visit MC update
            visited_pairs = set()
            for idx, (obs, act) in enumerate(zip(state_log, action_log)):
                if (obs, act) not in visited_pairs:
                    visited_pairs.add((obs, act))
                    self.return_map[obs][act].append(returns_list[idx])
                    self.Q[obs][act] = np.mean(self.return_map[obs][act])

            total_reward = sum(reward_log)
            episode_score_log.append(total_reward)

            # Live update plot each episode
            plot_training_progress(episode_score_log, ep, label="MC")

            # Save model if this episode is the best so far
            if total_reward > top_score:
                top_score = total_reward
                self.save_model('monte_carlo_agent_best.pkl')
                print(f"[MC] New best score: {top_score} (Episode {ep})")

            # Print progress at some intervals or final
            if ep % 100 == 0 or ep == num_episodes:
                avg_result = self.evaluate_performance(env, n_episodes=5)
                print(f"[MC] Episode {ep}/{num_episodes}, Epsilon: {self.exploration_prob:.4f}, "
                      f"Avg Score: {avg_result:.2f}, Best: {top_score}")

        print(f"[MC] Training Complete. Highest Score: {top_score}")
        return episode_score_log

    def create_episode_trajectory(self, env, max_steps=1000):
        """
        Generate one episode with the current policy. Logs states, actions, and rewards.
        """
        state, _ = env.reset()
        refined_state = refine_state_view(state)

        state_log = []
        action_log = []
        reward_log = []

        done = False
        steps = 0

        while not done and steps < max_steps:
            chosen_action = self.choose_action(refined_state)
            next_state, reward, terminated, truncated, _ = env.step(chosen_action)
            done = terminated or truncated

            next_refined_state = refine_state_view(next_state)

            state_log.append(refined_state)
            action_log.append(chosen_action)
            reward_log.append(reward)

            refined_state = next_refined_state
            steps += 1

        return state_log, action_log, reward_log

    def evaluate_performance(self, env, n_episodes=5):
        """
        Evaluate the trained policy without exploration for a few episodes.
        Returns the average score.
        """
        scores = []
        for _ in range(n_episodes):
            state, _ = env.reset()
            refined_state = refine_state_view(state)
            done = False
            episode_score = 0
            steps = 0

            while not done and steps < 1000:
                chosen_action = self.choose_action(refined_state, allow_explore=False)
                next_state, reward, terminated, truncated, _ = env.step(chosen_action)
                done = terminated or truncated
                refined_state = refine_state_view(next_state)
                episode_score += reward
                steps += 1

            scores.append(episode_score)

        return np.mean(scores)

    def save_model(self, filename):
        """Store the current Q-value dictionary in a pickle file."""
        with open(filename, 'wb') as f:
            pickle.dump(dict(self.Q), f)

    def load_model(self, filename):
        """
        Load Q-values from a file and overwrite the current policy.
        """
        with open(filename, 'rb') as f:
            self.Q = defaultdict(lambda: np.zeros(self.action_space_size), pickle.load(f))
            return True
        return False

In [41]:
class SarsaEligibilityLearner:
    """
    A Sarsa(λ) based agent that uses eligibility traces. Offers a more
    incremental approach to training than Monte Carlo.
    """

    def __init__(self, action_space_size, discount_factor=0.99, learning_rate=0.1,
                 trace_decay=0.9, exploration_prob=0.2, initial_q_val=15.0):
        self.action_space_size = action_space_size
        self.discount_factor = discount_factor
        self.learning_rate = learning_rate
        self.initial_learning_rate = learning_rate
        self.trace_decay = trace_decay
        self.exploration_prob = exploration_prob
        self.min_epsilon = 0.001
        self.epsilon_decay = 0.997

        # Initialize Q with optimistic estimates
        self.Q = defaultdict(lambda: np.ones(action_space_size) * initial_q_val)
        self.eligibility_traces = defaultdict(lambda: np.zeros(action_space_size))

    def choose_action(self, state, allow_explore=True):
        """
        Choose an action via epsilon-greedy selection.
        """
        if allow_explore and random.random() < self.exploration_prob:
            return random.randint(0, self.action_space_size - 1)
        else:
            return np.argmax(self.Q[state])

    def update_parameters(self, current_episode, total_episodes):
        """
        Dynamically modify epsilon, alpha, and lambda as training advances.
        """
        # Adjust epsilon
        if current_episode < total_episodes * 0.1:
            self.exploration_prob = 0.5
        elif current_episode < total_episodes * 0.3:
            self.exploration_prob = 0.3
        else:
            self.exploration_prob = max(
                self.min_epsilon,
                0.2 * np.exp(-3.0 * current_episode / total_episodes)
            )

        # Adjust alpha
        self.learning_rate = max(
            0.01,
            self.initial_learning_rate * (1 - current_episode / total_episodes)
        )

        # Adjust lambda (trace decay)
        self.trace_decay = 0.9 - 0.4 * (current_episode / total_episodes)

    def reset_eligibility(self):
        """Reinitialize the eligibility traces for a fresh episode."""
        self.eligibility_traces = defaultdict(lambda: np.zeros(self.action_space_size))

    def train_agent(self, env, num_episodes=1500, max_steps=1000):
        """
        Train the Sarsa(λ) agent by interacting with the environment,
        updating Q-values and traces at each step.
        """
        episode_score_log = []
        top_score = -float('inf')

        for ep in range(1, num_episodes + 1):
            self.update_parameters(ep, num_episodes)
            self.reset_eligibility()

            # Initial state-action
            state, _ = env.reset()
            refined_state = refine_state_view(state)
            action = self.choose_action(refined_state)

            done = False
            total_reward = 0
            steps = 0

            while not done and steps < max_steps:
                next_state, reward, terminated, truncated, _ = env.step(action)
                done = terminated or truncated
                next_refined_state = refine_state_view(next_state)
                next_action = self.choose_action(next_refined_state)

                # TD Error
                td_error = reward + (0 if done else
                    self.discount_factor * self.Q[next_refined_state][next_action]) \
                    - self.Q[refined_state][action]

                # Increase the trace for the current (state, action)
                self.eligibility_traces[refined_state][action] += 1

                # Update all Q values with non-zero trace
                states_to_update = list(self.eligibility_traces.keys())
                for s in states_to_update:
                    for a_idx in range(self.action_space_size):
                        if self.eligibility_traces[s][a_idx] > 0:
                            self.Q[s][a_idx] += self.learning_rate * td_error * self.eligibility_traces[s][a_idx]
                            self.eligibility_traces[s][a_idx] *= self.discount_factor * self.trace_decay

                            # Prune very small traces
                            if self.eligibility_traces[s][a_idx] < 0.01:
                                self.eligibility_traces[s][a_idx] = 0

                refined_state = next_refined_state
                action = next_action
                total_reward += reward
                steps += 1

            episode_score_log.append(total_reward)

            # Live update the training plot
            plot_training_progress(episode_score_log, ep, label="Sarsa(λ)")

            # Check for a best episode
            if total_reward > top_score:
                top_score = total_reward
                self.save_model('sarsa_lambda_agent_best.pkl')
                print(f"[Sarsa(λ)] New best score: {top_score} (Episode {ep})")

            # Print summary every so often
            if ep % 100 == 0 or ep == num_episodes:
                avg_result = self.evaluate_performance(env, n_episodes=5)
                print(f"[Sarsa(λ)] Episode {ep}/{num_episodes}, "
                      f"Epsilon: {self.exploration_prob:.4f}, "
                      f"Alpha: {self.learning_rate:.4f}, "
                      f"Lambda: {self.trace_decay:.2f}, "
                      f"Avg Score: {avg_result:.2f}, Best: {top_score}")

        print(f"[Sarsa(λ)] Training Complete. Highest Score: {top_score}")
        return episode_score_log

    def evaluate_performance(self, env, n_episodes=10, render=False):
        """
        Run the current policy (greedy) for a specified number of episodes,
        returning the average total reward.
        """
        scores = []

        for ep in range(n_episodes):
            state, _ = env.reset()
            refined_state = refine_state_view(state)
            done = False
            episode_score = 0
            steps = 0

            while not done and steps < 1000:
                action = self.choose_action(refined_state, allow_explore=False)
                next_state, reward, terminated, truncated, _ = env.step(action)
                done = terminated or truncated
                refined_state = refine_state_view(next_state)
                episode_score += reward
                steps += 1

            scores.append(episode_score)

        return np.mean(scores)

    def save_model(self, filename):
        """Store learned Q-values for future use."""
        with open(filename, 'wb') as f:
            pickle.dump(dict(self.Q), f)

    def load_model(self, filename):
        """
        Load previously stored Q-values and replace the current policy.
        """
        with open(filename, 'rb') as f:
            self.Q = defaultdict(lambda: np.zeros(self.action_space_size), pickle.load(f))
            return True
        return False

In [42]:
def display_optimal_run(agent, env, agent_label, max_episodes=10, max_steps=1000):
    """
    Execute several greedy episodes for a given agent,
    track their cumulative rewards, and present the best-performing run.
    """
    best_total = -float('inf')
    best_states = []
    best_actions = []
    best_rewards = []

    print(f"Searching for the most successful run by {agent_label} across {max_episodes} trials...")

    for episode_index in range(max_episodes):
        current_state, _ = env.reset()
        episode_states = []
        episode_actions = []
        episode_rewards = []
        done = False
        steps_taken = 0
        total_reward = 0

        while not done and steps_taken < max_steps:
            processed_state = refine_state_view(current_state)
            chosen_action = agent.choose_action(processed_state, allow_explore=False)
            next_state, reward, terminated, truncated, _ = env.step(chosen_action)
            done = terminated or truncated

            episode_states.append(current_state)
            episode_actions.append(chosen_action)
            episode_rewards.append(reward)

            current_state = next_state
            total_reward += reward
            steps_taken += 1

        print(f"Episode {episode_index + 1} finished with reward = {total_reward}, steps = {steps_taken}")

        # Update if this run is the current best
        if total_reward > best_total:
            best_total = total_reward
            best_states = episode_states
            best_actions = episode_actions
            best_rewards = episode_rewards
            print(f"New top performance recorded with total reward {total_reward}!")

    # Replay the top-performing episode
    print(f"\nReplaying the best {agent_label} episode (final reward = {best_total})...")

    replay_score = 0
    for index, (st, act, rew) in enumerate(zip(best_states, best_actions, best_rewards)):
        replay_score += rew
        # Render the textual environment every other step
        if index % 2 == 0:
            show_game_progress(
                env,
                reward=rew,
                step=index + 1,
                total_reward=replay_score,
                action=act,
                agent_type=agent_label
            )

    print(f"The best {agent_label} run achieved a total reward of: {best_total}")
    return best_total


def visualize_learning_progress(mc_scores, sarsa_scores, window_size=20):
    """
    Provide a side-by-side view of the Monte Carlo vs. Sarsa-lambda
    training progress, complete with rolling averages for clarity.
    """
    def smooth_data(data, w_size):
        # Simple rolling mean to reduce variance in the reward plots
        if w_size > len(data):
            w_size = len(data) // 2
        if w_size < 1:
            w_size = 1
        cumulative_sum = np.cumsum(np.insert(data, 0, 0))
        return (cumulative_sum[w_size:] - cumulative_sum[:-w_size]) / float(w_size)

    mc_smoothed = smooth_data(mc_scores, window_size)
    sarsa_smoothed = smooth_data(sarsa_scores, window_size)

    plt.figure(figsize=(12, 6))
    # Plot original results
    plt.plot(mc_scores, label='MC (raw)', alpha=0.4)
    plt.plot(sarsa_scores, label='Sarsa-lambda (raw)', alpha=0.4)

    # Plot smoothed curves
    plt.plot(range(window_size - 1, len(mc_scores)), mc_smoothed, linewidth=2,
             label='MC (smoothed)')
    plt.plot(range(window_size - 1, len(sarsa_scores)), sarsa_smoothed, linewidth=2,
             label='Sarsa-lambda (smoothed)')

    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.title('MC vs. Sarsa-lambda Learning Progress')
    plt.grid(True)
    plt.legend()
    plt.savefig('learning_curves.png')
    plt.show()

    # Print out a brief summary
    print("\n=== Summary of Training Results ===")
    print(f"Monte Carlo  - Average over last 100 episodes: {np.mean(mc_scores[-100:]):.2f}")
    print(f"Sarsa-lambda - Average over last 100 episodes: {np.mean(sarsa_scores[-100:]):.2f}")
    print(f"Monte Carlo  - Peak reward: {np.max(mc_scores):.2f}")
    print(f"Sarsa-lambda - Peak reward: {np.max(sarsa_scores):.2f}")


def execute_experiment(total_episodes=1500, load_existing=False):
    """
    Main routine that sets up the environment, initializes both agents,
    proceeds with training or model loading, and finally contrasts their performances.
    """
    env = gym.make('TextFlappyBird-screen-v0', height=15, width=20, pipe_gap=4)

    # Initialize two distinct approaches
    mc_agent = MonteCarloLearner(
        action_space_size=env.action_space.n,
        discount_factor=0.99,
        exploration_prob=0.2,
        initial_q_val=15.0
    )

    sarsa_agent = SarsaEligibilityLearner(
        action_space_size=env.action_space.n,
        discount_factor=0.99,
        learning_rate=0.1,
        trace_decay=0.9,
        exploration_prob=0.2,
        initial_q_val=15.0
    )

    # If indicated, try to load previously stored policies
    if load_existing:
        try:
            mc_agent.load_model('monte_carlo_agent_best.pkl')
            print("Loaded existing Monte Carlo agent successfully.")
        except:
            print("Monte Carlo model not found; new training will be conducted.")
            load_existing = False

        try:
            sarsa_agent.load_model('sarsa_lambda_agent_best.pkl')
            print("Loaded existing Sarsa-lambda agent successfully.")
        except:
            print("Sarsa-lambda model not found; new training will be conducted.")
            load_existing = False

    # If models weren't loaded, run the training process
    if not load_existing:
        print("\n===== TRAINING MONTE CARLO AGENT =====")
        mc_scores = mc_agent.train_agent(env, num_episodes=total_episodes)

        env.close()
        env = gym.make('TextFlappyBird-screen-v0', height=15, width=20, pipe_gap=4)

        print("\n===== TRAINING SARSA-LAMBDA AGENT =====")
        sarsa_scores = sarsa_agent.train_agent(env, num_episodes=total_episodes)

        # Compare the two learning curves
        visualize_learning_progress(mc_scores, sarsa_scores)
    else:
        # If we loaded models, define placeholders for the final comparison plot
        # (You might have them stored somewhere else if you saved them.)
        mc_scores = []
        sarsa_scores = []

    print("\n===== DEMONSTRATIONS OF BEST RUNS =====")

    print("\n--- Monte Carlo Method ---")
    mc_best_run = display_optimal_run(mc_agent, env, "Monte Carlo", max_episodes=10)

    env.close()
    env = gym.make('TextFlappyBird-screen-v0', height=15, width=20, pipe_gap=4)

    print("\n--- Sarsa-lambda Method ---")
    sarsa_best_run = display_optimal_run(sarsa_agent, env, "Sarsa-lambda", max_episodes=10)

    print("\n===== FINAL OUTCOME =====")
    print(f"Monte Carlo Best Score: {mc_best_run}")
    print(f"Sarsa-lambda Best Score: {sarsa_best_run}")
    print(f"Absolute Difference: {abs(mc_best_run - sarsa_best_run)}")
    if mc_best_run > sarsa_best_run:
        print("Monte Carlo outperformed Sarsa-lambda in these trials.")
    elif sarsa_best_run > mc_best_run:
        print("Sarsa-lambda outperformed Monte Carlo in these trials.")
    else:
        print("Both methods performed equally well.")

    env.close()


if __name__ == "__main__":
    # Adjust hyperparameters and settings below
    EPISODES_TO_TRAIN = 2000
    LOAD_MODELS = False

    execute_experiment(total_episodes=EPISODES_TO_TRAIN, load_existing=LOAD_MODELS)

Text Flappy Bird!
Score: 99
----------------------
[         [32m|[0m          ]
[         [32m|[0m          ]
[         [32m|[0m          ]
[         [32m|[0m          ]
[         [32m|[0m          ]
[                    ]
[                    ]
[                    ]
[                    ]
[      [33m@[0m  [32m|[0m          ]
[         [32m|[0m          ]
[         [32m|[0m          ]
[         [32m|[0m          ]
[         [32m|[0m          ]
[         [32m|[0m          ]
^^^^^^^^^^^^^^^^^^^^^^
(Flap)

Step: 999 | Reward: 1 | Accumulated Score: 999 | Agent: Sarsa-lambda | Action Taken: Idle
--------------------------------------------------
The best Sarsa-lambda run achieved a total reward of: 1000

===== FINAL OUTCOME =====
Monte Carlo Best Score: 1000
Sarsa-lambda Best Score: 1000
Absolute Difference: 0
Both methods performed equally well.
