In [None]:
from IPython.display import clear_output, Image, display, HTML
from collections import deque
from typing import Deque, Dict, List, Tuple
%matplotlib inline
import matplotlib.pyplot as plt
import pygame as pygame
import numpy as np
import asyncio
import nest_asyncio
import random
import cv2
import os
import gc
import json
nest_asyncio.apply()

In [None]:
os.environ["SDL_VIDEODRIVER"] = "dummy"
XRES = 800
YRES = 600
FACTOR = 8
XRES_SCALED = int(XRES / FACTOR)
YRES_SCALED = int(YRES / FACTOR)
TICKS_REF = 80
ACTION_SIZE = 4
FRAME_SKIP = 3
SCREEN = pygame.display.set_mode((XRES, YRES))
STATS_FILE = 'stats_RANDOM.json'

In [None]:
from game_v2 import SpaceInvaders

In [None]:
class Agent():
    def __init__(self, action_size):
        self.action_size = action_size
        self.frame_skip = FRAME_SKIP
        self.epsilon = 1.0

    def act(self, state):
        return random.randrange(self.action_size)

    def play(self, game, episode, max_episodes=1000, stats=None):
        # Retrieve action to play
        state = game.get_state()
        action = self.act(state)

        # Step game with frame skipping
        reward = 0
        for _ in range(self.frame_skip):
            next_state, r, done = game.step(action=action)
            reward += r
            if done:
                break

        if stats is not None:
            stats['rewards'].append(reward)
        if done:
            if stats is not None:
                stats['scores'].append(game.score)
                stats['episode'].append(episode)

        return done

In [None]:
agent = Agent(ACTION_SIZE)
game = SpaceInvaders(SCREEN, agent, XRES_SCALED, YRES_SCALED, TICKS_REF)
stats = dict({
    'scores': [],
    'rewards': [],
    'episode': [],
})
episode_count = 0

In [None]:
async def update_screen(display_handle, screen, stats, fig, ax):
    def batch_mean(x, w):
      s_len = len(x) - len(x) % w
      s = x[0:s_len]
      s_x = np.array(np.array_split(s, len(s) // w, axis=0))
      avg = np.mean(s_x, axis=1)
      s_len = len(avg)
      x_ticks = range(w, (s_len+ 1) * w, w)
      return avg, s_len, x_ticks

    def running_mean(x, w):
      avg = np.convolve(x, np.ones(w), 'valid') / w
      s_len = len(avg)
      return avg, s_len, range(0, s_len, 1)

    view = pygame.surfarray.array3d(screen)

    #  convert from (width, height, channel) to (height, width, channel)
    scores = np.array(stats['scores'])
    view = view.transpose([1, 0, 2])
    img_bgr = cv2.cvtColor(view, cv2.COLOR_RGB2BGR)
    img_bgr = cv2.putText(img_bgr,
        str(np.round(np.mean(scores), decimals=2)) + "/" +
        str(np.max(scores, initial=0)) + "/" +
        str(len(scores)) + "/" +
        str(agent.epsilon),
        (0, YRES-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,0,255), 2)
    img_bgr = cv2.resize(img_bgr, (500, 400), None)

    plot = False
    if len(scores) > 100 * 2:
        avg, s_len, x_ticks = batch_mean(scores, 100)
        plot = True
    elif len(scores) > 0:
        avg, s_len, x_ticks = running_mean(scores, 10)
        plot = True
    
    if plot:
        z = np.zeros_like(avg)
        if len(avg) > 1:
            z = np.polyfit(range(s_len), avg, 1)
        p = np.poly1d(z)

        ax.plot(x_ticks, avg)
        ax.plot(x_ticks, p(range(s_len)))

    fig.suptitle("Average Score")

    # convert canvas to image
    fig.canvas.draw()
    fig.canvas.flush_events()
    img_sts = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
    img_sts = img_sts.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    ax.cla()

    # img is rgb, convert to opencv's default bgr
    img_sts = cv2.cvtColor(img_sts, cv2.COLOR_RGB2BGR)
    img_sts = cv2.resize(img_sts, (500, 400), None)
    img = np.hstack((img_bgr, img_sts))
    ret, img_data = cv2.imencode(".jpg", img)
    assert ret
    clear_output(wait=True)
    display_handle.display(Image(data=img_data.tobytes()))
    del img_data

In [None]:
def run_game(game, agent, stats=None, episodes=-1, train=True, display_res=10, display_fig=True):
    display_handle = display(None, display_id=True)
    fig, ax = plt.subplots(figsize=(10, 8))
    plt.ion()

    display_cnt = 0
    episode = 0
    agent.training = train
    while episode <= episodes:
        episode += 1
        game.reset(0)
        game.start()
        done = False
        while not done:
            done = agent.play(game, episode, stats=stats)
            display_cnt += 1
            if display_fig and (done or display_cnt % display_res == 0):
                asyncio.run(update_screen(display_handle, SCREEN, stats, fig, ax))

        # Save model and stats if needed
        if stats is not None:
            with open(STATS_FILE, 'w') as fout:
                json.dump(stats, fout)

        # Cleanup memory
        gc.collect()

In [None]:
# To restore saved state
if os.path.exists(STATS_FILE):
    with open(STATS_FILE, "r") as f:
        stats = json.load(f)
    print("restoring stats to: " + STATS_FILE)

In [None]:
# For Training
run_game(game, agent, stats=stats, episodes=5000, train=True, display_res=20, display_fig=True)

In [None]:
# For Visualizing
run_game(game, agent, stats=None, episodes=5, train=False, display_res=1, display_fig=True)

### References

Title: Deep Reinforcement Learning to play Space Invaders
Link: https://nihit.github.io/resources/spaceinvaders.pdf
Type: Paper

Title: Frame Skipping and Pre-Processing for Deep Q-Networks on Atari 2600 Games
Link: https://danieltakeshi.github.io/2016/11/25/frame-skipping-and-preprocessing-for-deep-q-networks-on-atari-2600-games/
Type: Blog Post

Title: GuarDiqN: Playing Space Invader with Dueling Double Deep Q-Network
Link: https://github.com/lychengrex/Playing-Space-Invaders-with-Deep-Q-Networks/tree/master
Type: Source Code

Title: Improving the DQN algorithm using Double Q-Learning
Link: https://davidrpugh.github.io/stochastic-expatriate-descent/pytorch/deep-reinforcement-learning/deep-q-networks/2020/04/11/double-dqn.html
Type: Blog Post

Title: Double Q-learning
Link: https://proceedings.neurips.cc/paper_files/paper/2010/file/091d584fced301b442654dd8c23b3fc9-Paper.pdf
Type: Paper

Title: Deep reinforcement learning with pixel features in Atari Pong Game
Link: https://github.com/gznyyb/deep_reinforcement_learning_Pong/tree/master
Type: Source Code

Title: TRAIN A MARIO-PLAYING RL AGENT
Link: https://pytorch.org/tutorials/intermediate/mario_rl_tutorial.html
Type: Blog Post

Title: Playing Super Mario Bros with Deep Reinforcement Learning
Link: https://www.analyticsvidhya.com/blog/2021/06/playing-super-mario-bros-with-deep-reinforcement-learning/
Type: Blog Post

Title: Deep Reinforcement Learning with Space Invaders… Literally from scratch
Link: https://yaassinekaddi.medium.com/deep-reinforcement-learning-with-space-invaders-literally-from-scratch-part-i-10905fe04846
Type: Blog Post

Title: Asynchronous Methods for Deep Reinforcement Learning
Link: https://arxiv.org/pdf/1602.01783.pdf
Type: Paper

Title: Reinforcement-Learning-Atari-Games
Link: https://github.com/NitishMutha/Reinforcement-Learning-Atari-Games/tree/master
Type: Source Code

Title: Practical Reinforcement Learning — 02 Getting started with Q-learning
Link: https://towardsdatascience.com/practical-reinforcement-learning-02-getting-started-with-q-learning-582f63e4acd9
Type: Blog Post

Title: Playing Atari with Deep Reinforcement Learning
Link: https://www.cs.toronto.edu/~vmnih/docs/dqn.pdf
Type: Paper

Title: RL
Link: https://github.com/msachin93/RL
Type: Source Code

Title: Space Invaders challenge: a Reinforcement Learning competition
Link: https://wandb.ai/raghmura/qualcomm/reports/Space-Invaders-challenge-a-Reinforcement-Learning-competition--Vmlldzo5MzEzMg
Type: Blog Post