<a href="https://colab.research.google.com/github/Arivalo/QL_Snake/blob/master/DQN_complete.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
class Snake:
    def __init__(self, board_size):
        self.x = 8
        self.y = 7
        self.size = 4
        self.dir = 1  # directions are 0-up, 1-right, 2-down, 3-left
        self.ate = False
        self.crashed = False
        self.board_size = board_size
        self.body = []
        self.color = (255, 255, 255)
        for i in range(self.size):
            self.body.append((self.x-i, self.y-i))

    def move(self):
        if self.dir % 2 == 0:
            if self.dir > 0:
                self.y += 1
            else:
                self.y -= 1
        else:
            if self.dir > 1:
                self.x += 1
            else:
                self.x -= 1

        if self.x >= self.board_size:  # no walls, snake will go through
            self.x = 0
            self.crashed = True
        elif self.x < 0:
            self.x = self.board_size - 1
            self.crashed = True
        if self.y >= self.board_size:
            self.y = 0
            self.crashed = True
        elif self.y < 0:
            self.y = self.board_size - 1
            self.crashed = True

        if self.ate:
            self.ate = False
            self.body.append(self.body[-1])

        self.body[0] = (self.x, self.y)
        for i in range(len(self.body)):
            if i > 0:
                self.body[-i] = self.body[-i-1]

    def __sub__(self, other):
        return self.x - other.x, self.y - other.y

    def __eq__(self, other):
        return self.x == other.x and self.y == other.y

    def change_direction(self, new_dir):
        if abs(self.dir - new_dir) != 2:
            self.dir = new_dir

    def act(self, new_dir):
        if new_dir != self.dir:
            self.change_direction(new_dir)
        self.move()

In [0]:
import numpy as np


class Treat:
    def __init__(self, board_size):
        self.board_size = board_size
        self.color = (250, 150, 50)
        self.x = np.random.randint(0, self.board_size)
        self.y = np.random.randint(0, self.board_size)

    def change_pos(self):
        self.x = np.random.randint(0, self.board_size)
        self.y = np.random.randint(0, self.board_size)

    def __sub__(self, other):
        return other.x - self.x, other.y - self.y

    def __eq__(self, other):
        return self.x == other.x and self.y == other.y

In [0]:
from PIL import Image
import numpy as np
import cv2

class SnakeEnvObject:
    SW = 600
    SIZE = 15
    WALLS = True
    LOSE_PENALTY = 601
    EAT_REWARD = 60
    MOVE_PENALTY = 3
    ACTION_SPACE_SIZE = 4
    OBSERVATION_SPACE_VALUES = (SIZE, SIZE, 3)
    FPS = 30
    SCALING = SW // SIZE
    RETURN_IMAGES = True # change depending on whether you use neural network or simple QL

    def reset(self):
        self.player = Snake(self.SIZE)
        self.food = Treat(self.SIZE)
        inside = False
        for part in self.player.body:
            if part == (self.food.x, self.food.y):
                inside = True
                break
        while inside:
            inside = False
            self.food.change_pos()
            for part in self.player.body:
                if part == (self.food.x, self.food.y):
                    inside = True
                    break

        self.episode_step = 0
        observation = self.get_observation()

        return observation

    def get_observation(self):

        if self.RETURN_IMAGES:
            observation = np.array(self.get_image())
        else:
            left_b = 0  # is there a part of snake or wall (if turned on) on the left
            right_b = 0  # same on right
            up_b = 0   # same but a grid up
            down_b = 0  # same but a grid down
            for part in self.player.body:
                if part == (self.player.x - 1, self.player.y):
                    left_b = 1
                elif self.player.x - 1 < 0 and part == (self.SIZE-1, self.player.y):
                    left_b = 1
                if part == ((self.player.x + 1) % self.SIZE, self.player.y):
                    right_b = 1
                if part == (self.player.x, self.player.y - 1):
                    up_b = 1
                elif self.player.y - 1 < 0 and part == (self.player.x, self.SIZE-1):
                    up_b = 1
                if part == (self.player.x, (self.player.y + 1) % self.SIZE):
                    down_b = 1

            if self.WALLS:
                if self.player.x == 0:
                    left_b = 1
                if self.player.x == self.SIZE-1:
                    right_b = 1
                if self.player.y == 0:
                    up_b = 1
                if self.player.x == self.SIZE-1:
                    down_b = 1

            # observation contains: tuple of relative coordinates of snake to food, walls or body parts
            # in all directions and current snake direction
            observation = ((self.player-self.food), left_b, right_b, up_b, down_b, self.player.dir)

        return observation

    def step(self, action):
        self.episode_step += 1
        self.player.act(action)

        new_observation = self.get_observation()

        # if walls are turned on
        if self.WALLS and self.player.crashed:
            reward = -self.LOSE_PENALTY
            done = True
        # if snake is at the same grid as food
        elif self.player == self.food:
            self.player.ate = True
            self.food.change_pos()
            done = False
            inside = False
            for part in self.player.body:
                if part == (self.food.x, self.food.y):
                    inside = True
                    break
            while inside:
                inside = False
                self.food.change_pos()
                for part in self.player.body:
                    if part == (self.food.x, self.food.y):
                        inside = True
                        break
            reward = self.EAT_REWARD
        # otherwise its either normal move or snake hit itself
        else:
            reward = 0
            done = False
            for part in self.player.body:
                if part == (self.player.x, self.player.y) and part is not self.player.body[0]:
                    done = True
                    reward = -self.LOSE_PENALTY
                    break
            if reward == 0:
                reward = -self.MOVE_PENALTY

        return new_observation, reward, done

    def render(self):
        img = self.get_image()
        img = img.resize((300, 300))
        cv2.imshow("image", np.array(img))
        cv2.waitKey(30)
        '''pg.init()
        clock = pg.time.Clock()
        win = pg.display.set_mode((self.SW, self.SW))
        clock.tick(self.FPS)
        win.fill((0,0,0))
        for part in self.player.body:
            pg.draw.rect(win, self.player.color,
                         (part[0] * self.SCALING + 1, part[1] * self.SCALING + 1, self.SCALING - 2, self.SCALING - 2))
        pg.draw.rect(win, self.food.color,
                     (self.food.x * self.SCALING, self.food.y * self.SCALING, self.SCALING, self.SCALING))
        pg.display.update()'''

    def get_image(self):
        env = np.zeros((self.SIZE, self.SIZE, 3), dtype=np.uint8)
        for part in self.player.body:
            env[part[0]][part[1]] = self.player.color
        env[self.player.x][self.player.y] = (100, 200, 200)
        env[self.food.x][self.food.y] = self.food.color
        img = Image.fromarray(env, 'RGB')
        return img

In [0]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Activation, Flatten
from keras.callbacks import TensorBoard
from keras.optimizers import Adam
from collections import deque
from tqdm import tqdm
import numpy as np
import tensorflow as tf
import time
import os
import random

MODEL_NAME = "256x2"
# nn updates settings
MIN_REPLAY_MEMORY_SIZE = 1_000
REPLAY_MEMORY_SIZE = 50_000
MINIBATCH_SIZE = 64
DISCOUNT = 0.99
UPDATE_TARGET_EVERY = 5
MIN_REWARD = -200

EPISODES = 20_000

# exploration
MAX_STEPS = 200
epsilon = 1
EPSILON_DECAY = 0.99975
MIN_EPSILON = 0.001

# stats
AGGREGATE_STATS_EVERY = 50
SHOW_PREVIEW = False

# inits
env = SnakeEnvObject()
ep_rewards = [-200]
random.seed(1)
np.random.seed(1)
tf.set_random_seed(1)

if not os.path.isdir('DQN_SNAKE_models'):
    os.makedirs('DQN_SNAKE_models')


class DQNAgent:
    def __init__(self):
        self.model = self.create_model()
        self.target_model = self.create_model()
        self.target_model.set_weights(self.target_model.get_weights())

        self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
        self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{time.time()}")
        self.target_update_counter = 0

    def create_model(self):
        model = Sequential()
        model.add(Conv2D(256, (3, 3), input_shape=env.OBSERVATION_SPACE_VALUES))
        model.add(Activation("relu"))
        model.add(MaxPooling2D(2,2))
        model.add(Dropout(0.2))

        model.add(Conv2D(256, (3, 3)))
        model.add(Activation("relu"))
        model.add(MaxPooling2D(2,2))
        model.add(Dropout(0.2))

        model.add(Flatten())
        model.add(Dense(256))
        model.add(Dense(env.ACTION_SPACE_SIZE, activation="linear"))
        model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=['accuracy'])

        return model

    def update_replay_memory(self, transition):
        self.replay_memory.append(transition)

    def get_qs(self, state):
        return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]

    def train(self, terminal_state, step):
        if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
            return

        minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)
        current_states = np.array([transition[0] for transition in minibatch])/255
        current_qs_list = self.model.predict(current_states)

        new_current_states = np.array([transition[3] for transition in minibatch])/255
        future_qs_list = self.target_model.predict(new_current_states)

        X = []
        y = []

        for index, (current_state, action, reward, new_current_states, done) in enumerate(minibatch):
            if not done:
                max_future_q = np.max(future_qs_list[index])
                new_q = reward + DISCOUNT * max_future_q
            else:
                new_q = reward

            current_qs = current_qs_list[index]
            current_qs[action] = new_q

            X.append(current_state)
            y.append(current_qs)

        self.model.fit(np.array(X)/255, np.array(y), batch_size=MINIBATCH_SIZE, verbose=0, shuffle=False,
                       callbacks=[self.tensorboard] if terminal_state else None)

        if terminal_state:
            self.target_update_counter += 1

        if self.target_update_counter > UPDATE_TARGET_EVERY:
            self.target_model.set_weights(self.model.get_weights())
            self.target_update_counter = 0


# MODIFIED TENSORBOARD CLASS TO GET ONLY ONE LOG; CREDITS TO PYTHONPROGRAMMING.NET
class ModifiedTensorBoard(TensorBoard):

    # Overriding init to set initial step and writer (we want one log file for all .fit() calls)
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.step = 1
        self.writer = tf.summary.FileWriter(self.log_dir)

    # Overriding this method to stop creating default log writer
    def set_model(self, model):
        pass

    # Overrided, saves logs with our step number
    # (otherwise every .fit() will start writing from 0th step)
    def on_epoch_end(self, epoch, logs=None):
        self.update_stats(**logs)

    # Overrided
    # We train for one batch only, no need to save anything at epoch end
    def on_batch_end(self, batch, logs=None):
        pass

    # Overrided, so won't close writer
    def on_train_end(self, _):
        pass

    # Custom method for saving own metrics
    # Creates writer, writes custom metrics and closes writer
    def update_stats(self, **stats):
        self._write_logs(stats, self.step)


agent = DQNAgent()

for episode in tqdm(range(1, EPISODES+1), ascii=True, unit="episode"):
    agent.tensorboard.step = episode

    episode_reward = 0
    step = 1
    current_state = env.reset()

    done = False

    while not done:
        if np.random.random() > epsilon:
            action = np.argmax(agent.get_qs(current_state))
        else:
            action = np.random.randint(0, env.ACTION_SPACE_SIZE)

        new_state, reward, done = env.step(action)

        episode_reward += reward

        if SHOW_PREVIEW and not episode % AGGREGATE_STATS_EVERY:
            env.render()

        agent.update_replay_memory((current_state, action, reward, new_state, done))
        agent.train(done, step)

        current_state = new_state
        step += 1

        if env.episode_step > MAX_STEPS:
            done = True

    ep_rewards.append(episode_reward)

    if not episode % AGGREGATE_STATS_EVERY or episode == 1:
        average_reward = sum(ep_rewards[-AGGREGATE_STATS_EVERY:]) / len(ep_rewards[-AGGREGATE_STATS_EVERY:])
        min_reward = min(ep_rewards[-AGGREGATE_STATS_EVERY:])
        max_reward = max(ep_rewards[-AGGREGATE_STATS_EVERY:])
        agent.tensorboard.update_stats(reward_avg=average_reward, reward_min=min_reward, reward_max=max_reward,
                                       epsilon=epsilon)

        if min_reward >= MIN_REWARD:
            agent.model.save(f'models/{MODEL_NAME}_{max_reward:_>7.2f}')

    if epsilon > MIN_EPSILON:
        epsilon *= EPSILON_DECAY
        epsilon = max(MIN_EPSILON, epsilon)

W0814 15:32:50.183879 140379236976512 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:74: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.

W0814 15:32:50.186017 140379236976512 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:517: The name tf.placeholder is deprecated. Please use tf.compat.v1.placeholder instead.

W0814 15:32:50.196222 140379236976512 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:4138: The name tf.random_uniform is deprecated. Please use tf.random.uniform instead.

W0814 15:32:50.225148 140379236976512 deprecation_wrapper.py:119] From /usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py:3976: The name tf.nn.max_pool is deprecated. Please use tf.nn.max_pool2d instead.

W0814 15:32:50.227939 140379236976512 deprecation_wrapp