    This version of the snake game allows you to implement and run your evolutionary algorithm

In [1]:
import random
import time
import turtle
import random
from datetime import datetime

from deap import base
from deap import creator
from deap import tools
from deap import algorithms

In [2]:
XSIZE = YSIZE = 16 # Number of grid cells in each direction (do not change this)

In [3]:
from enum import  Enum

class Directions(Enum):
    UP = 0
    UP_RIGHT = 1
    RIGHT = 2
    DOWN_RIGHT = 3
    DOWN = 4
    DOWN_LEFT = 6
    LEFT = 7
    UP_LEFT = 8

Change headless to True when evolving your solution. You can then pickle your final solution, and run it with this set to True to visualize the result, if you wish. Note that, when using Turtle,  to run it again properly you might need to go to restart your kernel after the visualization has finished.

If you are running on the cloud, or on Google Colab, you can chose to remove the turtle and display code altogether and only run headless. You could then pickle your best solution and use this code to see it play.

In [4]:
HEADLESS = False

In [5]:
class DisplayGame:
    def __init__(self, XSIZE, YSIZE):
        # SCREEN
        self.win = turtle.Screen()
        self.win.title("EVCO Snake game")
        self.win.bgcolor("grey")
        self.win.setup(width=(XSIZE*20)+40,height=(YSIZE*20)+40)
        #self.win.screensize((XSIZE*20)+20,(YSIZE*20)+20)
        self.win.tracer(0)

        #Snake Head
        self.head = turtle.Turtle()
        self.head.shape("square")
        self.head.color("black")

        # Snake food
        self.food = turtle.Turtle()
        self.food.shape("circle")
        self.food.color("yellow")
        self.food.penup()
        self.food.shapesize(0.55, 0.55)
        self.segments = []

    def reset(self, snake):
        self.segments = []
        self.head.penup()
        self.food.goto(-500, -500)
        self.head.goto(-500, -500)
        for i in range(len(snake)-1):
            self.add_snake_segment()
        self.update_segment_positions(snake)
       
    def update_food(self,new_food):
        self.food.goto(((new_food[1]-9)*20)+20, (((9-new_food[0])*20)-10)-20)

    def draw_stats(self,stats):
        turtle.write(stats)


    def update_segment_positions(self, snake):
        self.head.goto(((snake[0][1]-9)*20)+20, (((9-snake[0][0])*20)-10)-20)
        for i in range(len(self.segments)):
            self.segments[i].goto(((snake[i+1][1]-9)*20)+20, (((9-snake[i+1][0])*20)-10)-20)

    def add_snake_segment(self):
        self.new_segment = turtle.Turtle()
        self.new_segment.speed(0)
        self.new_segment.shape("square")
        self.new_segment.color(random.choice(["green",'black','red','blue']))
        self.new_segment.penup()
        self.segments.append(self.new_segment)

In [6]:
class snake:
    def __init__(self, _XSIZE, _YSIZE):
        self.XSIZE = _XSIZE
        self.YSIZE = _YSIZE
        self.reset()
        self.directions = ["up","up_right","right","down_right","down","down_left","left","up_left"]

    def reset(self):
        self.snake = [[8,10], [8,9], [8,8], [8,7], [8,6], [8,5], [8,4], [8,3], [8,2], [8,1],[8,0] ]# Initial snake co-ordinates [ypos,xpos]    
        self.food = self.place_food()
        self.ahead = []
        self.snake_direction = "right"

    def place_food(self):
        self.food = [random.randint(1, (YSIZE-2)), random.randint(1, (XSIZE-2))]
        while (self.food in self.snake):
            self.food = [random.randint(1, (YSIZE-2)), random.randint(1, (XSIZE-2))]
        return( self.food )
    
    def update_snake_position(self):
        self.snake.insert(0, [self.snake[0][0] + (self.snake_direction == "down" and 1) + (self.snake_direction == "up" and -1), self.snake[0][1] + (self.snake_direction == "left" and -1) + (self.snake_direction == "right" and 1)])

    def food_eaten(self):
        if self.snake[0] == self.food:                                            # When snake eats the food
            return True
        else:    
            last = self.snake.pop()  # [1] If it does not eat the food, it moves forward and so last tail item is removed
            return False
            
    def snake_turns_into_self(self):
        if self.snake[0] in self.snake[1:]:
            return True
        else:
            return False

    def snake_hit_wall(self):
        if self.snake[0][0] == 0 or self.snake[0][0] == (YSIZE-1) or self.snake[0][1] == 0 or self.snake[0][1] == (XSIZE-1):
            return True
        else:
            return False

    # Example sensing functions
    def look(self,direction):
        vision_dict = {}
        vision_location=self.snake[0]
        distance_counter = 1
        while vision_location[0] > 0 and vision_location[0] < (YSIZE-1) and vision_location[1] > 0 and vision_location[1] < (XSIZE-1):
            if direction == "up":
                coord = (self.snake[0][1],  self.snake[0][0]- distance_counter)
            if direction == "up_right":
                coord = (self.snake[0][1] +distance_counter , self.snake[0][0] -distance_counter)
            if direction == "right":
                coord = (self.snake[0][1]+ distance_counter , self.snake[0][0])
            if direction == "down_right":
                coord = (self.snake[0][1] + distance_counter, self.snake[0][0] +distance_counter)
            if direction == "down":
                coord = (self.snake[0][1], self.snake[0][1]+distance_counter)
            if direction == "down_left":
                coord = (self.snake[0][1] -distance_counter, self.snake[0][0] +distance_counter)
            if direction == "left":
                coord = (self.snake[0][1]-distance_counter, self.snake[0][0])
            if direction == "up_left":
                coord = (self.snake[0][1] -distance_counter, self.snake[0][0] -distance_counter)
            vision_location = coord
            vision_dict[coord] = distance_counter
            distance_counter+=1
        return vision_dict


    def sense_blocked_wall(self,direction):
        vision_dict = self.look(direction)
        #print(vision_dict)
        return max(vision_dict.values())

    def sense_blocked(self,direction):
        if direction == "up":
            coord = [self.snake[0][0],  self.snake[0][1]- 1]
        if direction == "up_right":
            coord = [self.snake[0][0] +1 , self.snake[0][1] -1]
        if direction == "right":
            coord = [self.snake[0][0]+ 1 , self.snake[0][1]]
        if direction == "down_right":
            coord = [self.snake[0][0] + 1, self.snake[0][1] +1]
        if direction == "down":
            coord = [self.snake[0][0], self.snake[0][1]+1]
        if direction == "down_left":
            coord = [self.snake[0][0] -1, self.snake[0][1] +1]
        if direction == "left":
            coord = [self.snake[0][0]-1, self.snake[0][1]]
        if direction == "up_left":
            coord = [self.snake[0][0] -1, self.snake[0][1] -1]

        if coord in self.snake:
            return True
        elif coord[0] > 0 and coord[0] < (YSIZE-1) and coord[1] > 0 and coord[1] < (XSIZE-1):
            return False
        else:
            return False

        
    def sense_self(self,direction):
        vision_dict = self.look(direction)

        for snake in self.snake:
            if tuple(snake) in vision_dict.keys():
                return vision_dict[tuple(snake)]
        return 0



    def sense_food(self,direction):
        vision_dict = self.look(direction)
        if vision_dict.get(tuple(self.food)):
            return vision_dict.get(tuple(self.food))
        else:
            return 0

    def angle_to_food(self):
        snake_head = tuple(self.snake[0])
        food = tuple(self.food)
        ang1 = np.arctan2(*snake_head[::-1])
        ang2 = np.arctan2(*food[::-1])
        return np.rad2deg((ang1 - ang2) % (2 * np.pi))


In [7]:
import numpy as np
import math

class MLP(object):
    def __init__(self, numInput, numHidden1, numHidden2, numOutput):
        self.fitness = 0
        self.numInput = numInput + 1 # Add bias node from input to hidden layer 1 only
        self.numHidden1 = numHidden1 # Feel free to adapt the code to add more biases if you wish
        self.numHidden2 = numHidden2
        self.numOutput = numOutput

        self.w_i_h1 = np.random.randn(self.numHidden1, self.numInput)
        self.w_h1_h2 = np.random.randn(self.numHidden2, self.numHidden1)
        self.w_h2_o = np.random.randn(self.numOutput, self.numHidden2)

        self.ReLU = lambda x : max(0,x)

    def softmax(self, x):
        e_x = np.exp(x - np.max(x))
        return e_x / e_x.sum()

    def feed_forward(self, inputs):
        inputsBias = inputs[:]
        inputsBias.insert(len(inputs),1)             # Add bias input

        h1 = np.dot(self.w_i_h1, inputsBias)         # feed input to hidden layer 1
        h1 = [self.ReLU(x) for x in h1]              # Activate hidden layer1

        h2 = np.dot(self.w_h1_h2, h1)                 # feed layer 1 to hidden layer 2
        h2 = [self.ReLU(x) for x in h2]              # Activate hidden layer 2

        output = np.dot(self.w_h2_o, h2)             # feed to output layer
        #output = [self.sigmoid(x) for x in output]   # Activate output layer
        return self.softmax(output)

    def getWeightsLinear(self):
        flat_w_i_h1 = list(self.w_i_h1.flatten())
        flat_w_h1_h2 = list(self.w_h1_h2.flatten())
        flat_w_h2_o = list(self.w_h2_o.flatten())
        return( flat_w_i_h1 + flat_w_h1_h2 + flat_w_h2_o )

    def setWeightsLinear(self, Wgenome):
        numWeights_I_H1 = self.numHidden1 * self.numInput
        numWeights_H1_H2 = self.numHidden2 * self.numHidden1
        numWeights_H2_O = self.numOutput * self.numHidden2

        self.w_i_h1 = np.array(Wgenome[:numWeights_I_H1])
        self.w_i_h1 = self.w_i_h1.reshape((self.numHidden1, self.numInput))

        self.w_h1_h2 = np.array(Wgenome[numWeights_I_H1:(numWeights_H1_H2+numWeights_I_H1)])
        self.w_h1_h2 = self.w_h1_h2.reshape((self.numHidden2, self.numHidden1))

        self.w_h2_o = np.array(Wgenome[(numWeights_H1_H2+numWeights_I_H1):])
        self.w_h2_o = self.w_h2_o.reshape((self.numOutput, self.numHidden2))

In [8]:
random.seed(datetime.now()) # Set a random seed from the clock

numInputNodes = 25
numHiddenNodes1 = 16
numHiddenNodes2 = 16
numOutputNodes = 4

IND_SIZE = ((numInputNodes+1) * numHiddenNodes1) + (numHiddenNodes1 * numHiddenNodes2) + (numHiddenNodes2 * numOutputNodes)

In [9]:
myNet = MLP(numInputNodes, numHiddenNodes1, numHiddenNodes2, numOutputNodes)

In [10]:
snake_game = snake(XSIZE,YSIZE)

In [11]:
creator.create("FitnessMax", base.Fitness, weights=(20,1))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("attr_float", random.uniform, -1, 1.0)
toolbox.register("individual", tools.initRepeat, creator.Individual,
                 toolbox.attr_float, n=IND_SIZE)

In [12]:
def evaluate(indiv, myNet):
    myNet.setWeightsLinear(indiv)
    max_timestep = 1000
    score,timestep,timestep_tracker,total_direction_changes,game_over = run_game(display,myNet,snake_game,max_timestep=max_timestep)
    if score == 0 and timestep_tracker >=max_timestep:
        return -2,0,0,0,0
    if score >=15:
        print(score,timestep_tracker,timestep)
    return score,timestep

In [13]:
toolbox.register("evaluate", evaluate)
toolbox.register("select",tools.selNSGA2)

toolbox.register("mutate", tools.mutGaussian, mu=0.0, sigma=0.5, indpb=0.5)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("mate", tools.cxUniform, indpb=0.35)

In [14]:
def pareto_eq(ind1, ind2):
    return np.allclose(ind1.fitness.values, ind2.fitness.values)

In [15]:
import wandb

NGEN = 300
popSize = MU = 1000
LAMBDA = 200
CXPB = 0.7
MUTPB = 0.3


wandb.config = {
    "NGEN": NGEN,
    "popSize": popSize,
    "LAMBDA": LAMBDA,
    "CXPB":CXPB,
    "MUTPB": MUTPB
}

In [16]:
pop = toolbox.population(n=popSize)
hof = tools.ParetoFront(similar=pareto_eq)

stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean, axis=0)
stats.register("std", np.std, axis=0)
stats.register("min", np.min, axis=0)
stats.register("max", np.max, axis=0)
logbook = tools.Logbook() #

In [17]:
def run_game(display,brain,snake_game, headless=True,max_timestep=100):

    score = 0
    timestep = 0
    timestep_tracker = 0
    total_direction_changes = 0
    snake_game.reset()
    if not headless:
        display.reset(snake_game.snake)
        display.win.update()
    snake_game.place_food()
    game_over = False
    flag = True
    while not game_over:

        # ****YOUR AI BELOW HERE******************

        # Here is a very silly random snake controller. It moves with a correlated random walk, and the only sensible decision it makes is not to turn directly back on itself (possible in this game)
        # *** Replace this with your evolved controller here to decide on the direction the snake should take*
        directions = ["up","up_right","right","down_right","down","down_left","left","up_left"]

        def move_ai(brain):
            wall_distance = [snake_game.sense_blocked_wall(direction) for direction in directions]
            blocked = [snake_game.sense_blocked(direction) for direction in directions]
            self_distance = [snake_game.sense_self(direction) for direction in directions]
            food_distance = [snake_game.sense_food(direction) for direction in directions]
            inputs = wall_distance+self_distance+food_distance+ [snake_game.angle_to_food()]
            #print(f"WD {tuple(zip(directions,wall_distance))} ")
            #print(f"SD {tuple(zip(directions,self_distance))}")
            #print(f"FD {tuple(zip(directions,food_distance))}")
            output = brain.feed_forward(inputs)
            #inputs = blocked_sensor + food_sensor+[snake_game.sense_food_distance()]
            #print(snake_game.sense_food_distance())
            decision = np.argmax(output,axis=0)
            print(wall_distance)

            if decision  == 0:
                new_snake_direction = "up"
            elif decision == 1:
                new_snake_direction = "right"
            elif decision == 2:
                new_snake_direction = "down"
            elif decision ==3:
                new_snake_direction = "left"
            return new_snake_direction

        next_direction = move_ai(brain)
        print(next_direction)

        if snake_game.snake_direction != next_direction:
            total_direction_changes +=1
        snake_game.snake_direction = next_direction
        timestep+=1
        timestep_tracker+=1
        if timestep_tracker >= max_timestep:
            game_over = True
        # Here is an example sensing function



        # ****YOUR AI ABOVE HERE******************

        snake_game.update_snake_position()

        # Check if food is eaten
        if snake_game.food_eaten():
            snake_game.place_food()
            print("Food Eaten!")
            score += 1
            timestep_tracker = 0

            if not headless: display.add_snake_segment()

        # Game over if the snake runs over itself
        if snake_game.snake_turns_into_self():
            game_over = True
            print(f"Timesteps: {timestep}, Score: {score} Snake hit self")

        # Game over if the snake goes through a wall
        if snake_game.snake_hit_wall():
            game_over = True
            print(f"Timesteps: {timestep} , Score: {score} Snake hit a wall!")

        if not headless:
            display.update_food(snake_game.food)
            display.update_segment_positions(snake_game.snake)
            #display.draw_stats(directions)
            display.win.update()
            time.sleep(0.5) # Change this to modify the speed the game runs at when displayed.

    #print("FINAL score - " + str(score))
    #print("\nFINAL timestep - " + str(timestep))

    return (score,timestep,timestep_tracker,total_direction_changes,game_over)


In [18]:
fitnesses = [toolbox.evaluate(ind,myNet) for ind in pop]
for ind, fit in zip(pop, fitnesses):
    ind.fitness.values = fit

[8, 5, 5, 5, 5, 7, 10, 8]
down
[9, 5, 5, 5, 5, 6, 10, 9]
down
[10, 5, 5, 5, 5, 5, 10, 10]
down
[11, 5, 5, 4, 5, 4, 10, 10]
down
[12, 5, 5, 3, 5, 3, 10, 10]
down
[13, 5, 5, 2, 5, 2, 10, 10]
down
[14, 5, 5, 1, 5, 1, 10, 10]
down
Timesteps: 7 , Score: 0 Snake hit a wall!
[8, 5, 5, 5, 5, 7, 10, 8]
right
[8, 4, 4, 4, 4, 7, 11, 8]
right
[8, 3, 3, 3, 3, 7, 12, 8]
right
[8, 2, 2, 2, 2, 7, 13, 8]
right
[8, 1, 1, 1, 1, 7, 14, 8]
right
Timesteps: 5 , Score: 0 Snake hit a wall!
[8, 5, 5, 5, 5, 7, 10, 8]
down
[9, 5, 5, 5, 5, 6, 10, 9]
down
[10, 5, 5, 5, 5, 5, 10, 10]
down
[11, 5, 5, 4, 5, 4, 10, 10]
left
[11, 6, 6, 4, 6, 4, 9, 9]
left
[11, 7, 7, 4, 7, 4, 8, 8]
left
[11, 8, 8, 4, 8, 4, 7, 7]
left
[11, 9, 9, 4, 9, 4, 6, 6]
left
[11, 10, 10, 4, 10, 4, 5, 5]
left
[11, 11, 11, 4, 11, 4, 4, 4]
left
[11, 11, 12, 4, 12, 3, 3, 3]
left
[11, 11, 13, 4, 13, 2, 2, 2]
left
[11, 11, 14, 4, 14, 1, 1, 1]
left
Timesteps: 13 , Score: 0 Snake hit a wall!
[8, 5, 5, 5, 5, 7, 10, 8]
down
[9, 5, 5, 5, 5, 6, 10, 9]
right
[

In [None]:
wandb.init(project="EVAC", entity="samk")
for g in range(NGEN):
    print("-- Generation %i --" % g)

    # Generate offspring using VarOr
    offspring = algorithms.varOr(pop, toolbox, LAMBDA, CXPB, MUTPB)
    # Note that the MUTPB is a bit different here. It is the proportion of individuals that will be mutated, rather than crossed over.
    # So with varOr MUTPB + CXPB <= 1.0

    # Evaluate offspring
    invalid_inds = [ind for ind in offspring if not ind.fitness.valid]
    fitnesses = [toolbox.evaluate(ind,myNet) for ind in invalid_inds]
    for ind, fit in zip(invalid_inds, fitnesses):
        ind.fitness.values = fit

    nextGenOffspring = toolbox.select(pop + offspring, MU)
    nextGenOffspring = list(map(toolbox.clone, nextGenOffspring))
    pop[:] = nextGenOffspring

    hof.update(pop) #This version keeps the HoF a constant size. See also hof.insert()

    record = stats.compile(pop)

    logbook.record(gen=g, **record)

    wandb.log({"average_score": logbook[-1]["avg"][0],
               "current_gen":g,
               "min_score": logbook[-1]["min"][0],
               "max_score": logbook[-1]["max"][0],
               "std_score": logbook[-1]["std"][0],
               "average_timestamp": logbook[-1]["avg"][1],
               "min_timestamp": logbook[-1]["min"][1],
               "max_timestamp": logbook[-1]["max"][1],
               "std_timestamp": logbook[-1]["std"][1]
               })





wandb: Currently logged in as: samk (use `wandb login --relogin` to force relogin)
wandb: wandb version 0.12.11 is available!  To upgrade, please run:
wandb:  $ pip install wandb --upgrade


-- Generation 0 --
[8, 5, 5, 5, 5, 7, 10, 8]
left
Timesteps: 1, Score: 0 Snake hit self
[8, 5, 5, 5, 5, 7, 10, 8]
up
[7, 5, 5, 5, 5, 8, 10, 7]
up
[6, 5, 5, 5, 5, 9, 10, 6]
up
[5, 5, 5, 5, 5, 10, 10, 5]
up
[4, 4, 5, 5, 5, 10, 10, 4]
up
[3, 3, 5, 5, 5, 10, 10, 3]
up
[2, 2, 5, 5, 5, 10, 10, 2]
up
[1, 1, 5, 5, 5, 10, 10, 1]
up
Timesteps: 8 , Score: 0 Snake hit a wall!
[8, 5, 5, 5, 5, 7, 10, 8]
up
[7, 5, 5, 5, 5, 8, 10, 7]
up
[6, 5, 5, 5, 5, 9, 10, 6]
up
[5, 5, 5, 5, 5, 10, 10, 5]
up
[4, 4, 5, 5, 5, 10, 10, 4]
up
[3, 3, 5, 5, 5, 10, 10, 3]
up
[2, 2, 5, 5, 5, 10, 10, 2]
up
[1, 1, 5, 5, 5, 10, 10, 1]
right
[1, 1, 4, 4, 4, 11, 11, 1]
right
[1, 1, 3, 3, 3, 12, 12, 1]
right
[1, 1, 2, 2, 2, 13, 13, 1]
right
[1, 1, 1, 1, 1, 14, 14, 1]
right
Timesteps: 12 , Score: 0 Snake hit a wall!
[8, 5, 5, 5, 5, 7, 10, 8]
left
Timesteps: 1, Score: 0 Snake hit self
[8, 5, 5, 5, 5, 7, 10, 8]
left
Timesteps: 1, Score: 0 Snake hit self
[8, 5, 5, 5, 5, 7, 10, 8]
up
[7, 5, 5, 5, 5, 8, 10, 7]
up
[6, 5, 5, 5, 5, 9, 10,

In [None]:
logbook[0]

In [None]:
record

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

gen = logbook.select("gen")
_min = logbook.select("min")
_max = logbook.select("max")
avgs = logbook.select("avg")
stds = logbook.select("std")

In [None]:
avgs_score = [item[0] for item in avgs]
avgs_timestep = [item[1] for item in avgs]

In [None]:
plt.rc('axes', labelsize=14)
plt.rc('xtick', labelsize=14)
plt.rc('ytick', labelsize=14)
plt.rc('legend', fontsize=14)

fig, ax1 = plt.subplots()
line1 = ax1.plot(gen, avgs_score)
ax1.set_xlabel("Generation")
ax1.set_ylabel("Fitness (score)")

In [None]:
fig2, ax2 = plt.subplots()
line2 = ax2.plot(gen, avgs_timestep)
ax2.set_xlabel("Generation")
ax2.set_ylabel("Fitness (timestamp)")

In [None]:
if not HEADLESS:
    display = DisplayGame(XSIZE,YSIZE)

Below is what you would use for your evaluation. You would also use it to display your final (pickled and loaded) solution, with headless = False.

If you have run without headless (i.e. with the display) you will need to reset your kernel to run again. This appears to be an IPython issue with Turtle, but if you have a way to deal with that, do please let me know. You will probable only want to run it one-at-a-time anyway.

In [None]:
bestInd = tools.selBest(pop, 1)[0]

In [None]:
myNet.setWeightsLinear(bestInd)

In [None]:
final_run = run_game(display,myNet,snake_game, headless=HEADLESS)

In [None]:
final_run

In [None]:
wandb.log({"final_score": final_run[0]})

In [None]:
if not HEADLESS:
    turtle.done()


In [None]:
if not HEADLESS:
    turtle.bye()
