In [None]:
import torch
import torchvision.datasets as datasets # for Mist
import torchvision.transforms as transforms # Transformations we can perform on our dataset for augmentation
from torch import optim # For optimizers like SGD, Adam, etc.
from torch import nn # To inherit our neural network
from torch.utils.data import DataLoader # For management of the dataset (batches)
from tqdm import tqdm # For nice progress bar!
from torch.optim.lr_scheduler import StepLR
from torch.optim.lr_scheduler import CosineAnnealingLR
import numpy as np
import flappy_bird_gymnasium as flappy_bird
import gymnasium as gym
import random
from collections import deque, namedtuple
from preprocessing.preprocessing import preprocess_frame
from PIL import Image
from gymnasium.wrappers import RecordVideo
from static.static_variables import make_dirs, CHECKPOINTS_DIR, VIDEOS_DIR, PHOTOS_DIR
import os
import hashlib


In [41]:
make_dirs()

NameError: name 'make_dirs' is not defined

In [None]:
if torch.cuda.is_available():
    device = "cuda:0"
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"

device = torch.device(device)
print("Device: ", device)

Device:  mps


In [None]:
# Environment
frame_skip = 3

# Training
data_type = torch.float32
batch_size = 64
T_iterations = 10000   # very important

# Replay buffer
replay_buffer_size = 150_000

# DQN
alpha = 1e-4
gamma = 0.99

# Target network (soft update)
target_network_incorporation_rate = 0.005

# Epsilon-greedy
e_start = 1.0
e_end = 0.05
k_epsilon = -1 / T_iterations * np.log(e_end / e_start)


In [None]:
class DQN_CNN(nn.Module):
    def __init__(self):
        super(DQN_CNN, self).__init__()

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

         #w 82 - 7 + 2 * 3 / 2 + 1 = 41

         #w maxpool
         #w 41 / 2 = 20 out

         #h 136 - 7 + 2 * 3 / 2 + 1 = 68

        #h maxpool
        #h 68 / 2 = 34 out
        self.conv1 = nn.Conv2d(
            in_channels=frame_skip,
            out_channels=16,
            kernel_size=7,
            stride=2,
            padding=3,
        )


        #w maxpool
        #w 20 / 2 = 10 out

        #h maxpool
        #h 34 / 2 = 17 out
        self.conv2 = nn.Conv2d(
            in_channels=16,
            out_channels=32,
            kernel_size=3,
            stride=1,
            padding=1,
        )


        self.fc1 = nn.Linear(32 * 10 * 17, 128)
        self.fc2 = nn.Linear(128, 2)  # Assuming 2 actions: flap or not flap

        conv_layers = nn.Sequential(
            self.conv1,
            nn.ReLU(),
            self.pool,
            self.conv2,
            nn.ReLU(),
            self.pool,
        )

        linear_layers = nn.Sequential(
            self.fc1,
            nn.LayerNorm(128),
            nn.ReLU(),
            self.fc2    
        )

        self.layers = nn.Sequential(
            conv_layers,
            nn.Flatten(),
            linear_layers
        )

    def forward(self, x):
        return self.layers(x)





In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0

    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done

    def __len__(self):
        return len(self.buffer)

In [None]:

env = gym.make("FlappyBird-v0", render_mode="rgb_array")

policy_net = DQN_CNN().to(device)
target_net = DQN_CNN().to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=alpha)
memory = ReplayBuffer(replay_buffer_size)

global_step = 0
frame_id = 0
best_reward = -float("inf")

for episode in range(T_iterations):
    env.reset()

    # ---------- initialize frame stack ----------
    frame_stack = deque(maxlen=frame_skip)
    frame = preprocess_frame(env.render())

    Image.fromarray(frame).save(f"{PHOTOS_DIR}/frame_{frame_id:05d}.png")
    frame_id += 1

    for _ in range(frame_skip):
        frame_stack.append(frame)

    done = False
    episode_reward = 0

    while not done:
        global_step += 1

        # ---------- epsilon decay ----------
        epsilon = e_end + (e_start - e_end) * np.exp(-k_epsilon * global_step)

        state = np.stack(frame_stack, axis=0)  # (skip_frames, H, W)
        state_tensor = torch.tensor(
            state, dtype=torch.float32
        ).unsqueeze(0).to(device)
        #print("State tensor shape: ", state_tensor.shape)
        # ---------- epsilon-greedy ----------
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                q_values = policy_net(state_tensor)
                action = q_values.argmax(dim=1).item()

        # ---------- frame skipping ----------
        total_reward = 0
        for _ in range(frame_skip):
            _, reward, terminated, truncated, _ = env.step(action)
            total_reward += reward
            done = terminated or truncated
            if done:
                break

        # ---------- get next rendered frame ----------
        next_frame = preprocess_frame(env.render())
        frame_stack.append(next_frame)
        next_state = np.stack(frame_stack, axis=0)

        memory.push(state, action, total_reward, next_state, done)
        episode_reward += total_reward

        # ---------- learning ----------
        if len(memory) >= batch_size:
            states, actions, rewards, next_states, dones = memory.sample(batch_size)

            states = torch.tensor(states, dtype=torch.float32).to(device)
            actions = torch.tensor(actions, dtype=torch.long).unsqueeze(1).to(device)
            rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1).to(device)
            next_states = torch.tensor(next_states, dtype=torch.float32).to(device)
            dones = torch.tensor(dones, dtype=torch.float32).unsqueeze(1).to(device)

            q_values = policy_net(states).gather(1, actions)

            with torch.no_grad():
                next_q = target_net(next_states).max(1, keepdim=True)[0]
                target_q = rewards + gamma * next_q * (1 - dones)

            loss = nn.MSELoss()(q_values, target_q)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # ---------- soft target update ----------
            for target_param, policy_param in zip(
                target_net.parameters(), policy_net.parameters()
            ):
                target_param.data.copy_(
                    target_network_incorporation_rate * policy_param.data
                    + (1.0 - target_network_incorporation_rate) * target_param.data
                )
    best_reward = max(best_reward, episode_reward)
    print(
        f"Episode {episode} | Reward: {episode_reward:.2f} | Epsilon: {epsilon:.4f} | Best Reward: {best_reward:.2f}"
    )

env.close()


FileNotFoundError: [Errno 2] No such file or directory: 'photos/frame_00000.png'

In [None]:

def save_model(
    policy_net,
    target_net,
    optimizer,
    filepath,
    global_step=None,
    best_reward=None,
):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)

    checkpoint = {
        "policy_net_state_dict": policy_net.state_dict(),
        "target_net_state_dict": target_net.state_dict(),
        "optimizer_state_dict": optimizer.state_dict(),
        "global_step": global_step,
        "best_reward": best_reward,
    }

    torch.save(checkpoint, filepath)
    print(f"âœ… Model saved to {filepath}")


In [None]:
save_model(
    policy_net,
    target_net,
    optimizer,
    filepath="checkpoints/flappy_dqn.pt",
    global_step=global_step,
    best_reward=2,
)


âœ… Model saved to checkpoints/flappy_dqn.pt


In [None]:
def load_model(
    policy_net,
    target_net,
    optimizer,
    filepath,
    device,
):
    checkpoint = torch.load(filepath, map_location=device)

    policy_net.load_state_dict(checkpoint["policy_net_state_dict"])
    target_net.load_state_dict(checkpoint["target_net_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])

    policy_net.to(device)
    target_net.to(device)

    global_step = checkpoint.get("global_step", 0)
    best_reward = checkpoint.get("best_reward", None)

    print(f"âœ… Model loaded from {filepath}")

    return global_step, best_reward


In [None]:
policy_net = DQN_CNN().to(device)
target_net = DQN_CNN().to(device)
optimizer = optim.Adam(policy_net.parameters(), lr=alpha)

global_step, best_reward = load_model(
    policy_net,
    target_net,
    optimizer,
    filepath="checkpoints/flappy_dqn.pt",
    device=device,
)


âœ… Model loaded from checkpoints/flappy_dqn.pt


In [None]:
def record_trained_agent_video(
    model_path="checkpoints/flappy_dqn.pt",
    video_dir="videos",
    env_name="FlappyBird-v0",
    frame_skip=4,
    device="cpu",
):
    os.makedirs(video_dir, exist_ok=True)

    # ---------- environment with video recording ----------
    env = gym.make(env_name, render_mode="rgb_array")
    env = RecordVideo(
        env,
        video_folder=video_dir,
        episode_trigger=lambda episode_id: True,  # record first episode
        name_prefix="flappy_dqn",
    )

    # ---------- load model ----------
    policy_net = DQN_CNN().to(device)
    checkpoint = torch.load(model_path, map_location=device)
    policy_net.load_state_dict(checkpoint["policy_net_state_dict"])
    policy_net.eval()

    # ---------- reset ----------
    env.reset()
    frame_stack = deque(maxlen=frame_skip)

    frame = preprocess_frame(env.render())
    for _ in range(frame_skip):
        frame_stack.append(frame)

    done = False
    total_reward = 0

    # ---------- play episode ----------
    while not done:
        state = np.stack(frame_stack, axis=0)

        with torch.no_grad():
            state_t = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
            action = policy_net(state_t).argmax(dim=1).item()

        reward_sum = 0
        for _ in range(frame_skip):
            _, reward, terminated, truncated, _ = env.step(action)
            reward_sum += reward
            done = terminated or truncated
            if done:
                break

        frame = preprocess_frame(env.render())
        frame_stack.append(frame)
        total_reward += reward_sum

    env.close()
    print(f"ðŸŽ¥ Video saved in '{video_dir}/' | Reward: {total_reward:.2f}")




In [None]:
record_trained_agent_video(
    model_path=f"{CHECKPOINTS_DIR}/flappy_dqn_{hashlib.md5(random.randbytes(64)).hexdigest()}.pt",
    video_dir=VIDEOS_DIR,
    frame_skip=frame_skip,
    device=device,
)


SyntaxError: f-string: expecting '}' (1053249624.py, line 2)