# TD Learning: Sarsa e Q-learning

#### Prof. Armando Alves Neto - Introdução ao Aprendizado por Reforço - PPGEE/UFMG

<img src="cave.png" width="400">

In [1]:
import numpy as np
import gym
from functools import partial
import class_maze as cm
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (10,5)
from IPython.display import clear_output
import seaborn as sns
sns.set()

Cria uma classe (```RunningAverage()```) apenas para calcular a média móvel do sinal de reforço.

In [2]:
##########################################
class RunningAverage(object):
    def __init__(self, N):
        self.N = N
        self.vals = []
        self.num_filled = 0

    def push(self, val):
        if self.num_filled == self.N:
            self.vals.pop(0)
            self.vals.append(val)
        else:
            self.vals.append(val)
            self.num_filled += 1

    def get(self):
        return float(sum(self.vals)) / self.num_filled

In [3]:
class TDlearning(object):
    def __init__(self, parameters):

        self.parameters = parameters

        # metodo
        self.method = parameters['method']

        # numero de episodios
        self.episode = 0

        # cria o ambiente 
        self.env = cm.Maze(xlim=parameters['xlim'], ylim=parameters['ylim'], res=parameters['resolution'], image=parameters['maze'])

        # tamanho dos espacos de estados e acoes
        self.num_states = np.prod(np.array(self.env.num_states))
        self.num_actions = self.env.action_space.n

        # parametros de aprendizado
        self.gamma = parameters['gamma']
        self.eps = parameters['eps']
        self.alpha = parameters['alpha']

        # log file
        self.logfile = parameters['q-file']

        # reseta a politica
        self.reset_policy()

    ##########################################
    # reseta a funcao acao-valor
    def reset_policy(self):
        # Q(s,a)
        self.Q = np.zeros((self.num_states, self.num_actions))

        if self.parameters['load_Q']:
            try:
                with open(self.logfile, 'rb') as f:
                    data = np.load(f)
                    self.Q = data['Q']
                    self.episode = data['episodes']
            except: None	

    ##########################################
    # retorna a politica corrente
    def curr_policy(self, copy=False):
        if copy:
            return partial(self.TabularEpsilonGreedyPolicy, np.copy(self.Q))
        else:
            return partial(self.TabularEpsilonGreedyPolicy, self.Q)
        
    ########################################
    # salva tabela Q(s,a)
    def save(self):
        with open(self.logfile, 'wb') as f:
            np.savez(f, Q=self.Q, episodes=self.episode)

    ##########################################
    def __del__(self):
        self.env.close()

Probabilidade de escolha de uma ação $a$ baseada na política $\varepsilon$-soft:
$$
\pi(a|S_t) \gets 
                        \begin{cases}
                            1 - \varepsilon + \varepsilon/|\mathcal{A}|,  & \text{se}~ a = A^*,\\
                            \varepsilon/|\mathcal{A}|, & \text{caso contrário.}
                        \end{cases}
$$

In [4]:
class TDlearning(TDlearning):
    ##########################################
    # escolha da açao (epsilon-soft)
    def TabularEpsilonGreedyPolicy(self, Q, state):

        # acao otima corrente
        Aast = Q[state, :].argmax()

        # numero total de acoes
        nactions = Q.shape[1]
    
        # probabilidades de escolher as acoes
        p1 = 1.0 - self.eps + self.eps/nactions
        p2 = self.eps/nactions
        prob = [p1 if a == Aast else p2 for a in range(nactions)]
        
        return np.random.choice(nactions, p=np.array(prob))

In [5]:
class TDlearning(TDlearning):
    ##########################################
    # simula um episodio até o fim seguindo a politica corente
    def rollout(self, max_iter=500, render=False):

        # inicia o ambiente (começa aleatoriamente)
        S = self.env.reset()

        # lista de rewards
        rewards = []

        for _ in range(max_iter):
            # \pi(s)
            A = self.policy(S)

            # passo de interacao com o ambiente
            [Sl, R, done, info] = self.env.step(A)

            # update
            if self.method == 'Q-learning':
                self.Q[S, A] = self.Q[S, A] + self.alpha*(R + self.gamma*self.Q[Sl, :].max() - self.Q[S, A])
            elif self.method == 'Sarsa':

                Al = self.policy(Sl)
                self.Q[S, A] = self.Q[S, A] + self.alpha*(R + self.gamma*self.Q[Sl, Al] - self.Q[S, A])
                A = Al

            # Salva rewards
            rewards.append(R)

            # renderiza o ambiente
            if render:
                plt.figure(100, figsize=(7,7))
                clear_output(wait=True)
                plt.clf()
                self.env.render(self.Q)

            # chegou a um estado terminal?
            if done: break

            # proximo estado
            S = Sl

        return rewards

    ##########################################
    def runEpisode(self):

        # novo episodio
        self.episode += 1

        # pega a politica corrente (on-policy)
        self.policy = self.curr_policy()

        # gera um episodio seguindo a politica corrente
        rewards = self.rollout(render=(self.episode%20 == 0))

        if self.parameters['save_Q']:
            self.save()

        return np.sum(np.array(rewards))

Código principal:

In [6]:
##########################################
# main
##########################################
if __name__ == '__main__':
    
    plt.ion()
    
    # cria objeto para calculo da média movel do reward
    avg_calc = RunningAverage(500)

    # parametros
    parameters = {
                'xlim'      : np.array([0.0, 10.0]),
                'ylim'      : np.array([0.0, 10.0]),
                'resolution': 0.4,
                'episodes'  : 5000,
                'gamma'     : 0.99,
                'eps'       : 1.0e-1,
                'alpha'     : 0.9,
                'save_Q'    : True,
                'load_Q'    : True,
                'q-file'    : 'q-table.npy',
                'maze'      : 'cave.png',
                'method'    : 'Sarsa' #'Sarsa' ou 'Q-learning'
            }

    # Q-learning algorithm
    mc = TDlearning(parameters)

    rewards = []

    while mc.episode <= parameters['episodes']:
        # roda um episodio
        total_reward = mc.runEpisode()

        # reward medio
        avg_calc.push(total_reward)
        rewards.append(avg_calc.get())
        #print("Iteracao %d," % mc.episode, "Reforço: %.2f" % total_reward)	

        plt.figure(1)
        clear_output(wait=True)
        plt.clf()
        plt.plot(rewards)
        plt.title('Reforço por episódios')
        plt.xlabel('Episódios')
        plt.ylabel('Reforço')

        plt.show()
        plt.pause(.1)

    plt.ioff()

KeyboardInterrupt: 