In [23]:
import numpy as np
import os
import enviroment_no_visual as enviroment_no_visual
import enviroment as enviroment_visual
import tensorflow as tf
import keras

Implementazione di un buffer circolare che permetta inserimento/cancellazione degli elementi e accesso random veloce. 

In [24]:
class ReplayBuffer:
    def __init__(self, max_size):
        self.buffer = np.empty(max_size, dtype=object)
        self.max_size = max_size
        self.index = 0
        self.size = 0

    def append(self, obj):
        self.buffer[self.index] = obj
        self.size = min(self.size + 1, self.max_size)
        self.index = (self.index + 1) % self.max_size

    def sample(self, batch_size):
        indices = np.random.randint(self.size, size=batch_size)
        return self.buffer[indices]
    
    def sample_experiences(self, batch_size):
        batch = self.sample(batch_size)
        states, actions, rewards, next_states, game_over = [
            np.array([experience[field_index] for experience in batch])
                        for field_index in range(5)]
        return states, actions, rewards, next_states, game_over

In [25]:
def Linear_QNet(input_shape, hidden_size, output_size):
    model = keras.models.Sequential([
        keras.layers.Input(shape=input_shape),
        keras.layers.Dense(hidden_size, activation='relu'),
        keras.layers.Dense(output_size)
    ])
    return model

class QTrainer:

    def __init__(self, model, lr=1e-4, gamma=0.9):
        self.model = model
        self.gamma = gamma
        self.optimizer = keras.optimizers.Adam(learning_rate=lr)
        self.loss_object = keras.losses.MeanSquaredError()

    def train_step(self, states, actions, rewards, next_states, dones):
        future_rewards = tf.reduce_max(self.model(next_states), axis=1)
        # Equazione di Bellman: Q value = reward + discount factor * expected future reward
        updated_q_values = rewards + tf.math.multiply(self.gamma, future_rewards)
        updated_q_values = tf.math.multiply(updated_q_values, (1 - dones))

        masks = actions
        with tf.GradientTape() as tape:
            # train the model on the states and updated Q-values
            q_values = self.model(states)  # similar to action_probs
            # apply the masks to the Q-values to get the Q-value for the action taken
            q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
            # calculate loss between new Q-value and old Q-value
            loss = self.loss_object(updated_q_values, q_action)
        # Backpropagation
        grads = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(
            zip(grads, self.model.trainable_variables))

In [26]:
def as_tensors(states, actions, rewards, next_states, dones):
    """Converts arrays to tensors"""
    state = tf.constant(states, dtype=tf.float32)
    action = tf.constant(actions, dtype=tf.float32)
    reward = tf.constant(rewards, dtype=tf.float32)
    next_state = tf.constant(next_states, dtype=tf.float32)
    done = tf.constant(dones, dtype=tf.float32)

    return state, action, reward, next_state, done

def add_dimension(states, actions, rewards, next_states, dones):
    """Expands with a dimension. To use when wanting (1,x) but having (1,)"""
    state = tf.expand_dims(states, axis=0)
    action = tf.expand_dims(actions, axis=0)
    reward = tf.expand_dims(rewards, axis=0)
    next_state = tf.expand_dims(next_states, axis=0)
    done = tf.expand_dims(dones, axis=0)

    return state, action, reward, next_state, done

In [27]:
class Agent:

    def __init__(self, max_memory, batch_size):
        self.n_games = 0
        self.epsilon = 1  
        self.lr = 0.9 
        self.gamma = 0.9    
        self.memory = ReplayBuffer(max_memory)
        self.batch_size = batch_size
        self.model = Linear_QNet([11], 256, 3)
        self.trainer = QTrainer(self.model, lr=self.lr, gamma=self.gamma)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def train_long_memory(self):
            states, actions, rewards, next_states, dones = self.memory.sample_experiences(self.batch_size)
            states, actions, rewards, next_states, dones = as_tensors(states, actions, rewards, next_states, dones)
            self.trainer.train_step(states, actions, rewards, next_states, dones)

    def train_short_memory(self, state, action, reward, next_state, done):
        state, action, reward, next_state, done = as_tensors(state, action, reward, next_state, done)
        state, action, reward, next_state, done = add_dimension(state, action, reward, next_state, done)
        self.trainer.train_step(state, action, reward, next_state, done)

    def epsilon_greedy_policy(self, state):
        self.epsilon = max( (100 - self.n_games)/100, 0.05)
        if np.random.rand() < self.epsilon:
            return np.random.randint(3)
        else:
            state0 = tf.constant([state], dtype=tf.float32)  # create a tensor
            prediction = self.model.predict(state0, verbose=0)
            move = tf.argmax(prediction[0]).numpy()
            return move
        
    def get_action(self, state):
        final_move = [0, 0, 0]  
        move = self.epsilon_greedy_policy(state)
        final_move[move] = 1
        return final_move

    def save_model(self, model_dir_path="./DQNmodel", file_name='model.keras'):
        if not os.path.exists(model_dir_path):
            print(f"La cartella non esiste. Sarà creata con nome: {model_dir_path}")
            os.mkdir(model_dir_path)
        file_name = os.path.join(model_dir_path, file_name)
        self.model.save(file_name)

In [28]:
def train(N_GAME):
    agent = Agent(max_memory=100_000, batch_size=1_000)
    env = enviroment_visual.SnakeGameAI()
    plot_scores = []
    plot_mean_scores = []
    total_score = 0
    record = 0

    while agent.n_games < N_GAME:
        state_old = env.get_state()
        final_move = agent.get_action(state_old)

        # perform move and get new state
        state_new, reward, done, score = env.play_step(final_move)

        agent.train_short_memory(state_old, final_move, reward, state_new, done)
        agent.remember(state_old, final_move, reward, state_new, done)

        if done:
            env.reset()
            agent.n_games += 1
            agent.train_long_memory()

            if score > record:
                record = score
                agent.save_model()

            print(f"\rGame: {agent.n_games} Score: {score} Record: {record}", end="")
            plot_scores.append(score)
            total_score += score
            mean_score = total_score / agent.n_games
            plot_mean_scores.append(mean_score)

In [None]:
train(100)

Testa l'Agente seguendo la policy ottimale su 100 partita e mostra le performance

In [None]:
env = enviroment_no_visual.SnakeGameAI()
MAX_N_GAMES = 100
cumulative_score = 0
max_score = 0

for n_game in range(MAX_N_GAMES):
    env.reset()
    game_over = False
    state= env.get_state()
    while not game_over:
        action = np.argmax(model.predict(state[np.newaxis], verbose=0)[0])
        final_move = [0,0,0]
        final_move[action] = 1
        state, reward, game_over, score = env.play_step(final_move)
    if score > max_score:
        max_score = score
    cumulative_score += score
    
print(f"Mean score: {cumulative_score/ MAX_N_GAMES}\nMax score: {max_score}", end="")

Mostra visualmente una partita dell'Agente

In [20]:
env_visual = enviroment_visual.SnakeGameAI()
game_over = False
state = env_visual.get_state()
while not game_over:
    action = np.argmax(model.predict(state[np.newaxis], verbose=0)[0])

    final_move = [0,0,0]
    final_move[action] = 1

    state, reward, game_over, score = env_visual.play_step(final_move)