In [7]:
import sys
sys.path.append("/content/drive/My Drive/snakeAI")

import tensorflow as tf
import tensorflow.keras as k

from Snake.environment import SnakeMaze

from Snake.utils import resize_image, save_images, save_video, save_graph, save_eval, generate_animation, euclidean_distance
from Snake.variables import Cell, Status

import matplotlib.pyplot as plt

import numpy as np
import os

from models.replay_memory import ReplayMemory, Experience
from time import time
from datetime import datetime
from Snake.agent import Snake


In [8]:
def get_observation(env, handle=0, local_knowledge=5):
  snake = env.snakes[handle]

  if snake.status == Status.DEAD:
    return np.zeros((33 + local_knowledge**2 ))

  snake_position = snake.body[0]

  env_width, env_height, = env.matrix.shape

  directions_offsets = [(0, 1), (-1, 0), (1, 0), (0, -1), (-1, 1), (1, -1), (1, 1), (-1, -1)]
  features = []

  for offset_x, offset_y in directions_offsets: 
    distances_to_cell_type = {c: -1 for c in Cell.CELL_DICT.values()}
    tmp_x, tmp_y = snake_position
    tmp_x += offset_x
    tmp_y += offset_y
    
    distance = 1
    while (-1 < tmp_x < env_height) and (-1 < tmp_y < env_width):
      curr_cell_type = env.matrix[tmp_x, tmp_y]
      if distances_to_cell_type[curr_cell_type] == -1:
        distances_to_cell_type[curr_cell_type] = distance
      distance += 1
      tmp_x += offset_x
      tmp_y += offset_y

    del distances_to_cell_type[Cell.EMPTY_CELL]
    features.extend(distances_to_cell_type.values())
  
  features.append(len(snake.body))

  tmp_matrix = np.pad(env.matrix, local_knowledge)
  tmp_matrix = tmp_matrix[
                          snake_position[0]: snake_position[0]+local_knowledge, 
                          snake_position[1]: snake_position[1]+local_knowledge, 
                          ]
  features.extend(tmp_matrix.flatten())
  
  return np.array(features)  

def reward(snake: Snake, env: SnakeMaze, direction):
    if snake.status == Status.DEAD:
        return -500
    else:
        r = 0
        r += snake.steps_without_food * (-2)
        r += 350 if snake.steps_without_food == 1 else 0
        return r

@tf.function
def loss(p, t):
    return tf.reduce_sum(tf.square(t - p))


@tf.function
def train_step(states, targets, model, optimizer):
    with tf.GradientTape() as tape:
        q = model(states, training=True)
        _loss = loss(q, targets)
    grad = tape.gradient(_loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grad, model.trainable_variables))

In [9]:
NUM_NOT_TRAINABLE_BLOCKS = 4

IMAGE_SIZE = (64, 64)
IMAGE_SHAPE = (*IMAGE_SIZE, 3)

learning_rate = 1e-5

config = {
  "max_steps_per_episode": 500,
  "num_rolling_avg_sample": 10,
  "evaluate_each": 100,
  "save_models": True,
  "save_graphs": True,
  "gamma": .9,
  "epsilon": 1,
  "epsilon_decay": 0.00005,
  "boundaries": True,
  "maze_width": 10,
  "image_size": IMAGE_SIZE,
  "batch_size": 128,
  "path_to_weights": os.path.join(*['drive', 'My Drive', 'snakeAI', 'trainings', 'NonConvolutional', '28Jun2020__201526222058', 'model']),
  "comment": f"""
  Architecture: Non-convolutional (continuing from 28Jun2020__201526222058)
  Optimizer: Adam ( with amsgrad )
  Hyperparameters:
    lr = {learning_rate}
  Reward:
    if snake.status == Status.DEAD:
        return -500
    else:
        r = 0
        r += snake.steps_without_food * (-2)
        r += 350 if snake.steps_without_food == 1 else 0
        return r
  """
}

In [30]:
np.array([1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3]).reshape(-1, 3)

array([[1, 2, 3],
       [1, 2, 3],
       [1, 2, 3],
       [1, 2, 3],
       [1, 2, 3]])

In [10]:
model = k.Sequential([
                      k.layers.Input(58, name='InputLayer'),
                      k.layers.Dense(40, activation='relu', kernel_regularizer=k.regularizers.l1_l2(), name='FirstDense'),
                      k.layers.Dropout(.5, name='FirstDropout'),
                      k.layers.Dense(20, activation='relu', kernel_regularizer=k.regularizers.l1_l2(), name='SecondDense'),
                      k.layers.Dropout(.5, name='SecondDropout'),
                      k.layers.Dense(4, activation='softmax', kernel_regularizer=k.regularizers.l1_l2(), name='ThirdDense'),
], name='NonSequential')
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate, amsgrad=True)
config['training_dir'] = os.path.join(*['drive', 'My Drive', 'snakeAI', 'trainings', "NonConvolutional", datetime.now().strftime('%d%h%Y__%H%M%S%f')])

verbose = config['verbose'] if 'verbose' in config else True
save_videos_ = config['save_videos'] if 'save_videos' in config else False
save_images_ = config['save_images'] if 'save_images' in config else False
evaluate_each = config['evaluate_each'] if 'evaluate_each' in config else 50
num_rolling_avg_sample = config['num_rolling_avg_sample'] if 'num_rolling_avg_sample' in config else 50
max_steps_per_episode = config['max_steps_per_episode'] if 'max_steps_per_episode' in config else 200
num_episodes = config['num_episodes'] if 'num_episodes' in config else None
gamma = config['gamma'] if 'gamma' in config else .8
epsilon = config['epsilon'] if 'epsilon' in config else 1.
epsilon_decay = config['epsilon_decay'] if 'epsilon_decay' in config else 5e-4
min_epsilon = config['min_epsilon'] if 'min_epsilon' in config else 1e-2
memory_size = config['memory_size'] if 'memory_size' in config else 10000
boundaries = config['boundaries'] if 'boundaries' in config else True
maze_width = config['maze_width'] if 'maze_width' in config else 10
maze_height = config['maze_height'] if 'maze_height' in config else maze_width
max_snakes = config['max_snakes'] if 'max_snakes' in config else 1
path_to_weights = config['path_to_weights'] if 'path_to_weights' in config else None
image_size = config['image_size'] if 'image_size' in config else (112, 112)
training_dir = config['training_dir'] if 'training_dir' in config else './'
comment = config['comment'] if 'comment' in config else ''
batch_size = config['batch_size'] if 'batch_size' in config else 64
comment = comment + f"""

    _________________________________________________________
    ---------------------------------------------------------

    max_steps_per_episode  --> {max_steps_per_episode}
    num_episodes           --> {num_episodes}
    gamma                  --> {gamma}
    epsilon                --> {epsilon}
    epsilon_decay          --> {epsilon_decay}
    min_epsilon            --> {min_epsilon}
    memory_size            --> {memory_size}
    boundaries             --> {boundaries}
    maze_width             --> {maze_width}
    maze_height            --> {maze_height}
    max_snakes             --> {max_snakes}
    path_to_weights        --> {path_to_weights}
    _________________________________________________________
    ---------------------------------------------------------
    
    """

os.makedirs(training_dir, exist_ok=True)
comment_path = os.path.join(training_dir, "info.txt")
with open(comment_path, "w") as text_file:
    text_file.write(comment)
    model.summary(print_fn=lambda x: text_file.write(x + '\n'))
model.summary()

if path_to_weights is not None:
    model.load_weights(path_to_weights)

episode_number = 0

rolling_avg_reward = np.array([])
rolling_avg_epsilon = np.array([])


reward_window = np.array([])

memory = ReplayMemory(capacity=memory_size) if (memory_size > 50 and memory_size > batch_size) else None

while num_episodes is None or episode_number < num_episodes:
    if verbose:
        print("____________________________________________________________________________________________")
    is_eval_episode = (episode_number % evaluate_each == 0) or (num_episodes and episode_number == num_episodes - 1)

    episode_images = []
    is_random_action = []

    env = SnakeMaze(
        width=maze_width,
        height=(maze_height if maze_height is not None else maze_width),
        max_num_agents=max_snakes,
        with_boundaries=boundaries)
    env.reset()

    episode_reward = 0

    for _ in range(max_steps_per_episode):

        if env.num_active_agents == 0:
            break

        if is_eval_episode and (save_images_ or save_videos_):
            episode_images.append(resize_image(env.snake_matrices[0], image_size))
            
        state = get_observation(env,)
        q = model(state.reshape(1, -1))
        direction = np.random.randint(4) if np.random.rand() < epsilon else np.argmax(q)
        env.step({0: direction})
        current_reward = reward(env.snakes[0], env, direction)
        state2 = get_observation(env)

        episode_reward += current_reward

        if reward_window is None:
            reward_window = np.array([current_reward])
        else:
            reward_window = np.append(reward_window, current_reward)

        if memory is not None:
            memory.push(Experience(state, direction, state2, current_reward))

            if np.random.rand() > memory.space():
                continue
            batches_info = memory.pop()
        else:
            batches_info = [Experience(state, direction, state2, current_reward)]

        states, q_targets = [], []
        for state, direction, state2, current_reward in batches_info:
            q_target = model(state2.reshape(1, -1))
            max_q = np.max(np.max(q_target))
            q_target = q_target.numpy()
            q_target[0, direction] = current_reward + gamma * max_q

            states.append(state)
            q_targets.append(q_target)
        train_step(np.array(states), np.array(q_targets), model, optimizer)

    if verbose:
        print(f"Episode {episode_number + 1} Done!")
        print(f"Episode reward: {episode_reward}")
        print(f"Epsilon: {epsilon}")
        if memory is not None:
            print(f"Replay Memory size: {len(memory.experiences)}")

    if len(reward_window) >= num_rolling_avg_sample:
        rolling_avg = np.mean(reward_window)
        reward_window = np.delete(reward_window, 0)
        rolling_avg_reward = np.append(rolling_avg_reward, rolling_avg)
        rolling_avg_epsilon = np.append(rolling_avg_epsilon, epsilon)


    epsilon = max(epsilon - epsilon_decay, min_epsilon)

    if verbose:
        print()

    if is_eval_episode:
        save_eval(model, episode_number, episode_images, rolling_avg_reward, rolling_avg_epsilon, **config)

    episode_number += 1

Output hidden; open in https://colab.research.google.com to view.