In [1]:
import pygame
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from collections import deque

pygame 2.5.2 (SDL 2.28.3, Python 3.9.16)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.array, zip(*batch))
        return state, action, reward, next_state, done

In [3]:
def create_q_model(state_shape, action_space):
    inputs = layers.Input(shape=state_shape)
    layer1 = layers.Dense(64, activation='relu')(inputs)
    layer2 = layers.Dense(64, activation='relu')(layer1)
    action = layers.Dense(action_space, activation='linear')(layer2)
    return models.Model(inputs=inputs, outputs=action)

In [4]:
class DQNAgent:
    def __init__(self, state_shape, action_space):
        self.state_shape = state_shape
        self.action_space = action_space
        self.model = create_q_model(state_shape, action_space)
        self.target_model = create_q_model(state_shape, action_space)
        self.replay_buffer = ReplayBuffer(10000)
        self.epsilon = 1.0
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.995
        self.batch_size = 64
        self.gamma = 0.99
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
        self.loss_function = tf.keras.losses.MeanSquaredError()

    def update_target_network(self):
        self.target_model.set_weights(self.model.get_weights())

    def act(self, state):
        if np.random.rand() < self.epsilon:
            return random.randint(0, self.action_space - 1)
        state_tensor = tf.convert_to_tensor(state)
        state_tensor = tf.expand_dims(state_tensor, 0)
        action_probs = self.model(state_tensor, training=False)
        return tf.argmax(action_probs[0]).numpy()

    def replay(self):
        if len(self.replay_buffer.buffer) < self.batch_size:
            return
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
        future_rewards = self.target_model.predict(next_states)
        updated_q_values = rewards + self.gamma * tf.reduce_max(future_rewards, axis=1) * (1 - dones)
        masks = tf.one_hot(actions, self.action_space)
        with tf.GradientTape() as tape:
            q_values = self.model(states)
            q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
            loss = self.loss_function(updated_q_values, q_action)
        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

In [5]:
pygame.init()
SCREEN_WIDTH, SCREEN_HEIGHT = 800, 600
screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
FPS = 30
clock = pygame.time.Clock()


WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
RED = (255, 0, 0)
player_size = 50
player_velocity = 5
obstacle_width = 70
obstacle_height = 50
obstacle_speed = 5
obstacle_frequency = 20



state_shape = (4,)
action_space = 2
agent = DQNAgent(state_shape, action_space)


running = True
while running:
    player_x = SCREEN_WIDTH // 2 - player_size // 2
    obstacles = []
    score = 0
    total_steps = 0
    game_active = True

    while game_active:
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
                game_active = False

        if not running:
            break

        screen.fill(BLACK)
        closest_obstacle = min(obstacles, key=lambda x: x[1]) if obstacles else [0, 0, -1]
        state = np.array([player_x, closest_obstacle[0], closest_obstacle[1], obstacle_speed]) / SCREEN_WIDTH
        action = agent.act(state)


        if action == 0 and player_x > 0:
            player_x -= player_velocity
        elif action == 1 and player_x < SCREEN_WIDTH - player_size:
            player_x += player_velocity

        if random.randint(0, obstacle_frequency) == 1:
            obstacles.append([random.randint(0, SCREEN_WIDTH - obstacle_width), 0 - obstacle_height])

        collision = False
        for i in range(len(obstacles) - 1, -1, -1):
            obstacles[i][1] += obstacle_speed
            if obstacles[i][1] > SCREEN_HEIGHT:
                obstacles.pop(i)
                score += 1
            elif pygame.Rect(player_x, SCREEN_HEIGHT - player_size - 10, player_size, player_size).colliderect(pygame.Rect(obstacles[i][0], obstacles[i][1], obstacle_width, obstacle_height)):
                collision = True
                break

        player_rect = pygame.Rect(player_x, SCREEN_HEIGHT - player_size - 10, player_size, player_size)
        pygame.draw.rect(screen, WHITE, player_rect)
        for obstacle in obstacles:
            pygame.draw.rect(screen, RED, (obstacle[0], obstacle[1], obstacle_width, obstacle_height))

        pygame.display.flip()
        clock.tick(FPS)

        next_state = state
        reward = -10 if collision else 1
        done = collision

        agent.replay_buffer.add(state, action, reward, next_state, done)
        agent.replay()


        if total_steps % 20 == 0:
            agent.update_target_network()

        total_steps += 1

        if done:
            print(f"Game Over! Score: {score}")
            game_active = False
            
pygame.quit()

Game Over! Score: 3
Game Over! Score: 0
Game Over! Score: 16
Game Over! Score: 3
