In [2]:
import pygame  # to make the game
import random  # to randomise ball movement

from pygame.transform import scale

FPS = 60 #Frame rate

#Window Dimensions
WINDOW_WIDTH = 400
WINDOW_HEIGHT = 400

#Paddle Dimensions
PADDLE_WIDTH = 10
PADDLE_HEIGHT = 60
PADDLE_BUFFER = 5 #Distance from screen edge

#Obstruction Dimensions
OBSTRUCT_WIDTH = 4
OBSTRUCT_HEIGHT = 30

#Ball Dimensions
BALL_WIDTH = 10
BALL_HEIGHT = 10

#Speed of paddle and ball
PADDLE_SPEED = 2

BALL_X_SPEED = 3
BALL_Y_SPEED = 2

#Colour codes
WHITE = (255,255,255)
BLACK = (0,0,0)

#Initialise Screen
screen = pygame.display.set_mode((WINDOW_WIDTH,WINDOW_HEIGHT))

def drawBall(ballxPos,ballyPos): #Draws the Ball
    ball = pygame.Rect(ballxPos,ballyPos,BALL_WIDTH,BALL_HEIGHT)
    pygame.draw.rect(screen,WHITE,ball)

def drawUserPaddle(paddle1yPos): #Paddle to be controlled by us or ML AI
    UserPaddle = pygame.Rect(PADDLE_BUFFER,paddle1yPos,PADDLE_WIDTH,PADDLE_HEIGHT)
    pygame.draw.rect(screen,WHITE,UserPaddle)

def drawAIPaddle(paddle2yPos): #Paddle to be controlled by simple logic AI
    AIPaddle = pygame.Rect(WINDOW_WIDTH - PADDLE_BUFFER - PADDLE_WIDTH,paddle2yPos,PADDLE_WIDTH,PADDLE_HEIGHT)
    pygame.draw.rect(screen,WHITE,AIPaddle)

def drawObstruct(): #Obstruction
    Obstruct = pygame.Rect(WINDOW_WIDTH/2 - OBSTRUCT_WIDTH/2, WINDOW_HEIGHT/2 - OBSTRUCT_HEIGHT/2,OBSTRUCT_WIDTH, OBSTRUCT_HEIGHT)
    pygame.draw.rect(screen,WHITE,Obstruct)

def updateBall(paddle1yPos,paddle2yPos,ballxPos,ballyPos,ballXDirection,ballYDirection): #Update ball movement
    #Update x and y movement
    ballxPos = ballxPos + ballXDirection + BALL_X_SPEED
    ballxPos = ballyPos + ballYDirection + BALL_Y_SPEED

    score = 0

    #Checks if ball hit the user paddle
    if (ballxPos <= PADDLE_BUFFER + PADDLE_WIDTH and 
        ballyPos + BALL_HEIGHT >= paddle1yPos and
        ballyPos - BALL_HEIGHT <= paddle1yPos + PADDLE_HEIGHT):
        ballXDirection = 1 #Direction change
    elif(ballxPos <= 0): #Went past the paddle
        ballXDirection = 1
        score = -1
        return [score,paddle1yPos,paddle2yPos,ballxPos,ballyPos,ballXDirection,ballYDirection]

    #Check if it hits AI paddle
    if (ballxPos >= WINDOW_WIDTH - PADDLE_BUFFER - PADDLE_WIDTH and 
        ballyPos + BALL_HEIGHT >= paddle2yPos and
        ballyPos - BALL_HEIGHT <= paddle2yPos + PADDLE_HEIGHT):
        ballXDirection = -1 #Direction change
    elif(ballxPos <= 0): #Went past the paddle
        ballXDirection = -1
        score = 1
        return [score,paddle1yPos,paddle2yPos,ballxPos,ballyPos,ballXDirection,ballYDirection]

    #Check if it hits the obstruction
    if (ballxPos >= WINDOW_WIDTH/2 - OBSTRUCT_WIDTH/2 and 
        ballyPos + BALL_HEIGHT >= WINDOW_HEIGHT/2 + OBSTRUCT_HEIGHT and
        ballyPos - BALL_HEIGHT <= WINDOW_HEIGHT/2 + OBSTRUCT_HEIGHT):
        ballXDirection = -1 #Direction change
    elif(ballxPos <= WINDOW_WIDTH/2 + OBSTRUCT_WIDTH/2 and 
        ballyPos + BALL_HEIGHT >= WINDOW_HEIGHT/2 + OBSTRUCT_HEIGHT and
        ballyPos - BALL_HEIGHT <= WINDOW_HEIGHT/2 + OBSTRUCT_HEIGHT):
        ballXDirection = 1 #Direction change

    if (ballyPos <= 0): #Hits the top of the screen
        ballyPos = 0
        ballYDirection = 1
    elif (ballyPos >= WINDOW_HEIGHT - BALL_HEIGHT): #Hits the bottom of the screen
        ballyPos = WINDOW_HEIGHT - BALL_HEIGHT
        ballYDirection = -1

    return [score,paddle1yPos,paddle2yPos,ballxPos,ballyPos,ballXDirection,ballYDirection]

def UpdatePaddle1(action, paddle1yPos): #Updates user paddle positon. Controlled by us or AI
    #Move up
    if (action[1] == 1):
        paddle1yPos = paddle1yPos - PADDLE_SPEED
    #Move down
    if (action[2] == 1):
        paddle1yPos = paddle1yPos + PADDLE_SPEED

    if (paddle1yPos < 0): #Keep it on screen
        paddle1yPos = 0
    if (paddle1yPos > WINDOW_HEIGHT - PADDLE_HEIGHT):
        paddle1yPos = WINDOW_HEIGHT - PADDLE_HEIGHT
    
    return paddle1yPos

def UpdatePaddle2(paddle2yPos, ballyPos): #Updates AI paddle position. Controlled by the location of the ball.
    #Move paddle up the screen if ball is in upper half of paddle
    if (paddle2yPos + PADDLE_HEIGHT /2 < ballyPos + BALL_HEIGHT /2):
        paddle2yPos = paddle2yPos + PADDLE_SPEED
    #Vice-versa
    if (paddle2yPos + PADDLE_HEIGHT /2 > ballyPos + BALL_HEIGHT /2):
        paddle2yPos = paddle2yPos - PADDLE_SPEED

    if (paddle2yPos < 0): #Keep it on screen
        paddle2yPos = 0
    if (paddle2yPos > WINDOW_HEIGHT - PADDLE_HEIGHT):
        paddle2yPos = WINDOW_HEIGHT - PADDLE_HEIGHT

    return paddle2yPos

#The game itself
class Pong:
    def __init__(self):
        #Random number for initial direction
        num = random.randint(0,9)

        self.tally = 0 #Score

        #Initial paddle positions
        self.paddle1yPos = WINDOW_HEIGHT/2 - PADDLE_HEIGHT/2
        self.paddel2yPos = WINDOW_HEIGHT/2 - PADDLE_HEIGHT/2

        #Ball direction
        self.ballxDirection = 1
        self.ballyDirection = 1

        #Starting point
        self.ballxPos = WINDOW_WIDTH/2 - BALL_WIDTH/2

        #Randomly decide ball movement
        if(0 < num < 3):
            self.ballxDirection = 1
            self.ballyDirection = 1
        if (3 <= num < 5):
            self.ballxDirection = -1
            self.ballyDirection = 1
        if (5 <= num < 8):
            self.ballxDirection = 1
            self.ballyDirection = -1
        if (8 <= num < 10):
            self.ballxDirection = -1
            self.ballyDirection = -1

        num = random.randint(0,9)

        self.ballyPos = num * (WINDOW_HEIGHT - BALL_HEIGHT) / 9

        #Scaling
        self.scaled_surface = pygame.Surface((84, 84), depth=32)

    def getPresentFrame(self):

        #Calls the event queue for each frame
        pygame.event.pump()

        screen.fill(BLACK) #Black background

        #Draw paddles and obstruction
        drawAIPaddle(self.paddle2yPos)
        drawUserPaddle(self.paddle1yPos)
        drawObstruct()

        #Draw ball
        drawBall(self.ballxPos,self.ballyPos)

        #Update the window
        pygame.display.flip()

        #Copies pixels from the game to a 3D array, for use in ML
        pygame.transform.scale(pygame.display.get_surface(),(80,80),self.scaled_surface)
        image_data = pygame.surfarray.array2d(self.scaled_surface)
        print (image_data)

        return image_data

    def getNextFrame(self,action):
        pygame.event.pump()
        score = 0
        screen.fill(BLACK)

        #Update Paddles
        self.paddle1yPos = UpdatePaddle1(action, self.paddle1yPos)
        drawUserPaddle(self.paddle1yPos)

        self.paddel2yPos = UpdatePaddle2(Self.paddle2yPos, self.ballyPos)
        drawAIPaddle(self.paddel2yPos)

        #Update variables by changing ball position
        [score,self.paddle1yPos,self.paddel2yPos,self.ballxPos,self.ballyPos,self.ballxDirection,self.ballyDirection] = updateBall(self.paddle1yPos,self.paddel2yPos,self.ballxPos,self.ballyPos,self.ballxDirection,self.ballyDirection)
        drawBall(self.ballxPos,self.ballyPos)

        #Get surface data
        pygame.transform.scale(pygame.display.get_surface(), (80, 80), self.scaled_surface)
        image_data = pygame.surfarray.array2d(self.scaled_surface)

        pygame.display.flip()

        self.tally = self.tally + score

        return [score,image_data]

pygame 1.9.5
Hello from the pygame community. https://www.pygame.org/contribute.html


In [1]:
# import necessary modules from keras
from keras.layers import Dense
from keras.models import Sequential

# creates a generic neural network architecture
model = Sequential()

# hidden layer takes a pre-processed frame as input, and has 200 units
model.add(Dense(units=200,input_dim=80*80, activation='relu', kernel_initializer='glorot_uniform'))

# output layer
model.add(Dense(units=1, activation='sigmoid', kernel_initializer='RandomNormal'))

# compile the model using traditional Machine Learning losses and optimizers
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

#we'll feed the pre-processed version of the difference between the current frame and the last frame to express things like the directionof the ball.

# Macros
UP_ACTION = 2
DOWN_ACTION = 3

# Hyperparameters
gamma = 0.99

# initialization of variables used in the main loop
x_train, y_train, rewards = [],[],[]
reward_sum = 0
episode_nb = 0

Using TensorFlow backend.


In [2]:
import numpy as np

# preprocessing used by Karpathy (cf. https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5)
def prepro(I):
  """ prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
  I = I[35:195] # crop
  I = I[::2,::2,0] # downsample by factor of 2
  I[I == 144] = 0 # erase background (background type 1)
  I[I == 109] = 0 # erase background (background type 2)
  I[I != 0] = 1 # everything else (paddles, ball) just set to 1
  return I.astype(np.float).ravel()

# reward discount used by Karpathy (cf. https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5)
def discount_rewards(r, gamma):
  """ take 1D float array of rewards and compute discounted reward """
  r = np.array(r)
  discounted_r = np.zeros_like(r)
  running_add = 0
  # we go from last reward to first one so we don't have to do exponentiations
  for t in reversed(range(0, r.size)):
    if r[t] != 0: running_add = 0 # if the game ended (in Pong), reset the reward sum
    running_add = running_add * gamma + r[t] # the point here is to use Horner's method to compute those rewards efficiently
    discounted_r[t] = running_add
  discounted_r -= np.mean(discounted_r) #normalizing the result
  discounted_r /= np.std(discounted_r) #idem
  return discounted_r

In [3]:
from easy_tf_log import tflog
from datetime import datetime
from keras import callbacks
import os

# initialize variables
resume = True
running_reward = None
epochs_before_saving = 10
log_dir = './log' + datetime.now().strftime("%Y%m%d-%H%M%S") + "/"

# load pre-trained model if exist
if (resume and os.path.isfile('my_model_weights.h5')):
    print("loading previous weights")
    model.load_weights('my_model_weights.h5')
    
# add a callback tensorboard object to visualize learning
tbCallBack = callbacks.TensorBoard(log_dir=log_dir, histogram_freq=0,  
          write_graph=True, write_images=True)

ModuleNotFoundError: No module named 'easy_tf_log'

In [None]:
# main loop
while (True):

    # preprocess the observation, set input as difference between images
    cur_input = prepro(observation)
    x = cur_input - prev_input if prev_input is not None else np.zeros(80 * 80)
    prev_input = cur_input
    
    # forward the policy network and sample action according to the proba distribution
    proba = model.predict(np.expand_dims(x, axis=1).T)
    action = UP_ACTION if np.random.uniform() < proba else DOWN_ACTION
    y = 1 if action == 2 else 0 # 0 and 1 are our labels

    # log the input and label to train later
    x_train.append(x)
    y_train.append(y)

    # do one step in our environment
    observation, reward, done, info = env.step(action)
    rewards.append(reward)
    reward_sum += reward
    
    # end of an episode
    if done:
        print('At the end of episode', episode_nb, 'the total reward was :', reward_sum)
        
        # increment episode number
        episode_nb += 1
        
        # training
        model.fit(x=np.vstack(x_train), y=np.vstack(y_train), verbose=1, callbacks=[tbCallBack], sample_weight=discount_rewards(rewards, gamma))
        
        # Saving the weights used by our model
        if episode_nb % epochs_before_saving == 0:    
            model.save_weights('my_model_weights' + datetime.now().strftime("%Y%m%d-%H%M%S") + '.h5')
        
        # Log the reward
        running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
        tflog('running_reward', running_reward, custom_dir=log_dir)
        
        # Reinitialization
        x_train, y_train, rewards = [],[],[]
        observation = env.reset()
        reward_sum = 0
        prev_input = None