<a href="https://colab.research.google.com/github/GokulNC/Helicopter-Game-Reinforcement-Learning/blob/master/Train_Helicopter_DQN_RL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Step-8: The main runner code

In [0]:
import random, pygame, signal, time
import numpy as np
from tensorflow.python.keras import backend as K

EPISODES = 15000
starting_episode = 1
updateTargetNetwork = 1000
currentIteration = 0
stop_flow = False


def sigint_handler(signum, frame):
    global stop_flow
    print('Going to stop the flow after the current episode terminates...')
    stop_flow = True
    
# To capture Ctrl-C events and stop gracefully
# Source: https://pythonadventures.wordpress.com/2012/11/21/handle-ctrlc-in-your-script/
signal.signal(signal.SIGINT, sigint_handler)

# These dumps can be read by plot*.py and display the rewards/loss curve
f = open('/tmp/dumps.txt', 'w')

if __name__ == "__main__":
    tf.keras.backend.clear_session()
    game = Pixelcopter(512, 512)
    p = PLE(game, fps=20, force_fps=True, display_screen=False)
    p.init()
    state_size = 7 # TODO: Don't hardcode
    action_size = 2 # Up and No-op
    agent = DQNAgent(state_size, action_size)
    #agent.load("/tmp/heli-dqn-6000.h5")
    done = False
    last_loss = 0

    for e in range(starting_episode, EPISODES):
        state = resetEnv(p)
        total_reward = 0.0
        done = False
        while True:
            currentIteration += 1
            action = agent.act(state)
            next_state, reward, done, _ = actInEnv(p, action)
            reward = reward if not done else -10
            total_reward += reward
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            if done:
                print("episode: {}/{}, score: {}, e: {:.3}"
                      .format(e, EPISODES, total_reward, agent.epsilon))
                f.write("{},{},{}\n".format(e, total_reward, last_loss))
                break
            if len(agent.memory) > agent.batch_size*4:
                last_loss = agent.replay_batch_gpu_optimized()
                if currentIteration % updateTargetNetwork == 0:
                    agent.target_train()
            
        if stop_flow:
            break
        if e % 500 == 0:
            agent.save("/tmp/heli-dqn-{}.h5".format(e))
            print("Saved checkpoint!")
            # Decrease LR
            K.set_value(agent.model.optimizer.lr, agent.learning_rate/pow(1.1, e/500))
          
f.close()
agent.save("/tmp/heli-dqn-final.h5")

Step-7: Gym-like Wrappers to PLE

In [0]:
from pygame.constants import K_w, K_s

def resetEnv(ple_env):
    ple_env.reset_game()
    return getCurrentState(ple_env)

def getCurrentState(ple_env):
    state_dict = ple_env.getGameState()
    state = [state_dict[i] for i in state_dict]
    return np.reshape(state, [1, len(state)])

action_map = [K_w, K_s]
def actInEnv(ple_env, action_num):
    reward = ple_env.act(action_map[action_num])
    state = getCurrentState(ple_env)
    done = ple_env.game_over()
    action = action_map[action_num]
    return state, reward, done, action

Step-6: The below code contains the **DQN Agent** class. ([Inspiration](https://towardsdatascience.com/reinforcement-learning-w-keras-openai-dqns-1eed3a5338c))

In [0]:
from collections import deque
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import Dense, Dropout
from tensorflow.python.keras.optimizers import Adam

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.batch_size = 64
        self.memory = deque(maxlen=20000)
        self.gamma = 0.97    # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.99995
        self.learning_rate = 0.0004
        self.tau = .125
        self.train_on_TPU = False #Won't work if True
        self.model = self._build_model()
        self.target_model = self._build_model()
        if self.train_on_TPU:
            self.cpu_model = self.target_model.sync_to_cpu()

    def _build_model(self):
        # Neural Net for Deep-Q learning Model
        model = Sequential()
        model.add(Dense(32, input_dim=self.state_size, activation='relu'))
        model.add(Dense(48, activation='relu'))
        model.add(Dense(32, activation='relu'))
        model.add(Dense(16, activation='relu'))
        model.add(Dense(8, activation='relu'))
        model.add(Dense(self.action_size)) # default is linear activation
        if self.train_on_TPU:
            model = tf.contrib.tpu.keras_to_tpu_model(model,
                strategy=tf.contrib.tpu.TPUDistributionStrategy(
                tf.contrib.cluster_resolver.TPUClusterResolver(tpu='grpc://' + os.environ['COLAB_TPU_ADDR'])
            ))
            model.compile(loss='mean_squared_error',
                      optimizer=tf.train.AdamOptimizer(learning_rate=self.learning_rate))
        else:
            model.compile(loss='mse',
                      optimizer=Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append([state, action, reward, next_state, done])

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice([0, 1], p=[0.25, 0.75]) #random.randrange(self.action_size)
#         if self.train_on_TPU: # Sorry for this hack, Google
#             state = state.repeat(self.batch_size, axis=0)
        act_values = self.model.predict(state) if not self.train_on_TPU else self.cpu_model.predict(state) 
        return np.argmax(act_values[0])  # returns action

    def replay(self):
        minibatch = random.sample(self.memory, self.batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.target_model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def replay_batch(self):
        minibatch = random.sample(self.memory, self.batch_size)
        states, targets_f = [], []
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.target_model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target 
            # Filtering out states and targets for training
            states.append(state[0])
            targets_f.append(target_f[0])
        
        history = self.model.fit(np.array(states), np.array(targets_f), batch_size=self.batch_size, epochs=1, verbose=0)
        # Keeping track of loss
        loss = history.history['loss'][0]
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        return loss

    def replay_batch_gpu_optimized(self):
        # Check the above reference implementation for correspondance
        minibatch = np.array(random.sample(self.memory, self.batch_size))
        state = np.array(minibatch[:, 0].tolist()).squeeze()
        action = minibatch[:, 1]
        target = minibatch[:, 2]
        next_state = np.array(minibatch[:, 3].tolist()).squeeze()
        done = np.array(minibatch[:, 4], dtype=bool)
        Q_next_max = self.gamma * np.amax(self.target_model.predict(next_state, batch_size=self.batch_size), axis=1)
        target = target + (Q_next_max * np.invert(done))
        target_f = self.model.predict(state, batch_size=self.batch_size)
        target_f[range(self.batch_size), action.tolist()] = target
        
        history = self.model.fit(state, target_f, batch_size=self.batch_size, epochs=1, verbose=0)
        # Keeping track of loss
        loss = history.history['loss'][0]
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        return loss
      
    def target_train(self):
        weights = self.model.get_weights()
        target_weights = self.target_model.get_weights()
        # Any better way than below? ;(
        for i in range(len(target_weights)):
            target_weights[i] = weights[i] * self.tau + target_weights[i] * (1 - self.tau)
        self.target_model.set_weights(target_weights)
        if self.train_on_TPU: # I hope this is not a costly operation
            self.cpu_model = self.target_model.sync_to_cpu()
    
    def load(self, name):
        self.model.load_weights(name)
        self.target_model.load_weights(name)
        if self.train_on_TPU:
            self.cpu_model.load_weights(name)            

    def save(self, name):
        self.model.save_weights(name)

Step-5: Below is the helicopter game's PLE env modified by me ([Source](https://github.com/ntasfi/PyGame-Learning-Environment/blob/master/ple/games/pixelcopter.py))

In [5]:
#@title
img_dir = '/tmp/assets/'
if not os.path.exists(img_dir):
    os.makedirs(img_dir)
!wget -P /tmp/assets/ https://github.com/code-master5/SaveTheHeli/raw/master/heli2.png
  

import math
import sys, os

import pygame
from pygame.constants import K_w, K_s


class Block(pygame.sprite.Sprite):

    def __init__(self, pos_init, speed, SCREEN_WIDTH, SCREEN_HEIGHT):
        pygame.sprite.Sprite.__init__(self)

        self.pos = vec2d(pos_init)

        self.width = int(SCREEN_WIDTH * 0.07)
        self.height = int(SCREEN_HEIGHT * 0.1)
        self.speed = speed

        self.SCREEN_WIDTH = SCREEN_WIDTH
        self.SCREEN_HEIGHT = SCREEN_HEIGHT

        image = pygame.Surface((self.width, self.height))
        image.fill((0, 0, 0, 0))
        image.set_colorkey((0, 0, 0))

        pygame.draw.rect(
            image,
            (120, 240, 80),
            (0, 0, self.width, self.height),
            0
        )

        self.image = image
        self.rect = self.image.get_rect()
        self.rect.center = pos_init

    def update(self, dt):
        self.pos.x -= self.speed * dt

        self.rect.center = (self.pos.x, self.pos.y)


class HelicopterPlayer(pygame.sprite.Sprite):

    def __init__(self, speed, SCREEN_WIDTH, SCREEN_HEIGHT):
        pygame.sprite.Sprite.__init__(self)

        pos_init = (int(SCREEN_WIDTH * 0.35), SCREEN_HEIGHT / 2)
        self.pos = vec2d(pos_init)
        self.speed = speed
        self.climb_speed = speed * -0.875  # -0.0175
        self.fall_speed = speed * 0.09  # 0.0019
        self.momentum = 0

        self.width = SCREEN_WIDTH * 0.1
        self.height = SCREEN_HEIGHT * 0.05
        
        heli_sprite_path = img_dir+"heli2.png"
        self.image = pygame.image.load(heli_sprite_path).convert_alpha()
        self.image = pygame.transform.scale(self.image, (int(self.width), int(self.height)))
        #image = pygame.Surface((self.width, self.height))
        #image.fill((0, 0, 0, 0))
        #image.set_colorkey((0, 0, 0))

        #pygame.draw.rect(
        #    image,
        #    (255, 255, 255),
        #    (0, 0, self.width, self.height),
        #    0
        #)

        #self.image = image
        self.rect = self.image.get_rect()
        self.rect.center = pos_init

    def update(self, is_climbing, dt):
        self.momentum += (self.climb_speed if is_climbing else self.fall_speed) * dt
        self.momentum *= 0.99
        self.pos.y += self.momentum

        self.rect.center = (self.pos.x, self.pos.y)


class Terrain(pygame.sprite.Sprite):

    def __init__(self, pos_init, speed, SCREEN_WIDTH, SCREEN_HEIGHT):
        pygame.sprite.Sprite.__init__(self)

        self.pos = vec2d(pos_init)
        self.speed = speed
        self.width = int(SCREEN_WIDTH * 0.1)

        image = pygame.Surface((self.width, SCREEN_HEIGHT * 1.5))
        image.fill((0, 0, 0, 0))
        image.set_colorkey((0, 0, 0))

        color = (120, 240, 80)

        # top rect
        pygame.draw.rect(
            image,
            color,
            (0, 0, self.width, SCREEN_HEIGHT * 0.5),
            0
        )

        # bot rect
        pygame.draw.rect(
            image,
            color,
            (0, SCREEN_HEIGHT * 1.05, self.width, SCREEN_HEIGHT * 0.5),
            0
        )

        self.image = image
        self.rect = self.image.get_rect()
        self.rect.center = pos_init

    def update(self, dt):
        self.pos.x -= self.speed * dt
        self.rect.center = (self.pos.x, self.pos.y)


class Pixelcopter(PyGameWrapper):
    """
    Parameters
    ----------
    width : int
        Screen width.

    height : int
        Screen height, recommended to be same dimension as width.

    """

    def __init__(self, width=48, height=48):
        actions = {
            "up": K_w
        }

        PyGameWrapper.__init__(self, width, height, actions=actions)

        self.is_climbing = False
        self.speed = 0.0004 * width
        
        #self._dir_ = os.path.dirname(os.path.abspath(__file__))
        #self._asset_dir = os.path.join(self._dir_, "assets/")

    def _handle_player_events(self):
        self.is_climbing = False

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                sys.exit()

            if event.type == pygame.KEYDOWN:
                key = event.key
                if key == self.actions['up']:
                    self.is_climbing = True

    def getGameState(self):
        """
        Gets a non-visual state representation of the game.

        Returns
        -------

        dict
            * player y position.
            * player velocity.
            * player distance to floor.
            * player distance to ceiling.
            * next block x distance to player.
            * next blocks top y location,
            * next blocks bottom y location.

            See code for structure.

        """

        min_dist = 999
        min_block = None
        for b in self.block_group:  # Groups do not return in order
            dist_to = b.pos.x - self.player.pos.x
            if dist_to > 0 and dist_to < min_dist:
                min_block = b
                min_dist = dist_to

        current_terrain = pygame.sprite.spritecollide(
            self.player, self.terrain_group, False)[0]
        state = {
            "player_y": self.player.pos.y,
            "player_vel": self.player.momentum,
            "player_dist_to_ceil": self.player.pos.y - (current_terrain.pos.y - self.height * 0.25),
            "player_dist_to_floor": (current_terrain.pos.y + self.height * 0.25) - self.player.pos.y,
            "next_gate_dist_to_player": min_dist,
            "next_gate_block_top": min_block.pos.y,
            "next_gate_block_bottom": min_block.pos.y + min_block.height
        }

        return state

    def getScreenDims(self):
        return self.screen_dim

    def getActions(self):
        return self.actions.values()

    def getScore(self):
        return self.score

    def game_over(self):
        return self.lives <= 0.0

    def init(self):
        self.score = 0.0
        self.lives = 1.0

        self.player = HelicopterPlayer(
            self.speed,
            self.width,
            self.height
        )

        self.player_group = pygame.sprite.Group()
        self.player_group.add(self.player)

        self.block_group = pygame.sprite.Group()
        self._add_blocks()

        self.terrain_group = pygame.sprite.Group()
        self._add_terrain(0, self.width * 4)

    def _add_terrain(self, start, end):
        w = int(self.width * 0.1)
        # each block takes up 10 units.
        steps = range(start + int(w / 2), end + int(w / 2), w)
        y_jitter = []

        freq = 4.5 / self.width + self.rng.uniform(-0.01, 0.01)
        for step in steps:
            jitter = (self.height * 0.125) * \
                math.sin(freq * step + self.rng.uniform(0.0, 0.5))
            y_jitter.append(jitter)

        y_pos = [int((self.height / 2.0) + y_jit) for y_jit in y_jitter]

        for i in range(0, len(steps)):
            self.terrain_group.add(Terrain(
                (steps[i], y_pos[i]),
                self.speed,
                self.width,
                self.height
            )
            )

    def _add_blocks(self):
        x_pos = self.rng.randint(self.width, int(self.width * 1.5))
        y_pos = self.rng.randint(
            int(self.height * 0.25),
            int(self.height * 0.75)
        )
        self.block_group.add(
            Block(
                (x_pos, y_pos),
                self.speed,
                self.width,
                self.height
            )
        )

    def reset(self):
        self.init()

    def step(self, dt):

        self.screen.fill((0, 0, 0))
        self._handle_player_events()

        self.score += self.rewards["tick"]

        self.player.update(self.is_climbing, dt)
        self.block_group.update(dt)
        self.terrain_group.update(dt)

        hits = pygame.sprite.spritecollide(
            self.player, self.block_group, False)
        for creep in hits:
            self.lives -= 1

        hits = pygame.sprite.spritecollide(
            self.player, self.terrain_group, False)
        for t in hits:
            if self.player.pos.y - self.player.height <= t.pos.y - self.height * 0.25:
                self.lives -= 1

            if self.player.pos.y >= t.pos.y + self.height * 0.25:
                self.lives -= 1

        for b in self.block_group:
            if b.pos.x <= self.player.pos.x and len(self.block_group) == 1:
                self.score += self.rewards["positive"]
                self._add_blocks()

            if b.pos.x <= -b.width:
                b.kill()

        for t in self.terrain_group:
            if t.pos.x <= -t.width:
                self.score += self.rewards["positive"]
                t.kill()

        if self.player.pos.y < self.height * 0.125:  # its above
            self.lives -= 1

        if self.player.pos.y > self.height * 0.875:  # its below the lowest possible block
            self.lives -= 1

        if len(self.terrain_group) <= (
                10 + 3):  # 10% per terrain, offset of ~2 with 1 extra
            self._add_terrain(self.width, self.width * 5)

        if self.lives <= 0.0:
            self.score += self.rewards["loss"]

        self.player_group.draw(self.screen)
        self.block_group.draw(self.screen)
        self.terrain_group.draw(self.screen)

# if __name__ == "__main__":
#     import numpy as np

#     pygame.init()
#     game = Pixelcopter(width=256, height=256)
#     game.screen = pygame.display.set_mode(game.getScreenDims(), 0, 32)
#     game.clock = pygame.time.Clock()
#     game.rng = np.random.RandomState(24)
#     game.init()

#     while True:
#         if game.game_over():
#             game.reset()
#         dt = game.clock.tick_busy_loop(30)
#         game.step(dt)
#         pygame.display.update()

--2019-02-01 16:04:18--  https://github.com/code-master5/SaveTheHeli/raw/master/heli2.png
Resolving github.com (github.com)... 192.30.253.113, 192.30.253.112
Connecting to github.com (github.com)|192.30.253.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/code-master5/SaveTheHeli/master/heli2.png [following]
--2019-02-01 16:04:19--  https://raw.githubusercontent.com/code-master5/SaveTheHeli/master/heli2.png
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7240 (7.1K) [image/png]
Saving to: ‘/tmp/assets/heli2.png’


2019-02-01 16:04:20 (116 MB/s) - ‘/tmp/assets/heli2.png’ saved [7240/7240]



Step-4: Environment Wrapper for all games ([Source](https://github.com/ntasfi/PyGame-Learning-Environment/blob/master/ple/games/base/pygamewrapper.py))

In [0]:
#@title
import pygame
import numpy as np
from pygame.constants import KEYDOWN, KEYUP, K_F15


class PyGameWrapper(object):
    """PyGameWrapper  class

    ple.games.base.PyGameWrapper(width, height, actions={})

    This :class:`PyGameWrapper` class sets methods all games require. It should be subclassed when creating new games.

    Parameters
    ----------
    width: int
        The width of the game screen.

    height: int
        The height of the game screen.

    actions: dict
        Contains possible actions that the game responds too. The dict keys are used by the game, while the values are `pygame.constants` referring the keys.

        Possible actions dict:

        >>> from pygame.constants import K_w, K_s
        >>> actions = {
        >>>     "up": K_w,
        >>>     "down": K_s
        >>> }
    """

    def __init__(self, width, height, actions={}):

        # Required fields
        self.actions = actions  # holds actions

        self.score = 0.0  # required.
        self.lives = 0  # required. Can be 0 or -1 if not required.
        self.screen = None  # must be set to None
        self.clock = None  # must be set to None
        self.height = height
        self.width = width
        self.screen_dim = (width, height)  # width and height
        self.allowed_fps = None  # fps that the game is allowed to run at.
        self.NOOP = K_F15  # the noop key
        self.rng = None

        self.rewards = {
            "positive": 1.0,
            "negative": -1.0,
            "tick": 0,
            "loss": -5.0,
            "win": 5.0
        }

    def _setup(self):
        """
        Setups up the pygame env, the display and game clock.
        """
        pygame.init()
        self.screen = pygame.display.set_mode(self.getScreenDims(), 0, 32)
        self.clock = pygame.time.Clock()

    def _setAction(self, action, last_action):
        """
        Pushes the action to the pygame event queue.
        """
        if action is None:
            action = self.NOOP

        if last_action is None:
            last_action = self.NOOP

        kd = pygame.event.Event(KEYDOWN, {"key": action})
        ku = pygame.event.Event(KEYUP, {"key": last_action})

        pygame.event.post(kd)
        pygame.event.post(ku)

    def _draw_frame(self, draw_screen):
        """
        Decides if the screen will be drawn too
        """

        if draw_screen == True:
            pygame.display.update()

    def getScreenRGB(self):
        """
        Returns the current game screen in RGB format.

        Returns
        --------
        numpy uint8 array
            Returns a numpy array with the shape (width, height, 3).

        """

        return pygame.surfarray.array3d(
            pygame.display.get_surface()).astype(np.uint8)

    def tick(self, fps):
        """
        This sleeps the game to ensure it runs at the desired fps.
        """
        return self.clock.tick_busy_loop(fps)

    def adjustRewards(self, rewards):
        """

        Adjusts the rewards the game gives the agent

        Parameters
        ----------
        rewards : dict
            A dictonary of reward events to float rewards. Only updates if key matches those specificed in the init function.

        """
        for key in rewards.keys():
            if key in self.rewards:
                self.rewards[key] = rewards[key]

    def setRNG(self, rng):
        """
        Sets the rng for games.
        """

        if self.rng is None:
            self.rng = rng

    def getGameState(self):
        """
        Gets a non-visual state representation of the game.

        Returns
        -------
        dict or None
            dict if the game supports it and None otherwise.

        """
        return None

    def getScreenDims(self):
        """
        Gets the screen dimensions of the game in tuple form.

        Returns
        -------
        tuple of int
            Returns tuple as follows (width, height).

        """
        return self.screen_dim

    def getActions(self):
        """
        Gets the actions used within the game.

        Returns
        -------
        list of `pygame.constants`

        """
        return self.actions.values()

    def init(self):
        """
        This is used to initialize the game, such reseting the score, lives, and player position.

        This is game dependent.

        """
        raise NotImplementedError("Please override this method")

    def reset(self):
        """
        Wraps the init() function, can be setup to reset certain poritions of the game only if needed.
        """
        self.init()

    def getScore(self):
        """
        Return the current score of the game.


        Returns
        -------
        int
            The current reward the agent has received since the last init() or reset() call.
        """
        raise NotImplementedError("Please override this method")

    def game_over(self):
        """
        Gets the status of the game, returns True if game has hit a terminal state. False otherwise.

        This is game dependent.

        Returns
        -------
        bool

        """
        raise NotImplementedError("Please override this method")

    def step(self, dt):
        """
        This method steps the game forward one step in time equal to the dt parameter. The game does not run unless this method is called.

        Parameters
        ----------
        dt : integer
            This is the amount of time elapsed since the last frame in milliseconds.

        """
        raise NotImplementedError("Please override this method")

Step-3: Environment Interface ([Source](https://github.com/ntasfi/PyGame-Learning-Environment/blob/master/ple/ple.py))

In [3]:
#@title
import numpy as np
from PIL import Image  # pillow
import sys
import pygame

class PLE(object):
    """
    ple.PLE(
        game, fps=30,
        frame_skip=1, num_steps=1,
        reward_values={}, force_fps=True,
        display_screen=False, add_noop_action=True,
        NOOP=K_F15, state_preprocessor=None,
        rng=24
    )

    Main wrapper that interacts with games.
    Provides a similar interface to Arcade Learning Environment.

    Parameters
    ----------
    game: Class from ple.games.base
        The game the PLE environment manipulates and maintains.

    fps: int (default: 30)
        The desired frames per second we want to run our game at.
            Typical settings are 30 and 60 fps.

    frame_skip: int (default: 1)
        The number of times we skip getting observations while
        repeat an action.

    num_steps: int (default: 1)
        The number of times we repeat an action.

    reward_values: dict
        This contains the rewards we wish to set give our agent based on
        different actions in game. The current defaults are as follows:

        .. code-block:: python

            rewards = {
                "positive": 1.0,
                "negative": -1.0,
                "tick": 0.0,
                "loss": -5.0,
                "win": 5.0
            }

        Tick is given to the agent at each game step. You can selectively
        adjust the rewards by passing a dictonary with the key you want to
        change. Eg. If we want to adjust the negative reward and the tick
        reward we would pass in the following:

        .. code-block:: python

            rewards = {
                "negative": -2.0,
                "tick": -0.01
            }

        Keep in mind that the tick is applied at each frame. If the game is
        running at 60fps the agent will get a reward of 60*tick.

    force_fps: bool (default: True)
        If False PLE delays between game.step() calls to ensure the fps is
        specified. If not PLE passes an elapsed time delta to ensure the
        game steps by an amount of time consistent with the specified fps.
        This is usally set to True as it allows the game to run as fast as
        possible which speeds up training.

    display_screen: bool (default: False)
        If we draw updates to the screen. Disabling this speeds up
        interation speed. This can be toggled to True during testing phases
        so you can observe the agents progress.

    add_noop_action: bool (default: True)
        This inserts the NOOP action specified as a valid move the agent
        can make.

    state_preprocessor: python function (default: None)
        Python function which takes a dict representing game state and
        returns a numpy array.

    rng: numpy.random.RandomState, int, array_like or None. (default: 24)
        Number generator which is used by PLE and the games.

    """

    def __init__(self,
                 game, fps=30, frame_skip=1, num_steps=1,
                 reward_values={}, force_fps=True, display_screen=False,
                 add_noop_action=True, state_preprocessor=None, rng=24):

        self.game = game
        self.fps = fps
        self.frame_skip = frame_skip
        self.NOOP = None
        self.num_steps = num_steps
        self.force_fps = force_fps
        self.display_screen = display_screen
        self.add_noop_action = add_noop_action

        self.last_action = []
        self.action = []
        self.previous_score = 0
        self.frame_count = 0

        # update the scores of games with values we pick
        if reward_values:
            self.game.adjustRewards(reward_values)


        if isinstance(self.game, PyGameWrapper):
            if isinstance(rng, np.random.RandomState):
                self.rng = rng
            else:
                self.rng = np.random.RandomState(rng)

            # some pygame games preload the images
            # to speed resetting and inits up.
            pygame.display.set_mode((1, 1), pygame.NOFRAME)
        else:
            # in order to use doom, install following https://github.com/openai/doom-py
            from .games.base.doomwrapper import DoomWrapper
            if isinstance(self.game, DoomWrapper):
                self.rng = rng
        
        self.game.setRNG(self.rng)
        self.init()

        self.state_preprocessor = state_preprocessor
        self.state_dim = None

        if self.state_preprocessor is not None:
            self.state_dim = self.game.getGameState()

            if self.state_dim is None:
                raise ValueError(
                    "Asked to return non-visual state on game that does not support it!")
            else:
                self.state_dim = self.state_preprocessor(self.state_dim).shape

        if game.allowed_fps is not None and self.fps != game.allowed_fps:
            raise ValueError("Game requires %dfps, was given %d." %
                             (game.allowed_fps, game.allowed_fps))

    def _tick(self):
        """
        Calculates the elapsed time between frames or ticks.
        """
        if self.force_fps:
            return 1000.0 / self.fps
        else:
            return self.game.tick(self.fps)

    def init(self):
        """
        Initializes the game. This depends on the game and could include
        doing things such as setting up the display, clock etc.

        This method should be explicitly called.
        """
        self.game._setup()
        self.game.init() #this is the games setup/init

    def getActionSet(self):
        """
        Gets the actions the game supports. Optionally inserts the NOOP
        action if PLE has add_noop_action set to True.

        Returns
        --------

        list of pygame.constants
            The agent can simply select the index of the action
            to perform.

        """
        actions = self.game.actions

        if (sys.version_info > (3, 0)): #python ver. 3
            if isinstance(actions, dict) or isinstance(actions, dict_values):
                actions = actions.values()
        else:
            if isinstance(actions, dict):
                actions = actions.values()

        actions = list(actions) #.values()
        #print (actions)
        #assert isinstance(actions, list), "actions is not a list"

        if self.add_noop_action:
            actions.append(self.NOOP)

        return actions

    def getFrameNumber(self):
        """
        Gets the current number of frames the agent has seen
        since PLE was initialized.

        Returns
        --------

        int

        """

        return self.frame_count

    def game_over(self):
        """
        Returns True if the game has reached a terminal state and
        False otherwise.

        This state is game dependent.

        Returns
        -------

        bool

        """

        return self.game.game_over()

    def score(self):
        """
        Gets the score the agent currently has in game.

        Returns
        -------

        int

        """

        return self.game.getScore()

    def lives(self):
        """
        Gets the number of lives the agent has left. Not all games have
        the concept of lives.

        Returns
        -------

        int

        """

        return self.game.lives

    def reset_game(self):
        """
        Performs a reset of the games to a clean initial state.
        """
        self.last_action = []
        self.action = []
        self.previous_score = 0.0
        self.game.reset()

    def getScreenRGB(self):
        """
        Gets the current game screen in RGB format.

        Returns
        --------
        numpy uint8 array
            Returns a numpy array with the shape (width, height, 3).


        """

        return self.game.getScreenRGB()

    def getScreenGrayscale(self):
        """
        Gets the current game screen in Grayscale format. Converts from RGB using relative lumiance.

        Returns
        --------
        numpy uint8 array
                Returns a numpy array with the shape (width, height).


        """
        frame = self.getScreenRGB()
        frame = 0.21 * frame[:, :, 0] + 0.72 * \
            frame[:, :, 1] + 0.07 * frame[:, :, 2]
        frame = np.round(frame).astype(np.uint8)

        return frame

    def saveScreen(self, filename):
        """
        Saves the current screen to png file.

        Parameters
        ----------

        filename : string
            The path with filename to where we want the image saved.

        """
        frame = Image.fromarray(self.getScreenRGB())
        frame.save(filename)

    def getScreenDims(self):
        """
        Gets the games screen dimensions.

        Returns
        -------

        tuple of int
            Returns a tuple of the following format (screen_width, screen_height).
        """
        return self.game.getScreenDims()

    def getGameStateDims(self):
        """
        Gets the games non-visual state dimensions.

        Returns
        -------

        tuple of int or None
            Returns a tuple of the state vectors shape or None if the game does not support it.
        """
        return self.state_dim

    def getGameState(self):
        """
        Gets a non-visual state representation of the game.

        This can include items such as player position, velocity, ball location and velocity etc.

        Returns
        -------

        dict or None
            It returns a dict of game information. This greatly depends on the game in question and must be referenced against each game.
            If no state is available or supported None will be returned back.

        """
        state = self.game.getGameState()
        if state is not None:
            if self.state_preprocessor is not None:
                return self.state_preprocessor(state)
            return state
        else:
            raise ValueError(
                "Was asked to return state vector for game that does not support it!")

    def act(self, action):
        """
        Perform an action on the game. We lockstep frames with actions. If act is not called the game will not run.

        Parameters
        ----------

        action : int
            The index of the action we wish to perform. The index usually corresponds to the index item returned by getActionSet().

        Returns
        -------

        int
            Returns the reward that the agent has accumlated while performing the action.

        """
        return sum(self._oneStepAct(action) for i in range(self.frame_skip))

    def _draw_frame(self):
        """
        Decides if the screen will be drawn too
        """

        self.game._draw_frame(self.display_screen)

    def _oneStepAct(self, action):
        """
        Performs an action on the game. Checks if the game is over or if the provided action is valid based on the allowed action set.
        """
        if self.game_over():
            return 0.0

        if action not in self.getActionSet():
            action = self.NOOP

        self._setAction(action)
        for i in range(self.num_steps):
            time_elapsed = self._tick()
            self.game.step(time_elapsed)
            self._draw_frame()

        self.frame_count += self.num_steps

        return self._getReward()

    def _setAction(self, action):
        """
            Instructs the game to perform an action if its not a NOOP
        """

        if action is not None:
            self.game._setAction(action, self.last_action)

        self.last_action = action

    def _getReward(self):
        """
        Returns the reward the agent has gained as the difference between the last action and the current one.
        """
        reward = self.game.getScore() - self.previous_score
        self.previous_score = self.game.getScore()

        return reward

pygame 1.9.4
Hello from the pygame community. https://www.pygame.org/contribute.html


Step-2: Utils: ([Source](https://github.com/ntasfi/PyGame-Learning-Environment/blob/master/ple/games/utils/vec2d.py))

In [0]:
#@title
import math


class vec2d():

    def __init__(self, pos):
        self.x = pos[0]
        self.y = pos[1]

    def __add__(self, o):
        x = self.x + o.x
        y = self.y + o.y

        return vec2d((x, y))

    def __eq__(self, o):
        return self.x == o.x and self.y == o.y

    def normalize(self):
        norm = math.sqrt(self.x * self.x + self.y * self.y)
        self.x /= norm
        self.y /= norm

Step-1: Setup dependencies

In [1]:
!pip install pygame
import os

os.environ["SDL_VIDEODRIVER"] = "dummy"
import tensorflow as tf

Collecting pygame
[?25l  Downloading https://files.pythonhosted.org/packages/b3/5e/fb7c85304ad1fd52008fd25fce97a7f59e6147ae97378afc86cf0f5d9146/pygame-1.9.4-cp36-cp36m-manylinux1_x86_64.whl (12.1MB)
[K    100% |████████████████████████████████| 12.1MB 4.5MB/s 
[?25hInstalling collected packages: pygame
Successfully installed pygame-1.9.4
