In [15]:
import flappy_bird_gymnasium
import gymnasium
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

GAMMA = 0.99
LEARNING_RATE = 0.001
BATCH_SIZE = 64
REPLAY_BUFFER_SIZE = 100000
TARGET_UPDATE_FREQ = 1000
FRAME_SKIP = 4

EPSILON_START = 1.0
EPSILON_END = 0.01
DECAY_RATE = 375000

In [16]:
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, output_dim)
        )

    def forward(self, x):
        return self.fc(x)

In [17]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.array(states, dtype=np.float32),
            np.array(actions),
            np.array(rewards, dtype=np.float32),
            np.array(next_states, dtype=np.float32),
            np.array(dones, dtype=np.float32),
        )

    def __len__(self):
        return len(self.buffer)

In [18]:
def epsilon_greedy_policy(env, state, model, epsilon):
    if random.random() < epsilon:
        return env.action_space.sample()
    else:
        with torch.no_grad():
            state = torch.FloatTensor(state).view(1, -1)
            q_values = model(state)
            return q_values.max(1)[1].item()
        
def train(model, target_model, optimizer, replay_buffer):
    if len(replay_buffer) < BATCH_SIZE:
        return
    
    states, actions, rewards, next_states, dones = replay_buffer.sample(BATCH_SIZE)

    states = torch.FloatTensor(states)
    actions = torch.LongTensor(actions)
    rewards = torch.FloatTensor(rewards)
    next_states = torch.FloatTensor(next_states)
    dones = torch.FloatTensor(dones)

    # Current Q-values:
    q_values = model(states).gather(1, actions.view(-1, 1)).view(-1)

    # Next Q-values:
    next_actions = model(next_states).max(1)[1].view(-1, 1)

    next_q_values = target_model(next_states).gather(1, next_actions).view(-1)
    next_q_values[dones.bool()] = 0.0


    expected_q_values = rewards + GAMMA * next_q_values * (1 - dones)

    loss = nn.SmoothL1Loss()(q_values, expected_q_values.detach())
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    

In [19]:
env = gymnasium.make("FlappyBird-v0", render_mode="human", use_lidar=True)

obs, _ = env.reset()
obs = np.clip(np.array(obs, dtype=np.float32), 0.0, 1.0)

input_dim = len(obs)
output_dim = env.action_space.n

model = DQN(input_dim, output_dim)
target_model = DQN(input_dim, output_dim)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
target_model = target_model.to(device)

target_model.load_state_dict(model.state_dict())

optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)

epsilon = EPSILON_START
step = 0
total_time_alive = 0

for epoch in range(9000):
    obs, _ = env.reset()

    done = False
    total_reward = 0
    frame_skip_counter = 0
    time_alive = 0

    while not done:
        if frame_skip_counter % FRAME_SKIP == 0:
            action = epsilon_greedy_policy(env, obs, model, epsilon)

        next_obs, reward, done, _, _ = env.step(action)
        replay_buffer.push(obs, action, reward, next_obs, done)

        obs = np.clip(np.array(next_obs, dtype=np.float32), 0.0, 1.0)
        total_reward += np.clip(reward, -1, 1)

        time_alive += 1
        total_time_alive += 1

        reward += 0.15

        train(model, target_model, optimizer, replay_buffer)

        if step % TARGET_UPDATE_FREQ == 0:
            target_model.load_state_dict(model.state_dict())

        epsilon = max(EPSILON_END, EPSILON_START - step / DECAY_RATE)
        step += 1
        frame_skip_counter += 1

    average_time_alive = total_time_alive / (epoch + 1)
    print(f"Epoch {epoch+1}: Total Reward: {total_reward}, Time Alive: {time_alive}, Average Time Alive: {average_time_alive:.2f}, Epsilon: {epsilon:.4f}")

env.close()


Epoch 1: Total Reward: -0.8999999999999986, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9999
Epoch 2: Total Reward: -2.699999999999998, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9997
Epoch 3: Total Reward: -6.299999999999999, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9996
Epoch 4: Total Reward: -0.8999999999999986, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9995
Epoch 5: Total Reward: -0.8999999999999986, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9993
Epoch 6: Total Reward: -0.8999999999999986, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9992
Epoch 7: Total Reward: -8.7, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9991
Epoch 8: Total Reward: -6.299999999999999, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9989
Epoch 9: Total Reward: -5.699999999999998, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9988
Epoch 10: Total Reward: -8.7, Time Alive: 50, Average Time Alive: 50.00, Epsilon: 0.9987
Epo

In [20]:
torch.save(model.state_dict(), "flappybird_dqn.pth")

In [None]:
def greedy_policy(state, model):
    with torch.no_grad():
        state = torch.FloatTensor(state).unsqueeze(0)
        q_values = model(state)
        return q_values.max(1)[1].item()


def evaluate_agent(env, model, num_episodes=100):
    total_rewards = []
    total_time_alive = 0 
    for episode in range(num_episodes):
        obs, _ = env.reset()
        obs = np.array(obs, dtype=np.float32)

        done = False
        total_reward = 0
        time_alive = 0

        while not done:
            action = greedy_policy(obs, model)
            next_obs, reward, done, _, _ = env.step(action)
            obs = np.array(next_obs, dtype=np.float32)
            total_reward += reward

            time_alive += 1 

        total_time_alive += time_alive 
        total_rewards.append(total_reward)
        print(f"Episode {episode + 1}: Total Reward: {total_reward}, Time Alive: {time_alive}")

    avg_reward = np.mean(total_rewards)
    max_reward = np.max(total_rewards)
    avg_time_alive = total_time_alive / num_episodes

    print(f"\nEvaluation over {num_episodes} episodes:")
    print(f"Average Reward: {avg_reward:.2f}")
    print(f"Max Reward: {max_reward}")
    print(f"Average Time Alive: {avg_time_alive:.2f}")

model.load_state_dict(torch.load("flappybird_dqn.pth"))
env = gymnasium.make("FlappyBird-v0", render_mode="human", use_lidar=True)
evaluate_agent(env, model, num_episodes=10)


  model.load_state_dict(torch.load("flappybird_dqn.pth"))


Episode 1: Total Reward: 4.199999999999999, Time Alive: 62
Episode 2: Total Reward: 4.199999999999999, Time Alive: 62
Episode 3: Total Reward: 1.5000000000000009, Time Alive: 56
Episode 4: Total Reward: -0.8999999999999986, Time Alive: 50
Episode 5: Total Reward: -1.0999999999999988, Time Alive: 60
Episode 6: Total Reward: 4.299999999999995, Time Alive: 63
Episode 7: Total Reward: 4.299999999999995, Time Alive: 63
Episode 8: Total Reward: 4.299999999999995, Time Alive: 63
Episode 9: Total Reward: 4.199999999999999, Time Alive: 62
Episode 10: Total Reward: 2.599999999999998, Time Alive: 55

Evaluation over 10 episodes:
Average Reward: 2.76
Max Reward: 4.299999999999995
Average Time Alive: 59.60


: 