In [1]:
import torch
import random
import numpy as np
from collections import deque
from game import SnakeGameAI, Direction, Point
from model import Linear_QNet, QTrainer
from helper import plot

MAX_MEMORY = 100_000
BATCH_SIZE = 1000
LR = 0.001

class Agent:

    def __init__(self):
        self.n_games = 0
        self.epsilon = 0 # randomness
        self.gamma = 0.9 # discount rate
        self.memory = deque(maxlen=MAX_MEMORY) # popleft()
        self.model = Linear_QNet(11, 256, 3)
        self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)


    def get_state(self, game):
        head = game.snake[0]
        west = Point(head.x - 20, head.y)
        east = Point(head.x + 20, head.y)
        point_u = Point(head.x, head.y - 20)
        point_d = Point(head.x, head.y + 20)
        
        dir_l = game.direction == Direction.LEFT
        dir_r = game.direction == Direction.RIGHT
        dir_u = game.direction == Direction.UP
        dir_d = game.direction == Direction.DOWN

        state = [
            # Danger straight
            (dir_r and game.is_collision(east)) or 
            (dir_l and game.is_collision(west)) or 
            (dir_u and game.is_collision(point_u)) or 
            (dir_d and game.is_collision(point_d)),

            # Danger right
            (dir_u and game.is_collision(east)) or 
            (dir_d and game.is_collision(west)) or 
            (dir_l and game.is_collision(point_u)) or 
            (dir_r and game.is_collision(point_d)),

            # Danger left
            (dir_d and game.is_collision(east)) or 
            (dir_u and game.is_collision(west)) or 
            (dir_r and game.is_collision(point_u)) or 
            (dir_l and game.is_collision(point_d)),
            
            # Move direction
            dir_l,
            dir_r,
            dir_u,
            dir_d,
            
            # Food location 
            game.food.x < game.head.x,  # food left
            game.food.x > game.head.x,  # food right
            game.food.y < game.head.y,  # food up
            game.food.y > game.head.y  # food down
            ]

        return np.array(state, dtype=int)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done)) # popleft if MAX_MEMORY is reached

    def train_long_memory(self):
        if len(self.memory) > BATCH_SIZE:
            mini_sample = random.sample(self.memory, BATCH_SIZE) # list of tuples
        else:
            mini_sample = self.memory

        states, actions, rewards, next_states, dones = zip(*mini_sample)
        self.trainer.train_step(states, actions, rewards, next_states, dones)
        #for state, action, reward, nexrt_state, done in mini_sample:
        #    self.trainer.train_step(state, action, reward, next_state, done)

    def train_short_memory(self, state, action, reward, next_state, done):
        self.trainer.train_step(state, action, reward, next_state, done)

    def get_action(self, state):
        # random moves: tradeoff exploration / exploitation
        self.epsilon = 80 - self.n_games
        final_move = [0,0,0]
        if random.randint(0, 200) < self.epsilon:
            move = random.randint(0, 2)
            final_move[move] = 1
        else:
            state0 = torch.tensor(state, dtype=torch.float)
            prediction = self.model(state0)
            move = torch.argmax(prediction).item()
            final_move[move] = 1

        return final_move


def train():
    plot_scores = []
    plot_mean_scores = []
    total_score = 0
    record = 0
    agent = Agent()
    game = SnakeGameAI()
    while True:
        # get old state
        state_old = agent.get_state(game)

        # get move
        final_move = agent.get_action(state_old)

        # perform move and get new state
        reward, done, score = game.play_step(final_move)
        state_new = agent.get_state(game)

        # train short memory
        agent.train_short_memory(state_old, final_move, reward, state_new, done)

        # remember
        agent.remember(state_old, final_move, reward, state_new, done)

        if done:
            # train long memory, plot result
            game.reset()
            agent.n_games += 1
            agent.train_long_memory()

            if score > record:
                record = score
                agent.model.save()

            print('Game', agent.n_games, 'Score', score, 'Record:', record)

            plot_scores.append(score)
            total_score += score
            mean_score = total_score / agent.n_games
            plot_mean_scores.append(mean_score)
            plot(plot_scores, plot_mean_scores)


pygame 2.0.1 (SDL 2.0.14, Python 3.8.5)
Hello from the pygame community. https://www.pygame.org/contribute.html


# DQN

In [22]:
import torch
import random
import numpy as np
from collections import deque
from game import SnakeGameAI, Direction, Point
from helper import plot
import pygame
import os

import math, random
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import time

import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output



Point(x=300.0, y=120.0) Point(x=340.0, y=120.0) Point(x=320.0, y=100.0) Point(x=320.0, y=140.0)
False False True False
False False False False False True False False True False True


In [None]:
#Set device to GPU_indx if GPU is avaliable
GPU_indx = 0
device = torch.device(GPU_indx if torch.cuda.is_available() else 'cpu')

In [None]:
def state(game):
    head = game.snake[0]
    west = Point(head.x - 20, head.y)
    east = Point(head.x + 20, head.y)
    north = Point(head.x, head.y - 20)
    south = Point(head.x, head.y + 20)

    left = game.direction == Direction.LEFT
    right = game.direction == Direction.RIGHT
    up = game.direction == Direction.UP
    down = game.direction == Direction.DOWN
    #print(game.snake)
    #print(left, right, up, down)
    
    state = [
        # Detect collision when moving straight
        (right and game.is_collision(east)) or 
        (left and game.is_collision(west)) or 
        (up and game.is_collision(north)) or 
        (down and game.is_collision(south)),

        # Detect collision when moving right
        (up and game.is_collision(east)) or 
        (down and game.is_collision(west)) or 
        (left and game.is_collision(north)) or 
        (right and game.is_collision(south)),

        # Detect collision when moving left
        (down and game.is_collision(east)) or 
        (up and game.is_collision(west)) or 
        (right and game.is_collision(north)) or 
        (left and game.is_collision(south)),

        # Move direction
        left, right, up, down,

        # Food location 
        game.food.x < game.head.x,  # food left
        game.food.x > game.head.x,  # food right
        game.food.y < game.head.y,  # food up
        game.food.y > game.head.y  # food down
        ]
    #print(state)
    return np.array(state, dtype=int)

In [None]:
class ReplayBuffer(object):
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity) # popleft()

    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done)) # popleft if MAX_MEMORY is reached

    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.memory, batch_size))
        return state, action, reward, next_state, done
    
    def __len__(self):
        return len(self.memory)
    

In [None]:
def action_array(action, num_actions):
    if (action < num_actions):
        move = np.zeros(num_actions).astype(np.int)
        move[action] = 1
        #move = move.tolist()
        return move.tolist()
    else:
        print("exceed number of actions\n")
        return

In [None]:
def Loss(state, action, reward, next_state, done):
    state = torch.tensor(state, dtype=torch.float)
    next_state = torch.tensor(next_state, dtype=torch.float)
    action = torch.tensor(action, dtype=torch.long)
    reward = torch.tensor(reward, dtype=torch.float)
    #done = torch.FloatTensor(done)
    # (n, x)
    
    if len(state.shape) == 1:
        # (1, x)
        state = torch.unsqueeze(state, 0)
        next_state = torch.unsqueeze(next_state, 0)
        action = torch.unsqueeze(action, 0)
        reward = torch.unsqueeze(reward, 0)
        done = (done, )

    # 1: predicted Q values with current state
    Q_i = model(state.to(device))

    target_Q = Q_i.clone()
    for idx in range(len(done)):
        Q_new = reward[idx]
        if not done[idx]:
            Q_new = reward[idx].to(device) + gamma.to(device) * torch.max(model(next_state[idx].to(device)))

        target_Q[idx][torch.argmax(action[idx]).item()] = Q_new

    # 2: Q_new = r + y * max(next_predicted Q value) -> only do this if not done
    # pred.clone()
    # preds[argmax(action)] = Q_new
    optimizer.zero_grad()
    loss = criterion(Q_i, target) #!!
    loss.backward()

    optimizer.step()

    return loss

In [None]:
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = self.linear2(x)
        return x

In [None]:
epsilon_start = 1
epsilon_final = 0.0001
epsilon_decay = 10000

epsilon_by_frame = lambda frame_idx: epsilon_final + (epsilon_start - epsilon_final) * math.exp(-1. * frame_idx / epsilon_decay)


In [None]:
channels_input = 3
num_actions = 3
model = CnnDQN(channels_input, num_actions).to(device) # (180, 120, 3) , (6)

lr = 5e-5
    
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()

replay_initial = 500
replay_buffer = ReplayBuffer(6000)

# Frame:
start = 1
total_frames = 50000
batch_size = 32
gamma = 0.99


# start:
game = SnakeGameAI()

In [None]:
def train():
    plot_scores = []
    plot_mean_scores = []
    total_score = 0
    record = 0
    agent = Agent()
    #game = SnakeGameAI()
    
    move = np.zeros(3).astype(np.int)
    move = move.tolist()
    game.play_step(move)
    state_i = state(game)
    while True:
        model.train()
        # get old state (done)
        
        # get move 
        #final_move = agent.get_action(state_i)
        
        #epsilon = epsilon_by_frame(episode) # No. 1
        if random.random() > epsilon:
            x = torch.tensor(state_i, dtype=torch.float)
            q_value = model(x.to(device))
            action  = q_value.max(1)[1].data[0]
            #state = state.squeeze(0)
        else:
            action = random.randrange(num_actions)

        move = action_array(action)

        # perform move and get new state
        reward, done, score = game.play_step(final_move) # No.2
        state_i_new = state(game)

        # train short memory
        loss = Loss(state_i, move, reward, state_i_new, done)

        # push
        replay_buffer.push(state_i, final_move, reward, state_i_new, done)
        state_i = state_i_new
        #episode_reward += reward

        if done:
            # train long memory, plot result
            game.reset()
            episode += 1
            
            if len(replay_buffer) > batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
            else:
                states, actions, rewards, next_states, dones = replay_buffer

            loss = Loss(states, actions, rewards, next_states, dones)

            if score > record:
                record = score
                save(model)

            print('Game', agent.n_games, 'Score', score, 'Record:', record)

            plot_scores.append(score)
            total_score += score
            mean_score = total_score / agent.n_games
            plot_mean_scores.append(mean_score)
            plot(plot_scores, plot_mean_scores)
