In [None]:
%pip install pygame

In [None]:
from dataclasses import dataclass

import numpy as np

import pygame
import random
import keras


SCREEN_HEIGHT, SCREEN_WIDTH = (160, 192)
SCREEN_CENTER_X, SCREEN_CENTER_Y = SCREEN_WIDTH // 2, SCREEN_HEIGHT // 2
SCREEN_FACTOR = 4
WINDOW_TITLE = 'DeepPong'
WINDOW_ICON_FILEPATH = 'assets/favicon.ico'

SCORE_TO_WIN = 9
SCORE_PADDING = 10
SCORE_SIZE = 24
SCORE_FONT_FILEPATH = 'assets/font.ttf'
SCORE_SOUND_FILEPATH = 'assets/score.wav'
BOUNCE_SOUND_FILEPATH = 'assets/bounce.wav'

DASH_LENGTH = 4
DASH_WIDTH = 2
GAP_LENGTH = 8
GOAL_PADDING = 8

PADDLE_HEIGHT, PADDLE_WIDTH = (16, 2)
PADDLE_SPEED = 60

BALL_SIZE = 2
BALL_SPEED = 60

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)

SOUND_VOLUME = 0.25
MAX_FPS = 30
FPS_PADDING = 10
FPS_SIZE = 16
IDEAL_DT = 1 / MAX_FPS
MAX_DT = IDEAL_DT * 1.5
MILLISECONDS_PER_SECOND = 1000

DENSE_MODEL_FILEPATH = 'dense.keras'
RNN_MODEL_FILEPATH = 'rnn.keras'
CNN_MODEL_FILEPATH = 'cnn.keras'
DATA_DIRECTORY_NAME = 'data'
SPREADSHEET_FILEPATH = f'{DATA_DIRECTORY_NAME}/data.csv'

SCORE_COMBINATIONS = [(p1, p2) for p1 in range(SCORE_TO_WIN + 1) for p2 in range(SCORE_TO_WIN + 1) if not (p1 == SCORE_TO_WIN and p2 == SCORE_TO_WIN)]
STATES_PER_SCORE = 3
TIMESTEPS_PER_STATE = 2
FEATURES_PER_STATE = 10

RANDOM_SEED = 42


def clamp(x: float, low: float, high: float):
    '''Clamps x to the range [low, high].'''
    return max(low, min(x, high))


def colliding(x1: float, y1: float, w1: float, h1: float, x2: float, y2: float, w2: float, h2: float) -> bool:
    '''Determines if two rectangles are overlapping based on the top left coordinate and their dimensions.'''
    return not (x1 + w1 <= x2 or x1 >= x2 + w2 or y1 - h1 >= y2 or y1 <= y2 - h2)


@dataclass(eq=False)
class Paddle:
    x: float
    y: float
    up: int
    down: int
    score: int = 0


class Ball:
    def __init__(self):
        self.x = SCREEN_CENTER_X - BALL_SIZE / 2
        self.y = SCREEN_CENTER_Y + BALL_SIZE / 2
        self.vx = BALL_SPEED * (-1) ** random.randint(0, 1)
        self.vy = BALL_SPEED * random.uniform(-1, 1)


class Pong:
    def __init__(self, dataset_generation_mode=False):
        if not pygame.get_init():
            pygame.init()

        self.score_font = pygame.font.Font(SCORE_FONT_FILEPATH, FPS_SIZE)
        
        self.current_renderer = 0
        self.renderers = [RasterizedRenderer(self)]
        self.sounds = []

        if not dataset_generation_mode:
            self.renderers.extend([DenseModelRenderer(self, DENSE_MODEL_FILEPATH), RecurrentRenderer(self, RNN_MODEL_FILEPATH), ConvolutionalRenderer(self, CNN_MODEL_FILEPATH)])
            self.bounce_sound = pygame.mixer.Sound(BOUNCE_SOUND_FILEPATH)
            self.score_sound = pygame.mixer.Sound(SCORE_SOUND_FILEPATH)
            self.sounds = [self.bounce_sound, self.score_sound]
            self.mute()
        else:
            self.bounce_sound = None
            self.score_sound = None

        self.clock = pygame.time.Clock()        
        self.restart()

    def mute(self):
        '''Toggles sound for the game.'''
        for sound in self.sounds:
            volume = 0 if sound.get_volume() > 0 else SOUND_VOLUME
            sound.set_volume(volume)

    def update(self, dt, overrides={}):
        '''Updates the positions of all the game objects based on the amount of time that has passed since last frame (dt).'''
        if self.paused:
            return

        keys = pygame.key.get_pressed()

        # Move the paddles.
        for paddle in self.paddles:
            input = overrides.get(paddle.up, keys[paddle.up]) - overrides.get(paddle.down, keys[paddle.down])
            paddle.y += input * PADDLE_SPEED * dt
            paddle.y = clamp(paddle.y, PADDLE_HEIGHT, SCREEN_HEIGHT)

        # Update the ball's position.
        self.ball.y += self.ball.vy * dt
        self.ball.x += self.ball.vx * dt

        bounced = False
        scored = False

        # Vertical bounce.
        if self.ball.y >= SCREEN_HEIGHT:
            self.ball.y = SCREEN_HEIGHT - (self.ball.y - SCREEN_HEIGHT)
            self.ball.vy *= -1
            bounced = True
        elif self.ball.y - BALL_SIZE <= 0:
            self.ball.y += abs(self.ball.y - BALL_SIZE)
            self.ball.vy *= -1
            bounced = True

        # Check for goals if the game is in progress.
        if self.playing:
            for paddle in self.paddles:
                if colliding(paddle.x, paddle.y, PADDLE_WIDTH, PADDLE_HEIGHT, self.ball.x, self.ball.y, BALL_SIZE, BALL_SIZE):
                    bounced = True
                    self.ball.vx *= -1

                    if paddle == self.p1:
                        self.ball.x += paddle.x + PADDLE_WIDTH - self.ball.x
                    else:
                        self.ball.x -= self.ball.x + BALL_SIZE - paddle.x

                    offset = (self.ball.y - BALL_SIZE / 2) - (paddle.y - PADDLE_HEIGHT / 2)
                    standardized = clamp(offset / (PADDLE_HEIGHT / 2), -1, 1)
                    self.ball.vy = BALL_SPEED * standardized

            # Check to see if someone scored.
            if self.ball.x <= 0:
                scored = True
                self.p2.score += 1
                self.ball = Ball()
                self.ball.vx = abs(self.ball.vx)
            elif self.ball.x + BALL_SIZE >= SCREEN_WIDTH:
                scored = True
                self.p1.score += 1
                self.ball = Ball()
                self.ball.vx = -abs(self.ball.vx)

        # Otherwise the game is over: let the ball bounce freely against the goals without the paddles.
        elif self.ball.x <= 0:
            bounced = True
            self.ball.vx *= -1
            self.ball.x = abs(self.ball.x)
        elif self.ball.x + BALL_SIZE >= SCREEN_WIDTH:
            bounced = True
            self.ball.vx *= -1
            self.ball.x = SCREEN_WIDTH - (self.ball.x + BALL_SIZE - SCREEN_WIDTH)

        if bounced and self.bounce_sound is not None:
            self.bounce_sound.play()

        if scored and self.score_sound is not None:
            self.score_sound.play()

        # Check to see if the game is over (every frame to make generating the dataset slightly easier).
        if max(self.p1.score, self.p2.score) == SCORE_TO_WIN:
            self.playing = False

    def restart(self):
        '''Restarts the game entirely.'''
        self.ball = Ball()
        self.p1 = Paddle(GOAL_PADDING, (SCREEN_HEIGHT + PADDLE_HEIGHT) / 2, up=pygame.K_w, down=pygame.K_s)
        self.p2 = Paddle(SCREEN_WIDTH - (GOAL_PADDING + PADDLE_WIDTH), (SCREEN_HEIGHT + PADDLE_HEIGHT) / 2, up=pygame.K_UP, down=pygame.K_DOWN)
        self.paddles = [self.p1, self.p2]
        self.playing = True
        self.paused = True
        self.show_fps = False

    def show(self):
        '''Reveals the game in a desktop window.'''
        pygame.display.set_caption(WINDOW_TITLE)
        icon = pygame.image.load(WINDOW_ICON_FILEPATH)
        pygame.display.set_icon(icon)
        self.screen = pygame.display.set_mode((SCREEN_WIDTH * SCREEN_FACTOR, SCREEN_HEIGHT * SCREEN_FACTOR))

    def refresh(self):
        '''Renders and displays the next frame.'''
        renderer = self.renderers[self.current_renderer]
        surface = renderer.render()
        surface = pygame.transform.scale(surface, self.screen.get_size())
        self.screen.blit(surface, (0, 0))
        
        if self.show_fps:
            fps = self.score_font.render(str(round(self.clock.get_fps())), False, WHITE)
            self.screen.blit(fps, (FPS_PADDING, FPS_PADDING))

        pygame.display.flip()

    def capture(self, screenshot=True):
        '''Returns the normalized game state and current frame.'''
        state = np.array([
            self.p1.x / SCREEN_WIDTH, self.p1.y / SCREEN_HEIGHT,
            self.p2.x / SCREEN_WIDTH, self.p2.y / SCREEN_HEIGHT,
            self.ball.x / SCREEN_WIDTH, self.ball.y / SCREEN_HEIGHT, self.ball.vx / BALL_SPEED, self.ball.vy / BALL_SPEED,
            self.p1.score / SCORE_TO_WIN, self.p2.score / SCORE_TO_WIN,
        ])

        return (state, self.renderers[0].render().copy()) if screenshot else state

    def tick(self):
        '''Ticks the in-game clock.'''
        milliseconds = self.clock.tick(MAX_FPS)
        dt = milliseconds / MILLISECONDS_PER_SECOND
        return clamp(dt, 0, MAX_DT)

    def run(self):
        '''Runs the game.'''
        pygame.display.set_caption(WINDOW_TITLE)
        pygame.display.set_icon(pygame.image.load(WINDOW_ICON_FILEPATH))
        self.screen = pygame.display.set_mode((SCREEN_WIDTH * SCREEN_FACTOR, SCREEN_HEIGHT * SCREEN_FACTOR))

        running = True
        dt = 0.0

        while running:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    running = False
                elif event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_ESCAPE:
                        running = False
                    elif event.key == pygame.K_SPACE:
                        self.paused = not self.paused
                    elif event.key == pygame.K_r:
                        self.restart()
                    elif event.key == pygame.K_m:
                        self.mute()
                    elif pygame.K_1 <= event.key < pygame.K_1 + len(self.renderers):
                        self.current_renderer = event.key - pygame.K_1
                    elif event.key == pygame.K_f:
                        self.show_fps = not self.show_fps

            self.update(dt)
            self.refresh()

            dt = self.tick()

        pygame.quit()


class Renderer:
    def __init__(self, pong):
        self.pong = pong
        self.frame = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT))

    def render(self):
        raise NotImplementedError


class RasterizedRenderer(Renderer):
    def __init__(self, pong):
        super().__init__(pong)
        self.font = pygame.font.Font(SCORE_FONT_FILEPATH, SCORE_SIZE)

    def render(self, copy=False):
        self.frame.fill(BLACK)

        # Draw the left paddle's score.
        score = self.font.render(str(self.pong.p1.score), False, WHITE)
        self.frame.blit(score, ((SCREEN_WIDTH // 4) - (score.get_width() // 2), SCORE_PADDING))

        # Draw the right paddle's score.
        score = self.font.render(str(self.pong.p2.score), False, WHITE)
        self.frame.blit(score, ((3 * SCREEN_WIDTH // 4) - (score.get_width() // 2), SCORE_PADDING))

        # Draw the paddles.
        if self.pong.playing:  # Only if the game hasn't ended.
            for paddle in self.pong.paddles:
                rectangle = pygame.Rect(paddle.x, SCREEN_HEIGHT - paddle.y, PADDLE_WIDTH, PADDLE_HEIGHT)
                pygame.draw.rect(self.frame, WHITE, rectangle)

        # Draw the ball.
        square = pygame.Rect(self.pong.ball.x, SCREEN_HEIGHT - self.pong.ball.y, BALL_SIZE, BALL_SIZE)
        pygame.draw.rect(self.frame, WHITE, square)

        # Draw the center divider.
        for y in range(0, SCREEN_HEIGHT, DASH_LENGTH + GAP_LENGTH):
            dash = pygame.Rect(SCREEN_CENTER_X - DASH_WIDTH / 2, y, DASH_WIDTH, DASH_LENGTH)
            pygame.draw.rect(self.frame, WHITE, dash)

        return self.frame.copy() if copy else self.frame


class DeepLearningRenderer(Renderer):
    def __init__(self, pong, model):
        super().__init__(pong)
        self.model = keras.models.load_model(model)

    def render(self):
        frame = self.predict()
        frame = frame.reshape((SCREEN_HEIGHT, SCREEN_WIDTH))
        frame = (frame * 255).astype(np.uint8)
        frame = np.stack([frame] * 3, axis=-1)
        surface = pygame.surfarray.make_surface(frame.swapaxes(0, 1))
        return surface

    def predict(self):
        raise NotImplementedError


class DenseModelRenderer(DeepLearningRenderer):
    def predict(self):
        state = self.pong.capture(screenshot=False).reshape(1, -1)
        return self.model.predict(state, verbose=0)


class RecurrentRenderer(DeepLearningRenderer):
    def __init__(self, pong, model):
        super().__init__(pong, model)
        self.states = np.zeros((1, TIMESTEPS_PER_STATE, FEATURES_PER_STATE))

    def predict(self):
        state = self.pong.capture(screenshot=False).reshape(1, -1)
        self.states[0][:-1] = self.states[0][1:]
        self.states[0][-1] = state
        return self.model.predict(self.states, verbose=0)


class ConvolutionalRenderer(DeepLearningRenderer):
    def __init__(self, pong, model):
        super().__init__(pong, model)
        self.states = np.zeros((1, TIMESTEPS_PER_STATE, FEATURES_PER_STATE))

    def predict(self):
        state = self.pong.capture(screenshot=False).reshape(1, -1)
        self.states[0][:-1] = self.states[0][1:]
        self.states[0][-1] = state
        input = self.states.reshape(1, TIMESTEPS_PER_STATE, FEATURES_PER_STATE, 1)
        return self.model.predict(input, verbose=0)


In [None]:
import os
import pandas as pd
import tensorflow as tf


def generate_dataset(save_to_disk=False):
    pong = Pong(dataset_generation_mode=True)
    df = pd.DataFrame(columns=['left_x', 'left_y', 'right_x', 'right_y', 'ball_x', 'ball_y', 'ball_vx', 'ball_vy', 'left_score', 'right_score'])

    screenshots = []
    i = 0

    # Make sure the image directory exists.
    if save_to_disk:
        os.makedirs(DATA_DIRECTORY_NAME, exist_ok=True)

    for p1_score, p2_score in SCORE_COMBINATIONS:
        for state in range(STATES_PER_SCORE):
            pong.restart()  # Start a new phase of the game.

            # Generate a random state.
            pong.p1.y = random.uniform(PADDLE_HEIGHT, SCREEN_HEIGHT)
            pong.p1.score = p1_score

            pong.p2.y = random.uniform(PADDLE_HEIGHT, SCREEN_HEIGHT)
            pong.p2.score = p2_score

            pong.ball.x = random.uniform(GOAL_PADDING, SCREEN_WIDTH - GOAL_PADDING)
            pong.ball.y = random.uniform(BALL_SIZE, SCREEN_HEIGHT)

            overrides = {}
            pong.paused = False

            # Simulate the game for a couple timesteps.
            for _ in range(TIMESTEPS_PER_STATE):
                pong.update(IDEAL_DT, overrides)
                state, screenshot = pong.capture()

                if save_to_disk:
                    pygame.image.save(screenshot, f'{DATA_DIRECTORY_NAME}/{i}.png')

                # Create a new record.
                df.loc[len(df)] = state
                screenshot = pygame.surfarray.array3d(screenshot).astype(np.uint8)
                screenshot = np.transpose(screenshot, (1, 0, 2))  # (height, width, 3)
                screenshot = screenshot[..., 0] / 255.0 # or 1 or 2, since R = G = B
                screenshot = screenshot.reshape(-1)
                screenshots.append(screenshot)

                # Simulate player input that simply chases the ball.
                overrides = {}

                for paddle in pong.paddles:                
                    if pong.ball.y >= paddle.y :  # Ball is above the paddle, move up.
                        overrides[paddle.up] = True
                    elif pong.ball.y - BALL_SIZE <= paddle.y - PADDLE_HEIGHT:  # Ball is below the paddle, move down
                        overrides[paddle.down] = True

                i += 1

    if save_to_disk:
        df.to_csv(SPREADSHEET_FILEPATH, index=False, float_format='%.20f')

    # Convert to numpy arrays.
    states = df.to_numpy()
    screenshots = np.stack(screenshots)

    return states, screenshots


def load_dataset():
    df = pd.read_csv(SPREADSHEET_FILEPATH)  # Read in the state data.

    # Load in all the screenshots.
    screenshots = []

    for i, _ in df.iterrows():
        path = f'{DATA_DIRECTORY_NAME}/{i}.png'
        screenshot = tf.io.read_file(path)
        screenshot = tf.image.decode_png(screenshot, channels=1).numpy() / 255
        screenshot = tf.squeeze(screenshot, axis=-1)
        screenshot = tf.reshape(screenshot, [-1])
        screenshots.append(screenshot)

    # Convert to numpy arrays.
    states = df.to_numpy()
    screenshots = np.stack(screenshots)

    return states, screenshots

In [None]:
from itertools import product


def generate_configs(options):
    keys = options.keys()
    values = options.values()
    return [dict(zip(keys, combo)) for combo in product(*values)]


def stringify(config):
    return '__'.join([str(option).replace(' ','') for option in config.values()])

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense


PIXELS = SCREEN_WIDTH * SCREEN_HEIGHT


def generate_dense_models(options):
    models = []

    for config in generate_configs(options):
        layers = []

        # Add the hidden layers.
        for neurons in config['neurons']:
            layers.append(Dense(neurons, activation=config['activation']))

        # Add the output layer.
        layers.append(Dense(PIXELS, activation='sigmoid'))

        model = Sequential(layers)
        model.compile(optimizer=config['optimizer'], loss='binary_crossentropy')
        name = 'dense__' + stringify(config)

        models.append((model, name))

    return models

In [None]:
import time

from sklearn import metrics
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tqdm.notebook import tqdm


def train_and_evaulate(models, X_train, y_train, X_test, y_test):
    results = []

    for model, name in tqdm(models, desc='Training Models'):
        model_filepath = f'training/{name}.keras'
        checkpointer = ModelCheckpoint(filepath=model_filepath, verbose=0, save_best_only=True)
        stopper = EarlyStopping(patience=10, verbose=1)
        
        # Train the model.
        model.fit(X_train, y_train, epochs=10, verbose=0, validation_data=(X_test, y_test), callbacks=[checkpointer, stopper])
        model.load_weights(model_filepath)  # Swap to the vest version.
        
        # Evaluate the model's performance.
        predictions = model.predict(X_test, verbose=0)
        predictions = np.round(predictions)
        score = metrics.f1_score(predictions, y_test, average="weighted", zero_division=0)
        
        # Measure the model's latency.
        start = time.time()
        _ = model.predict(X_train[:1], verbose=0)
        
        for i in range(10):
            _ = model.predict(X_train[i:i+1], verbose=0)
        
        latency = (time.time() - start) / 10
        
        results.append((name, score, latency))
    
    return results

In [None]:
X, y = generate_dataset()

In [None]:
from sklearn.model_selection import train_test_split


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_SEED)

dense_options = {
    'neurons': [[], [128], [128, 256], [128, 256, 512], [128, 256, 512, 1024], [256, 512, 1024, 2048], [128, 256, 512, 1024, 4096]],
    'activation': [None, 'relu', 'tanh', 'sigmoid'],
    'optimizer': ['adam', 'adamw', 'sgd']
}

dense_models = generate_dense_models(dense_options)
results = train_and_evaulate(dense_models, X_train, y_train, X_test, y_test)

results.sort(key=lambda result: result[1])
best_loss_model, worst_loss_model = results[0], results[-1]

In [None]:
import shutil


def select_model(model, destination):
    source = f'training/{model[0]}.keras'
    shutil.copy2(source, destination)

In [None]:
select_model(best_loss_model, DENSE_MODEL_FILEPATH)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM


def generate_rnn_models(options):
    models = []

    for config in generate_configs(options):
        layers = []

        for neurons in config['bottom_layers']:
            layers.append(LSTM(neurons, activation='tanh', dropout=0.1, return_sequences=True))
        
        layers.append(LSTM(64, activation='tanh', dropout=0.1))

        for neurons in config['top_layers']:
            layers.append(Dense(neurons, activation='relu'))

        layers.append(Dense(PIXELS, activation='sigmoid'))

        model = Sequential(layers)
        model.compile(optimizer='adamw', loss='binary_crossentropy')
        name = 'rnn__' + stringify(config)

        models.append((model, name))

    return models

In [None]:
samples = STATES_PER_SCORE * len(SCORE_COMBINATIONS)
timesteps = TIMESTEPS_PER_STATE
features = FEATURES_PER_STATE

X = X.reshape(samples, timesteps, features)
y = y.reshape(samples, timesteps, PIXELS)[:, -1, :]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_SEED)

In [None]:
rnn_options = {
    'bottom_layers': [[64]],
    'top_layers': [[], [128, 512]],
}

rnn_models = generate_rnn_models(rnn_options)
results = train_and_evaulate(rnn_models, X_train, y_train, X_test, y_test)

results.sort(key=lambda result: result[1])
best_loss_model, worst_loss_model = results[0], results[-1]

In [None]:
select_model(best_loss_model, RNN_MODEL_FILEPATH)

In [None]:
samples = STATES_PER_SCORE * len(SCORE_COMBINATIONS)
timesteps = TIMESTEPS_PER_STATE
features = FEATURES_PER_STATE

X = X.reshape(samples, timesteps, features, 1)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_SEED)

In [None]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten


def generate_cnn_models(options):
    models = []

    for config in generate_configs(options):
        layers = []

        for neurons in config['bottom_layers']:
            layers.append(Conv2D(neurons, (3, 3), activation='relu', padding='same'))
            layers.append(MaxPooling2D(pool_size=(2, 2), padding='same'))

        layers.append(Flatten())

        for neurons in config['top_layers']:
            layers.append(Dense(neurons, activation='relu'))

        layers.append(Dense(PIXELS, activation='sigmoid'))

        model = Sequential(layers)
        model.compile(optimizer='adamw', loss='binary_crossentropy')
        name = 'rnn__' + stringify(config)

        models.append((model, name))

    return models

In [None]:
cnn_options = {
    'bottom_layers': [[64]],
    'top_layers': [[], [128, 512]],
}

cnn_models = generate_cnn_models(cnn_options)
results = train_and_evaulate(cnn_models, X_train, y_train, X_test, y_test)

results.sort(key=lambda result: result[1])
best_loss_model, worst_loss_model = results[0], results[-1]

In [None]:
select_model(best_loss_model, CNN_MODEL_FILEPATH)

In [None]:
import traceback


try:
    pong = Pong()
    pong.show()
    pong.run()
except Exception as e:
    traceback.print_exc()