In [1]:
#Codice dell'agente DQN
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from collections import deque
import numpy as np
import random
import os

class DQNAgent():
    def __init__(self, env, alpha= 0.001, epsilon=1, gamma=0.99, epsilon_reduction=0.05):
        #variabili dell'ambiente
        self.env = env
        self.state_size = self.env.observation_space.shape[0]
        self.action_size = self.env.action_space.n
        self.state_shape = self.env.observation_space.shape

        #replay memory e campione
        self.memory = deque(maxlen=20000)
        self.mem_sample = 32

        #learning rate
        self.alpha = alpha

        #exploration
        self.epsilon = epsilon
        self.epsilon_min = 0.01
        self.epsilon_reduction = epsilon_reduction

        #futuro 
        self.gamma = gamma

        #creazione delle due reti neurali, quella dell'addestramento e quella stabile
        self.train_model = self.create_model()
        self.target_model = self.create_model()
        self.target_model.set_weights(self.train_model.get_weights())
        self.C = 30

    def create_model(self):
        model = Sequential()
        model.add(Dense(64, activation='relu', input_shape=self.state_shape))
        model.add(Dense(128, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))

        model.compile(loss='mse', optimizer=Adam(lr=self.alpha))

        return model

    #salvataggio del cervellone
    def save_model(self, name="cervello_positronico"):
        self.train_model.save(name)
        print("___Model Saved___")

    #caricamento del cervellone
    def load_model(self, name="cervello_positronico"):
        self.target_model.load_weights(name)
        self.train_model.load_weights(name)
        print("___Model Loaded___")

    #scelta dell'azione in base ad epsilon
    def choose_action(self, state):
        self.epsilon = max(self.epsilon_min, self.epsilon)
        if np.random.rand(1) < self.epsilon:
            return self.env.action_space.sample()
        else:
            return np.argmax(self.train_model.predict(state)[0])

    #repaly memory
    def train_from_memory(self):
        if len(self.memory) < self.mem_sample:
            return

        minibatch = random.sample(self.memory, self.mem_sample)

        states = []
        new_states=[]

        for state, action, reward, new_state, done in minibatch:
            states.append(state)
            new_states.append(new_state)
        
        na = np.array(states)
        states = na.reshape(self.mem_sample, self.state_size)
        na2 = np.array(new_states)
        new_states = na2.reshape(self.mem_sample, self.state_size)

        targets = self.train_model.predict(states)
        new_state_targets=self.target_model.predict(new_states)

        i=0
        for state, action, reward, new_state, done in minibatch:
            target = targets[i]
            if done:
                target[action] = reward
            else:
                target[action] = reward + self.gamma * max(new_state_targets[i])
            i+=1
        
        self.train_model.fit(states, targets, epochs=1, verbose=0)

    #memorizzazione degli stati
    def memorize(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    #esecuzione di episodi per l'addestramento
    def fit(self, episode, state, render):
        total_reward = 0
        epochs = 0
        done = False
        while not done:
            if render and episode % 50 == 0:
                self.env.render()
            
            action = self.choose_action(state)
            new_state, reward, done, _ = self.env.step(action)
            new_state = new_state.reshape(1,self.state_size)

            self.memorize(state, action, reward, new_state, done)
            self.train_from_memory()

            total_reward += reward
            state = new_state
            epochs += 1

        if reward >= 20:
            print("Success in epsoide {}, used {} iterations!".format(episode,epochs))
            self.save_model()
        else:
            print("Failed in episode {}.".format(episode))

        if episode % self.C:
            self.target_model.set_weights(self.train_model.get_weights())

        print("--now epsilon is {:.3f}, the reward is {}.".format(max(self.epsilon, self.epsilon_min),total_reward))
        if episode > 300:
            self.epsilon -= self.epsilon_reduction

        return total_reward, epochs
    
    #chiamata di più episodi per l'addestramento
    def start_training(self, episodes=400, render=False):
        total_rewards=[]
        total_epochs=[]
        
        try:
            for episode in range(episodes):
                state = env.reset().reshape(1,self.state_size)
                total_reward, epoch = self.fit(episode, state, render)

                total_epochs.append(epoch+1)
                total_rewards.append(total_reward)
        finally:
            self.env.close()
            self.save_model()

        return total_rewards, total_epochs

    #test dell'agente
    def play(self, filename, trials=400, render=True):
        epochs = []
        successes = []
        total_rewards = []
        self.load_model(filename)
        try:
            for episode in range(trials):
                state = self.env.reset()
                state = np.reshape(state, (1, self.state_size))

                succ = 0
                i=0
                done=False
                total_reward = 0
                while not done:
                    if render:
                        self.env.render()
                    action = np.argmax(self.train_model.predict(state)[0])

                    next_state, reward, done, _ = self.env.step(action)
                    next_state = np.reshape(next_state, (1, self.state_size))
                    total_reward += reward
                    if total_reward >= 19:
                        succ = 1
                    state = next_state
                print("Completed/Episodes {}/{}, reward = {}".format(episode + 1, trials, total_reward))
                successes.append(succ)
                epochs.append(i+1)
                total_rewards.append(total_reward)
        finally:
            self.env.close()

        return epochs, successes, total_rewards

Using TensorFlow backend.


In [2]:
#Creazione dell'ambiente
import gym
env = gym.make('Pong-ram-v0')

In [4]:
#Addestramento dell'agente
pippo=DQNAgent(env)
pippo.load_model()
episodes = 10000
total_rewards, total_epochs = pippo.start_training(episodes,render=False)

___Model Loaded___
Failed in episode 0.
--now epsilon is 1.000, the reward is -21.0.
Failed in episode 1.
--now epsilon is 1.000, the reward is -21.0.
Failed in episode 2.
--now epsilon is 1.000, the reward is -21.0.
Failed in episode 3.
--now epsilon is 1.000, the reward is -20.0.
Failed in episode 4.
--now epsilon is 1.000, the reward is -21.0.
___Model Saved___


KeyboardInterrupt: 

In [7]:
# Ripresa dell'addestramento
pippo=DQNAgent(env, epsilon=0.01)
pippo.load_model()
episodes = 10000
total_rewards, total_epochs = pippo.start_training(episodes,render=False)

___Model Loaded___
Failed in episode 0.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 1.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 2.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 3.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 4.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 5.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 6.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 7.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 8.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 9.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 10.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 11.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 12.
--now epsilon is 0.010, the reward is -21.0.
Failed in episode 13.
--now epsilon is 0.010, the reward is -20.0.
Failed in episode 14.
--now epsilon is 0.010, the rew

KeyboardInterrupt: 

In [4]:
# Test

test=DQNAgent(env)
trials = 1
epochs, successes, total_rewards_test = test.play("cervello_positronico", trials, render=True)

___Model Loaded___
Completed/Episodes 1/1, reward = -21.0
