In [88]:

# --- Kaggle / Headless environment fixes (inserted) ---
import os
# Use dummy SDL drivers so pygame works in headless Kaggle environment
os.environ.setdefault("SDL_VIDEODRIVER", "dummy")
os.environ.setdefault("SDL_AUDIODRIVER", "dummy")
# Put checkpoints in /kaggle/working for persistence on Kaggle
DEFAULT_CHECKPOINT_DIR = "/kaggle/working"
os.makedirs(DEFAULT_CHECKPOINT_DIR, exist_ok=True)
# Torch device selection (will use GPU on Kaggle if available)
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Helper to resolve checkpoint paths
def resolve_checkpoint_path(filename="checkpoint.pth"):
    return os.path.join(DEFAULT_CHECKPOINT_DIR, filename)
# End header


Using device: cuda


In [89]:

# Safe pygame display initialization (headless-friendly)
def safe_set_mode(*args, **kwargs):
    import pygame
    try:
        return safe_set_mode(*args, **kwargs)
    except Exception as e:
        try:
            # Try initializing display explicitly and retry
            pygame.display.init()
            return safe_set_mode(*args, **kwargs)
        except Exception as e2:
            print("Warning: pygame display not available (headless). Using dummy surface. Errors:", e, e2)
            # Return a dummy Surface of requested size if possible
            if len(args) >= 1 and isinstance(args[0], (tuple, list)):
                w,h = args[0][0], args[0][1]
                return pygame.Surface((w,h))
            return None


In [90]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import math

In [91]:
import os
os.environ["SDL_VIDEODRIVER"] = "dummy"  

In [92]:
EPI_NUMS = 4000
CHECKPOINT_PATH = resolve_checkpoint_path(resolve_checkpoint_path("checkpoint.pth"))
BUFFER_SIZE = 50000
BATCH_SIZE = 64
GAMMA = 0.99
LR = 1e-4
TAU = 0.02             
EPSILON_DECAY = 20000
EPSILON_START = 1.0
EPSILON_FINAL = 0.05
WARMUP_STEPS = 5000
MAX_STEPS = 2000

In [93]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.array, zip(*batch))
        return (
            torch.FloatTensor(state),
            torch.LongTensor(action),
            torch.FloatTensor(reward),
            torch.FloatTensor(next_state),
            torch.FloatTensor(done),
        )

    def __len__(self):
        return len(self.buffer)

In [94]:
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )

    def forward(self, x):
        return self.net(x)

In [95]:
class Agent:
    def __init__(self, state_dim=4, n_actions=2):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.n_actions = n_actions
        self.policy_net = DQN(state_dim, n_actions).to(self.device)
        self.target_net = DQN(state_dim, n_actions).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=LR)
        self.loss_fn = nn.SmoothL1Loss()

    def act(self, state, epsilon):
        if random.random() < epsilon:
            return random.randint(0, self.n_actions - 1)
        state_v = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        q_vals = self.policy_net(state_v)
        return q_vals.argmax().item()

    def update(self, buffer, batch_size):
        if len(buffer) < batch_size:
            return None
        state, action, reward, next_state, done = buffer.sample(batch_size)
        state, action, reward, next_state, done = (
            state.to(self.device),
            action.to(self.device),
            reward.to(self.device),
            next_state.to(self.device),
            done.to(self.device),
        )

        q_values = self.policy_net(state).gather(1, action.unsqueeze(1)).squeeze(1)
        next_q_values = self.target_net(next_state).max(1)[0]
        expected_q = reward + GAMMA * next_q_values * (1 - done)

        loss = self.loss_fn(q_values, expected_q.detach())

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Soft target update (Polyak averaging)
        for target_param, param in zip(self.target_net.parameters(), self.policy_net.parameters()):
            target_param.data.copy_(TAU * param.data + (1.0 - TAU) * target_param.data)

        return loss.item()

    def save(self, path):
        """Save policy, target networks and optimizer state"""
        checkpoint = {
            'policy_state_dict': self.policy_net.state_dict(),
            'target_state_dict': self.target_net.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
        }
        torch.save(checkpoint, path)
    
    def load(self, path):
        """Load checkpoint with safety checks"""
        checkpoint = torch.load(path, map_location=self.device)
    
        if 'policy_state_dict' in checkpoint:
            self.policy_net.load_state_dict(checkpoint['policy_state_dict'])
        else:
            raise KeyError("Checkpoint missing 'policy_state_dict'")
    
        if 'target_state_dict' in checkpoint:
            self.target_net.load_state_dict(checkpoint['target_state_dict'])
        else:
            # fallback: copy policy_net
            self.target_net.load_state_dict(self.policy_net.state_dict())
    
        if 'optimizer_state_dict' in checkpoint:
            self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        else:
            print("Warning: optimizer state not found in checkpoint, skipping.")


In [96]:

class FlappyBirdEnv:
    """Compact, headless environment optimized for training.
    Observation: [dy_norm, vel_norm, pipe_dist_norm, gap_y_norm]
    Action: 0 = noop, 1 = flap
    """
    def __init__(self, difficulty='normal', seed=None):
        if seed is not None:
            random.seed(seed)
            np.random.seed(seed)

        presets = {
            'easy':  {'PIPE_GAP':220,'PIPE_SPACING':300,'SCROLL_SPEED':2},
            'normal':{'PIPE_GAP':180,'PIPE_SPACING':280,'SCROLL_SPEED':3},
            'hard':  {'PIPE_GAP':150,'PIPE_SPACING':260,'SCROLL_SPEED':3}
        }
        if difficulty not in presets:
            raise ValueError('difficulty must be one of: %s' % list(presets.keys()))

        p = presets[difficulty]
        self.PIPE_GAP = p['PIPE_GAP']
        self.PIPE_SPACING = p['PIPE_SPACING']
        self.SCROLL_SPEED = p['SCROLL_SPEED']

        # Physics
        self.GRAVITY = 0.5
        self.FLAP_VEL = -9.0
        self.MAX_VEL = 12.0

        # Simple constants (no assets)
        self.SCREEN_WIDTH = 288
        self.SCREEN_HEIGHT = 512
        self.GROUND_HEIGHT = 112
        self.pipe_width = 52
        self.INIT_PIPE_OFFSET = 100
        self.MIN_GAP_Y = 50

        # Reward params
        self.LIVING_REWARD = 0.1
        self.SCORE_REWARD = 10.0
        self.DEATH_PENALTY = -7
        self.VERTICAL_WEIGHT = 0.3
        self.VELOCITY_WEIGHT = 0.1
        self.CENTER_BONUS_MULT = 5.0
        self.APPROACHING_THRESHOLD = 0.3
        self.APPROACHING_MULTIPLIER = 2.0

        self.reset()

    def reset(self):
        self.bird_x = 80
        self.bird_y = self.SCREEN_HEIGHT//2
        self.bird_vel = 0.0
        self.bird_radius = 12

        self.bg_x = 0
        self.base_x = 0

        self.pipes = []
        start_x = self.SCREEN_WIDTH + self.INIT_PIPE_OFFSET
        for i in range(3):
            gap_y = random.randint(self.MIN_GAP_Y, self.SCREEN_HEIGHT - self.MIN_GAP_Y - self.PIPE_GAP)
            self.pipes.append([start_x + i*self.PIPE_SPACING, gap_y])

        self.done = False
        self.score = 0
        self.scored_pipes = set()
        return self._get_state()

    def step(self, action):
        if action == 1:
            self.bird_vel = self.FLAP_VEL

        self.bird_vel += self.GRAVITY
        self.bird_vel = max(-self.MAX_VEL, min(self.MAX_VEL, self.bird_vel))
        self.bird_y += self.bird_vel

        # Move pipes
        for p in self.pipes:
            p[0] -= self.SCROLL_SPEED

        if len(self.pipes)>0 and (self.pipes[0][0] + self.pipe_width) < 0:
            self.pipes.pop(0)
            new_x = self.pipes[-1][0] + self.PIPE_SPACING
            new_gap_y = random.randint(self.MIN_GAP_Y, self.SCREEN_HEIGHT - self.MIN_GAP_Y - self.PIPE_GAP)
            self.pipes.append([new_x, new_gap_y])

        if self._check_collision():
            self.done = True
            return self._get_state(), self.DEATH_PENALTY, True, {}

        just_scored = False
        for pipe in self.pipes:
            px, gy = pipe
            if (px + self.pipe_width) < self.bird_x and id(pipe) not in self.scored_pipes:
                if gy < self.bird_y < gy + self.PIPE_GAP:
                    self.score += 1
                    just_scored = True
                self.scored_pipes.add(id(pipe))

        dy_norm, vel_norm, dx_norm = self._get_normalized_values()
        if just_scored:
            reward = self.SCORE_REWARD + max(0, (1.0 - abs(dy_norm)) * self.CENTER_BONUS_MULT)
        else:
            reward = self.LIVING_REWARD - abs(dy_norm)*self.VERTICAL_WEIGHT - abs(vel_norm)*self.VELOCITY_WEIGHT
            if dx_norm < self.APPROACHING_THRESHOLD:
                reward *= self.APPROACHING_MULTIPLIER

        return self._get_state(), reward, self.done, {}

    def _check_collision(self):
        GROUND_Y = self.SCREEN_HEIGHT - self.GROUND_HEIGHT
        if self.bird_y - self.bird_radius <= 0 or self.bird_y + self.bird_radius >= GROUND_Y:
            return True
        for px, gy in self.pipes:
            if (self.bird_x + self.bird_radius) > px and (self.bird_x - self.bird_radius) < (px + self.pipe_width):
                if self.bird_y - self.bird_radius < gy or self.bird_y + self.bird_radius > gy + self.PIPE_GAP:
                    return True
        return False

    def _get_next_pipe(self):
        for px, gy in self.pipes:
            if px + self.pipe_width >= self.bird_x:
                return px, gy
        return self.pipes[0]

    def _get_normalized_values(self):
        next_px, next_gy = self._get_next_pipe()
        gap_center = next_gy + self.PIPE_GAP/2
        dy_norm = (gap_center - self.bird_y)/float(self.PIPE_GAP)
        vel_norm = self.bird_vel/float(self.MAX_VEL)
        dx_norm = (next_px - self.bird_x)/float(self.SCREEN_WIDTH)
        return dy_norm, vel_norm, dx_norm

    def _get_state(self):
        next_px, next_gy = self._get_next_pipe()
        gap_center = next_gy + self.PIPE_GAP/2
        dy_norm = (gap_center - self.bird_y)/float(self.PIPE_GAP)
        vel_norm = self.bird_vel/float(self.MAX_VEL)
        pipe_dist_norm = (next_px - self.bird_x)/float(self.SCREEN_WIDTH)
        gap_y_norm = next_gy/float(self.SCREEN_HEIGHT)
        return np.array([dy_norm, vel_norm, pipe_dist_norm, gap_y_norm], dtype=np.float32)



In [97]:
def train_loop(num_episodes=EPI_NUMS, resume=False, difficulty="normal"):
    env = FlappyBirdEnv(difficulty=difficulty)
    agent = Agent()
    buffer = ReplayBuffer(BUFFER_SIZE)

    if resume and os.path.exists(CHECKPOINT_PATH):
        print("Loading checkpoint...")
        agent.load(CHECKPOINT_PATH)
        print("Loaded model.")

    total_steps = 0
    losses, all_scores = [], []

    # Warmup
    print(f"Collecting {WARMUP_STEPS} random transitions for warmup...")
    state = env.reset()
    for _ in range(WARMUP_STEPS):
        action = random.randint(0, 1)
        next_state, reward, done, _ = env.step(action)
        buffer.push(state, action, reward, next_state, float(done))
        state = env.reset() if done else next_state
    print(f"Warmup finished. Replay buffer size = {len(buffer)}")

    for ep in range(1, num_episodes + 1):
        state = env.reset()
        ep_reward, done = 0.0, False

        while not done:
            epsilon = EPSILON_FINAL + (EPSILON_START - EPSILON_FINAL) * max(0, (1 - total_steps / EPSILON_DECAY))
            action = agent.act(state, epsilon)
            next_state, reward, done, _ = env.step(action)
            buffer.push(state, action, reward, next_state, float(done))

            loss = agent.update(buffer, batch_size=BATCH_SIZE)
            if loss is not None:
                losses.append(loss)

            state = next_state
            ep_reward += reward
            total_steps += 1

        all_scores.append(env.score)
        if ep % 50 == 0:
            avg_score = np.mean(all_scores[-50:])
            avg_loss = np.mean(losses[-100:]) if len(losses) > 0 else 0.0
            print(f"Ep {ep:4d} | Steps {total_steps:6d} | Score {env.score:3d} | "
                  f"EpReward {ep_reward:.2f} | Eps {epsilon:.3f} | "
                  f"AvgScore50 {avg_score:.2f} | AvgLoss100 {avg_loss:.4f}")
            agent.save(CHECKPOINT_PATH)

    agent.save(CHECKPOINT_PATH)
    print("✅ Training finished. Model saved to", CHECKPOINT_PATH)

In [98]:
train_loop(num_episodes=EPI_NUMS, difficulty="normal", resume=False)

Collecting 5000 random transitions for warmup...
Warmup finished. Replay buffer size = 5000
Ep   50 | Steps   1626 | Score   0 | EpReward -10.04 | Eps 0.923 | AvgScore50 0.00 | AvgLoss100 0.0742
Ep  100 | Steps   3274 | Score   0 | EpReward -14.99 | Eps 0.845 | AvgScore50 0.00 | AvgLoss100 0.2067
Ep  150 | Steps   4938 | Score   0 | EpReward -10.35 | Eps 0.765 | AvgScore50 0.00 | AvgLoss100 0.1835
Ep  200 | Steps   6629 | Score   0 | EpReward -13.35 | Eps 0.685 | AvgScore50 0.00 | AvgLoss100 0.1104
Ep  250 | Steps   8396 | Score   0 | EpReward -9.26 | Eps 0.601 | AvgScore50 0.00 | AvgLoss100 0.0715
Ep  300 | Steps  10264 | Score   0 | EpReward -9.22 | Eps 0.513 | AvgScore50 0.00 | AvgLoss100 0.0550
Ep  350 | Steps  12108 | Score   0 | EpReward -9.03 | Eps 0.425 | AvgScore50 0.00 | AvgLoss100 0.0566
Ep  400 | Steps  14209 | Score   0 | EpReward -13.54 | Eps 0.325 | AvgScore50 0.00 | AvgLoss100 0.0360
Ep  450 | Steps  16723 | Score   0 | EpReward -14.46 | Eps 0.206 | AvgScore50 0.00 | Av