In [1237]:
import numpy as np
import scipy as sp
from scipy.special import softmax

In [1238]:
pi = np.array([1,1,0])
P = np.zeros((3,2,3))
R = np.zeros((3,2))


a_0 = 0
P[:,a_0,:] = np.array([[0.50, 0.25, 0.25],
                   [1.00, 0.00, 0.00],
                   [0.50, 0.50, 0.00]])
R[:,a_0] = np.array([0.00, 1.00, 5.00])


a_1 = 1
P[:,a_1,:] = np.array([[0.00, 0.50, 0.50],
                   [0.90, 0.10, 0.00],
                   [0.00, 0.25, 0.75]])
R[:,a_1] = np.array([3.00, 6.00, 9.00])


In [1239]:
class MDP():
    def __init__(self, P, R, gamma = 0.5, eps = 0.01):
        self.A = np.unique(pi)
        self.P = P
        self._R = R
        
        self.N = P.shape[0]
        self.num_a = A.shape[0]
        
        self.V_0 = np.zeros_like(self.N)
        self.gamma = gamma
        
        self.eps = eps
        
        
    def R(self, s, a = None):
        """ R(s,a) - Reward function
        
        Arguments
        ---------
        s : int
        a : int
        
        Returns
        -------
        E_R_s : np.array[]
        E_R_s_a : np.array[]
        """
        # E[R|s] = sum(a): p(a|s) E[R|s,a]
        if a == None:
            E_R_s = (self._R * self.pi[:,np.newaxis]).sum(1)[s]
            return E_R_s
        
        # E[R|s,a]
        else:
            E_R_s_a = self._R[s,a]
            return E_R_s_a
        
        
    def V_next(self, V):
        """ V(s) - Value-State, k-step
        
        Arguments
        ---------
        V : np.array[N]
        
        Returns
        -------
        V : np.array[N]
        """
        # V_k+1(s_i) = R(s,pi(s_i)) + sum(j): P(s_j | s_i, pi(s_i)) * V_k(s_i),  for i = 1,...,N
        V_next = np.stack([self.R(s, self.pi[s]) \
                         + self.gamma * (self.P[s, self.pi[s],:] * V).sum()
                           for s in range(self.N)])
        return V_next
    
    
    def Q_next(self, V, a):
        """ Q(s,a) - Action-State-Value, 1-step
        
        Arguments
        ---------
        a : int
        
        Returns
        -------
        Q_next : np.array[N]
        """
        Q_next = np.stack([self.R(s, a) \
                         + self.gamma * (self.P[s,a,:] * V).sum()
                           for s in range(self.N)])
        return Q_next

    
    
    def V_opt_next(self, V):
        """ V*(s) - Bellman optimization equation, k-step
        
        Arguments
        ---------
        V : np.array[N]
        
        Returns
        -------
        V_opt_next : np.array[N]
        pi_opt_next : np.array[N]
        """
        Q_next_a = np.stack([self.Q_next(V, a) for a in self.A],axis = 0)
        V_opt_next = Q_next_a.max(axis = 0)
        a_opt_next = Q_next_a.argmax(axis = 0)
        pi_opt_next = self.A[a_opt_next]
        return V_opt_next, pi_opt_next
    
    
    
    def BV(self):
        """ BV(s) - Bellmann backup operator
        
        Arguments
        ---------
        -
        
        Return
        ------
        BV : np.array[N]
        a_max: np.array[N]
        """
        Q_a = np.stack([self.Q(a) for a in self.A], axis = 0)
        BV = Q_a.max(axis = 0)
        a_max = Q_a.argmax(axis = 0)
        return BV, a_max
        
    
    def V(self):
        """ V(s) - State-Value
        Arguments
        ---------
        -
        
        Returns
        -------
        V : np.array[N]
        """
        k = 0
        while True:
            if k == 0:
                V = self.V_0
            if k >= 1:
                V_prev = V
                V = self.V_next(V)
                if np.linalg.norm(V - V_prev) < self.eps:
                    break
            k+=1
        return V        
        
        
    def Q(self, a):
        """ Q(s,a) - Action-State-Value
        
        Arguments
        ---------
        a : int
        
        Returns
        -------
        Q : np.array[N]
        """
        Q = np.stack([self.R(s,a) \
                    + self.gamma * (self.P[s,a,:] * self.V()).sum() 
                      for s in range(self.N)])
        return Q
    
    
    def policy_iteration(self):
        """Determine optimal policy with policy iteration
        
        Arguments
        ---------
        -
        Returns
        -------
        -
        """
        self.pi = np.random.randint(0, self.num_a, size = (self.N,))
        while True:
            pi_prev = self.pi
            Q_max, a_max = self.BV()
            self.pi = self.A[a_max]
            if np.array_equal(pi_prev, self.pi):
                break
    
    
    def value_iteration(self):
        """Determine optimal policy with value_iteration
         
        Arguments
        ---------
        -
        Returns
        -------
        -
        """
        k = 0
        while True:
            if k == 0:
                V = self.V_0
            if k >= 1:
                V_prev = V
                V, self.pi = self.V_opt_next(V)
                
                if np.linalg.norm(V - V_prev) < self.eps:
                        break
            k+=1
            
            
    def sample_sequence(self, T):
        """ Sample sequence of given length and return state, action, reward
        
        Arguments
        ---------
        T : int
        
        Returns
        -------
        sequence : np.array[T]
        actions : np.array[T-1]
        rewards : np.array[T-1]
        """
        states = list()
        actions = list()
        rewards = list()
        
        for t in range(T):
            if t == 0:
                s = np.random.randint(0, self.N)
            if t >= 1:
                a = self.pi[s]
                s = np.random.choice(self.N, p = self.P[s,a])
                r = self.R(s,a)
                
                actions.append(a)
                rewards.append(r)
            states.append(s)
            
        return np.stack(states), np.stack(actions), np.stack(rewards)