In [None]:
from pg import PG
from dqn import DQN
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
import random
import math
from time import sleep
from IPython.display import clear_output

In [None]:
class Snake:

    def __init__(self, size=(16, 16), gui=False):
        self.size = size
        self.gui = gui
        self.snake = [(size[0] // 2, 3), (size[0] // 2, 2), (size[0] // 2, 1)]
        self.positions = [(x, y) for x in range(size[0]) for y in range(size[1])]
        self.direction = (0, 1)
        self.food = random.choice([p for p in self.positions if p not in self.snake])
        self.points = 0
        self.game_over = False
        self.visualize()

    def visualize(self):
        if self.gui:
            field = np.zeros(self.size)
            field[self.food] = 3
            field[np.array(self.snake)[:, 0], np.array(self.snake)[:, 1]] = 1
            field[self.snake[0]] = 2
            replacements = {'2.': '◈', '0.': '▢', '1.': '▩', '3.': '◉', '[': '', ']': ''}
            output = str(field)
            for key, val in replacements.items():
                output = output.replace(key, val)
            clear_output(wait=True)
            print('Points: {0}\n\n {1}'.format(self.points, output))
            sleep(0.1)
            if self.game_over:
                sleep(1)
            else:
                sleep(0.1)
                
    def get_vector(self, direct):
        # Vectors to select from
        vecs = [(0, 1), (1, 0), (0, -1), (-1, 0)]
        # Right
        if direct == 0:
            return vecs[(vecs.index(self.direction) + 1) % 4]
        # Left
        if direct == 1:
            return vecs[(vecs.index(self.direction) - 1) % 4]
        # Forward
        return self.direction

    def obs_and_invalid(self):
        # Get angle to food
        dir_food = tuple(np.array(self.food) - np.array(self.snake[0]))
        angle = math.acos((self.direction[0] * dir_food[0] + self.direction[1] * dir_food[1]) / math.sqrt(dir_food[0] ** 2 + dir_food[1] ** 2)) / math.pi
        # Get positions right, left, and in front of snake
        positions = [tuple(np.array(self.snake[0]) + np.array(self.get_vector(i))) for i in range(3)]
        # Get booleans that indicate whether there is an obstacle right, left, and in front of snake
        obstacles = [float(p not in self.positions or p in self.snake[:-1]) for p in positions]
        # Get invalid actions if there are 1 or 2
        invalid = np.argwhere(np.array(obstacles) == 1).flatten().tolist() if 0 < np.sum(obstacles) < 3 else []
        # Return observation (as numpy array) and invalid actions
        return np.array(obstacles + [angle]), invalid

    def step(self, action):
        # Get new movement direction of snake
        vec = self.get_vector(action)
        # Calculate new position of snake's head
        new_pos = tuple(np.array(self.snake[0]) + np.array(vec))
        # Check if new position is within the borders and not part of snake (else: game over and reward -2)
        if new_pos not in self.positions or new_pos in self.snake[:-1]:
            self.game_over = True
            reward = -2
        else:
            # Calculate the current and the new distance to food
            d_food_prev = np.sum(np.abs(np.array(self.food) - np.array(self.snake[0])))
            d_food_new = np.sum(np.abs(np.array(self.food) - np.array(new_pos)))
            # Apply move to environment
            self.snake = [new_pos] + self.snake
            last = self.snake[-1]
            self.snake = self.snake[:-1]
            self.direction = vec
            # Check if food is reached and increase length of snake
            if new_pos == self.food:
                self.snake.append(last)
                self.points += 1
                self.food = random.choice([p for p in self.positions if p not in self.snake])
            # reward of 1 if food is reached or distance to food has decreased, else -1
            reward = 1 if d_food_new < d_food_prev else -1
        # Visualize environment (only if gui is enabled)
        self.visualize()
        # Get current observation and invalid actions
        obs, invalid = self.obs_and_invalid()
        return obs, reward, self.game_over, invalid

## Policy Gradients

In [None]:
layers =[4, (32, 'elu'), (32, 'elu'), (3, 'softmax')]
loss = keras.losses.categorical_crossentropy
optimizer = keras.optimizers.Adam(lr=0.01)
discount_factor = 0.95
file = 'C:/users/carlo/Documents/Models/snake_pg/snake_32_32.h5'

agent = PG(layers, loss, optimizer, discount_factor, None)

In [None]:
iterations = 500
episodes = 25
max_steps_per_score = 128
train = False

best = None
mean_rewards = []
for i in range(iterations):
    all_rewards = []
    all_grads = []
    all_points = []
    for e in range(episodes):
        current_rewards = []
        current_grads = []
        # Get initial observation
        env = Snake(gui=not train)
        obs, invalid = env.obs_and_invalid()
        # Store steps per score in dictionary
        steps_per_score = {}
        while steps_per_score.get(env.points, 0) < max_steps_per_score:
            # Get action and corresponding gradient
            action, grads = agent.run_policy(obs, invalid)
            # Perform the action to get new observation and reward data 
            obs, reward, done, invalid = env.step(action)
            # Save reward / gradient in current_rewards / current_gradients
            current_rewards.append(reward)
            current_grads.append(grads)
            # Increase steps of current score by one
            steps_per_score[env.points] = steps_per_score.get(env.points, 0) + 1
            # Exit loop if game over
            if done:
                break
        # Save lists current_rewards / current_grads in all_rewards / all_grads
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
        all_points.append(env.points)
    mean_reward = np.sum([r for e in all_rewards for r in e]) / episodes
    mean_rewards.append(mean_reward)
    if train:
        print('Iteration {0}/{1} - mean reward, score: {2}, {3}'.format(i + 1, iterations, mean_reward, np.mean(all_points)))
        # Save model if it scored best
        if best is None or mean_reward >= best:
            agent.save('snake_pg')
            best = mean_reward
            print('Model saved.')
        # Use collected reward and gradient data to train agent
        agent.apply_grads(all_rewards, all_grads)
# Plot mean rewards
plt.plot(range(iterations), mean_rewards)
plt.xlabel('Iteration')
plt.ylabel('Mean reward')

## Deep Q-Learning

In [None]:
layers = [4, (96, 'elu'), (96, 'elu'), (3, 'linear')]
loss = keras.losses.mean_squared_error
optimizer = keras.optimizers.Adam(lr=1e-3)
discount_factor = 0.95
buffer_size = 50000
file = 'C:/users/carlo/Documents/Models/snake_dqn/snake_dqn_96_96.h5'

agent = DQN(layers, loss, optimizer, discount_factor, buffer_size, file)

In [None]:
episodes = 25000
max_steps_per_score = 128
n_pretrain = 100
update_target = 200
batch_size = 128
epsilon_decay = 1, 0.01, 15000
train = False

best = None
total_rewards = []
scores = []
for e in range(episodes):
    total_reward = 0
    # Initialize environment and get initial state
    env = Snake(gui=not train)
    state, invalid = env.obs_and_invalid()
    # Store steps per score in dictionary
    steps_per_score = {}
    while steps_per_score.get(env.points, 0) < max_steps_per_score:
        # Get agent's action
        epsilon = max(epsilon_decay[0] - e / epsilon_decay[2], epsilon_decay[1]) if train else 0
        action = agent.play_one_step(state, epsilon, invalid)
        # Let environment perform action and update current state
        next_state, reward, done, invalid = env.step(action)
        agent.add_experience(state, action, reward, next_state, done, invalid)
        state = next_state
        total_reward += reward
        # Increase steps of current score by one
        steps_per_score[env.points] = steps_per_score.get(env.points, 0) + 1
        # Exit loop if game over
        if done:
            break
    # Save and print game data
    total_rewards.append(total_reward)
    scores.append(env.points)
    if train:
        print('Episode {0}/{1} - total reward, score: {2}, {3}'.format(e + 1, episodes, total_reward, env.points))
        # Save model if the highest reward has been collected
        if best is None or total_reward >= best: 
            agent.save('snake_dqn')
            best = total_reward
            print('Model saved.')
        # Perform training step
        if e >= n_pretrain:
            agent.training_step(batch_size)
            if e % update_target == 0:
                agent.update_target_model()
# Plot scores and total_rewards
plt.plot(range(episodes), total_rewards)
plt.xlabel('Episode')
plt.ylabel('Reward')