#Game


In [None]:
import numpy as np
from numpy.random import randint


class SnakeGame:
    "Implements the snake game core"

    def __init__(
        self, width, height, food_amount=1, border=0, grass_growth=0, max_grass=0
    ):
        "Initialize board"
        self.width = width
        self.height = height
        self.board = np.zeros((height, width, 3), dtype=np.float32)
        self.food_amount = food_amount
        self.border = border
        self.grass_growth = grass_growth
        self.grass = np.zeros((height, width)) + max_grass
        self.max_grass = max_grass
        self.reset()

    def create_apples(self):
        "create a new apple away from the snake"
        while len(self.apples) < self.food_amount:
            apple = (randint(0, self.height - 1), randint(0, self.width - 1))
            while apple in self.snake:
                apple = (randint(0, self.height - 1), randint(0, self.width - 1))
            self.apples.append(apple)

    def create_snake(self):
        "create a snake, size 3, at random position and orientation"
        x = randint(5, self.width - 5)  # not t0o close to border
        y = randint(5, self.height - 5)
        self.direction = randint(0, 4)
        self.snake = []
        for i in range(5):
            if self.direction == 0:
                y = y + 1
            elif self.direction == 1:
                x = x - 1
            elif self.direction == 2:
                y = y - 1
            elif self.direction == 3:
                x = x + 1
            self.snake.append((y, x))

    def grow_snake(self, d):
        "add one position to snake head (0=up, 1=right, 2=down, 3=left)"
        y, x = self.snake[0]
        if d == 0:
            y = y - 1
        elif d == 1:
            x = x + 1
        elif d == 2:
            y = y + 1
        else:
            x = x - 1
        self.snake.insert(0, (y, x))

    def check_collisions(self):
        "check if game is over by colliding with edge or itself"
        # just need to check snake's head
        x, y = self.snake[0]
        if (
            x == -1
            or x == self.height
            or y == -1
            or y == self.width
            or (x, y) in self.snake[1:]
        ):
            self.done = True

    def step(self, action):
        """
        move snake/game one step
        action can be -1 (turn left), 0 (continue), 1 (turn rignt)
        """
        direction = int(action)
        assert -1 <= direction <= 1
        self.direction += direction
        if self.direction < 0:
            self.direction = 3
        elif self.direction > 3:
            self.direction = 0
        self.grow_snake(self.direction)  # two steps: grow+remove last
        if self.snake[0] in self.apples:
            self.apples.remove(self.snake[0])
            reward = 1
            self.create_apples()  # new apple
        else:
            self.snake.pop()
            self.check_collisions()
            if self.done:
                reward = -1
            else:
                reward = 0
        if reward >= 0:
            x, y = self.snake[0]
            reward += self.grass[x, y]
            self.grass[x, y] = 0
            self.score += reward
            self.grass += self.grass_growth
            self.grass[self.grass > self.max_grass] = self.max_grass

        return self.board_state(), reward, self.done, {"score": self.score}

    def get_state(self):
        "easily get current state (score, apple, snake head and tail)"
        score = self.score
        apple = self.apples
        head = self.snake[0]
        tail = self.snake[1:]
        return score, apple, head, tail, self.direction

    def print_state(self):
        "print the current board state"
        for i in range(self.height):
            line = "." * self.width
            for x, y in self.apples:
                if y == i:
                    line = line[:x] + "A" + line[x + 1 :]
            for s in self.snake:
                x, y = s
                if y == i:
                    line = line[:x] + "X" + line[x + 1 :]
            print(line)

    def test_step(self, direction):
        "to test: move the snake and print the game state"
        self.step(direction)
        self.print_state()
        if self.done:
            print("Game over! Score=", self.score)

    def reset(self):
        "reset state"
        self.score = 0
        self.done = False
        self.create_snake()
        self.apples = []
        self.create_apples()
        self.grass[:, :] = self.max_grass

        return self.board_state(), 0, self.done, {"score": self.score}

    def board_state(self, mode="human", close=False):
        "Render the environment"
        self.board[:, :, :] = 0
        if self.max_grass > 0:
            self.board[:, :, 1] = self.grass / self.max_grass * 0.3
        if not self.done:
            x, y = self.snake[0]
            self.board[x, y, :] = 1
        for x, y in self.snake[1:]:
            self.board[x, y, 0] = 1
        for x, y in self.apples:
            self.board[x, y, 1] = 1
        if self.border == 0:
            return self.board
        else:
            h, w, _ = self.board.shape
            board = np.full(
                (h + self.border * 2, w + self.border * 2, 3), 0.5, np.float32
            )
            board[self.border : -self.border, self.border : -self.border] = self.board
            return board


# just run this if this file is the main
# if __name__ == '__main__':
# game = SnakeGame(20,20)
# game.print_state()

# Heuristic approach using Manhattan distance

In [None]:
from math import fabs
from copy import deepcopy

class HeuristicAgent:
    def __init__(self, env: SnakeGame):
        self.env = env
        self.possible_actions = [-1, 0, 1]

    def generate_examples(self, n, force_trunc=False):
        examples = []
        while len(examples) < n:
            transitions = self._play_game()
            examples.extend(transitions)
            print(f"There are {len(examples)} examples")
        return examples[:n] if force_trunc else examples

    def _play_game(self):
        transitions = []
        board_state, _, done, _ = self.env.reset()
        steps = 0
        total_reward = 0
        while not done:
            new_state, reward, done, _, action = self.pick_best_action(*self.env.get_state())
            transition = (board_state, action, reward, new_state, done)
            transitions.append(transition)
            board_state = new_state
            steps += 1
            total_reward += reward
            if steps % 100 == 0:
                print(f"Total reward = {total_reward} after {steps} steps")
        return transitions

    def pick_best_action(self, score, apples, head, tail, direction):
        closest_apple = min(apples, key=lambda apple: manh_dist(apple, head))
        dying_penalty = max(self.env.width, self.env.height)**2
        action_scores = []
        for action in self.possible_actions:
            _env = deepcopy(self.env)
            _, reward, done, _ = _env.step(action)
            if reward == 1:
                action_score = 0
            elif done:
                action_score = dying_penalty
            else:
                _, _, new_head, _, _ = _env.get_state()
                action_score = manh_dist(new_head, closest_apple)
            action_scores.append(action_score)
        min_score_index = action_scores.index(min(action_scores))
        selected_action = self.possible_actions[min_score_index]
        assert -1 <= selected_action <= 1
        return (*self.env.step(selected_action), selected_action)


def manh_dist(p1: tuple, p2: tuple):
    x1, y1 = p1
    x2, y2 = p2
    return fabs(x1 - x2) + fabs(y1 - y2)

# Model

In [None]:
import tensorflow as tf
from collections import deque
from typing import Iterable, List, Dict, Any
import numpy as np
import random
import matplotlib.pyplot as plt
from copy import deepcopy

tf.config.run_functions_eagerly(True)

class DqnAgent:
    def __init__(
        self,
        env: SnakeGame,
        possible_actions: List = [-1, 0, 1],
        replay_memory_size: int = 2**16,
    ):
        self.env = env
        self.possible_actions = possible_actions
        self.state_shape = self.env.board_state().shape
        self.model = self._create_model()
        self.target_model = self._create_model()
        self.target_model.set_weights(self.model.get_weights())
        self.replay_memory = deque(maxlen=replay_memory_size)
        self.rewards: List[float] = list()
        self.steps_per_episode: List[int] = list()
        self.data_to_log: List[Dict[str, Any]] = list()

    def _create_model(self):
      model = tf.keras.Sequential()
      model.add(
          tf.keras.layers.Conv2D(
              filters=32,
              kernel_size=(3, 3),
              activation="relu",
              padding="same",
              kernel_initializer=tf.keras.initializers.HeNormal(),
              input_shape=(24, 24, 3),
          )
      )
      model.add(tf.keras.layers.BatchNormalization(synchronized=True))
      model.add(
          tf.keras.layers.Conv2D(
              filters=64,
              kernel_size=(3, 3),
              activation="relu",
              padding="same",
              kernel_initializer=tf.keras.initializers.HeNormal(),
          )
      )
      model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), padding="same"))
      model.add(tf.keras.layers.BatchNormalization(synchronized=True))

      model.add(tf.keras.layers.Flatten())
      model.add(
          tf.keras.layers.Dense(
              64,
              activation="relu",
              kernel_initializer=tf.keras.initializers.HeNormal(),
          )
      )
      model.add(tf.keras.layers.BatchNormalization(synchronized=True))
      model.add(
          tf.keras.layers.Dense(
              32,
              activation="relu",
              kernel_initializer=tf.keras.initializers.HeNormal(),
          )
      )
      model.add(tf.keras.layers.Dropout(.2))
      model.add(tf.keras.layers.BatchNormalization(synchronized=True))
      model.add(
          tf.keras.layers.Dense(
              3,
              activation="linear",
              kernel_initializer=tf.keras.initializers.HeNormal(),
          )
      )
      return model


    def _train(self, done: bool, discount_factor: float, batch_size: int, epochs: int):
        mini_batch = random.sample(self.replay_memory, batch_size)
        current_states, actions, rewards, future_states, not_dones = [], [], [], [], []

        for transition in mini_batch:
            current_states.append(transition[0])
            actions.append(self._action_index(transition[1]))
            rewards.append(transition[2])
            future_states.append(transition[3])
            not_dones.append(int(not transition[4]))

        current_states = np.array(current_states)
        actions = np.array(actions)
        rewards = np.array(rewards)
        future_states = np.array(future_states)
        not_dones = np.array(not_dones)

        current_qs = self.model.predict(current_states)
        future_qs = self.target_model.predict(future_states)
        max_future_qs = rewards + discount_factor * np.max(future_qs, axis=1) * not_dones

        current_qs[np.arange(len(current_qs)), actions] = max_future_qs

        self.model.fit(
            current_states, current_qs,
            batch_size=64, shuffle=True, epochs=epochs
        )


    def train(
        self,
        episodes: int,
        *,
        min_epsilon: float,
        max_epsilon: float,
        decay: float,
        learning_rate: float,
        discount_factor: float,
        epochs: int = 1000,
        initial_examples: Iterable = list(),
        batch_size: int = 512,
        min_steps_to_update_target_model: int = 50,
    ):
        self._compile_model(learning_rate)
        self.replay_memory.extend(initial_examples)
        step_counter = 0
        epsilon = max_epsilon
        for episode in range(1, episodes + 1):
            total_reward, steps_per_episode = 0, 0
            state, _, done, _ = self.env.reset()
            while not done:
                step_counter += 1
                steps_per_episode += 1
                action = self._epsilon_greedy_action(epsilon, state)
                next_state, reward, done, _ = self.env.step(action)
                self.replay_memory.append((state, action, reward, next_state, done))
                if step_counter % 4 == 0 or done:
                    self._train(done, discount_factor, batch_size, epochs)
                state = next_state
                total_reward += reward
                if done:
                    self._print_episode_info(episode, total_reward, steps_per_episode)
                    self._update_reward_history(step_counter, total_reward)
                    self._update_target_model(step_counter, min_steps_to_update_target_model)
            epsilon = self._generate_epsilon(epsilon, min_epsilon, max_epsilon, decay)

    def _compile_model(self, learning_rate):
        self.model.compile(loss="mse", optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), run_eagerly=False)

    def _train_model(self, done, discount_factor, batch_size, epochs):
        self._train(done, discount_factor, batch_size, epochs=epochs)  # done, env, replay_memory, model, target_model, epochs

    def _print_episode_info(self, episode, total_reward, steps_per_episode):
        print(f"------ Episode {episode} reward = {total_reward} steps = {steps_per_episode} -------")

    def _update_reward_history(self, step_counter, total_reward):
        self.rewards.append(total_reward)
        self.steps_per_episode.append(step_counter)

    def _update_target_model(self, step_counter, min_steps_to_update_target_model):
        if step_counter >= min_steps_to_update_target_model:
            self.target_model.set_weights(self.model.get_weights())
            step_counter = 0

    def _generate_epsilon(self, previous_epsilon, min_epsilon, max_epsilon, decay):
        new_epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay * previous_epsilon)
        return new_epsilon


    def play_game(self):
        state, _, done, _ = self.env.reset()
        self.env.print_state()
        total_reward = 0
        while not done:
            state_reshaped = state.reshape((1, *state.shape))
            predicted_q_values = self.model.predict(state_reshaped).flatten()
            action = self._choose_action(predicted_q_values)
            state, reward, done, _ = self.env.step(action)
            total_reward += reward
            self.env.print_state()
        return total_reward

    def _epsilon_greedy_action(self, epsilon, state) -> int:
        if np.random.rand() <= epsilon:
            action = np.random.choice(self.possible_actions)
        else:
            state_reshaped = state.reshape((1, *state.shape))
            predicted_q_values = self.model.predict(state_reshaped).flatten()
            action = self._choose_action(predicted_q_values)
        return action

    def _choose_action(self, q_values: np.ndarray):
        action_index = np.argmax(q_values)
        action = self.possible_actions[action_index]
        # assert -1 <= action <= 1
        return action

    def _action_index(self, action: int) -> int:
        index = self.possible_actions.index(action)
        assert 0 <= index <= 2
        return index

    def plot_rewards(self):
        episodes = range(1, len(self.rewards) + 1)
        plt.plot(episodes, self.rewards)
        plt.xlabel("Episode")
        plt.ylabel("Total Reward")
        plt.title("Total Reward per Episode")
        plt.show()

In [None]:
#agent.env.board_state().shape

(24, 24, 3)

# Creating game and training


In [None]:
game = SnakeGame(
    width=14, height=14, border=5, food_amount=1, grass_growth=0.001, max_grass=0.05
)
state_shape = game.board_state().shape
possible_actions = [-1, 0, 1]

In [None]:
huristic = HeuristicAgent(game)
initial_examples = huristic.generate_examples(10**5)

Total reward = 14.618000000000025 after 100 steps
Total reward = 28.412000000000088 after 200 steps
Total reward = 40.05799999999995 after 300 steps
Total reward = 51.031999999999684 after 400 steps
There are 490 examples
Total reward = 13.54400000000001 after 100 steps
Total reward = 27.08200000000004 after 200 steps
Total reward = 39.58199999999988 after 300 steps
Total reward = 50.40399999999967 after 400 steps
There are 934 examples
Total reward = 15.634000000000027 after 100 steps
There are 1114 examples
Total reward = 16.690000000000015 after 100 steps
Total reward = 27.138000000000062 after 200 steps
There are 1361 examples
Total reward = 14.381999999999996 after 100 steps
Total reward = 29.11200000000005 after 200 steps
Total reward = 41.731999999999886 after 300 steps
Total reward = 51.49999999999962 after 400 steps
There are 1772 examples
Total reward = 15.500000000000032 after 100 steps
Total reward = 31.128000000000085 after 200 steps
There are 1973 examples
Total reward = 

In [None]:
agent = DqnAgent(game)
agent.train(
    10,
    min_epsilon=0.1,
    max_epsilon=1,
    decay=0.5,  # TODO: try linear decay
    learning_rate=0.001,
    epochs=200,
    min_steps_to_update_target_model=20,
    discount_factor=0.6,
    batch_size=512,
    initial_examples=initial_examples,
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10


KeyboardInterrupt: ignored

In [None]:
agent.plot_rewards()

In [None]:
agent._play_game()