In [470]:
import numpy as np

Each of our states is identified by a label. 

In [471]:
state_labels = [
    "home",
    "waiting_room",
    "train",
    "light_traffic",
    "medium_traffic",
    "heavy_traffic",
    "work",
]

But when we run value iteration (VI) we will not use the labels, we will use indexes in matrixes. Therefore we now map each label to an integer.

In [472]:
states = {a: i for i, a in enumerate(state_labels)}
states

{'home': 0,
 'waiting_room': 1,
 'train': 2,
 'light_traffic': 3,
 'medium_traffic': 4,
 'heavy_traffic': 5,
 'work': 6}

And we do the same for actions

In [473]:
action_names = [
    "railway",
    "go_home",
    "wait",
    "relax",
    "car",
    "drive",
    "bike",
]
actions = {a: i for i, a in enumerate(action_names)}
actions

{'railway': 0,
 'go_home': 1,
 'wait': 2,
 'relax': 3,
 'car': 4,
 'drive': 5,
 'bike': 6}

We set up our costs matrix to map from an action (axis 0) to the cost of executing it in a particular state (axis 1). We fill it with `inf` so that all actions have infinite cost. We then specify costs only for the states where the actions are enabled. 

In [474]:
costs = np.full((len(actions), len(states)), fill_value=np.inf, dtype=np.float32)

We set all actions in the goal state to cost zero. This prevents the value of the goal state increasing during VI.

In [475]:
goal = "work"
costs[:, states[goal]] = 0

The transition functions are set up to provide transitions for all states to all other states. This is done in a matrix where the rows correspond to the starting state ($s$) and the columsn to the resultant state ($s'$). The cell $s, s'$ contains the probability of ending up in $s'$ from $s$ given the action was taken.

Below we first do this for the "bike" action which transitions from home to work with probability 1.

In [476]:
# start with probability zero everywhere
t_bike = np.zeros((len(states), len(states)))
# set the probability of reach work to 1
t_bike[states["home"], states["work"]] = 1
# and set the cost of the action
costs[actions["bike"], states["home"]] = 45

print(t_bike)
print(costs[actions["bike"]])

[[0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0.]]
[45. inf inf inf inf inf  0.]


Action to drive, once we've chosen car and experienced a traffic level. Transition probability is the same across different traffic levels, but the costs are different.

In [477]:
t_drive = np.zeros((len(states), len(states)))
t_drive[states["light_traffic"], states["work"]] = 1
t_drive[states["medium_traffic"], states["work"]] = 1
t_drive[states["heavy_traffic"], states["work"]] = 1

costs[actions["drive"], states["light_traffic"]] = 20
costs[actions["drive"], states["medium_traffic"]] = 30
costs[actions["drive"], states["heavy_traffic"]] = 70

Relax all the way to work once we're on a train.

In [478]:
t_relax = np.zeros((len(states), len(states)))
t_relax[states["train"], states["work"]] = 1
costs[actions["relax"], states["train"]] = 35

Stop waiting for the train and go home.

In [479]:
t_go_home = np.zeros((len(states), len(states)))
t_go_home[states["waiting_room"], states["home"]] = 1
costs[actions["go_home"], states["waiting_room"]] = 2

Choose the railway. We find a train with some probability. Otherwise we have to wait.

In [480]:
t_railway = np.zeros((len(states), len(states)))
t_railway[states["home"], states["train"]] = 0.9
t_railway[states["home"], states["waiting_room"]] = 0.1
costs[actions["railway"], states["home"]] = 2

Choose to wait for the train.

In [481]:
t_wait = np.zeros((len(states), len(states)))
t_wait[states["waiting_room"], states["train"]] = 0.9
t_wait[states["waiting_room"], states["waiting_room"]] = 0.1
costs[actions["wait"], states["waiting_room"]] = 3

Choose to get in to the car. This causes us to experience different traffic levels with different probabilities. 

In [482]:
t_car = np.zeros((len(states), len(states)))
t_car[states["home"], states["light_traffic"]] = 0.2
t_car[states["home"], states["medium_traffic"]] = 0.7
t_car[states["home"], states["heavy_traffic"]] = 0.1
costs[actions["car"], states["home"]] = 1

Next we store all the transition functions in a matrix so we can evaluate them all through one matrix operation. 

In [483]:

transitions = np.zeros((len(actions), len(states), len(states)))

transitions[actions["bike"]] = t_bike
transitions[actions["drive"]] = t_drive
transitions[actions["relax"]] = t_relax
transitions[actions["go_home"]] = t_go_home
transitions[actions["railway"]] = t_railway
transitions[actions["wait"]] = t_wait
transitions[actions["car"]] = t_car

In [484]:
costs

array([[ 2., inf, inf, inf, inf, inf,  0.],
       [inf,  2., inf, inf, inf, inf,  0.],
       [inf,  3., inf, inf, inf, inf,  0.],
       [inf, inf, 35., inf, inf, inf,  0.],
       [ 1., inf, inf, inf, inf, inf,  0.],
       [inf, inf, inf, 20., 30., 70.,  0.],
       [45., inf, inf, inf, inf, inf,  0.]], dtype=float32)

In [485]:
transitions

array([[[0. , 0.1, 0.9, 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ]],

       [[0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [1. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ]],

       [[0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0.1, 0.9, 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ],
        [0. , 0. , 0. , 0. , 0. , 0. , 0. ]],

       [[0. , 0. , 0. , 0. , 0. , 0. , 0. ],
    

Next we create a vector of initial state values. These should be greater than zero, and can be arbitrarily large. 

In [486]:
v_init = np.full_like(state_labels, 100, dtype=float)

Since we know the goal state, we can set the value of that to zero. 

In [487]:
v_init[states[goal]] = 0
v_init

array([100., 100., 100., 100., 100., 100.,   0.])

The core of VI is the operation below. This muliplies a vector of state values ($v_{n-1}$) by the transition function, then adds the costs. This gives a matrix of Q values, where each row is contains a list of Q values per action, and each column is the Q value for that action reaching that state. 

In [488]:
q_matrix = np.dot(transitions, v_init) + costs

In [489]:
q_matrix

array([[102.,  inf,  inf,  inf,  inf,  inf,   0.],
       [ inf, 102.,  inf,  inf,  inf,  inf,   0.],
       [ inf, 103.,  inf,  inf,  inf,  inf,   0.],
       [ inf,  inf,  35.,  inf,  inf,  inf,   0.],
       [101.,  inf,  inf,  inf,  inf,  inf,   0.],
       [ inf,  inf,  inf,  20.,  30.,  70.,   0.],
       [ 45.,  inf,  inf,  inf,  inf,  inf,   0.]])

We calculate the next state value vector by taking a min over the actions in the state (i.e. a min over the columns).

In [490]:
np.min(q_matrix, axis=0)

array([ 45., 102.,  35.,  20.,  30.,  70.,   0.])

We can do the same to get the index of the action with the min Q value, i.e. the policy.

In [491]:
np.argmin(q_matrix, axis=0)

array([6, 1, 3, 5, 5, 5, 0])

Set the previous state value vector to our intial values. 

In [492]:
v_prev = v_init


Define `epsilon` which gives us our stopping condition.

In [493]:
epsilon = 1e-05

And finally, run value iteration and extract the policy. 

In [494]:
max_residual = 100
while max_residual > epsilon:
    q_matrix = np.dot(transitions, v_prev) + costs
    v_next = np.min(q_matrix, axis=0)
    max_residual = np.max(np.abs(v_next - v_prev))
    v_prev = v_next
    print(f"max. residual: {max_residual}")

policy = np.argmin(q_matrix, axis=0)

max. residual: 80.0
max. residual: 57.3
max. residual: 9.700000000000003
max. residual: 0.0


We can then inspect the policy

In [495]:
for state_label, s in states.items():
    if state_label != goal:
        print(f"in state {state_label} choose {action_names[policy[s]]}")

in state home choose car
in state waiting_room choose go_home
in state train choose relax
in state light_traffic choose drive
in state medium_traffic choose drive
in state heavy_traffic choose drive


And also check the value of the states

In [496]:
for state_label, s in states.items():
    print(f"the value of state {state_label} is {v_next[s]}")

the value of state home is 33.0
the value of state waiting_room is 35.0
the value of state train is 35.0
the value of state light_traffic is 20.0
the value of state medium_traffic is 30.0
the value of state heavy_traffic is 70.0
the value of state work is 0.0
