In [None]:
!pip install --upgrade --force-reinstall keras-rl2 --user
!pip install networkx
!pip install numba

In [None]:
import numpy as np
from IPython.display import clear_output
from gym import Env, spaces
import networkx as nx

class Segment:
    def __init__(self, x_pos, y_pos, x_dir, y_dir):
        self.x_pos = x_pos
        self.y_pos = y_pos
        self.x_dir = x_dir
        self.y_dir = y_dir

class Snake(Env):
    def __init__(self, board_size=(10, 10)):

        num_actions = 4
        self.action_space = spaces.Discrete(num_actions)
        self.observation_space = spaces.Tuple((spaces.MultiDiscrete(board_size),  # head position
                                               spaces.MultiDiscrete(board_size),  # food position
                                               spaces.Discrete(20),  # distance from head to food  
                                               spaces.Discrete(num_actions),  # x direction of head
                                               spaces.Discrete(num_actions),  # y direction of head
                                               spaces.Box(low=0, high=1, shape=(1,), dtype=np.int32),
                                               spaces.Box(low=0, high=1, shape=(4,), dtype=np.int32), # vision to wall
                                               spaces.Box(low=0, high=9, shape=(12,), dtype=np.int32), # available movement space in grid
                                               spaces.Box(low=0, high=1.0, shape=(12,), dtype=np.float32), # vision of snake's segments in each direction
                                               spaces.Box(low=0, high=13.0, shape=(12,),
                                                          dtype=np.float32)))  # distance to snake's segements in each direction
        self.food_number = [4]
        self.board_size = board_size
        self.score = 1
        self.BOLD = '\033[1m'
        self.END = '\033[0m'
        self.G = nx.grid_2d_graph(board_size[0], board_size[1])
        
         # up, down, right, left
        self.possible_actions = {0: (-1, 0), 1: (1, 0),  2: (0, 1), 3: (0, -1)}
        self.reset()
        self.iteration = 1

    def step(self, action):
        self.max_steps_per_episode -= 1

        old_x_dir = self.segments[-1].x_dir
        old_y_dir = self.segments[-1].y_dir

        x_dir, y_dir = self.possible_actions[action]

        self._reward = 0
        rewarded = False
        over = False

        # Move
        for idx, s in enumerate(self.segments):
            if idx == len(self.segments) - 1:
                s.x_dir = x_dir
                s.y_dir = y_dir
                s.x_pos = s.x_pos + x_dir
                s.y_pos = s.y_pos + y_dir
            else:
                s.x_dir = self.segments[idx + 1].x_dir
                s.y_dir = self.segments[idx + 1].y_dir
                s.x_pos = s.x_pos + s.x_dir
                s.y_pos = s.y_pos + s.y_dir
            self.segments[idx] = s

        # Get new positions after movement
        self.positions = np.array([[s.x_pos, s.y_pos] for s in self.segments])
        head_position = self.positions[-1]

        # Check if snake eats itself
        if np.any(np.all(self.positions[:-1] == head_position, axis=-1)) or np.unique(self.positions,
                                                                                      axis=0).size < self.positions.size and not rewarded:
            self._reward = -len(self.segments) - 100
            rewarded = True 
            
        # Check if snake goes out of bounds:
        elif head_position[0] < 0 or head_position[0] > self.board_size[0] - 1 or head_position[1] < 0 or head_position[
            1] > self.board_size[0] - 1 and not rewarded:
            self._reward = -len(self.segments) - 100
            rewarded = True
            over = True  # set this flag to avoid any exceptions when rendering
             
        else:
            # Award for just staying alive
            self._reward = min(105 + len(self.segments), len(self.segments) + len(self.segments) + 7)            

        # Check if snake eats food
        tail_position = self.segments[0]
        if np.all(np.equal(head_position, self.food)) and not rewarded:
            if tail_position.x_dir == 1 and tail_position.y_dir == 0:
                self.segments = [Segment(tail_position.x_pos - 1, tail_position.y_pos, x_dir=tail_position.x_dir,
                                         y_dir=tail_position.y_dir)] + self.segments
            elif tail_position.x_dir == -1 and tail_position.y_dir == 0:
                self.segments = [Segment(tail_position.x_pos + 1, tail_position.y_pos, x_dir=tail_position.x_dir,
                                         y_dir=tail_position.y_dir)] + self.segments
            elif tail_position.x_dir == 0 and tail_position.y_dir == 1:
                self.segments = [Segment(tail_position.x_pos, tail_position.y_pos - 1, x_dir=tail_position.x_dir,
                                         y_dir=tail_position.y_dir)] + self.segments
            elif tail_position.x_dir == 0 and tail_position.y_dir == -1:
                self.segments = [Segment(tail_position.x_pos, tail_position.y_pos + 1, x_dir=tail_position.x_dir,
                                         y_dir=tail_position.y_dir)] + self.segments

            self._reward += 105 + len(self.segments)
            self.grid = np.ones(self.board_size, dtype=bool)
            self.positions = np.array([[s.x_pos, s.y_pos] for s in self.segments])
            self.grid[tuple(self.positions.T)] = False
            self.food = np.argwhere(self.grid)
            self.food = self.food[np.random.randint(self.food.shape[0], size=1), :]
            
        # Check if snake is getting close to food:
        temp_distance_to_food = self.euclidean_distance(tuple(np.argwhere(self.board == 3)[0]), tuple(np.argwhere(self.board == 4)[0]))
        
        if temp_distance_to_food < self.distance_to_food:
            self.distance_to_food = temp_distance_to_food
            if len(self.segments) <= 5:  # Award in the beginning to encourage snake to find food
                self._reward += 10
        else:
            if len(self.segments) <= 5:
                self._reward -= 10
            
        self.vision = np.array([0] * 12)
        self.available_movement_space = np.array([0] * 12)
        self.distances_to_segments =  np.array([12.72] * 12)
        self.segment_between_food_and_head = 0
        self.vision_to_wall = np.array([0] * 12)

        if not over:  # Render new grid
            self.board = np.ones(self.board_size, dtype=np.int32)
            self.board[tuple(np.concatenate([self.positions, self.food], axis=0).T)] = np.concatenate(
                [np.repeat([0], repeats=self.positions[:-1].shape[0]), [3], self.food_number], axis=0)

            source = self.positions[-1]
            down = self.greater_than_one(self.board[min(9, source[0] + 1):, source[1]])
            down_left = self.greater_than_one(self.board[min(9, source[0] + 1), :source[1]]) 
            down_right = self.greater_than_one(self.board[min(9, source[0] + 1), min(9, source[1] + 1):])  
            down_left_down = self.greater_than_one(self.board[min(9, source[0] + 1):, max(0, source[1]-1)]) 
            down_right_down = self.greater_than_one(self.board[min(9, source[0] + 1):, min(9, source[1] + 1)])


            right = self.greater_than_one(self.board[source[0], min(9, source[1] + 1):])
            left = self.greater_than_one(self.board[source[0], 0:source[1]])

            up = self.greater_than_one(self.board[0:source[0], source[1]])
            up_left = self.greater_than_one(self.board[max(0, source[0]-1), 0:min(9, source[1])]) 
            up_right = self.greater_than_one(self.board[max(0, source[0]-1), min(9, source[1] + 1):]) 
            up_left_up = self.greater_than_one(self.board[0:source[0], max(0, source[1]-1)])  
            up_right_up = self.greater_than_one(self.board[0:source[0], min(9, source[1] + 1)]) 
            
            if source[0] == 0: 
                up, up_left, up_right, up_left_up, up_right_up = ([] for _ in range(5))
            if source[0] == 9:
                down, down_left, down_right, down_left_down, down_right_down = ([] for _ in range(5))
            if source[1] == 0: 
                up_left, up_left_up, down_left, down_left_down, left = ([] for _ in range(5))
            if source[1] == 9:
                up_right, up_right_up, down_right, down_right_down, right = ([] for _ in range(5))

            # Reveal if snake sees itself in any of the directions
            self.vision = np.array(
                [int(not np.all(up)), int(not np.all(up_left)), int(not np.all(up_right)),
                int(not np.all(up_left_up)), int(not np.all(up_right_up)),
                int(not np.all(down)), int(not np.all(down_left)), int(not np.all(down_right)),
                int(not np.all(down_left_down)), int(not np.all(down_right_down)),
                int(not np.all(left)), int(not np.all(right))])

            self.vision_to_wall = np.array([int(len(up) == 0), int(len(up_left) == 0), int(len(up_right) == 0),
                           int(len(up_left_up) == 0), int(len(up_right_up) == 0),
                           int(len(down) == 0), int(len(down_left) == 0),  int(len(down_right) == 0), 
                           int(len(down_left_down) == 0), int(len(down_right_down) == 0), 
                           int(len(left) == 0), int(len(right) == 0)])
            
            # The available movement space is calculated based on the vision
            self.available_movement_space = np.array([self.calculate_availabe_movement_space(up, reverse=True),
                                     self.calculate_availabe_movement_space(up_left, reverse=True),
                                     self.calculate_availabe_movement_space(up_right, reverse=False),
                                     self.calculate_availabe_movement_space(up_left_up, reverse=True),
                                     self.calculate_availabe_movement_space(up_right_up, reverse=True),
                                     self.calculate_availabe_movement_space(down, reverse=False),
                                     self.calculate_availabe_movement_space(down_left, reverse=True),
                                     self.calculate_availabe_movement_space(down_right, reverse=False),
                                     self.calculate_availabe_movement_space(down_left_down, reverse=False),
                                     self.calculate_availabe_movement_space(down_right_down, reverse=False),
                                     self.calculate_availabe_movement_space(left, reverse=True),
                                     self.calculate_availabe_movement_space(right, reverse=False)])

            # Measures how far the snake is from its segments in each direction
            segments_up = np.argwhere(up[::-1] == 0)
            if segments_up.size != 0:
                des = (max(0, source[0] - (segments_up[0][0]+1)), source[1])
                self.distances_to_segments[0] = self.euclidean_distance(source, des)

            segments_up_left = np.argwhere(up_left[::-1] == 0)
            if segments_up_left.size != 0:
                des = (max(0,source[0] - 1), max(0, source[1] - 1))
                des = (des[0], max(0, des[1] - segments_up_left[0][0]))
                self.distances_to_segments[1] = self.euclidean_distance(source, des)

            segments_up_right = np.argwhere(up_right == 0)
            if segments_up_right.size != 0:
                des = (max(0,source[0] - 1) , min(9, source[1] + 1))
                des = (des[0], min(9, des[1] + segments_up_right[0][0]))
                self.distances_to_segments[2] = self.euclidean_distance(source, des)

            segments_up_left_up = np.argwhere(up_left_up[::-1] == 0)
            if segments_up_left_up.size != 0:
                des = (max(0, source[0] - 1), max(0, source[1] - 1))
                des = (max(0, des[0] - segments_up_left_up[0][0]), des[1])
                self.distances_to_segments[3] = self.euclidean_distance(source, des)

            segments_up_right_up = np.argwhere(up_right_up[::-1] == 0)
            if segments_up_right_up.size != 0:
                des = (max(0,source[0] - 1) , min(9, source[1] + 1))
                des = (max(0, des[0] - segments_up_right_up[0][0]), des[1])
                self.distances_to_segments[4] = self.euclidean_distance(source, des)

            segments_down = np.argwhere(down == 0)
            if segments_down.size != 0:
                des = (min(9, source[0] + (segments_down[0][0]+1)), source[1])
                self.distances_to_segments[5] = self.euclidean_distance(source, des)

            segments_down_left = np.argwhere(down_left[::-1] == 0)
            if segments_down_left.size != 0:
                des = (min(9, source[0] + 1), max(0, source[1] - 1))
                des = (des[0], max(0, des[1] - segments_down_left[0][0]))
                self.distances_to_segments[6] =  self.euclidean_distance(source, des)

            segments_down_right = np.argwhere(down_right == 0)
            if segments_down_right.size != 0:
                des = (min(9, source[0] + 1), min(9, source[1] + 1))
                des = (des[0], min(9, des[1] + segments_down_right[0][0]))
                self.distances_to_segments[7] =  self.euclidean_distance(source, des)

            segments_down_left_down = np.argwhere(down_left_down == 0)
            if segments_down_left_down.size != 0:
                des = (min(9, source[0] + 1), max(0, source[1] - 1))
                des = (min(9, des[0] + segments_down_left_down[0][0]), des[1])
                self.distances_to_segments[8] =  self.euclidean_distance(source, des)

            segments_down_right_down = np.argwhere(down_right_down == 0)
            if segments_down_right_down.size != 0:
                des = (min(9, source[0] + 1), min(9, source[1] + 1))
                des = (min(9, des[0] + segments_down_right_down[0][0]), des[1])
                self.distances_to_segments[9] =  self.euclidean_distance(source, des)

            segments_left = np.argwhere(left[::-1] == 0)
            if segments_left.size != 0:
                des = (source[0], max(0, source[1] - (segments_left[0][0]+1)))
                self.distances_to_segments[10] = self.euclidean_distance(source, des)

            segments_right = np.argwhere(right == 0)
            if segments_right.size != 0:
                des = (source[0], min(9, source[1] + (segments_right[0][0]+1)))
                self.distances_to_segments[11] = self.euclidean_distance(source, des)
           
            segment_positions = np.argwhere(self.board == 0).tolist()
            self.segment_between_food_and_head = int(np.any([int(list(n) in segment_positions) for n in nx.shortest_path(self.G, source=tuple(source), target=tuple(np.argwhere(self.board == 4)[0]))[1:-1]]))


        done = self.max_steps_per_episode <= 0 or self.reward() < -99
        if done:  self.iteration +=1

        self.state = [self.positions[-1][0], self.positions[-1][1], self.food[-1][0], self.food[-1][1],
                      temp_distance_to_food, self.segments[-1].x_dir, self.segments[-1].y_dir, 
                      self.segment_between_food_and_head, self.vision_to_wall, self.vision,
                      self.available_movement_space, self.distances_to_segments]

        return self.state, self.reward(), done, {}

    def render(self, mode="snake"):
        
        clear_output(wait=True)
        temp_score = len(self.segments)
        if temp_score > self.score:
            self.score = temp_score

        print('Score: {}'.format(temp_score))
        print('Top Score: {}'.format(self.score))
        print('\n\n')

        for b in self.board:
            if 0 in b or 3 in b or 4 in b:
                print('[', end='')
                for i, n in enumerate(b):
                    ending = '' if i == len(b) - 1 else ' '
                    if n == 0 or n == 3 or n == 4:
                        print(self.BOLD + '{}'.format(n) + self.END, end=ending)
                    else:
                        print(n, end=ending)
                print(']')
            else:
                print(b)
         
        """
        f = open('results-{}.txt'.format(round(self.iteration, -3)),'a')

        print('Score: {}'.format(temp_score), file=f)
        print('Top Score: {}'.format(self.score), file=f)
        print('Iteration: {}'.format(self.iteration), file=f)

        print('\n\n', file=f)

        for b in self.board:
            if 0 in b or 3 in b or 4 in b:
                print('[', end='', file=f)
                for i, n in enumerate(b):
                    ending = '' if i == len(b) - 1 else ' '
                    if n == 0 or n == 3 or n == 4:
                        print(self.BOLD + '{}'.format(n) + self.END, end=ending, file=f)
                    else:
                        print(n, end=ending, file=f)
                print(']', file=f)
            else:
                print(b, file=f)
                
        print('\n\n', file=f)
        print('#########', file=f)
        f.close()
        """
                
    def reward(self):
        if not hasattr(self, '_reward'):
            self._reward = 0
        return self._reward

    def greater_than_one(self, arr):
        if 3 in arr and arr.size == 1: return []
        else: return arr

    def calculate_availabe_movement_space(self, direction, reverse=False):
        if reverse: direction = direction[::-1]
        if 0 in direction:
            return direction.tolist().index(0)
        else:
            return len(direction)
        
    def euclidean_distance(self, source, destination):
        x1, y1 = source
        x2, y2 = destination
        return np.sqrt((x1-x2)**2 + (y1-y2)**2) 

    def reset(self):
        self.max_steps_per_episode = 5000
        self.segments = [Segment(np.random.randint(low=0, high=9), 
                                 np.random.randint(low=0, high=9), 
                                 x_dir=np.random.randint(low=-1, high=1), 
                                 y_dir=np.random.randint(low=-1, high=1))]
        
        self.grid = np.ones(self.board_size, dtype=bool)
        self.positions = np.array([[s.x_pos, s.y_pos] for s in self.segments])
        self.grid[tuple(self.positions.T)] = False
        self.food = np.argwhere(self.grid)
        self.available_grid_space = self.food.shape[0]
        self.food = self.food[np.random.randint(self.food.shape[0], size=1), :]
        self.board = np.ones(self.board_size, dtype=np.int32)
        self.board[tuple(np.concatenate([self.positions, self.food], axis=0).T)] = np.concatenate([[3], [4]], axis=0)
        
        self.distance_to_food = self.euclidean_distance(tuple(np.argwhere(self.board == 3)[0]), tuple(np.argwhere(self.board == 4)[0]))
        
        source = self.positions[-1]
        down = self.greater_than_one(self.board[min(9, source[0] + 1):, source[1]])
        down_left = self.greater_than_one(self.board[min(9, source[0] + 1), :source[1]]) 
        down_right = self.greater_than_one(self.board[min(9, source[0] + 1), min(9, source[1] + 1):])  
        down_left_down = self.greater_than_one(self.board[min(9, source[0] + 1):, max(0, source[1]-1)]) 
        down_right_down = self.greater_than_one(self.board[min(9, source[0] + 1):, min(9, source[1] + 1)])

        right = self.greater_than_one(self.board[source[0], min(9, source[1] + 1):])
        left = self.greater_than_one(self.board[source[0], 0:source[1]])

        up = self.greater_than_one(self.board[0:source[0], source[1]])
        up_left = self.greater_than_one(self.board[max(0, source[0]-1), 0:min(9, source[1])]) 
        up_right = self.greater_than_one(self.board[max(0, source[0]-1), min(9, source[1] + 1):]) 
        up_left_up = self.greater_than_one(self.board[0:source[0], max(0, source[1]-1)])  
        up_right_up = self.greater_than_one(self.board[0:source[0], min(9, source[1] + 1)])
        
        if source[0] == 0: 
            up, up_left, up_right, up_left_up, up_right_up = ([] for _ in range(5))
        if source[0] == 9:
            down, down_left, down_right, down_left_down, down_right_down = ([] for _ in range(5))
        if source[1] == 0: 
            up_left, up_left_up, down_left, down_left_down, left = ([] for _ in range(5))
        if source[1] == 9:
            up_right, up_right_up, down_right, down_right_down, right = ([] for _ in range(5))

        self.distances_to_segments =  np.array([12.72] * 12)
            
        # Reveal if snake sees itself in any of the directions
        self.vision = np.array(
            [int(not np.all(up)), int(not np.all(up_left)), int(not np.all(up_right)),
            int(not np.all(up_left_up)), int(not np.all(up_right_up)),
            int(not np.all(down)), int(not np.all(down_left)), int(not np.all(down_right)),
            int(not np.all(down_left_down)), int(not np.all(down_right_down)),
            int(not np.all(left)), int(not np.all(right))])

        self.vision_to_wall = np.array([int(len(up) == 0), int(len(up_left) == 0), int(len(up_right) == 0),
                           int(len(up_left_up) == 0), int(len(up_right_up) == 0),
                           int(len(down) == 0), int(len(down_left) == 0),  int(len(down_right) == 0), 
                           int(len(down_left_down) == 0), int(len(down_right_down) == 0), 
                           int(len(left) == 0), int(len(right) == 0)])
            

        # The available movement space is calculated based on the vision
        self.available_movement_space = np.array([self.calculate_availabe_movement_space(up, reverse=True),
                                 self.calculate_availabe_movement_space(up_left, reverse=True),
                                 self.calculate_availabe_movement_space(up_right, reverse=False),
                                 self.calculate_availabe_movement_space(up_left_up, reverse=True),
                                 self.calculate_availabe_movement_space(up_right_up, reverse=True),
                                 self.calculate_availabe_movement_space(down, reverse=False),
                                 self.calculate_availabe_movement_space(down_left, reverse=True),
                                 self.calculate_availabe_movement_space(down_right, reverse=False),
                                 self.calculate_availabe_movement_space(down_left_down, reverse=False),
                                 self.calculate_availabe_movement_space(down_right_down, reverse=False),
                                 self.calculate_availabe_movement_space(left, reverse=True),
                                 self.calculate_availabe_movement_space(right, reverse=False)])
        

        # measures how far the snake is from its segments in each direction
        segment_between_food_and_head = 0
        
        self.state = [self.positions[-1][0], self.positions[-1][1], self.food[-1][0], self.food[-1][1],
                      self.distance_to_food, self.segments[-1].x_dir, self.segments[-1].y_dir,
                      segment_between_food_and_head, self.vision_to_wall, self.vision, self.available_movement_space, 
                      self.distances_to_segments]

        return self.state


In [None]:
env = Snake()

env.reset()
action = env.action_space.sample()

state, reward, done, info = env.step(action)

env.render()
print(state)
print(reward)

In [None]:
from rl.memory import SequentialMemory
from rl.policy import BoltzmannGumbelQPolicy
from rl.agents.dqn import DQNAgent
from rl.core import Processor
import tensorflow as tf
import gym
from numba import cuda

cuda.select_device(0)
cuda.close()
tf.keras.backend.clear_session()


env = Snake()
actions = env.action_space.n

def build_model(actions):
  
    inputs = tf.keras.layers.Input(shape=(1,) + (56,))    
    x1 = tf.keras.layers.Flatten()(inputs)
    x1 = tf.keras.layers.Dense(512, activation='relu', )(x1)
    x1 = tf.keras.layers.Dense(256, activation='relu')(x1)
    x1 = tf.keras.layers.Dense(128, activation='relu')(x1)
    x1 = tf.keras.layers.Dense(64, activation='relu')(x1)
    x1 = tf.keras.layers.Dense(32, activation='relu')(x1)
    scalar_outputs = tf.keras.layers.Dense(16, activation='relu')(x1)
    outputs = tf.keras.layers.Dense(actions, activation='linear')(scalar_outputs)
    model = tf.keras.Model(inputs, outputs)
    return model


class CustomProcessor(Processor):
    def process_observation(self, observation):
        processed_observation = observation[:-4]
        processed_observation = np.concatenate((processed_observation, observation[-4].reshape([-1]),
                                                observation[-3].reshape([-1]), observation[-2].reshape([-1]), 
                                                observation[-1].reshape([-1])))
        return processed_observation

def build_agent(model, actions):

    processor = CustomProcessor()

    policy = BoltzmannGumbelQPolicy()
    memory = SequentialMemory(limit=1000000, window_length=1)

    dqn = DQNAgent(model=model, nb_actions=actions, policy=policy, memory=memory,
                 processor=processor, enable_double_dqn=True, enable_dueling_network=True, dueling_type='avg', target_model_update=10000, batch_size=64)
    return dqn

model = build_model(actions)
DQN = build_agent(model, actions)

DQN.compile(tf.keras.optimizers.Adam(learning_rate=0.0005)) # think about a decaying learning rate
DQN.fit(env, nb_steps=2000000, visualize=True,  action_repetition=1, verbose=0)

#### 