In [1]:
!pip install gym
!pip install tensorflow
!pip install utils
!pip install pyvirtualdisplay
!pip install flappy-bird-gymnasium



### Importing Packages 

In [2]:
import time
from collections import deque, namedtuple
from matplotlib import pyplot as plt

import gym
import numpy as np
import tensorflow as tf
import utils
import random
import flappy_bird_gymnasium
import gymnasium

from pyvirtualdisplay import Display
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.losses import MSE
from tensorflow.keras.optimizers import Adam

### The Flappy Bird Environment
- Action Space
  - do nothing = 0
  - Flap = 1
- Observation Space
  - the last pipe's horizontal position
  - the last top pipe's vertical position
  - the last bottom pipe's vertical position
  - the next pipe's horizontal position
  - the next top pipe's vertical position
  - the next bottom pipe's vertical position
  - the next next pipe's horizontal position
  - the next next top pipe's vertical position
  - the next next bottom pipe's vertical position
  - player's vertical position
  - player's vertical velocity
  - player's rotation
- Rewards
  - +1 every time you pass a pipe
  - +0.1 for each frame where you don't collide against the top and bottom bounds
  - -1 for dying

### Load the environment

In [3]:
env = gymnasium.make("FlappyBird-v0", render_mode="human")

state_size = env.observation_space.shape[0]
num_actions = env.action_space.n
print('State Shape:', state_size)
print('Number of actions:', num_actions)

State Shape: 12
Number of actions: 2


In [4]:
# initial_state = env.reset()

# # Select an action
# action = 0

# print("Initial State:", initial_state[0])
# print("Next State:", env.step(action)[0])
# print("Reward:", env.step(action)[1])
# print("Episode Terminated:", env.step(action)[2])
# print("_:", env.step(action)[2])
# print("Current Score:", env.step(action)[4])

### Training the agent

In [5]:
# Hyperparameters
MEMORY_SIZE = 100_000     # size of memory buffer
GAMMA = 0.995             # discount factor
ALPHA = 1e-3              # learning rate  
NUM_STEPS_FOR_UPDATE = 1  # perform a learning update every C time steps

In [6]:
# Helper functions
SEED = 0              # seed for pseudo-random number generator
MINIBATCH_SIZE = 64   # mini-batch size
TAU = 1e-3            # soft update parameter
E_DECAY = 0.9       # ε decay rate for ε-greedy policy
E_MIN = 0.01          # minimum ε value for ε-greedy policy

random.seed(SEED)

def get_action(model, state, num_actions, epsilon=0):
    # if random.random() > epsilon:
    #     return np.argmax(q_values.numpy()[0])
    # else:
    #     return random.choice([0, 1])
    if np.random.rand() <= epsilon:
        return np.random.choice(num_actions)
    else:
        q_values = model.predict(state)
        return np.argmax(q_values[0])

def check_update_conditions(t, num_steps_upd, memory_buffer):
    if (t + 1) % num_steps_upd == 0 and len(memory_buffer) > MINIBATCH_SIZE:
        return True
    else:
        return False

def get_experiences(memory_buffer):
    experiences = random.sample(memory_buffer, k=MINIBATCH_SIZE)
    states = tf.convert_to_tensor(np.array([e.state for e in experiences if e is not None]),dtype=tf.float32)
    actions = tf.convert_to_tensor(np.array([e.action for e in experiences if e is not None]), dtype=tf.float32)
    rewards = tf.convert_to_tensor(np.array([e.reward for e in experiences if e is not None]), dtype=tf.float32)
    next_states = tf.convert_to_tensor(np.array([e.next_state for e in experiences if e is not None]),dtype=tf.float32)
    done_vals = tf.convert_to_tensor(np.array([e.done for e in experiences if e is not None]).astype(np.uint8),
                                     dtype=tf.float32)
    return (states, actions, rewards, next_states, done_vals)

def update_target_network(q_network, target_q_network):
    for target_weights, q_net_weights in zip(target_q_network.weights, q_network.weights):
        target_weights.assign(TAU * q_net_weights + (1.0 - TAU) * target_weights)

def get_new_eps(epsilon):
    return max(E_MIN, E_DECAY*epsilon)

In [7]:
# Store experiences as named tuples
experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

In [8]:
# Create the Q-Network
q_network = Sequential([
    Input(shape=state_size),                      
    Dense(units=64, activation='relu'),            
    Dense(units=64, activation='relu'),            
    Dense(units=num_actions, activation='linear'),
    ])

# Create the target Q^-Network
target_q_network = Sequential([
    Input(shape=state_size),                      
    Dense(units=64, activation='relu'),            
    Dense(units=64, activation='relu'),            
    Dense(units=num_actions, activation='linear'),
    ])

optimizer = Adam(learning_rate=ALPHA)



In [9]:
def compute_loss(experiences, gamma, q_network, target_q_network):
    """ 
    Calculates the loss.
    
    Args:
      experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "done"] namedtuples
      gamma: (float) The discount factor.
      q_network: (tf.keras.Sequential) Keras model for predicting the q_values
      target_q_network: (tf.keras.Sequential) Karas model for predicting the targets
          
    Returns:
      loss: (TensorFlow Tensor(shape=(0,), dtype=int32)) the Mean-Squared Error between
            the y targets and the Q(s,a) values.
    """
    
    # Unpack the mini-batch of experience tuples
    states, actions, rewards, next_states, done_vals = experiences
    
    # Compute max Q^(s,a)
    max_qsa = tf.reduce_max(target_q_network(next_states), axis=-1)
    
    # Set y = R if episode terminates, otherwise set y = R + γ max Q^(s,a).
    y_targets = rewards + (gamma * max_qsa * (1 - done_vals))
    
    # Get the q_values
    q_values = q_network(states)
    q_values = tf.gather_nd(q_values, tf.stack([tf.range(q_values.shape[0]),
                                                tf.cast(actions, tf.int32)], axis=1))
        
    # Compute the loss
    loss = MSE(y_targets, q_values)
    
    return loss

In [10]:
@tf.function
def agent_learn(experiences, gamma):
    """
    Updates the weights of the Q networks.
    
    Args:
      experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "done"] namedtuples
      gamma: (float) The discount factor.
    
    """
    
    # Calculate the loss
    with tf.GradientTape() as tape:
        loss = compute_loss(experiences, gamma, q_network, target_q_network)

    # Get the gradients of the loss with respect to the weights.
    gradients = tape.gradient(loss, q_network.trainable_variables)
    
    # Update the weights of the q_network.
    optimizer.apply_gradients(zip(gradients, q_network.trainable_variables))

    # update the weights of target q_network
    update_target_network(q_network, target_q_network)

In [11]:
start = time.time()

num_episodes = 2000
max_num_timesteps = 1000

total_point_history = []

num_p_av = 100    # number of total points to use for averaging
epsilon = 1.0     # initial ε value for ε-greedy policy

# Create a memory buffer D with capacity N
memory_buffer = deque(maxlen=MEMORY_SIZE)

# Set the target network weights equal to the Q-Network weights
target_q_network.set_weights(q_network.get_weights())


for i in range(20):
    
    # Reset the environment to the initial state and get the initial state
    state = env.reset()[0]
    total_points = env.reset()[1]['score']

    # max_num_timesteps = 1000. This means that the episode will automatically terminate if the episode hasn't terminated after 1000 time steps.
    for t in range(100):
    
        # From the current state S choose an action A using an ε-greedy policy
        state_qn = np.expand_dims(state, axis=0)  # state needs to be the right shape for the q_network
        q_values = q_network(state_qn)
        print(q_values)
        action = get_action(q_network, state, num_actions, epsilon)
        print(action)
         
        # Take action A and receive reward R and the next state S'
        next_state, reward, done, _, current_score = env.step(action)
        
        # Store experience tuple (S,A,R,S') in the memory buffer.
        # We store the done variable as well for convenience.
        memory_buffer.append(experience(state, action, reward, next_state, done))
        
        # Only update the network every NUM_STEPS_FOR_UPDATE time steps.
        update = check_update_conditions(t, NUM_STEPS_FOR_UPDATE, memory_buffer)
        print(update)
        if update:
            # Sample random mini-batch of experience tuples (S,A,R,S') from D
            experiences = get_experiences(memory_buffer)
            
            # Set the y targets, perform a gradient descent step,
            # and update the network weights.
            agent_learn(experiences, GAMMA)

        state = next_state.copy()
        total_points += reward
        
        if done:
            break

    total_point_history.append(total_points)
    av_latest_points = np.mean(total_point_history[-num_p_av:])

    # Update the ε value
    epsilon = get_new_eps(epsilon)

    print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}", end="")

    if (i+1) % num_p_av == 0:
        print(f"\rEpisode {i+1} | Total point average of the last {num_p_av} episodes: {av_latest_points:.2f}")

    # We will consider that the environment is solved if we get an
    # average of 200 points in the last 100 episodes.
    if av_latest_points >= 10000:
        print(f"\n\nEnvironment solved in {i+1} episodes!")
        q_network.save('flappy_bird.h5')
        break

tot_time = time.time() - start
print(f"\nTotal Runtime: {tot_time:.2f} s ({(tot_time/60):.2f} min)")       

tf.Tensor([[ 0.57821953 -0.04987197]], shape=(1, 2), dtype=float32)
0
False
tf.Tensor([[ 0.57072735 -0.04308057]], shape=(1, 2), dtype=float32)
1
False
tf.Tensor([[ 0.58511865 -0.04958145]], shape=(1, 2), dtype=float32)
0
False
tf.Tensor([[ 0.57699734 -0.04223641]], shape=(1, 2), dtype=float32)
1
False
tf.Tensor([[ 0.5920201  -0.04929708]], shape=(1, 2), dtype=float32)
0
False
tf.Tensor([[ 0.58328754 -0.04141028]], shape=(1, 2), dtype=float32)
1
False
tf.Tensor([[ 0.59892154 -0.04901268]], shape=(1, 2), dtype=float32)
1
False
tf.Tensor([[ 0.60257524 -0.04886209]], shape=(1, 2), dtype=float32)
0
False
tf.Tensor([[ 0.5940773  -0.04130581]], shape=(1, 2), dtype=float32)
1
False
tf.Tensor([[ 0.6097695  -0.04899026]], shape=(1, 2), dtype=float32)
1
False
tf.Tensor([[ 0.6137218  -0.04925984]], shape=(1, 2), dtype=float32)
1
False
tf.Tensor([[ 0.6176738  -0.04952943]], shape=(1, 2), dtype=float32)
0
False
tf.Tensor([[ 0.6094464  -0.04235423]], shape=(1, 2), dtype=float32)
0
False
tf.Tensor([[

ValueError: in user code:

    File "/opt/homebrew/lib/python3.11/site-packages/keras/src/engine/training.py", line 2440, in predict_function  *
        return step_function(self, iterator)
    File "/opt/homebrew/lib/python3.11/site-packages/keras/src/engine/training.py", line 2425, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/opt/homebrew/lib/python3.11/site-packages/keras/src/engine/training.py", line 2413, in run_step  **
        outputs = model.predict_step(data)
    File "/opt/homebrew/lib/python3.11/site-packages/keras/src/engine/training.py", line 2381, in predict_step
        return self(x, training=False)
    File "/opt/homebrew/lib/python3.11/site-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/opt/homebrew/lib/python3.11/site-packages/keras/src/engine/input_spec.py", line 253, in assert_input_compatibility
        raise ValueError(

    ValueError: Exception encountered when calling layer 'sequential' (type Sequential).
    
    Input 0 of layer "dense" is incompatible with the layer: expected min_ndim=2, found ndim=1. Full shape received: (None,)
    
    Call arguments received by layer 'sequential' (type Sequential):
      • inputs=tf.Tensor(shape=(None,), dtype=float32)
      • training=False
      • mask=None


In [None]:
# import numpy as np
# import tensorflow as tf
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Dense
# from tensorflow.keras.optimizers import Adam
# import gym

# # Create Flappy Bird environment (you may need to install the gym-flappy-bird package)
# env = gymnasium.make('FlappyBird-v0')

# # Define constants
# state_size = 12  # Adjust based on your state representation
# action_size = env.action_space.n
# learning_rate = 0.001
# gamma = 0.95
# epsilon = 1.0
# epsilon_decay = 0.995
# min_epsilon = 0.01
# batch_size = 32
# memory_size = 100000

# # Build the Deep Q-Learning model
# model = Sequential()
# model.add(Dense(24, input_dim=state_size, activation='relu'))
# model.add(Dense(24, activation='relu'))
# model.add(Dense(action_size, activation='linear'))
# model.compile(loss='mse', optimizer=Adam(lr=learning_rate))

# # Experience replay buffer
# class ReplayBuffer:
#     def __init__(self, size):
#         self.memory = []
#         self.size = size

#     def add(self, experience):
#         self.memory.append(experience)
#         if len(self.memory) > self.size:
#             self.memory.pop(0)

#     def sample(self, batch_size):
#         if len(self.memory) == 0:
#             return []
#         else:
#             indices = np.random.choice(len(self.memory), batch_size, replace=True)
#             return [self.memory[i] for i in indices]

# replay_buffer = ReplayBuffer(memory_size)

# # Epsilon-greedy strategy
# def epsilon_greedy(model, state, epsilon):
#     if np.random.rand() <= epsilon:
#         return np.random.choice(action_size)
#     else:
#         q_values = model.predict(state)
#         return np.argmax(q_values[0])

# # Update Q-values using experience replay
# def update_q_values(model, replay_buffer, batch_size, gamma):
#     minibatch = replay_buffer.sample(batch_size)
#     for state, action, reward, next_state, done in minibatch:
#         target = reward
#         if not done:
#             target = reward + gamma * np.amax(model.predict(next_state)[0])
#         q_values = model.predict(state)
#         q_values[0][action] = target
#         model.fit(state, q_values, epochs=1, verbose=0)

# # Deep Q-Learning algorithm
# def deep_q_learning(model, env, episodes):
#     global epsilon
#     for episode in range(episodes):
#         state = env.reset()[0]
#         state = np.reshape(state, [1, state_size])

#         for time in range(500):  # Adjust the maximum number of time steps as needed
#             # Choose an action using epsilon-greedy strategy
#             action = epsilon_greedy(model, state, epsilon)

#             # Take the chosen action and observe the next state and reward
#             next_state, reward, done, _, score = env.step(action)
#             next_state = np.reshape(next_state, [1, state_size])

#             # Store the experience in the replay buffer
#             replay_buffer.add((state, action, reward, next_state, done))

#             # Update the Q-values using experience replay
#             update_q_values(model, replay_buffer, batch_size, gamma)

#             # Update the current state
#             state = next_state

#             if done:
#                 break

#         # Decay epsilon
#         epsilon = max(min_epsilon, epsilon * epsilon_decay)

# # Train the model
# episodes = 1000  # Adjust the number of episodes as needed
# deep_q_learning(model, env, episodes)

# # Test the trained model
# total_reward = 0
# state = env.reset()
# state = np.reshape(state, [1, state_size])

# for _ in range(500):  # Adjust the maximum number of time steps as needed
#     action = np.argmax(model.predict(state)[0])
#     next_state, reward, done, _ = env.step(action)
#     total_reward += reward
#     next_state = np.reshape(next_state, [1, state_size])
#     state = next_state

#     if done:
#         break

# print("Total Reward:", total_reward)