In [1]:

import pygame 
import random
from copy import deepcopy
import numpy as np 
import os 
import sys 
from copy import deepcopy
import matplotlib.pyplot as plt
from collections import deque
import tensorflow as tf
import torch
import torch.nn as nn
# Move the model to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

pygame 2.2.0 (SDL 2.0.22, Python 3.8.16)
Hello from the pygame community. https://www.pygame.org/contribute.html
cuda


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class DQL:
    def __init__(self, model, criterion, optimizer, actions, discount_factor=0.6, exploration_rate=0.9, memory_size=100000, batch_size=500,base_decay_rate = 0.995, decay_rate=0.98565207, base_exploration_rate = 0.1,validation_batch_size = 100 ,epochs = 1,device = 'cpu'):
        #NN
        self.model = model
        self.actions = actions
        self.epochs = epochs
        self.device = device
        self.criterion = criterion
        self.optimizer = optimizer
        #gamma
        self.discount_factor = discount_factor
        #for epsilon-greedy
        self.exploration_rate = exploration_rate
        #buffer
        self.memory = deque(maxlen=memory_size)
        self.evalmemory  = deque(maxlen = memory_size)
        self.batch_size = batch_size
        #diminish exploration 
        self.base_decay_rate = base_decay_rate
        self.decay_rate = decay_rate
        self.validation_batch_size = validation_batch_size
        self.base_exploration_rate = base_exploration_rate

    def get_action(self, state, direction, snake_list, block_size, width, height):
        action = direction
        possible_moves = list(range(0, len(self.actions)))

        # eliminate all impossible moves
        acts = list(self.actions)
        poss_copy = possible_moves.copy()
        for p in poss_copy:
            (u, v) = (snake_list[-1][0] + acts[p][1] * block_size, snake_list[-1][1] + acts[p][0] * block_size)
            if (u < 0 or u >= width or v < 0 or v >= height or [u, v] in snake_list[1:]):
                possible_moves.remove(p)

        if len(possible_moves) > 0:
            if np.random.rand() < self.base_exploration_rate + self.exploration_rate:
                # Choose a random action
                action = possible_moves[np.random.randint(len(possible_moves))]
            else:
                # Choose the best action according to the model
                with torch.no_grad():
                    q_values = self.model(torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device))
                    q_values = q_values.squeeze(0)

                sorted = q_values.argsort(descending=True)
                for s in sorted:
                    if s in possible_moves:
                        action = s
                        break

        return action


    def add_memory(self, state, action, reward, next_state, done,episode_reward):
        x = np.random.rand()

        if(x<0.9):
            self.memory.append((state, action, reward, next_state, done))
        else:
            self.evalmemory.append((state, action, reward, next_state, done))
        episode_reward=episode_reward*self.discount_factor+reward
        return episode_reward
    
    def train(self, batch_size):
        if len(self.memory) < batch_size:
            # Not enough memories to train the model
            return

        # Randomly sample memories from the replay buffer
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = [], [], [], [], []

        for state, action, reward, next_state, done in batch:
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            next_states.append(next_state)
            dones.append(done)

        states = torch.tensor(states,dtype = torch.float32).to(self.device)


        next_states = torch.tensor(next_states,dtype = torch.float32).to(self.device)


        # Decrease the exploration rate
        self.exploration_rate *= self.decay_rate
        self.base_exploration_rate *= self.base_decay_rate

        target_q_values = np.zeros((batch_size,len(self.actions)))

        # Calculate the target Q-values
        with torch.no_grad():
            next_q_values = self.model(next_states).to(self.device)
            for i in range(batch_size):
                if dones[i]:
                    target_q_values[i][actions[i]] = rewards[i]

                else:
                    target_q_values[i][actions[i]] = rewards[i] + self.discount_factor * max(next_q_values[i])
        target_q_values = torch.tensor(target_q_values,dtype = torch.float32).to(self.device)

        # Train the model for a specified number of epochs
        for _ in range(self.epochs):
            predicted_q_values = self.model(states).float().to(self.device)
            loss = nn.functional.mse_loss(predicted_q_values, target_q_values)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        # Remove variables from memory
        del states, actions, rewards, next_states, dones, target_q_values


In [3]:

# Snake block size
block_size = 25


# Set display width and height
width = 500 
height = 500
penalty_names  = ['accessible_points_proportion','penalty_distance','penalty_touch_self','penalty_distance*gass_reward','reward_eat','penalty_wall','penalty_danger','compacity','episode_len_penalty']
c = np.array([0.4,0.5,0.3,0.1,0.6,0.3,0.2,0.1,0.4])

In [4]:
def generate_food(snake_list):
    # Generate food for the snake where there is no snake
    food_x, food_y = None, None
    
    while food_x is None or food_y is None or [food_x, food_y] in snake_list:
        food_x = round(random.randrange(0, width - block_size) / block_size) * block_size
        food_y = round(random.randrange(0, height - block_size) / block_size) * block_size
    return food_x, food_y


In [5]:
#dictionary of possible actions 
actions = {"up":(-1,0),"down":(1,0),"left":(0,-1),"right":(0,1)}

#(x,y) apple + snake body (which has at most width*height parts) 
input_size = 2*width*height//block_size**2+2





In [6]:

def initNNmodel():
    # Define the input shape
    input_shape = (3, height//block_size, width//block_size)

    # Create a Sequential model
    model = nn.Sequential(
        # Add a 2D convolutional layer with 80 filters, a kernel size of 3x3, and ReLU activation
        nn.Conv2d(3, 80, kernel_size=3, padding=1),
        nn.ReLU(),
        
        # Add a flatten layer to convert the 2D output to a 1D vector
        nn.Flatten(),
        
        # Add 3 fully connected layers with ReLU activation
        nn.Linear(80 * (height // block_size) * (width // block_size), 2048),
        nn.ReLU(),
        nn.Linear(2048, 1024),
        nn.ReLU(),
        nn.Linear(1024, 512),
        nn.ReLU(),
        
        # Add the output layer with len(actions) units
        nn.Linear(512, len(actions))
    )


    model.to(device)

    # Define the loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters())



    return model, criterion, optimizer

In [7]:
def state(snake_list,apple):
    layer_head = np.zeros((height//block_size,width//block_size))
    layer_tail = np.zeros((height//block_size,width//block_size))
    layer_apple = np.zeros((height//block_size,width//block_size))
    layer_head[snake_list[-1][0]//block_size-1, snake_list[-1][1]//block_size-1] =1 
    layer_apple[apple[0]//block_size-1, apple[1]//block_size-1] =1 
    for s in snake_list[:-1]:
        layer_tail[s[0]//block_size-1, s[1]//block_size-1] =1 
    input = np.zeros( (3,height//block_size, width//block_size))
    input[0,:,:] = layer_head
    input[1,:,:] = layer_tail
    input[2,:,:] = layer_apple

    return(input)

def normalized_distance(u,v,food_x,food_y):
    return np.sqrt((((u-food_x)/width)**2+((v-food_y)/height)**2)/2)

def inBounds(u,v):
    if(u>=0 and v>=0):
        if(u<width and v<height):
            return True
    return False

def gaussian_aroundone(x,alpha):
    return(np.exp(-alpha*(x-1)**2))
def danger_distance(direction, snake_list):
    dis =0  
    acts = list(actions.values())
    (u,v) = (snake_list[-1][0],snake_list[-1][1])
    while (inBounds(u,v)):
        u +=block_size*acts[direction][1]
        v+=block_size*acts[direction][0]
        if([u,v] in snake_list[1:]):
            return (-1+1.0*dis*block_size/max(width,height))
        dis+=1
    return(0)

#reward function for each state and action
def reward(action, snake_list,episode_length,c):
    copy = deepcopy(snake_list)
    p = copy[-1]
    a = list(actions.values())
    (u,v)=(a[action][1]*block_size+p[0],a[action][0]*block_size+p[1])
    penalty_touch_self = 0 
    if [u,v] in snake_list:
        penalty_touch_self=-1 # return a negative reward if the snake collides with itself
    copy.append([u,v])
    del copy[0]
    
    global food_x, food_y
    # reward the agent for getting closer to the food
    reward_distance = 1-normalized_distance(u,v,food_x,food_y)
    #if too far then the reward is very close to 0 
    gass_reward =gaussian_aroundone(reward_distance,20)
    # reward the agent for eating the food
    reward_eat = 1 if u == food_x and v == food_y else 0
    # penalize the agent for moving away from the food
    penalty_distance = -1 if normalized_distance(u,v,food_x,food_y) > normalized_distance(p[0], p[1], food_x, food_y) else 0.5
    # penalize he agent for hitting a wall
    penalty_wall = -1 if not (inBounds(u,v)) else 0
    #penalize the agent for getting closer to danger
    penalty_danger = danger_distance(action,snake_list)
    #print(penalty_danger)
    compacity_value = 1/compacity(snake_list)
    #accessible points 
    accessible_points_proportion = find_accessible_points(snake_list)-1
    episode_length_penalty = -episode_length/(width*height//block_size**2)
    penalties = np.array([accessible_points_proportion,penalty_distance,penalty_touch_self,penalty_distance*gass_reward,reward_eat,penalty_wall,penalty_danger,compacity_value,episode_length_penalty])
  
    penalty_names  = ['accessible_points_proportion','penalty_distance','penalty_touch_self','penalty_distance*gass_reward','reward_eat','penalty_wall','penalty_danger','compacity','episode_len_penalty']


    total_reward = penalties@c/c.sum() 

    return total_reward

def compacity(snake_list):
    snake_list = np.array(snake_list)
    min_x = snake_list[:,0].min()
    min_y = snake_list[:,1].min()
    max_x = snake_list[:,0].max()
    max_y = snake_list[:,1].max()
    return((max_y-min_y+block_size)*(max_x-min_x+block_size)/(len(snake_list)*block_size**2))   


#if all cells are accessible 
def find_accessible_points(snake_list):
    accessible_points= np.zeros((height//block_size,width//block_size))
    head_position = snake_list[-1]
    explore = [head_position]

    while(len(explore)>0):
        p = explore.pop()
        accessible_points[p[1]//block_size,p[0]//block_size]=1
        for m in actions.values():
            (u,v)=(m[1]*block_size+p[0],m[0]*block_size+p[1])
            if(inBounds(u,v)):
                if(accessible_points[v//block_size,u//block_size]==0):
                    if(not [u,v] in snake_list):
                        explore.append((u,v))

    return((np.sum(accessible_points)+len(snake_list)-1)*block_size**2/(height*width))


In [8]:
model, criterion, optimizer = initNNmodel()

max_exploration_episodes = 150
max_exploration_rate = 0.9
min_exploration_rate = 0.05
decay_rate = (min_exploration_rate/max_exploration_rate)**(1.0/max_exploration_episodes)
#initialize deepQ
dql = DQL(model, criterion, optimizer,actions.values(),decay_rate = decay_rate ,exploration_rate= max_exploration_rate ,device = device,epochs=10)
# Initialize pygame

food_x, food_y = generate_food([])   

In [9]:
lens = []

def main(length,c,dql):
    episode_reward = 0
    # Initial snake position and food
    snake_x = (width//block_size)//2     * block_size   
    snake_y = (height//block_size)//2     * block_size   

    snake_list = [[snake_x,snake_y]]
    
    global food_x,food_y
    #food_x, food_y = generate_food([])   
    episode_length =0 
    # Initial snake direction and length

    snake_length =length
    #direction of the snake [0,1,2,3] each corresponding to one of up down left right
    a=0
    # = True if snake hits wall or itself
    done = False
    # number of collected fruit
    score = 0
    acts = list(actions.values())
    # Game loop
    check = 0 
    while True:


        St1 = state(snake_list,[food_x,food_y]) 
        a= dql.get_action(St1,a,snake_list,block_size,width,height)
        r = reward(a,snake_list,episode_length-check,c)

        #move snake
        snake_x+=acts[a][1]*block_size
        snake_y+=acts[a][0]*block_size
        # Add new block of snake to the list
        snake_list.append([snake_x, snake_y])
        # Keep the length of snake same as snake_length
        if len(snake_list) > snake_length:
            del snake_list[0]

        # Check if snake hits the boundaries
        if snake_x >= width or snake_x < 0 or snake_y >= height or snake_y < 0:
            done = True
        # Check if snake hits itself
        for block in snake_list[:-1]:
            if block[0] == snake_x and block[1] == snake_y:
                done = True
                break

        St2 = state(snake_list,[food_x,food_y])
        #add to Buffer
        episode_reward= dql.add_memory(St1,a,r,St2,done,episode_reward)

        # Check if snake hits the food
        if snake_x == food_x and snake_y == food_y:
            lens.append(episode_length-check)
            food_x, food_y = generate_food(snake_list)
            snake_length += 1
            score+=1
            check = episode_length
        episode_length+=1
#
        if((episode_length-check)%((width*height//block_size**2)) ==0 ):
            done = True
        #if snake has hit something quit
        if(done):
            return [snake_length,episode_length,score,episode_reward]
        


In [10]:
max_m = 0
max_avg =0 
if(os.path.exists('tuning_c_avg.txt')):
    with open('tuning_c_avg.txt', 'r') as f:

        last_line = f.readlines()[-1]
        m,max_avg_str,cs = last_line.split(" ")
        c=  np.array( [float(num) for num in cs.split(",")])
        max_m = float(m)
        max_avg = float(max_avg_str)
best_c = deepcopy(c)
print(best_c)
print(max_avg)



[0.38697159 0.52548699 0.22038228 0.40087986 0.33995491 0.29913072
 0.58751657 0.34641235 0.48963719]
2.44


In [11]:
#number of episodes
num_episodes =100
#maximum score reached
m =0 
#initial max length for the snake at birth
max_length = 1
#maximum allowed length for a snake 
max_max_length = width*height//block_size**2
exploration_rate = 0.9
min_exploration_rate = 0.05
decay_rate = (min_exploration_rate/exploration_rate)**(1.0/num_episodes)
print(decay_rate)
def run_new_model(c,num_episodes,thread):
    m=0
    avg = 0 
    global max_length,max_m,max_avg

    model, criterion, optimizer = initNNmodel()
    dql = DQL(model, criterion, optimizer,actions.values(),decay_rate = decay_rate,device = device)
    for i in range(num_episodes):
        print(thread," ",i,m,avg, max_m)
        #do a generation and see the outcome
        a= main(np.random.randint(1,max_length+1),c,dql)
        #update maximum score 
        m = max(a[2],m)
        avg = (1.0*i*avg+a[2])/(i+1)
        #generate a new food position every 20 generations
        if(i%2 ==0):
            food_x, food_y = generate_food([])   

        dql.train(a[1])

    del model
    del dql
    if(m>max_m or (m==max_m and avg>max_avg)):

        best_c = deepcopy(c)
        with open("tuning_c_avg.txt", "a") as f:
            f.write("\n")  # add a newline character before writing the list
            f.write(str(m)+" ")
            f.write(str(avg)+ " ")
            f.write(",".join([str(k) for k in best_c])) 
        max_m = m
        max_avg = avg
    


0.971509999298448


In [12]:
torch.cuda.empty_cache()

In [13]:
number_of_threads = 6

from threading import Thread 
run = 0 
borne = 0.5

while(borne>0.05):
    borne = np.exp(-run/6)
    print(borne)
    threads = []
    for i in range(number_of_threads):
            # Create a thread for this combination of parameter values and start it
        c= np.array([np.random.uniform(np.clip(x-borne,0,1),np.clip(x+borne,0,1)) for x in best_c])
        t = Thread(target=run_new_model, args=(c, num_episodes,i))
        t.start()
        
        # Add the thread to the list
        threads.append(t)
    # Wait for all the threads to finish
    for t in threads:
        t.join()
    run+=1

1.0
50   0 0 0 8.0
   0 0 0 8.0
4   0 0 0 8.0
2   0 0 0 8.0
1   0 0 0 8.0
3   0 0 0 8.0
1   1 0 0.0 8.0
3   1 0 0.0 8.0
0   1 0 0.0 8.0
2   1 0 0.0 8.0
5   1 0 0.0 8.0
4   1 0 0.0 8.0
0   2 0 0.0 8.0
3   2 0 0.0 8.0
5   2 0 0.0 8.0
1   2 0 0.0 8.0
4   2 1 0.5 8.0
2   2 1 0.5 8.0
1   3 0 0.0 8.0
5   3 0 0.0 8.0
3   3 1 0.3333333333333333 8.0
0   3 2 0.6666666666666666 8.0
4   3 1 0.3333333333333333 8.0
2   3 1 0.3333333333333333 8.0
1   4 0 0.0 8.0
5   4 0 0.0 8.0
0   4 2 0.5 8.0
3   4 1 0.25 8.0
4   4 1 0.25 8.0
2   4 1 0.25 8.0


Exception in thread Thread-8:
Traceback (most recent call last):
  File "c:\Users\Alhus\anaconda3\envs\pytorch\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "c:\Users\Alhus\anaconda3\envs\pytorch\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Alhus\AppData\Local\Temp\ipykernel_6408\3210863791.py", line 31, in run_new_model
  File "C:\Users\Alhus\AppData\Local\Temp\ipykernel_6408\2193369100.py", line 109, in train
  File "c:\Users\Alhus\anaconda3\envs\pytorch\lib\site-packages\torch\tensor.py", line 221, in backward
    torch.autograd.backward(self, gradient, retain_graph, create_graph)
  File "c:\Users\Alhus\anaconda3\envs\pytorch\lib\site-packages\torch\autograd\__init__.py", line 130, in backward
    Variable._execution_engine.run_backward(
RuntimeError: CUDA out of memory. Tried to allocate 250.00 MiB (GPU 0; 8.00 GiB total capacity; 6.86 GiB already allocated; 0 bytes free; 7.13 GiB reserved in total by Py

0   5 2 0.4 8.0
3   5 1 0.2 8.0
1   5 1 0.2 8.0
2   5 1 0.2 8.0
4   5 2 0.6 8.0
0   6 2 0.3333333333333333 8.0
3   6 1 0.3333333333333333 8.0
2   6 1 0.16666666666666666 8.0
4   6 2 0.5 8.0
1   6 1 0.3333333333333333 8.0
3   7 1 0.2857142857142857 8.0
0   7 2 0.42857142857142855 8.0
4   7 2 0.42857142857142855 8.0
2   7 1 0.14285714285714285 8.0
1   7 1 0.2857142857142857 8.0
3   8 2 0.5 8.0
4   8 2 0.375 8.0
2   8 1 0.125 8.0
0   8 2 0.5 8.0
1   8 1 0.25 8.0
3   9 2 0.4444444444444444 8.0
4   9 2 0.3333333333333333 8.0
2   9 1 0.1111111111111111 8.0
0   9 2 0.4444444444444444 8.0
1   9 1 0.2222222222222222 8.0
3   10 2 0.4 8.0
3   11 2 0.5454545454545454 8.0
4   10 2 0.3 8.0
2   10 1 0.1 8.0
0   10 2 0.4 8.0
1   10 1 0.2 8.0
4   11 2 0.2727272727272727 8.0
3   12 2 0.5 8.0
2   11 1 0.09090909090909091 8.0
0   11 2 0.45454545454545453 8.0
1   11 1 0.18181818181818182 8.0
3   13 2 0.46153846153846156 8.0
4   12 2 0.3333333333333333 8.0
1   12 1 0.16666666666666666 8.0
0   12 2 0.5 8.0
2

Exception in thread Thread-9:
Traceback (most recent call last):
  File "c:\Users\Alhus\anaconda3\envs\pytorch\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "c:\Users\Alhus\anaconda3\envs\pytorch\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Alhus\AppData\Local\Temp\ipykernel_6408\3210863791.py", line 31, in run_new_model
  File "C:\Users\Alhus\AppData\Local\Temp\ipykernel_6408\2193369100.py", line 110, in train
  File "c:\Users\Alhus\anaconda3\envs\pytorch\lib\site-packages\torch\autograd\grad_mode.py", line 26, in decorate_context
    return func(*args, **kwargs)
  File "c:\Users\Alhus\anaconda3\envs\pytorch\lib\site-packages\torch\optim\adam.py", line 108, in step
    F.adam(params_with_grad,
  File "c:\Users\Alhus\anaconda3\envs\pytorch\lib\site-packages\torch\optim\functional.py", line 94, in adam
    denom = (exp_avg_sq.sqrt() / math.sqrt(bias_correction2)).add_(eps)
RuntimeError: CUDA out of memory. Tr

3   14 2 0.42857142857142855 8.0
4   15 2 0.4666666666666667 8.0
1   14 2 0.42857142857142855 8.0
2   15 1 0.2 8.0
5   16 2 0.25 8.0
3   15 2 0.4 8.0
4   16 2 0.4375 8.0
1   15 2 0.4 8.0
2   16 1 0.1875 8.0
5   17 2 0.23529411764705882 8.0
3   16 2 0.375 8.0
1   16 2 0.375 8.0
4   17 2 0.4117647058823529 8.0
2   17 1 0.17647058823529413 8.0
5   18 2 0.2222222222222222 8.0
3   17 2 0.35294117647058826 8.0
4   18 2 0.3888888888888889 8.0
1   17 2 0.35294117647058826 8.0
2   18 1 0.16666666666666666 8.0
5   19 2 0.21052631578947367 8.0
3   18 2 0.3333333333333333 8.0
1   18 2 0.3333333333333333 8.0
4   19 2 0.42105263157894735 8.0
2   19 1 0.15789473684210525 8.0
5   20 2 0.2 8.0
3   19 2 0.3157894736842105 8.0
1   19 2 0.3157894736842105 8.0
4   20 2 0.4 8.0
2   20 1 0.15 8.0
5   21 2 0.19047619047619047 8.0
3   20 2 0.35 8.0
1   20 2 0.3 8.0
4   21 2 0.38095238095238093 8.0
2   21 1 0.14285714285714285 8.0
5   22 2 0.18181818181818182 8.0
3   21 2 0.3333333333333333 8.0
1   21 2 0.28571

In [None]:
import os

os.system('shutdown /s /t 0')
