In [1]:
import numpy as np


# heigth and width of grid
H, W = 5, 5

grid = [
    [1,1,1,1,1],
    [1,1,1,1,0],
    [1,1,0,1,1],
    [1,1,1,1,1],
    [1,1,1,1,1]
]

# rewards
R = [
    [0,0,-100,0,100],
    [0,0,0,0,0],
    [0,0,0,0,0],
    [0,0,0,-100,0],
    [0,0,0,0,0]
]

# actions (directions of walk)
A = [[1,0],[-1,0],[0,1],[0,-1]]
   
# transition function
def delta(s, a):
    i, j = s
    return (i + A[a][0], j + A[a][1])
    
# check wheter action a is allowed in the current state s
def allowed(a, s):
    x, y = delta(s, a)
    return x >= 0 and x < H and y >= 0 and y < W and grid[x][y] == 1

# discount-factor
gamma = 0.9

# random walk policy
def pi(a, s):
    return 1. / len(A)

# propability of transition to the new state s_new from state s with action a
def p(s_new, s, a):
    return int(s_new == delta(s, a)) and allowed(a, s)

# reward from transition to the new state s_new from state s with action a
def r(s_new, s, a):
    i, j = s_new
    return R[i][j]

In [2]:
import random


S = set([(i,j) for i in range(0, H) for j in range(0, W) if grid[i][j] == 1])
print(random.sample(S,1)[0])

(3, 1)


In [3]:
# state-value function
# iterative policy evalation: v - old state-value function, s - state
def v_pi(s, v):
    val = 0
    for a in range(len(A)):
        for s_new in S:
            val += pi(a, s) * p(s_new, s, a) * (r(s_new, s, a) + gamma * v[s_new])
    return val

In [4]:
def compute_state_value_function(iter_cnt):
    n = len(S)
    v_old = {}
    for s in S:
        i,j = s
        v_old[s] = R[i][j]
        
    v_new = {}
    
    for i in range(iter_cnt):
        for s in S:
            v_new[s] = v_pi(s, v_old)
        
        v_old = v_new
    
    return v_old

In [5]:
def print_svf(v):
    grid = []
    for i in range(H):
        grid.append(np.array([0] * W))
    grid = np.array(grid)
    
    for s in v:
        i,j = s
        val = v[s]
        grid[i][j] = val
    
    print(grid)    

In [42]:
for i in range(0, 20, 1):
    v = compute_state_value_function(i)
    print(i)
    print_svf(v)

0
[[   0    0 -100    0  100]
 [   0    0    0    0    0]
 [   0    0    0    0    0]
 [   0    0    0 -100    0]
 [   0    0    0    0    0]]
1
[[  0 -47   0   0   0]
 [  0   0 -47   0   0]
 [  0   0   0 -47   0]
 [  0   0 -47   0 -47]
 [  0   0   0 -47   0]]
2
[[-10 -36 -21  -9  -2]
 [ -6 -18 -34 -21   0]
 [ -1   0   0 -39 -17]
 [  0  -7 -34 -42 -39]
 [  0   0 -18 -43 -21]]
3
[[ -9 -35 -18  -8  -1]
 [ -6 -19 -37 -18   0]
 [ -3  -6   0 -41 -18]
 [ -2 -11 -38 -35 -41]
 [  0  -6 -19 -41 -18]]
4
[[ -9 -35 -18  -9  -2]
 [ -7 -19 -37 -19   0]
 [ -4  -7   0 -41 -18]
 [ -3 -13 -40 -36 -41]
 [ -2  -7 -20 -41 -18]]
5
[[ -9 -35 -18  -9  -2]
 [ -7 -20 -38 -19   0]
 [ -4  -8   0 -42 -18]
 [ -4 -13 -40 -37 -41]
 [ -2  -8 -20 -42 -18]]
6
[[ -9 -36 -18  -9  -2]
 [ -7 -20 -38 -20   0]
 [ -4  -8   0 -42 -18]
 [ -4 -14 -41 -37 -41]
 [ -2  -8 -20 -42 -18]]
7
[[ -9 -36 -18  -9  -2]
 [ -7 -20 -38 -20   0]
 [ -4  -8   0 -42 -18]
 [ -4 -14 -41 -37 -42]
 [ -3  -8 -20 -42 -18]]
8
[[ -9 -36 -18  -9  -2]
 [ -7 

In [28]:
v = compute_state_value_function(20)
print_svf(v)

[[ -9 -36 -18  -9  -2]
 [ -7 -20 -38 -20   0]
 [ -4  -8   0 -42 -18]
 [ -5 -14 -41 -37 -42]
 [ -3  -8 -20 -42 -19]]


In [6]:
from scipy.stats import bernoulli


def allowed_actions(s):
    return [a for a in range(len(A)) if allowed(a,s)]


def optimal_policy(s, Q):
    return optimal(Q, s)

def optimal_value_func(s, Q):
    return np.max([Q[s][a] for a in allowed_actions(s)])


def optimal(Q, s):
    As = allowed_actions(s)
    return As[np.argmax([Q[s][a] for a in As])]

def eps_greedy_step(Q, s, eps):
    choose_opt = (bernoulli.rvs(1 - eps) == 1)
    if choose_opt:
        return optimal(Q, s)
    else:
        As = allowed_actions(s)
        return As[np.random.random_integers(0, len(As) - 1)]

def init_Q():
    Q = {}
    for s in S:
        Q[s] = [0] * len(A)
    return Q
    
def Q_learning(gamma, alpha, eps, init_state, final_state, iter_cnt, show=True, step=10):
    Q = init_Q()
    for i in range(iter_cnt):
        s = init_state
        
        while True:
            a = eps_greedy_step(Q, s, eps)
            sn = delta(s, a)
            reward = r(sn, s, a)
            
            Q[s][a] += alpha * (reward + gamma * Q[sn][optimal(Q, sn)] - Q[s][a])
#             print(s, sn, A[a], reward, Q[s][a])
            
            s = sn
            if s == final_state:
                break
        
        if show and (i % step == 0 or i == iter_cnt - 1):
            print_svf({ s: optimal_value_func(s, Q) for s in S})
                
    return Q

In [29]:
Q = Q_learning(0.9, 0.5, 0.1, (4,0), (0,4), 20000, True, 5000)



[[ 0  0  0 50  0]
 [ 0  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0  0  0  0  0]]
[[ 15  49  90 100   0]
 [ 26  72  81  90   0]
 [ 31  34   0  81  72]
 [ 34  38  43  72  65]
 [ 38  43  47  53  59]]
[[ 15  64  90 100   0]
 [ 26  72  81  90   0]
 [ 31  34   0  81  72]
 [ 34  38  43  72  65]
 [ 38  43  47  53  59]]
[[ 41  65  90 100   0]
 [ 65  72  81  90   0]
 [ 59  65   0  81  72]
 [ 53  59  53  72  65]
 [ 47  53  47  53  59]]
[[ 59  65  90 100   0]
 [ 65  72  81  90   0]
 [ 59  65   0  81  72]
 [ 53  59  53  72  65]
 [ 47  53  47  53  59]]


In [10]:
# A = [[1,0],[-1,0],[0,1],[0,-1]]
arrows = ['↓', '↑', '→', '←',]

def print_opt_policy(Q):
    grid = []
    for i in range(H):
        grid.append(np.array(['X'] * W))
    grid = np.array(grid)
    
    for s in S:
        i,j = s
        val = arrows[optimal_policy(s, Q)]
        grid[i][j] = val
    
    print(grid)    

In [30]:
print_opt_policy(Q)

[['↓' '↓' '→' '→' '←']
 ['→' '→' '→' '↑' 'X']
 ['↑' '↑' 'X' '↑' '←']
 ['↑' '↑' '←' '↑' '↑']
 ['↑' '↑' '→' '→' '↑']]


In [47]:
def cooling(T0, k, al=0.8):
#     return max(1.0, T0 * (al ** k))
    return max(1.0, T0 - k)

def softmax(w, t=1.0):
    w = np.array(w)
    mx = np.max(w)
    e = np.exp((w - mx) / t)
    dist = e / np.sum(e)
    return dist

def Boltzman(Q, s, T0, k):
    As = allowed_actions(s)
    elements = np.array(As)
    probabilities = np.array(softmax([ Q[s][a] for a in As ], cooling(T0, k)))
    a = np.random.choice(elements, 1, list(probabilities))
#     print(s, As, [ Q[s][a] for a in As ], probabilities, a)
    return a

def Boltzman_eps_greedy_step(Q, s, eps, T0, k):
    choose_opt = (bernoulli.rvs(1 - eps) == 1)
    if choose_opt:
        return optimal(Q, s)
    else:
        return Boltzman(Q, s, T0, k)


def SARSA(gamma, alpha, T0, init_state, final_state, iter_cnt, show=True, step=10):
    Q = init_Q()
    
    # episodes
    for i in range(iter_cnt):
        s = init_state
        a = Boltzman(Q, s, T0, i)
#         a = Boltzman_eps_greedy_step(Q, s, 0.2, T0, i) 
#         a = eps_greedy_step(Q, s, 0.1)    
        # steps in episode
        while True:
            sn = delta(s, a)
            reward = r(sn, s, a)
            an = Boltzman(Q, sn, T0, i)
#             an = Boltzman_eps_greedy_step(Q, sn, 0.2, T0, i)
#             an = eps_greedy_step(Q, sn, 0.1)
    
            Q[s][a] += alpha * (reward + gamma * Q[sn][an] - Q[s][a])
            
#             print(s, a, sn, an, Q[s][a], Q[sn][an])
            
            s, a = sn, an
            if s == final_state:
                break
        
        if show and (i % step == 0 or i == iter_cnt - 1):
            print_svf({ s: optimal_value_func(s, Q) for s in S})
                
    return Q

In [48]:
Q = SARSA(0.4, 0.5, 1000.0, (4,0), (0,4), 15000, True, 5000)
print_opt_policy(Q)



[[ 0  0  0 50  0]
 [ 0  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0  0  0  0  0]
 [ 0  0  0  0  0]]
[[  0  -1  -7 100   0]
 [  0   0   3  34   0]
 [  0   0   0  11 -10]
 [  0   0   0  -3  -4]
 [  0   0   0  -3 -11]]
[[  0   0  21 100   0]
 [  0   0   1 -14   0]
 [  0   0   0  -5 -19]
 [  0   0   0  -4  -7]
 [  0   0   0  -4 -10]]
[[  0  -1  35 100   0]
 [  0   0  -1  11   0]
 [ -1   0   0  -5 -13]
 [  0   0  -1  -2  -6]
 [  0   0   0  -1  -1]]
[['↓' '←' '→' '→' '←']
 ['↓' '↓' '←' '↑' 'X']
 ['↓' '←' 'X' '→' '←']
 ['↓' '←' '↓' '↓' '↓']
 ['↑' '←' '←' '←' '←']]
