In [1]:
from platform import python_version
print(python_version())

3.8.5


# Deep Reinforcement Learning
The model learns from experience instead of labels.

In [2]:
!pip install -q -U watermark

In [3]:
# Imports
import json 
import time
import numpy as np 
import matplotlib
import matplotlib.pyplot as plt 
from PIL import Image
from IPython import display 
import seaborn
import tensorflow
import keras
from keras.models import model_from_json
from keras.models import Sequential
from keras.layers.core import Dense
from keras.optimizers import SGD


%matplotlib inline 
seaborn.set()

In [4]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Data Science Academy" --iversions

Author: Data Science Academy

seaborn   : 0.11.1
json      : 2.0.9
keras     : 2.4.3
IPython   : 7.21.0
matplotlib: 3.3.4
tensorflow: 2.4.1
autopep8  : 1.5.6
PIL       : 8.1.2
numpy     : 1.19.5



## Preparando o game

No jogo, frutas, representadas por azulejos brancos, caem do topo. O objetivo é pegar os frutos com um basket (representado por azulejos brancos). Se você pegar uma fruta, você obtém um ponto (sua pontuação sobe por um), se você perder uma fruta, perdeu um (sua pontuação diminui).


In [5]:
class Catch(object):
    def __init__(self, grid_size = 10):
        self.grid_size = grid_size
        self.reset()
    
    def _update_state(self, action):
        state = self.state
        if action == 0: #left
            action = -1
        elif action == 1: #stay
            action = 0
        else:
            action == 1 #right
        
        f0, f1, basket = state[0]
        new_basket = min(max(1, basket + action), self.grid_size-1)
        f0 += 1
        out = np.asarray([f0, f1, new_basket])
        out = out[np.newaxis]
        
        assert len(out.shape) == 2
        self.state = out 
        
    def _draw_state(self):
        im_size = (self.grid_size,)*2
        state = self.state[0]
        canvas = np.zeros(im_size)
        canvas[state[0], state[1]] = 1 # desenha fruta
        #canvas[-1, state[2]-1:state[2] + 2] = 1  # desenha basket
        return canvas
    
    def _get_reward(self):
        fruit_row, fruit_col, basket = self.state[0]
        if fruit_row == self.grid_size-1:
            if abs(fruit_col - basket) <= 1:
                return 1
            else:
                return -1
        else:
            return 0
        
    def _is_over(self):
        if self.state[0,0] == self.grid_size - 1:
            return True
        else:
            return False
        
    def observe(self):
        canvas = self._draw_state()
        return canvas.reshape((1,-1))
    
    def act(self):
        self._update_state(action)
        reward = self._get_reward()
        game_over = self._is_over()
        return self.observe(), reward, game_over
    
    def reset(self):
        n = np.random.randint(0, self.grid_size-1, size=1)
        m = np.random.randint(1, self.grid_size-2, size=1)
        self.state = np.asarray([0,n,m])[np.newaxis]

In [6]:
# Setando variavies do ambiente

# o ultimo time frame faz o controle do quadro que estamos atualmente
last_frame_time = 0 

# Traduz as acoes para o vocabulario humano 
translate_action = ["Left", "Stay", "Right", "Create Ball", "End Test"]

# Tamanho do campo de jogo
grid_size = 10

In [7]:
# Definindo funcoes auxiliares para o ambiente
def display_screen(action, points, input_t):
    global last_frame_time
    print("Action %s, Points: %s" % (translate_action[action], points))
    
    # Somente ira mostrar a tela do jogo se nao for game over
    if("End" not in translate_action[action]):
        plt.imshow(input_t.reshape((grid_size,)*2), interpolation='none', cmap='gray')
        display.clear_output(wait=True)
        display.display(plt.gcf())
    last_frame_time = set_max_fps(last_frame_time)
    
def set_max_fps(last_frame_time, FPS=1):
    current_milli_time = lambda: int(round(time.time()*1000))
    sleep_time = 1./FPS - (current_milli_time() - last_frame_time)
    if sleep_time > 0:
        time.sleep(sleep_time)
    return current_milli_time()

## Treinando o agente

In [None]:
class ExperienceReplay(object):
    def __init__(self, max_memory=100, discount=0.9):
        self.max_memory = max_memory
        self.memory = list()
        self.discount = discount
        
    def remember(self, states, game_over):
        # Salvando o estado na memoria
        self.memory.append([states, game_over])
        
        # Para nao lotar a memoria
        if len(self.memory) > self.max_memory:
            del self.memory[0]
            
    def get_batch(self, model, batch_size=10):
        # Quantas experiencias temos
        len_memory = len(self.memory)
        
        # Calcule o numero de acoes que podem ser tomadas no jogo
        num_actions = model.output_shape[-1]
        
        # Dimensoes do campode jogo
        env_dim = self.memory[0][0][0].shape[1]
        
        # Queremos retornar um vetor de entrada e destino com entradas de um estado observado
        inputs = np.zeros((min(len_memory, batch_size), env_dim))
        
        # ... e target r + gamma * max Q(s', a')
        # Observe que nosso alvo é uma matriz, com possíveis campos não só para a ação realizada, mas também 
        # para as outras ações possíveis. As ações não tomam o mesmo valor que a previsão de não afetá-las.
        targets = np.zeros((inputs.shape[0], num_actions))
        
        # Nos desenhamos estados para aprender aleatoriamente
        for i, idx, in enumerate(np.random.randint(0, len_memory, size=inputs.shape[0])):
            
            state_t, action_t, reward_t, state_tp1 = self.memory[idx][0]
            
          # Também precisamos saber se o jogo terminou nesse estado
            game_over = self.memory[idx][1]
            
            # Adicione o estado s à entrada
            inputs[i:i+1] = state_t
            
            # Primeiro, preenchemos os valores-alvo com as previsões do modelo. 
            # Eles não serão afetados pelo treinamento (uma vez que a perda de treinamento para eles é 0)
            targets[i] = model.predict(state_t)[0]
            
            """
            Se o jogo acabou, a recompensa esperada Q (s, a) deve ser a recompensa final r.
            Ou então o target value é r + gamma * max Q(s’,a’)
            """
            
            Q_sa = np.max(model.predict(state-tp1)[0])
            
          # Se o jogo acabou, a recompensa é a recompensa final.
            if game_over:
                targets[i, action_t] = reward_t
            # r + gamma * max Q(s’,a’)
            else:
                targets[i, action_t] = reward_t + self.discount * Q_sa
        return inputs, targets