Importiamo tutte le librerie che ci interessano

In [None]:
import gym
import random
import numpy as np
from keras.layers import Input, Conv2D, Dense, MaxPooling2D , Flatten
from keras.models import Model, Sequential, load_model
from collections import deque
from keras.optimizers import Adam
import keras
import matplotlib.pyplot as plt
from random import sample 
import chart_studio.plotly as py
from plotly.graph_objs import *

Creo l'ambiente SpaceInvaders-vo

In [None]:
env = gym.make('SpaceInvaders-v0')

Creiamo la nostra rete neurale

In [None]:
class DQN:
    def __init__(self, input_shape, output_shape, discount=0.99, update_target_every=10, memory_size=2000):
        self.input_shape=input_shape
        self.output_shape=output_shape
        self.discount=discount
        self.update_target_every=update_target_every
        self.policy_net=self.create_model()
        self.memory=deque(maxlen=memory_size)
        self.target_counter=0 
    
    def create_model(self):
        model=Sequential()
        model.add(Conv2D(input_shape=self.input_shape, filters=16, kernel_size=(8,8), strides=(4,4), padding="valid", 
                        activation="relu", use_bias=True,))
        model.add(Conv2D(filters=16, kernel_size=(4,4), strides=(2,2), padding="valid", 
                       activation="relu", use_bias=True,))
        model.add(Flatten())
        model.add(Dense(512, activation="relu"))
        model.add(Dense(self.output_shape)) 
        adm=Adam(lr=0.001, beta_1=0.9, beta_2=0.999, amsgrad=False)
        model.compile(loss="mse", optimizer=adm, metrics=["accuracy"] )
        return model        

Definiamo tre metodi per preprocessare la nostra immagine croppandola e portandola ad una scala di grigi

In [None]:
def to_greyscale(img):
        return np.mean(img , axis=2).astype(np.uint8)

In [None]:
def downsample(img):
        return img[::2 , ::2]

In [None]:
def crop(img):
    return img[:100 ,:]

In [None]:
def preprocess(img):
        return crop(to_greyscale(downsample(img)))/255

In [None]:
img = env.reset()
plt.imshow(preprocess(img))
print(preprocess(img).shape)

In [None]:
plt.imshow(to_greyscale(downsample(img)))

In [None]:
class Experience_Replay:
    def __init__(self , memory = [] , maxsize = 10000):
            self.iteration = 0
            self.memory = memory
            self.maxsize = maxsize
            self.experience_gain = []
            self.rmse = []
            
    def getSample(self, size_sample=32):
        choises =np.random.randint(len(experience.memory), size=size_sample )
        arr= np.array(experience.memory)
        return arr[choises , :]

In [None]:
def iteration(env  , model  , experience):
    env.reset()
    epsilon =  (0.995)**(experience.iteration)
    o,r,d,i = env.step(0)
    env.render()
    processed_state = preprocess(o)
    processed_state = processed_state.reshape((1,100,80,1))
    experience.memory = []
    exp_gain = 0
    for i in range(experience.maxsize):
        if  random.random() < epsilon:
            ac = env.action_space.sample()
        else:
            ac = np.argmax(model.policy_net.predict(processed_state))
        next_state,reward,done,info = env.step(ac)
        exp_gain= exp_gain+ reward
        if done:
            env.reset()
        env.render()
        next_processed_state = preprocess(next_state)
        next_processed_state = next_processed_state.reshape((1,100,80,1))
        experience.memory.append((processed_state,ac  , next_processed_state , reward, done))
        processed_state=next_processed_state
    experience.iteration=experience.iteration+1 
    experience.experience_gain.append(exp_gain)

In [None]:
def preprocess_experience_replay(experience_replay):
    #experience_replay= experience_replay[44:]
    length = len(experience_replay)-1
    new_experience_replay = []
    i=0
    while i < length:
        x1=(experience_replay[i])
        x2=(experience_replay[i+1])
        x3 = [np.maximum(x1[0], x2[0]) , x2[1] ,np.maximum(x1[2], x2[2]) , x1[3]+x2[3],x2[4]]
        new_experience_replay.append(x3)
        i=i+4
    return new_experience_replay 

In [None]:
def train ( experience , model,gamma = 0.99 , train_iteration=20):
    experience.memory = preprocess_experience_replay(experience.memory)
    for i in range(train_iteration):
        batch = experience.getSample()
        for state , action , next_state , reward ,done in batch:
            target =model.policy_net.predict(state)[0]
            if not done:
                target[action]= reward + gamma* np.max(model.policy_net.predict(next_state)[0])
            else:
                target[action]= 0
            target =target.reshape(1,6)
            model.policy_net.fit(state , target , verbose=0)
            prediction = model.policy_net.predict(state)
            experience.rmse.append(np.sqrt(np.mean((prediction-np.array(target))**2)))

In [None]:
experience = Experience_Replay(maxsize=3000)
dqn = DQN((100,80,1),6)

In [None]:
dqn.policy_net.summary()

In [None]:
for i in range(20):
    iteration(env,dqn,experience)
    train(experience , dqn)
    env.close()
    print(f"iterazione: {experience.iteration}\t gain:{experience.experience_gain[-1]}\t esplorazione: {(0.995)**(experience.iteration)}\t batch :{len(experience.memory)}")

In [None]:
def play_network(env , model):
    env.reset()
    o,r,d,i = env.step(0)
    env.render()
    processed_state = preprocess(o)
    processed_state = processed_state.reshape((1,100,80,1))
    for i in range(10000):
        ac = np.argmax(model.policy_net.predict(processed_state))
        next_state,reward,done,info = env.step(ac)
        if done:
            break
        env.render() 

In [None]:
play_network(env , dqn)

In [None]:
env.close()

In [None]:
#salviamo la rete
dqn.policy_net.save("modello_prova.h5")

In [None]:
def custom_scatterplot (title, y, x_title, y_title, x_upperBound, y_upperBound) : 
    '''
        Print a scatter plot with:
            x-axis: sequential integer 
            y-axis: list y's values
        
        Args:
            title (str): plot's title
            y (list): n-value of n-iterations 
            x_title (str): x-axis's title
            y_title (str): y-axis's title
            x_upperBound (int): x-axis's upper bound
            y_upperBound (int): y-axis's upper bound 
        
        Code to install modules required:
            pip install plotly
            pip install "ipywidgets>=7.2"
            pip install chart-studio
        
        Code to import modules required:
            import chart_studio.plotly as py
            from plotly.graph_objs import *
    '''
      
    py.sign_in('mickPar', 'vc7gPeON5gTXA6gmfrO7')
    x_value = []
    y_value = []
    for index, rmse in enumerate(y) :
        x_value.append(index)
        y_value.append(rmse)

    trace1 = {
      "uid": "5eacaf", 
      "name": "RMSE (Root Mean Square Error)", 
      "type": "scatter", 
      "x": x_value, 
      "y": y_value
    }
    
    data = Data([trace1])
    layout = {
          "title": title, 
      "width": 1050, 
      "xaxis": {
        "type": "linear", 
        "range": [0, x_upperBound], 
        "title": x_title, 
        "autorange": False
      }, 
      "yaxis": {
        "type": "linear", 
        "range": [0, y_upperBound], 
        "title": y_title, 
        "autorange": False
      }, 
      "height": 793, 
      "autosize": True, 
      "annotations": [
        {
          "x": x_upperBound - x_upperBound/3, 
          "y": y_upperBound, 
          "font": {"size": 16}, 
          "text": "α (Learning rate) = 0.001, epsilon: 0.995**(#episodes)", 
          "showarrow": False
        }
      ]
    }
    fig = Figure(data=data, layout=layout)
    # per aprire il grafico nel browser (per condividere, creare dashboard...)
    # plot_url = py.plot(fig)
    fig.show()

In [None]:
x_upperBound = len(experience.rmse)
y_upperBound = max(experience.rmse)

custom_scatterplot("RMSE vs #ITERATIONS", experience.rmse, '# episodes', 'Root mean square error', x_upperBound, y_upperBound)

In [None]:
epsilon = []
nrEpisodes = 500
for i in range (nrEpisodes): 
    epsilon.append(0.995**i)
custom_scatterplot('ϵ decay', epsilon, '# episodes', 'epsilon', 1)

In [None]:
experience.experience_gain
x_upperBound = len(experience.experience_gain)
y_upperBound = max(experience.experience_gain)

custom_scatterplot('Return vs Experience', experience.experience_gain, 'n-experience', 'Return',  x_upperBound, y_upperBound)