In [1]:
import numpy as np
import gymnasium as gym
import pandas as pd
from collections import defaultdict
import random
import flappy_bird_gymnasium

env_name = "FlappyBird-v0"

In [2]:
import time

env = gym.make("FlappyBird-v0")

obs, _ = env.reset()
while True:
    # Next action:
    # (feed the observation to your agent here)
    action = env.action_space.sample()

    # Processing:
    obs, reward, terminated, _, info = env.step(action)

    # Rendering the game:
    # (remove this two lines during training)
    env.render()
    time.sleep(1 / 30)  # FPS

    # Checking if the player is still alive
    if terminated:
        break

env.close()

## MC Control Implementation


In [4]:
class MCControl:
    """Implements Monte Carlo Control."""

    def __init__(self, env, num_states, num_actions, epsilon, gamma):
        """Parameters
        ----------
        env:         gym.core.Environment, open gym environment object
        num_states:  integer, number of states in the environment
        num_actions: integer, number of possible actions
        epsilon:     float, the epsilon parameter used for exploration
        gamma:       float, discount factor
        """
        self.env = env
        self.num_states = num_states
        self.num_actions = num_actions
        self.epsilon = epsilon
        self.gamma = gamma

    def run_mc_control(self, num_episodes, verbose=True):
        """Performs Monte Carlo control task.

        Parameters
        ----------
        num_episodes: integer, number of episodes to run to train RL agent

        Returns
        ----------
        self.Q:              nested dictionary {state: {action: q value}}, final action value function
        self.policy:         list of integers of length self.num_states, final policy
        rewards_per_episode: numpy array of rewards collected at each episode
        """
        self.init_agent()

        rewards_per_episode = np.array([None] * num_episodes)
        episode_len = np.array([None] * num_episodes)

        for episode in range(num_episodes):
            state_action_reward = self.generate_episode(self.policy)
            G = self.calculate_returns(state_action_reward)
            self.evaluate_policy(G)
            self.improve_policy()

            # Logging rewards and episode length
            total_return = 0
            for _, _, reward in state_action_reward:
                total_return += reward
            rewards_per_episode[episode] = total_return
            episode_len = len(state_action_reward)

        # Once training is finished, calculate final policy using argmax approach
        final_policy = self.argmax(self.Q, self.policy)

        if verbose:
            print(f"Finished training RL agent for {num_episodes} episodes!")

        return self.Q, final_policy, rewards_per_episode, episode_len

    def init_agent(self):
        """Initializes RL agent components:
        self.policy:      list of integers of length self.num_states, the action to take at a given state
        self.Q:           nested dictionary {state: {action: q value}}, action value function
        self.visit_count: nested dictionary {state: {action: count}}, keeps track of how many episodes
                          state and action pair were visited for a first time in every episode
        """
        # --------------------------
        # Randomly initialize policy, use numpy random.choice method:
        # your code here (1 line)
        self.policy = np.random.choice(num_actions, num_states)
        # --------------------------

        self.Q = {}
        self.visit_count = {}

        for state in range(self.num_states):
            self.Q[state] = {}
            self.visit_count[state] = {}
            for action in range(self.num_actions):
                # --------------------------
                # Initalize action value (self.Q) and visit count (self.visit_count) dictionaries to zero:
                # your code here (~ 2 lines)
                self.Q[state][action] = 0
                self.visit_count[state][action] = 0
                # --------------------------

    def generate_episode(self, policy):
        """Generates episode given current policy.

        Parameters
        ----------
        policy: list of integers of length self.num_states, the action to take at a given state

        Returns
        ----------
        state_action_reward: list of tuple (state, action, reward)
        """
        G = 0
        s = env.reset()
        a = policy[s]

        state_action_reward = [(s, a, 0)]
        while True:
            s, r, terminated, _ = env.step(a)
            if terminated:
                state_action_reward.append((s, None, r))
                break
            else:
                a = policy[s]
                state_action_reward.append((s, a, r))

        return state_action_reward

    def calculate_returns(self, state_action_reward):
        """Calculates and returns total discounted reward for each pair of (s, a) appearing in the episode.

        Parameters
        ----------
        state_action_reward: list of tuple (state, action, reward)

        Returns
        ----------
        G: nested dictionary {state: {action: count}}, contains returns for every pair of (s, a) appearing in the episode

        """
        G = {}
        t = 0
        for state, action, reward in state_action_reward:
            if state not in G:
                G[state] = {action: 0}
            else:
                if action not in G[state]:
                    G[state][action] = 0
            for s in G.keys():
                for a in G[s].keys():
                    G[s][a] += reward * gamma**t
            t += 1

        return G

    def evaluate_policy(self, G):
        """Evaluates current policy using incremental mean and updates action value function self.Q.

        Parameters
        ----------
        G: float, episode return (total discounted reward)
        state_action_reward: list of tuple (state, action, reward)
        """

        for state in G.keys():
            for action in G[state].keys():
                if action:
                    # your code here (2 lines): increment self.visit_count and update self.Q for state and action pair
                    self.visit_count[state][action] += 1
                    self.Q[state][action] += (
                        G[state][action] - self.Q[state][action]
                    ) / self.visit_count[state][action]
                    # --------------------------

    def improve_policy(self):
        """Improves and updates current policy self.policy using epsilon greedy approach."""
        # Your code here (~3 lines):
        # first use argmax method to choose actions greedily: self.policy = argmax
        # then replace greedy policy by epsilon greedy approach: self.policy[state] for every state in S = ?
        self.policy = self.argmax(self.Q, self.policy)
        for state in range(self.num_states):
            self.policy[state] = self.get_epsilon_greedy_action(self.policy[state])
        # --------------------------

    def argmax(self, Q, policy):
        """
        Finds and returns greedy policy.

        Parameters
        ----------
        Q: nested dictionary {state: {action: q value}}, action value function
        policy: list of integers of length self.num_states containing last actions per state

        Returns
        ----------
        next_policy: list of integers of length self.num_states containing next actions with a highest value per state

        """
        next_policy = policy

        for state in range(self.num_states):
            best_action = None
            best_value = float("-inf")
            # --------------------------
            # Find greedy action to take in every state and assign to policy[state]:
            # your code here (~ 5 lines)
            for action, value in Q[state].items():
                if value > best_value:
                    best_value = value
                    best_action = action
            next_policy[state] = best_action
            # --------------------------

        return next_policy

    def get_epsilon_greedy_action(self, greedy_action):
        """Returns next action using epsilon greedy approach.

        Parameters
        ----------
        greedy_action: integer, greedy action (action with a maximum Q value)

        Returns
        ----------
        next_action: integer, either greedy or random action
        """
        prob = np.random.random()

        if prob < 1 - self.epsilon:
            # your code here (1 line)
            # return ?
            return greedy_action
            # --------------------------

        # your code here (1 line)
        # return ?
        return np.random.randint(0, self.num_actions)
        # --------------------------

## Test


In [5]:
np.random.seed(1)

epsilon = 0.4
gamma = 0.9
n_episodes = 10000

In [7]:
env = gym.make(env_name)

num_states = env.observation_space.n
num_actions = env.action_space.n


mc_model = MCControl(env, num_states, num_actions, epsilon, gamma)

policy = np.array([1, 1, 1, 1, 0, 0, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3])
res = mc_model.generate_episode(policy)

assert res == [
    (0, 1, 0),
    (4, 0, 0.0),
    (4, 0, 0.0),
    (8, 3, 0.0),
    (8, 3, 0.0),
    (4, 0, 0.0),
    (0, 1, 0.0),
    (4, 0, 0.0),
    (4, 0, 0.0),
    (4, 0, 0.0),
    (8, 3, 0.0),
    (4, 0, 0.0),
    (0, 1, 0.0),
    (4, 0, 0.0),
    (8, 3, 0.0),
    (8, 3, 0.0),
    (8, 3, 0.0),
    (8, 3, 0.0),
    (4, 0, 0.0),
    (8, 3, 0.0),
    (8, 3, 0.0),
    (8, 3, 0.0),
    (9, 3, 0.0),
    (5, None, 0.0),
]

AttributeError: 'Box' object has no attribute 'n'