# vacuum

> Support code for defining the discrete states and discrete actions of a vacuum cleaning robot.

In [None]:
#| default_exp vacuum

In [None]:
#| include: false
from fastcore.all import *

In [None]:
#| export
import numpy as np
import gtsam
from gtbook.discrete import Variables
from gtbook.display import pretty

## State

In [None]:
#| export
rooms = ["Living Room", "Kitchen", "Office", "Hallway", "Dining Room"]

In [None]:
test_eq(rooms, ['Living Room', 'Kitchen', 'Office', 'Hallway', 'Dining Room'])

## Actions

In [None]:
#| export
action_space = ["L", "R", "U", "D"]
action_spec = """
    1/0/0/0/0 2/8/0/0/0 1/0/0/0/0 2/0/0/8/0
    8/2/0/0/0 0/1/0/0/0 0/1/0/0/0 0/2/0/0/8
    0/0/1/0/0 0/0/2/8/0 0/0/1/0/0 0/0/1/0/0
    0/0/8/2/0 0/0/0/2/8 8/0/0/2/0 0/0/0/1/0
    0/0/0/8/2 0/0/0/0/1 0/8/0/0/2 0/0/0/0/1
    """

In [None]:
test_eq(action_space, ['L', 'R', 'U', 'D'])

In [None]:
VARIABLES = Variables()

X = VARIABLES.discrete_series("X", [1, 2, 3], rooms) # states for times 1,2 and 3
A = VARIABLES.discrete_series("A", [1, 2], action_space) # actions for times 1 and 2
motion_model = gtsam.DiscreteConditional(X[2], [X[1], A[1]], action_spec)
pretty(motion_model)


X1,A1,0,1,2,3,4
0,0,1.0,0.0,0.0,0.0,0.0
0,1,0.2,0.8,0.0,0.0,0.0
0,2,1.0,0.0,0.0,0.0,0.0
0,3,0.2,0.0,0.0,0.8,0.0
1,0,0.8,0.2,0.0,0.0,0.0
1,1,0.0,1.0,0.0,0.0,0.0
1,2,0.0,1.0,0.0,0.0,0.0
1,3,0.0,0.2,0.0,0.0,0.8
2,0,0.0,0.0,1.0,0.0,0.0
2,1,0.0,0.0,0.2,0.8,0.0


## Sensing

In [None]:
#| export
light_levels = ["dark", "medium", "light"]
sensor_spec = "1/1/8 1/1/8 2/7/1 8/1/1 1/8/1"

In [None]:
test_eq(sensor_spec, '1/1/8 1/1/8 2/7/1 8/1/1 1/8/1')

## RL

In [None]:
# | export
def calculate_value_system(
    R: np.array,  # reward function as a tensor
    T: np.array,  # transition probabilities as a tensor
    pi: np.array,  # policy, as a vector
    gamma=0.9,  # discount factor
):
    """Calculate A, b matrix of linear system for value computation."""
    b = np.empty((5,), float)
    AA = np.empty((5, 5), float)
    for x, room in enumerate(rooms):
        a = pi[x]  # action under policy
        b[x] = T[x, a] @ R[x, a]  # expected reward under policy pi
        AA[x] = -gamma * T[x, a]
        AA[x, x] += 1
    return AA, b


def calculate_value_function(
    R: np.array,  # reward function as a tensor
    T: np.array,  # transition probabilities as a tensor
    pi: np.array,  # policy, as a vector
    gamma=0.9,  # discount factor
):
    """Calculate value function for given policy"""
    AA, b = calculate_value_system(R, T, pi, gamma)
    return np.linalg.solve(AA, b)

In [None]:
# From section 3.5:
conditional = gtsam.DiscreteConditional((2,5), [(0,5), (1,4)], action_spec)
R = np.empty((5, 4, 5), float)
T = np.empty((5, 4, 5), float)
for assignment, value in conditional.enumerate():
    x, a, y = assignment[0], assignment[1], assignment[2]
    R[x, a, y] = 10.0 if y == rooms.index("Living Room") else 0.0
    T[x, a, y] = value

In [None]:
test_eq(R[2, 1], [10,  0,  0,  0,  0])

Calculating the value function of a given policy `pi`:

In [None]:
reasonable_policy = [2, 1, 0, 2, 1]
AA, b = calculate_value_system(R, T, reasonable_policy)
test_close(
    AA,
    np.array(
        [
            [0.1, 0, 0, 0, 0],
            [0, 0.1, 0, 0, 0],
            [0, 0, 0.1, 0, 0],
            [-0.72, 0, 0, 0.82, 0],
            [0, 0, 0, 0, 0.1],
        ]
    ),
)
test_close(b, np.array([10, 0, 0, 8, 0]))

In [None]:
value_for_pi = calculate_value_function(R, T, reasonable_policy)
test_close(value_for_pi, np.array([100, 0, 0, 97.56097561, 0]))

In [None]:
optimal_policy = [0, 0, 1, 2, 2]
value_for_pi = calculate_value_function(R, T, optimal_policy)
test_close(
    value_for_pi,
    np.array([100, 97.56097561, 85.66329566, 97.56097561, 85.66329566]),
)