#Download Libraries and Frameworks

In [None]:
!pip install flappy-bird-gymnasium
!pip install gymnasium
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!apt-get update > /dev/null 2>&1
!apt-get install cmake > /dev/null 2>&1
!pip install --upgrade setuptools 2>&1
!pip install ez_setup > /dev/null 2>&1
!pip install gym[atari] > /dev/null 2>&1
!sudo apt-get install xvfb
!pip install swig
!pip install gym[box2d]
!pip install pygame

# Import Libraries

In [None]:
import numpy as np
import gymnasium
import flappy_bird_gymnasium
import pygame
import matplotlib.pyplot as plt
from IPython import display as ipythondisplay
import glob
import io
import base64
from gym import logger as gymlogger
from gym.wrappers.record_video import RecordVideo
from IPython.display import HTML
from tqdm import tqdm

#Display video

In [None]:
from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

"""
Utility functions to enable video recording of gym environment and displaying it
To enable video, just do "env = wrap_env(env)""
"""

def show_video():
  mp4list = glob.glob('video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else:
    print("Could not find video")

def wrap_env(env):
  env = RecordVideo(env, './video',  episode_trigger = lambda episode_number: True)
  return env

# Random agent

In [None]:
env_v = wrap_env(gymnasium.make("FlappyBird-v0", render_mode="rgb_array", use_lidar=False))
observation = env_v.reset()

terminated = False
while not terminated:
    action = env_v.action_space.sample()
    observation, reward, terminated, truncated, info = env_v.step(action)

env_v.close()
show_video()

# define and test environment

## initialize environment

In [None]:
env = gymnasium.make("FlappyBird-v0", render_mode="rgb_array", use_lidar=False)

## action space dimension

In [None]:
env.action_space

## observation space or state space dimension

In [None]:
env.observation_space

## reset and see a frame of environment
- use env.step(action) for interaction with environment

In [None]:
env.reset()
print(env.step(1))
env.render()

In [None]:
a = env.step(0)
print(a)
env.render()


# discretizing state space

## set the number level for discretizing

In [None]:
NUM_BINS = 51

## reduce the dimension of observation by combining the dimension

In [None]:
# For creating the dimension of this problem
# We need to decrease the dimension of problem
# Because if the dimension is large we can'n discretising it occassionallly
# so we use the mean of pipe
def create_bins(num_bins):
    horizental_pipe = np.linspace(-1.0, 1.0, num_bins)
    mean_pipe = np.linspace(-1.0, 1.0, num_bins)
    horizental_pipe_next = np.linspace(-1.0, 1.0, num_bins)
    mean_pipe_next = np.linspace(-1.0, 1.0, num_bins)
    bird_vertical_position = np.linspace(-1.0, 1.0, num_bins)
    bird_vertical_velocity = np.linspace(-1.0, 1.0, num_bins)
    rotation = np.linspace(-1.0, 1.0, num_bins)
    bins = np.array([horizental_pipe,mean_pipe,horizental_pipe_next,mean_pipe_next ,bird_vertical_position,bird_vertical_velocity,rotation])
    return bins

In [None]:
bins = create_bins(NUM_BINS)

In [None]:
# discretizing the environment
def discretize_states(states, bins):
    binned_states = []
    for i, state in enumerate(states):
        discretized_state = np.digitize(state, bins[i])
        if discretized_state == 51:
          discretized_state -= 1
        binned_states.append(discretized_state)

    return tuple(binned_states)

# Modified State

In [None]:
# deine modified state after discretizing
def modified_state(state):
  li = []
  li.append((state[0][0]))
  li.append((state[0][2] + state[0][1])/2)
  li.append((state[0][3]))
  li.append((state[0][5] + state[0][4])/2)
  li.append((state[0][9]))
  li.append((state[0][10]))
  li.append((state[0][11]))
  return li

In [None]:
# deine modified state after discretizing
def modified_state2(state):
  li = []
  li.append((state[0]))
  li.append((state[2] + state[1])/2)
  li.append((state[3]))
  li.append((state[5] + state[4])/2)
  li.append((state[9]))
  li.append((state[10]))
  li.append((state[11]))
  return li

# define Q-Table

In [None]:
# define Q-table
Q = np.zeros((NUM_BINS,NUM_BINS,NUM_BINS,NUM_BINS,NUM_BINS,NUM_BINS,NUM_BINS,env.action_space.n))

# Define parameters

In [None]:
# define parameters
alpha = 0.9
GAMMA = 1
epsilon = 1
train_episodes = 30_000

# Q-Learning Equation

The Q-learning algorithm updates the action-value function \( Q(s, a) \) using the following equation:

Q(sₜ, aₜ) ← Q(sₜ, aₜ) + α * [rₜ₊₁ + γ * maxₐ Q(sₜ₊₁, a) - Q(sₜ, aₜ)]


Where:
- Q(sₜ, aₜ) = current Q-value for state sₜ and action aₜ
- α (alpha) = learning rate (0 < α ≤ 1)
- γ (gamma) = discount factor (0 ≤ γ < 1)
- rₜ₊₁ = immediate reward after action aₜ
- maxₐ Q(sₜ₊₁, a) = maximum future value for next state sₜ₊₁
- sₜ₊₁ = next state after action aₜ

| Component           | Symbol | Description                          |
|---------------------|--------|--------------------------------------|
| Current Q-value     | Q(s,a) | Value for current state-action pair  |
| Learning rate       | α      | How quickly new info overrides old   |
| Discount factor     | γ      | Importance of future rewards         |
| Immediate reward    | r      | Reward after taking action           |
| Max future Q-value  | maxQ   | Best estimated value of next state   |

# Define Policy, reduce alpha and reduce epsilon Function





In [None]:
def policy(epsilon, Q, state):
    if np.random.random() > epsilon:
        action = np.argmax(Q[state])
    else:
        action = env.action_space.sample()

    return action

In [None]:
def reduce_alpha(episode):
    alpha = np.exp(-(episode/10_000))

    return alpha

In [None]:
def reduce_epsilon(episode):

    epsilon = np.exp(-(episode/1_000))

    return epsilon

In [None]:
# # @title Q-Learning Parameter Explorer { run: "auto" }

# import numpy as np
# import matplotlib.pyplot as plt
# from ipywidgets import interact, FloatSlider

# # Q-learning parameters with interactive sliders
# alpha = FloatSlider(value=0.1, min=0.01, max=1.0, step=0.01, description='Learning Rate (α):')
# gamma = FloatSlider(value=0.9, min=0.0, max=0.99, step=0.01, description='Discount (γ):')
# epsilon = FloatSlider(value=0.1, min=0.0, max=1.0, step=0.01, description='Exploration (ε):')

# def q_learning_update(current_q, reward, max_future_q, alpha, gamma):
#     """Visualize the Q-learning update equation"""
#     new_q = current_q + alpha * (reward + gamma * max_future_q - current_q)

#     # Create visualization
#     plt.figure(figsize=(10, 6))
#     bars = plt.bar(['Current Q', 'Immediate Reward', 'Max Future Q', 'Updated Q'],
#                    [current_q, reward, max_future_q, new_q],
#                    color=['blue', 'green', 'orange', 'red'])

#     plt.title('Q-Learning Update Visualization', fontsize=14)
#     plt.ylabel('Value', fontsize=12)
#     plt.xticks(fontsize=10)

#     # Add value labels on top of bars
#     for bar in bars:
#         height = bar.get_height()
#         plt.text(bar.get_x() + bar.get_width()/2., height,
#                  f'{height:.2f}',
#                  ha='center', va='bottom')

#     # Display the equation with current values
#     equation = f"Q(s,a) ← {current_q:.2f} + {alpha:.2f} × ({reward:.2f} + {gamma:.2f}×{max_future_q:.2f} - {current_q:.2f}) = {new_q:.2f}"
#     plt.text(0.5, -0.3, equation, ha='center', va='center', transform=plt.gca().transAxes, fontsize=12, bbox=dict(facecolor='yellow', alpha=0.5))

#     plt.ylim(0, max(current_q, reward, max_future_q, new_q) * 1.2)
#     plt.show()

# # Create interactive widget
# interact(q_learning_update,
#          current_q=FloatSlider(value=5.0, min=0, max=10, step=0.1, description='Current Q:'),
#          reward=FloatSlider(value=3.0, min=-5, max=10, step=0.5, description='Reward:'),
#          max_future_q=FloatSlider(value=7.0, min=0, max=10, step=0.5, description='Max Future Q:'),
#          alpha=alpha,
#          gamma=gamma);

# Q-Learning Algorithm

In [None]:
training_rewards = []
training_rewards_mean = []
epsilons = []
log_interval = 1000
for episode in tqdm(range(train_episodes)):
    state = env.reset()
    state = discretize_states(modified_state(state), bins)
    total_training_reward = 0
    terminated = False
    while not terminated:
        action = policy(epsilon, Q, state)
        new_state,reward,terminated , done, info = env.step(action)
        new_state = discretize_states(modified_state2(new_state), bins)
        Q[state + (action,)] = Q[state + (action,)] +  alpha * (reward + GAMMA * np.max(Q[new_state]) - Q[state + (action,)])
        total_training_reward += reward
        state = new_state
    if episode % log_interval == 0:
        mean_rewards = np.mean(training_rewards[-log_interval:])
        log = f"""
            episode {episode}
            -----------------------------
            Mean Reward = {mean_rewards}
            Epsilon = {epsilon}
            Alpha = {alpha}
            -----------------------------
        """
        print(log)
        if episode != 0:
            training_rewards_mean.append(mean_rewards)
    alpha = reduce_alpha(episode)
    epsilon = reduce_epsilon(episode)
    training_rewards.append(total_training_reward)
    epsilons.append(epsilon)


# Plot the Convergence

In [None]:
x = range(train_episodes)
plt.plot(x, training_rewards)
plt.show()

# Test Q*-Table

In [None]:
env_v = wrap_env(gymnasium.make("FlappyBird-v0", render_mode="rgb_array", use_lidar=False))
state = env_v.reset()
state = discretize_states(modified_state(state), bins)
done = False
c = 0
while not done:

    # env_v.render(mode='rgb_array')
    action = np.argmax(Q[state])
    new_state, reward, done, info ,a= env_v.step(action)
    state = discretize_states(modified_state2(new_state), bins)


env_v.close()
show_video()
