*Breakout Game that uses Deep learning with Reinforcement Learning*


Done By Aditya Narayanasetti

In [4]:
import pygame
import math
import random

# Constant for determining the screen size.
SIZE_OF_THE_SCREEN = 424, 430
# Dimensions of the brikcs
HEIGHT_OF_BRICK  = 13
WIDTH_OF_BRICK   = 32
# Dimensions of the paddle
HEIGH_OF_PADDLE = 8
PADDLE_WIDTH  = 50
# Y coordinate for Paddle
PADDLE_Y = SIZE_OF_THE_SCREEN[1] - HEIGH_OF_PADDLE - 10
# Dimensions of the ball
BALL_DIAMETER = 12
BALL_RADIUS   = BALL_DIAMETER // 2
# X coordinate for Paddles
MAX_PADDLE_X = SIZE_OF_THE_SCREEN[0] - PADDLE_WIDTH
MAX_BALL_X   = SIZE_OF_THE_SCREEN[0] - BALL_DIAMETER
MAX_BALL_Y   = SIZE_OF_THE_SCREEN[1] - BALL_DIAMETER

# Color constants
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
BLUE  = (0, 0, 255)
COLOR_OF_BRICK = (153, 76, 0)
PADDLE_COLOR = (204,0,0)

FPS = 60
FPSCLOCK = pygame.time.Clock()

pygame.init() # Calling pygame module
screen = pygame.display.set_mode(SIZE_OF_THE_SCREEN)
pygame.display.set_caption(" BREAKOUT")
clock = pygame.time.Clock()

class Breakout:

    def __init__(self):
        self.capture = 0
        # Used 12-velocity to train
        self.ball_vel = [12,-12]
        self.paddle   = pygame.Rect(215, PADDLE_Y,PADDLE_WIDTH, HEIGH_OF_PADDLE)
        self.ball     = pygame.Rect(225,PADDLE_Y - BALL_DIAMETER,BALL_DIAMETER,BALL_DIAMETER)
        self.create_bricks()

    def create_bricks(self):
        y_ofs = 20
        self.bricks = []
        for i in range(11):
            x_ofs = 15
            for j in range(12):
                self.bricks.append(pygame.Rect(x_ofs,y_ofs,WIDTH_OF_BRICK,HEIGHT_OF_BRICK))
                x_ofs += WIDTH_OF_BRICK + 1
            y_ofs += HEIGHT_OF_BRICK + 1
    def draw_bricks(self):
        for brick in self.bricks:
            pygame.draw.rect(screen, COLOR_OF_BRICK, brick)
    def draw_paddle(self):
        pygame.draw.rect(screen, PADDLE_COLOR, self.paddle)
    def draw_ball(self):
        pygame.draw.circle(screen, WHITE, (self.ball.left + BALL_RADIUS, self.ball.top + BALL_RADIUS), BALL_RADIUS)

    def check_input(self,input_action):
        # 0-LEFT, 1-Right
        if input_action[0] == 1:
        	# Used 12-velocity to train --> self.paddle.left -= 12
            self.paddle.left -= 12
            if self.paddle.left < 0:
                self.paddle.left = 0
        if input_action[1] == 1:
        	# Likewise 12
            self.paddle.left += 12
            if self.paddle.left > MAX_PADDLE_X:
                self.paddle.left = MAX_PADDLE_X

    def move_ball(self):
        self.ball.left += self.ball_vel[0]
        self.ball.top  += self.ball_vel[1]
        if self.ball.left <= 0:
            self.ball.left = 0
            self.ball_vel[0] = -self.ball_vel[0]
        elif self.ball.left >= MAX_BALL_X:
            self.ball.left = MAX_BALL_X
            self.ball_vel[0] = -self.ball_vel[0]
        if self.ball.top < 0:
            self.ball.top = 0
            self.ball_vel[1] = -self.ball_vel[1]
        elif self.ball.top >= MAX_BALL_Y:
            self.ball.top = MAX_BALL_Y
            self.ball_vel[1] = -self.ball_vel[1]

    def take_action(self,input_action):

        pygame.event.pump()

        reward = 0.1
        terminal = False
        randNum = random.randint(0,1)

        # Get every event
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                quit()

        screen.fill(BLACK)
        self.check_input(input_action)
        self.move_ball()

        # Handle Collisions
        for brick in self.bricks:
            if self.ball.colliderect(brick):
                reward = 2
                self.ball_vel[1] = -self.ball_vel[1]
                self.bricks.remove(brick)
                break
        if len(self.bricks) == 0:
            self.terminal = True
            self.__init__()
        if self.ball.colliderect(self.paddle):
            self.ball.top = PADDLE_Y - BALL_DIAMETER
            self.ball_vel[1] = -self.ball_vel[1]
        elif self.ball.top > self.paddle.top:
            terminal = True
            self.__init__()
            reward = -2

        self.draw_bricks()
        self.draw_ball()
        self.draw_paddle()
        # Capture screen and assign to image_data
        image_data = pygame.surfarray.array3d(pygame.display.get_surface())
        # Update the screen
        pygame.display.update()
        FPSCLOCK.tick(FPS)
        return image_data, reward, terminal

In [5]:

import cv2
import numpy as np
from collections import deque
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
import torch.nn.functional as F

import os
import sys
import time


class NeuralNetwork(nn.Module):

    def __init__(self):
        super(NeuralNetwork, self).__init__()

        self.number_of_actions = 2
        self.gamma = 0.99
        self.final_epsilon = 0.05
        self.initial_epsilon = 0.1
        self.number_of_iterations = 2000
        self.replay_memory_size = 750000
        self.minibatch_size = 32
        self.explore = 3000000 # Timesteps to go from INITIAL_EPSILON to FINAL_EPSILON

        self.conv1 = nn.Conv2d(4, 32, kernel_size = 8, stride = 4)
        self.conv2 = nn.Conv2d(32, 64, 4, 2)
        self.conv3 = nn.Conv2d(64, 64, 3, 1)
        self.fc4 = nn.Linear(7 * 7 * 64, 512)
        self.fc5 = nn.Linear(512, self.number_of_actions)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        #make sure input tensor is flattened
        x = F.relu(self.fc4(x.view(x.size(0), -1)))
        return self.fc5(x)


def preprocessing(image):
	image_data = cv2.cvtColor(cv2.resize(image, (84, 84)), cv2.COLOR_BGR2GRAY)
	image_data[image_data > 0] = 255
	image_data = np.reshape(image_data,(84, 84, 1))
	image_tensor = image_data.transpose(2, 0, 1)
	image_tensor = image_tensor.astype(np.float32)
	image_tensor = torch.from_numpy(image_tensor)
	return image_tensor

def init_weights(m):
    if type(m) == nn.Conv2d or type(m) == nn.Linear:
        torch.nn.init.uniform(m.weight, -0.01, 0.01)
        m.bias.data.fill_(0.01)

def train(model, start):
    # define Adam optimizer
    optimizer = optim.Adam(model.parameters(), lr=0.0002)

    # initialize mean squared error loss
    criterion = nn.MSELoss() # crossentropy

    # instantiate game
    game_state = Breakout()

    # initialize replay memory
    D = deque()
    #replay = []

    # initial action is do nothing
    action = torch.zeros([model.number_of_actions], dtype=torch.float32)
    action[0] = 0
    image_data, reward, terminal = game_state.take_action(action)
    image_data = preprocessing(image_data)
    state = torch.cat((image_data, image_data, image_data, image_data)).unsqueeze(0) # 1-4-84-84

    # initialize epsilon value
    epsilon = model.initial_epsilon
    iteration = 0

    #epsilon = 0.0927
    #iteration = 420000
    # main infinite loop
    while iteration < model.number_of_iterations:
        # get output from the neural network
        output = model(state)[0] # Output size = torch.Size([2]) tensor([-0.0278,  1.7244]
        #output = model(state)

        # initialize action
        action = torch.zeros([model.number_of_actions], dtype=torch.float32)

        # epsilon greedy exploration
        random_action = random.random() <= epsilon
        if random_action:
            print("Random action!")

        # Pick action --> random or index of maximum q value
        action_index = [torch.randint(model.number_of_actions, torch.Size([]), dtype=torch.int)
                        if random_action
                        else torch.argmax(output)][0]

        #print("Action index shape: ", action_index.shape) # torch.Size([])

        action[action_index] = 1

        if epsilon > model.final_epsilon:
            epsilon -= (model.initial_epsilon - model.final_epsilon) / model.explore

        # get next state and reward
        image_data_1, reward, terminal = game_state.take_action(action)
        image_data_1 = preprocessing(image_data_1)

        #print("İmage data_1 shape: ", image_data_1.shape)  # 1-84-84

        state_1 = torch.cat((state.squeeze(0)[1:, :, :], image_data_1)).unsqueeze(0)   # squeeze(0).shape = 4-84-84
        #print("State_1 Shape: ", state_1.shape) # State_1 Shape = ([1, 4, 84, 84])     # squeeze(0)[1:,:,:].shape = 3-84-84
        action = action.unsqueeze(0)
        #print("Action size: ", action.shape) # 1-2
        reward = torch.from_numpy(np.array([reward], dtype=np.float32)).unsqueeze(0)
        #print("Reward size: ", reward.shape)
        # save transition to replay memory
        D.append((state, action, reward, state_1, terminal))

        # if replay memory is full, remove the oldest transition
        if len(D) > model.replay_memory_size:
            D.popleft()

        # sample random minibatch
        # it picks k unique random elements, a sample, from a sequence: random.sample(population, k)
        minibatch = random.sample(D, min(len(D), model.minibatch_size))
        # unpack minibatch

        state_batch   = torch.cat(tuple(d[0] for d in minibatch))
        #print("state_batch size: ", state_batch.shape)
        action_batch  = torch.cat(tuple(d[1] for d in minibatch))
        #print("action_batch size: ", action_batch.shape)
        reward_batch  = torch.cat(tuple(d[2] for d in minibatch))
        #print("reward_batch size: ", reward_batch.shape)
        state_1_batch = torch.cat(tuple(d[3] for d in minibatch))
        #print("state_1_batch size: ", state_1_batch.shape)

        # get output for the next state
        output_1_batch = model(state_1_batch)
        #print("output_1_batch: " , output_1_batch.shape)

        # set y_j to r_j for terminal state, otherwise to r_j + gamma*max(Q) Target Q value Bellman equation.
        y_batch = torch.cat(tuple(reward_batch[i] if minibatch[i][4]
                                  else reward_batch[i] + model.gamma * torch.max(output_1_batch[i])
                                  for i in range(len(minibatch))))


        # extract Q-value -----> column1 * column1 + column2 * column2
        # The main idea behind Q-learning is that if we had a function Q∗ :State × Action → ℝ
        #that could tell us what our return would be, if we were to take an action in a given state,
        #then we could easily construct a policy that maximizes our rewards
        q_value = torch.sum(model(state_batch) * action_batch, dim=1)
        #print("q_value: ", q_value.shape)

        # PyTorch accumulates gradients by default, so they need to be reset in each pass
        optimizer.zero_grad()

        # returns a new Tensor, detached from the current graph, the result will never require gradient
        y_batch = y_batch.detach()

        # calculate loss
        loss = criterion(q_value, y_batch)

        # do backward pass
        loss.backward()
        optimizer.step()

        # set state to be state_1
        state = state_1
        iteration += 1

        if iteration % 10000 == 0:
            torch.save(model, "trained_model/current_model_" + str(iteration) + ".pth")

        print("total iteration: {} Elapsed time: {:.2f} epsilon: {:.5f}"
               " action: {} Reward: {:.1f}".format(iteration,((time.time() - start)/60),epsilon,action_index.cpu().detach().numpy(),reward.numpy()[0][0]))

def test(model):
    game_state = Breakout()

    # initial action is do nothing
    action = torch.zeros([model.number_of_actions], dtype=torch.float32)
    action[0] = 1
    image_data, reward, terminal = game_state.take_action(action)
    image_data = preprocessing(image_data)
    state = torch.cat((image_data, image_data, image_data, image_data)).unsqueeze(0)

    while True:
        # get output from the neural network
        output = model(state)[0]

        action = torch.zeros([model.number_of_actions], dtype=torch.float32)

        # get action
        action_index = torch.argmax(output)
        action[action_index] = 1

        # get next state
        image_data_1, reward, terminal = game_state.take_action(action)
        image_data_1 = preprocessing(image_data_1)
        state_1 = torch.cat((state.squeeze(0)[1:, :, :], image_data_1)).unsqueeze(0)

        # set state to be state_1
        state = state_1

def main(mode):
    if mode == 'test':
        model = torch.load('trained_model/current_model_420000.pth', map_location='cpu').eval()
        test(model)
    elif mode == 'train':
        if not os.path.exists('trained_model/'):
            os.mkdir('trained_model/')
        model = NeuralNetwork()
        model.apply(init_weights)
        start = time.time()
        train(model, start)
    elif mode == 'continue': # You can change trained model id and keep training.
        model = torch.load('trained_model/current_model_420000.pth', map_location='cpu').eval()
        start = time.time()
        train(model, start)

if __name__ == "__main__":
    main('train')

  torch.nn.init.uniform(m.weight, -0.01, 0.01)


total iteration: 1 Elapsed time: 0.00 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 2 Elapsed time: 0.00 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 3 Elapsed time: 0.00 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 4 Elapsed time: 0.00 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 5 Elapsed time: 0.00 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 6 Elapsed time: 0.00 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 7 Elapsed time: 0.00 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 8 Elapsed time: 0.01 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 9 Elapsed time: 0.01 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 10 Elapsed time: 0.01 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 11 Elapsed time: 0.01 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 12 Elapsed time: 0.01 epsilon: 0.10000 action: 0 Reward: 0.1
total iteration: 13 Elapsed time: 0.01 epsilon: 0.10000 action: 0 Reward:


1. **Imported Libraries**: The code imports several libraries including `pygame`, `math`, `random`, `cv2`, `numpy`, `torch`, and related modules for neural network training.

2. **Constants and Parameters**: Constants such as screen size, dimensions of bricks, paddle, and ball, colors, and game parameters like FPS are defined at the beginning of the script. This makes it easy to adjust these values if needed.

3. **Breakout Class**: This class defines the Breakout game environment and its functionalities. It includes methods for initializing the game, creating bricks, drawing elements (bricks, paddle, ball), handling user input, moving the ball, and taking actions in the game.

4. **Neural Network Class**: This class defines the neural network architecture for the Q-learning algorithm. It includes convolutional layers (`Conv2d`), fully connected layers (`Linear`), and the forward method for processing input images.

5. **Preprocessing Function**: This function preprocesses images by resizing them to 84x84 pixels, converting to grayscale, and reshaping to the required format for input to the neural network.

6. **Training Function**: The `train` function implements the Q-learning algorithm. It initializes the game environment, neural network, optimizer, and replay memory. It iterates through episodes, selecting actions based on an epsilon-greedy policy, updating the Q-values, and sampling minibatches from the replay memory for training.

7. **Main Function**: The `main` function determines the mode of operation (`train`, `test`, or `continue`) and executes the corresponding logic. In training mode, it initializes a neural network, starts training, and saves the trained model. In test mode, it loads a pre-trained model and evaluates its performance in the game environment. In continue mode, it resumes training from a pre-trained model.

8. **Overall Structure**: The code follows a structured approach with clear separation of concerns. The game logic is encapsulated within the `Breakout` class, while the neural network training logic is encapsulated within the `NeuralNetwork` class and related functions. The main function orchestrates the training or testing process based on the chosen mode.

9. **Comments and Documentation**: The code includes comments explaining the purpose of each function, method, and significant code block. This helps in understanding the code and its functionalities.

10. **Potential Improvements**:
    - The code could benefit from additional documentation, especially for complex functions and methods.
    - Error handling mechanisms could be added to handle exceptions and edge cases gracefully.
    - Performance optimizations such as parallelization could be explored to improve training efficiency.
    - Hyperparameters tuning and experimentation with different network architectures could be conducted to enhance learning performance.
    - Visualization tools could be incorporated to monitor training progress and analyze agent behavior.
    - Unit tests could be implemented to ensure the correctness of individual components.