In [1]:
import cv2
import numpy as np
import random
import time

In [2]:
x, y, vel = 15, 15, (15, 0)
screenx, screeny = 600, 600

In [3]:
def create_image():
    
    global img, food
    
    img = np.zeros((screenx, screeny, 3), np.uint8)

    for index, part in enumerate(snake.body_parts):

        ### Move body to the next frame
        part.pos1[0] += part.vel[0]
        part.pos1[1] += part.vel[1]
        part.pos2[0] += part.vel[0]
        part.pos2[1] += part.vel[1]

        pos1, pos2 = part.pos1, part.pos2
        cv2.rectangle(img, (pos1[0], pos1[1]), (pos2[0], pos2[1]), (0, 255, 0), -1)

    cv2.rectangle(img, (0, 0), (screenx, 30), (255, 255, 255), -1) ### Blank bar at the top
    cv2.rectangle(img, (food[0], food[1]), (food[0] + 12, food[1] + 12), (0, 255, 255), -1) ### Yellow food
    cv2.putText(img=img, text=f"Score: {score}", fontFace=cv2.FONT_HERSHEY_SIMPLEX, org=(25, 25), fontScale=1, color=(0, 0, 0), lineType=1, thickness=2) ### Score
    cv2.imshow('Snake', img) ### Shows the image

In [4]:
def create_food():
    x1, y1 = random.randint(0, screenx - x), random.randint(35, screeny - y)
    return x1, y1

In [5]:
def check_game():
    for index, part in enumerate(snake.body_parts):
    ### Player outside window
        if (part.pos1[0] > screenx) or (part.pos2[0] > screenx) or (part.pos1[0] < 0) or (part.pos2[0] < 0) or \
           (part.pos1[1] > screeny) or (part.pos2[1] > screeny) or (part.pos1[1] < 30) or (part.pos2[1] < 30):
            return 1

        ### Player in itself
        if index > 2:
            if ((snake.body_parts[0].pos1[0] == part.pos1[0] and part.pos2[0] == snake.body_parts[0].pos2[0])):
                if ((snake.body_parts[0].pos1[1] == part.pos1[1] and part.pos2[1] ==  snake.body_parts[0].pos2[1])):
                    return 1

    return False

In [6]:
def step(action):
    global food, score, snake, img
    eat = False
    if action == 0: ### W
        snake.go_up()

    if action == 1:  ### A
        snake.go_left()

    if action == 2: ### S
        snake.go_down()

    if action == 3: ### D
        snake.go_right()

    create_image()
    new_state = img
    done = check_game()

    ### Player eats the food
    if ((snake.body_parts[0].pos1[0] <= food[0] <= snake.body_parts[0].pos2[0]) or (snake.body_parts[0].pos1[0] <= food[0] + x <= snake.body_parts[0].pos2[0])) and \
        ((snake.body_parts[0].pos1[1] <= food[1] <= snake.body_parts[0].pos2[1]) or (snake.body_parts[0].pos1[1] <= food[1] + y <= snake.body_parts[0].pos2[1])):
        food = create_food()
        snake.eat_food()
        score += 1
        eat = True

    if done:
        reward = -1
    else:
        if eat:
            reward = 1
        else:
            reward = 0
        
    ### Updates body position
    if len(snake.body_parts) > 1:
        for index in range(len(snake.body_parts) - 1, 0, -1):
            snake.body_parts[index].vel = snake.body_parts[index - 1].vel

    return new_state, reward, done

In [7]:
def reset():
    global snake, img, food, score
    snake = Snake()
    food = create_food()
    score = 0
    img = np.zeros((screenx, screeny, 3), np.uint8)
    create_image()
    return img

In [8]:
class Snake(object):
    def __init__(self):
        self.startx, self.starty = random.randint(100, 500), random.randint(100, 500)
        head = Body_Part(name='head', pos1=[self.startx, self.starty], pos2=[self.startx + x, self.starty + y])
        self.body_parts = [head]

    def eat_food(self):
        ### Going up
        if self.body_parts[-1].vel[1] < 0:
            self.body_parts.append(Body_Part(pos1=[self.body_parts[-1].pos1[0], self.body_parts[-1].pos2[1]], 
                                             pos2=[self.body_parts[-1].pos1[0] + x, self.body_parts[-1].pos2[1] + y], 
                                             vel=self.body_parts[-1].vel))
        ### Going down
        elif self.body_parts[-1].vel[1] > 0:
            self.body_parts.append(Body_Part(pos1=[self.body_parts[-1].pos1[0], self.body_parts[-1].pos1[1] - y], 
                                             pos2=[self.body_parts[-1].pos2[0], self.body_parts[-1].pos1[1]], 
                                             vel=self.body_parts[-1].vel))
        ### Going right
        elif self.body_parts[-1].vel[0] > 0:
            self.body_parts.append(Body_Part(pos1=[self.body_parts[-1].pos1[0] - x, self.body_parts[-1].pos1[1]], 
                                             pos2=[self.body_parts[-1].pos1[0], self.body_parts[-1].pos2[1]], 
                                             vel=self.body_parts[-1].vel))
        ### Going left
        elif self.body_parts[-1].vel[0] < 0:
            self.body_parts.append(Body_Part(pos1=[self.body_parts[-1].pos2[0], self.body_parts[-1].pos1[1]], 
                                             pos2=[self.body_parts[-1].pos2[0] + x, self.body_parts[-1].pos2[1]], 
                                             vel=self.body_parts[-1].vel))

    ### Update direction if the head is not going the opposite direction
    def go_right(self):
        if self.body_parts[0].vel != (-15, 0):
            self.body_parts[0].vel = (15, 0)
    def go_left(self):
        if self.body_parts[0].vel != (15, 0):
            self.body_parts[0].vel = (-15, 0)
    def go_up(self):
        if self.body_parts[0].vel != (0, 15):
            self.body_parts[0].vel = (0, -15)
    def go_down(self):
        if self.body_parts[0].vel != (0, -15):
            self.body_parts[0].vel = (0, 15)

In [9]:
class Body_Part(object):
    def __init__(self, pos1, pos2, vel=vel, name='part'):
        self.name = name
        self.pos1 = pos1
        self.pos2 = pos2
        self.vel = vel

In [10]:
import warnings
warnings.simplefilter('ignore')

import tensorflow as tf
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.compat.v1.Session(config=config)
tf.compat.v1.keras.backend.set_session(sess)

print("Is GPU available?", tf.test.is_gpu_available())
print("TF version:", tf.__version__)
print("Keras version:", tf.keras.__version__)

Instructions for updating:
Use `tf.config.list_physical_devices('GPU')` instead.
Is GPU available? True
TF version: 2.3.1
Keras version: 2.4.0


In [11]:
import matplotlib.pyplot as plt
from collections import deque
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.layers import Dense, Activation, Conv2D, Flatten, Concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import backend as k

k.set_image_data_format('channels_first')

In [12]:
class NN(Model):
    def __init__(self, n_actions):
        super(NN, self).__init__()
        
        self.vel = Dense(16, input_shape=(2,), activation='relu')
        
        self.conv1 = Conv2D(filters=8, kernel_size=(4, 4), strides=(1, 1), padding='same', activation='relu', input_shape=(3, 128, 128))
        self.conv2 = Conv2D(filters=16, kernel_size=(3, 3), strides=(1, 1), padding='same', activation='relu')
        self.conv3 = Conv2D(filters=16, kernel_size=(3, 3), strides=(1, 1), padding='same', activation='relu')
        self.flatten = Flatten()
        self.dense1 = Dense(128, activation='relu')
        self.dense2 = Dense(128, activation='relu')
        self.V = Dense(1, activation=None)
        self.A = Dense(n_actions, activation=None)
    
    def call(self, data):
        state, vel = data
        
        vel = np.array([vel[0] / 15, vel[1] / 15])
        y = self.vel(vel)
        
        x = self.conv1(state)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.flatten(x)
        x = Concatenate()[x, y]
        x = self.dense1(x)
        x = self.dense2(x)
        V = self.V(x)
        A = self.A(x)
        
        Q = (V + (A - tf.math.reduce_mean(A, axis=1, keepdims=True)))
        
        return Q
    
    def advantage(self, state):
        x = self.conv1(state)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.dense2(x)
        A = self.A(x)
        
        return A

In [13]:
class Agent():
    def __init__(self, input_shape, n_actions, mem_size, eps, eps_min, eps_dec, gamma, q_eval_name, q_next_name, 
                 replace_freq, lr=0.0005):
        self.Q_eval = NN(n_actions)
        self.Q_next = NN(n_actions)
        self.Q_eval.compile(optimizer=Adam(lr=lr), loss='mse')
        self.Q_next.compile(optimizer=Adam(lr=lr), loss='mse')
        #self.Q_eval.summary()
        self.memory = deque(maxlen=mem_size)
        self.eps = eps
        self.eps_min = eps_min
        self.eps_dec = eps_dec
        self.gamma = gamma
        self.replace = replace_freq
        self.action_space = [i for i in range(n_actions)]
        self.steps = 0
        self.input_shape = input_shape
        self.q_eval_name = q_eval_name
        self.q_next_name = q_next_name
    
    def store(self, state, action, reward, n_state, done, vels):
        pack = [np.expand_dims(state, axis=0), action, reward, np.expand_dims(n_state, axis=0), done, vels]
        self.memory.append(pack)
    
    def take_data(self, batch_size):
        pack = random.sample(self.memory, batch_size)
        states = []
        actions = []
        rewards = []
        n_states = []
        dones = []
        vels = []
        for i in range(batch_size):
            states.append(pack[i][0])
            actions.append(pack[i][1])
            rewards.append(pack[i][2])
            n_states.append(pack[i][3])
            dones.append(pack[i][4])
            vels.append(pack[i][5])
        return states, actions, rewards, n_states, dones, vels
    
    def choose_action(self, state):
        if np.random.random() > self.eps:
            return np.argmax(self.Q_eval.advantage(np.expand_dims(state, axis=0)))
        return np.random.choice(self.action_space)
    
    def decay_eps(self): 
        self.eps = self.eps - self.eps_dec if self.eps > self.eps_min else self.eps_min
    
    def replace_weights(self):
        if not (self.steps % self.replace):
            self.Q_next.set_weights(self.Q_eval.get_weights())
    
    def upgrade(self, batch_size=64):
        if len(self.memory) >= 4*batch_size:
            states, actions, rewards, n_states, dones, vels = self.take_data(batch_size)
            
            self.replace_weights()
            
            act = [np.argmax(self.Q_eval([np.array(n_states[i]), vels[i]])) for i in range(batch_size)]
            q_next = [self.Q_next([np.array(n_states[i]), vels[i]]).numpy()[0][act[i]] for i in range(batch_size)]
            q_target = [self.Q_eval([states[i], vels[i]]).numpy()[0] for i in range(batch_size)]
            
            for i in range(batch_size):
                q_target[i][actions[i]] = rewards[i] + self.gamma*q_next[i]*(1 - dones[i])
            
            states = np.reshape(states, (batch_size, *self.input_shape))
            
            self.Q_eval.train_on_batch([np.array(states), vels], np.array(q_target))
            
            self.decay_eps()
            self.steps += 1

    def save(self):
        self.Q_eval.save_weights(self.q_eval_name)

In [14]:
agent = Agent(input_shape=(3, 128, 128), n_actions=4, mem_size=10000, eps=1.0, 
              eps_min=0.001, eps_dec=0.001, gamma=0.99, q_eval_name='Q_eval.h5', q_next_name='Q_next.h5', 
              replace_freq=800)

In [15]:
def transform(state):
    global p
    state = cv2.resize(state, (128, 128))
    #state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
    state = np.reshape(state, (3, 128, 128))
    state = np.array(state, np.float32)
    state /= 255.0
    return state

In [16]:
n_episodes = 1000
totals = []
means = []

for i in range(n_episodes):
    state = transform(reset())
    done = False
    total = 0
    while not done:
        action = agent.choose_action(state)
        n_state, reward, done = step(action)
        agent.store(state, action, reward, transform(n_state), int(done), list(snake.body_parts[0].vel))
        agent.upgrade()
        state = transform(n_state)
        total += reward
        key = cv2.waitKey(1) & 0xFF
    totals.append(total)
    means.append(np.mean(totals))
    print('episode', i, 'score', total, 'eps', agent.eps)

episode 0 score -1 eps 1.0
episode 1 score -1 eps 1.0
episode 2 score -1 eps 1.0
episode 3 score -1 eps 1.0


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



ValueError: Input 0 of layer dense is incompatible with the layer: : expected min_ndim=2, found ndim=1. Full shape received: [2]