In [16]:
import numpy as np
import copy

In [2]:
k = 4   # number of actions
Actions = {0 : ('L', (0, -1)),
     1 : ('R', (0, 1)), 
     2 : ('U', (-1, 0)), 
     3 : ('D', (1, 0))}

def state2idx(s, n):
    return divmod(s, n)

def idx2state(idx, n):
    return idx[0]*n + idx[1]

def nextState(s, a, n):
    '''
        Args:
            s : current state
            a : action taken
        
        Return:
            s_next : next state
    '''
    s_next = -1

    s_idx = state2idx(s, n)
    delta = Actions[a][1]
    s_next_idx = (s_idx[0] + delta[0], s_idx[1] + delta[1])

    if 0 <= s_next_idx[0] < n and 0 <= s_next_idx[1] < n:
        s_next = idx2state(s_next_idx, n)
    
    return s_next

def createMDP(n, psucc, rlo=-3, rhi=3):
    '''
        Args:
            n : grid size
            psucc : probability of success of an action

        Return:
            P : probability transition matrix for each action
            R : reward mapping for each state
    '''

    P = np.zeros((k, n*n, n*n), dtype=np.float)

    # for each action
    for a in range(k):
        # for each state
        for s in range(n*n):
            s_next = nextState(s, a, n)
            if s_next != -1:
                P[a, s, s_next] = psucc
                P[a, s, s] = 1 - psucc
            else:
                P[a, s, s] = 1

    R = np.random.randint(rlo, rhi, size=(n*n))

    return (P, R)

In [3]:
(P, R) = createMDP(4 , 0.75)
print(P, R)

[[[1.   0.   0.   ... 0.   0.   0.  ]
  [0.75 0.25 0.   ... 0.   0.   0.  ]
  [0.   0.75 0.25 ... 0.   0.   0.  ]
  ...
  [0.   0.   0.   ... 0.25 0.   0.  ]
  [0.   0.   0.   ... 0.75 0.25 0.  ]
  [0.   0.   0.   ... 0.   0.75 0.25]]

 [[0.25 0.75 0.   ... 0.   0.   0.  ]
  [0.   0.25 0.75 ... 0.   0.   0.  ]
  [0.   0.   0.25 ... 0.   0.   0.  ]
  ...
  [0.   0.   0.   ... 0.25 0.75 0.  ]
  [0.   0.   0.   ... 0.   0.25 0.75]
  [0.   0.   0.   ... 0.   0.   1.  ]]

 [[1.   0.   0.   ... 0.   0.   0.  ]
  [0.   1.   0.   ... 0.   0.   0.  ]
  [0.   0.   1.   ... 0.   0.   0.  ]
  ...
  [0.   0.   0.   ... 0.25 0.   0.  ]
  [0.   0.   0.   ... 0.   0.25 0.  ]
  [0.   0.   0.   ... 0.   0.   0.25]]

 [[0.25 0.   0.   ... 0.   0.   0.  ]
  [0.   0.25 0.   ... 0.   0.   0.  ]
  [0.   0.   0.25 ... 0.   0.   0.  ]
  ...
  [0.   0.   0.   ... 1.   0.   0.  ]
  [0.   0.   0.   ... 0.   1.   0.  ]
  [0.   0.   0.   ... 0.   0.   1.  ]]] [-2 -2  0 -1  2  0 -2 -1  1  0 -3  2 -3  2  1  2]


In [4]:
def genRandSDP(num_states, num_actions):
    '''
        Generates a stationary deterministic policy
    '''
    return np.random.randint(num_actions, size=num_states)

In [5]:
print(genRandSDP(4*4, 4))

[1 1 2 3 3 1 0 2 3 1 0 2 3 3 1 2]


In [6]:
class Error(Exception):
    """Base class for exceptions in this module."""
    pass

def gaussElimination(A, b):
    '''
        A : numpy array of shape : (n * n)
        b : numpy array of shape : (n,)
        Solve equation Ax = b where A is matrix of size n * n and b is a vector of size n
        Return: a vector x which is the solution of this equation
    '''
    if A.ndim != 2 or A.shape[0] != A.shape[1]:
        raise Error('A must be a square matrix')
    
    elif b.ndim != 1 or A.shape[1] != b.shape[0]:
        raise Error('b must be a vector with appropriate size')

    # applying forward elimination
    A = copy.deepcopy(A).astype(np.float)
    b = copy.deepcopy(b).astype(np.float)

    n = b.shape[0]
    x = np.zeros(n)

    #### forward elimination ####

    # loop over all rows in A, except last
    for k in range(n-1):
        # loop over all rows below the diagonal position (k, k)
        for i in range(k+1, n):
            # compute multiplier for row i and column k
            m_ik = A[i, k] / A[k, k] 
            # update b[i]
            b[i] -= m_ik * b[k]
            # loop over all the columns to the right of the diagonal position (k, k)
            for j in range(k+1, n):
                # update the value
                A[i, j] -= m_ik * A[k, j]

    #### back substitution ####

    x[n-1] = b[n-1] / A[n-1, n-1]

    for i in range(n-2, -1, -1):
        s = 0
        for j in range(i+1, n):
            s += A[i, j] * x[j]

        x[i] = (b[i] - s) / A[i,i]

    return x

In [7]:
A = np.array([[3, 2, -1], [2, -2, 4] , [-1, 0.5, -1]])
b = np.array([1, -2, 0])
# Ax = b ==> x = [1, -2, -2]

print(gaussElimination(A, b))

[ 1. -2. -2.]


In [8]:
def valueViaInv(P, gamma, R):
    '''
        Args:
            P : probability transition matrix
            gamma : discount faactor
            R : reward map
        Return:
            Solution of equation => (I - gamma * P)V = R
            where V is the value function
    '''
    n = R.shape[0]
    I = np.eye(n)

    A = (I - gamma * P)
    b = R

    return gaussElimination(A, b)

In [9]:
def getProbTranMat(Pk, u):
    '''
        Args:
            Pk : Probability transition matrix for each action
            u : deterministic policy
        Return:
            P : Porbability transition matrix for the given policy
    '''
    num_states = u.shape[0]
    P = np.zeros((num_states, num_states))
    for s in range(num_states):
        a = u[s]
        P[s, :] = Pk[a, s, :]
    return P

In [10]:
def oneStep(V, R, P, gamma):
    '''
        Args:
            V : current value function
            R : reward mapping
            P : probability transition matrix
            gamma : dicount factor
        Return: Calculates the next update for the value function
    '''
    return (R + gamma * np.dot(P, V))

In [11]:
def valueEval(R, P, gamma, eps=1e-6):
    '''
        Args:
            R : reward mapping
            P : probability transition matrix
            gamma : discount factor
            eps : precision
        Return:
            V : performs the policy evaluation to compute the 
                value function for the given MDP
    '''

    V = np.zeros(R.shape[0], dtype=np.float)
    V_next = oneStep(V, R, P, gamma)

    while np.linalg.norm(V_next - V, np.inf) > eps:
        V = V_next
        V_next = oneStep(V, R, P, gamma)

    return V

In [12]:
n = 3
psucc = np.round(np.random.uniform(),2)
# generating a random policy
u = genRandSDP(n*n, k)
# generating an MDP
(Pk, R) = createMDP(n , psucc)
P = getProbTranMat(Pk, u)

print("Probability Transition Matrix:\n", P)
print("Value function by inverse:\n", np.round(valueViaInv(P, 0.4, R), 4))
print("Value function by policy evaluation:\n", np.round(valueEval(R, P, 0.4), 4))

Probability Transition Matrix:
 [[1.   0.   0.   0.   0.   0.   0.   0.   0.  ]
 [0.55 0.45 0.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   1.   0.   0.   0.   0.   0.   0.  ]
 [0.   0.   0.   0.45 0.   0.   0.55 0.   0.  ]
 [0.   0.55 0.   0.   0.45 0.   0.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.45 0.   0.   0.55]
 [0.   0.   0.   0.   0.   0.   1.   0.   0.  ]
 [0.   0.   0.   0.   0.   0.   0.55 0.45 0.  ]
 [0.   0.   0.   0.   0.   0.   0.   0.55 0.45]]
Value function by inverse:
 [-1.6667  0.7724  3.3333 -2.561  -3.4513  0.5564 -5.     -0.122  -2.4717]
Value function by policy evaluation:
 [-1.6667  0.7724  3.3333 -2.561  -3.4513  0.5564 -5.     -0.122  -2.4717]


In [13]:
def randomWalk(L, P, R):
    '''
        Args:
            L : length of the trajectory
            P : transition probability matrix
            R : reward mapping
        Return:
            A list containing tuples (s, r, s_next)
            where s is the initial state, and
            s_next is the state after transition
            and r is the reward that we recived.
    '''
    rw = []
    num_states = R.shape[0]
    s = np.random.randint(n)

    for i in range(L):
        s_next = np.random.choice(num_states, p=P[s, :])
        r = R[s_next]
        rw.append((s, r, s_next))
        s = s_next
    
    return rw

In [14]:
n = 3
psucc = np.round(np.random.uniform(),2)
# generating a random policy
u = genRandSDP(n*n, k)
# generating an MDP
(Pk, R) = createMDP(n , psucc)
P = getProbTranMat(Pk, u)

print("Probability Transition Matrix:\n", P)
print("Reward Mapping:\n", R)
print("Random Walk:\n", randomWalk(10, P, R))


Probability Transition Matrix:
 [[1. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 1. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 1.]]
Reward Mapping:
 [ 2  0 -3 -3 -2  0 -2 -1 -3]
Random Walk:
 [(1, 0, 1), (1, 0, 1), (1, 0, 1), (1, 0, 1), (1, 0, 1), (1, 0, 1), (1, 0, 1), (1, 0, 1), (1, 0, 1), (1, 0, 1)]


In [28]:
def stationary_distribution(P, R, eps=1e-6, iter=10e5):
    '''
        Args:
            P : transition probability matrix - n*n
            R : reward mapping
        Return:
            d : vector representing the stationary 
                distribution
    '''
    L = P.shape[0]*3
    err = np.inf

    d = np.zeros(P.shape[0], dtype=np.float)
    s = np.zeros(P.shape[0], dtype=np.int)
    n = 0

    while err > eps and iter >= 0:
        iter -=1
        rw = randomWalk(L, P, R)
        d_new = copy.copy(d)
        for (s, _ , s_next) in rw:
            n += 1
            one_hot = np.zeros(P.shape[0], dtype=np.int)
            one_hot[s] = 1
            d_new = ((n-1)*d + one_hot)/(n)
        
        err = np.linalg.norm(d_new - d, np.inf)
        d = d_new

    return d

In [29]:
n = 3
psucc = np.round(np.random.uniform(),2)
# generating a random policy
u = genRandSDP(n*n, k)
# generating an MDP
(Pk, R) = createMDP(n , psucc)
P = getProbTranMat(Pk, u)

d = stationary_distribution(P, R)

print("Stationary Distribution: ", d)
print("d.P : ", np.dot(d,P))

Stationary Distribution:  [0.         0.         0.21334216 0.11851953 0.         0.
 0.         0.         0.        ]
d.P :  [0.         0.         0.21334216 0.11851953 0.         0.
 0.         0.         0.        ]
