# Simple POMDP example with two states

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors

## Utility functions

In [None]:
def get_b_m(r):
    b = r[0]
    m = r[1] - r[0]
    return b, m
    
def compute_intersect(r1, r2):
    b1, m1 = get_b_m(r1)
    b2, m2 = get_b_m(r2)
    if np.isclose(m2 - m1, 0):
        return None
    return (b2 - b1)/(m1 - m2)
   
def compute_segment(r, rewards):
    lb, ub = 0, 1
    for r_prime in rewards:
        if np.all(r >= r_prime):
            continue
        x = compute_intersect(r, r_prime)
        if x is not None and 0 <= x <= 1:
            _, m1 = get_b_m(r)
            _, m2 = get_b_m(r_prime)
            if m1 > m2:
                lb = max(lb, x)
            else:
                ub = min(ub, x)
    return lb, ub

def compute_parsimonious(rewards):
    parsimonious = []
    for r in rewards:
        lb, ub = compute_segment(r, rewards)
        if lb > ub or np.isclose(lb, ub):
            continue
        parsimonious.append(r)
    return parsimonious

## Model parameters

### States

In [None]:
s0, s1 = 0, 1
states = [s0, s1]

### Observations

There are two possible observations, o0 and o1. Z specifies the probability of observing o0 and o1 in a particular state. Note that the probability does not depend on the action.

In [None]:
o0, o1 = "a", "b"
obs = [o0, o1]

In [None]:
Z = dict()
Z[(s0, o0)] = 0.7
Z[(s0, o1)] = 0.3
Z[(s1, o0)] = 0.4
Z[(s1, o1)] = 0.6

### Actions

There are two actions available in both states, with R capturing the reward.

In [None]:
a0, a1 = "a0", "a1"
actions = [a0, a1]

In [None]:
R = dict()
R[(s0, a0)] = 1
R[(s1, a0)] = 2
R[(s0, a1)] = 3
R[(s1, a1)] = 1

### Transitions

In [None]:
P = dict()

# Action a0
P[(s0, a0, s0)] = 0.9
P[(s0, a0, s1)] = 0.1
P[(s1, a0, s0)] = 0.2
P[(s1, a0, s1)] = 0.8

# Action a1
P[(s0, a1, s0)] = 0.3
P[(s0, a1, s1)] = 0.7
P[(s1, a1, s0)] = 0.6
P[(s1, a1, s1)] = 0.4

## Belief updates

In [None]:
def obs_prob(s, a, o):
    """Computes the probability of observing o after playing action a in s"""
    return sum([P[(s, a, s_prime)] * Z[(s_prime, o)] for s_prime in states])

print(obs_prob(s0, a0, o0))
print(obs_prob(s1, a0, o0))

In [None]:
def belief_T(b, a, o):
    n = sum(b[s]*obs_prob(s, a, o) for s in states)
    M = np.array([
        [Z[(s0, o)] * P[(s0, a, s0)], Z[(s0, o)] * P[(s1, a, s0)]],
        [Z[(s1, o)] * P[(s0, a, s1)], Z[(s1, o)] * P[(s1, a, s1)]]
        ]) * (1/n)
    return np.matmul(M, b)
    
print(belief_T(np.array([0.5, 0.5]), a0, o0))

## Reward

In [None]:
rewards = [np.array([R[s0, a], R[s1, a]]) for a in actions]

def plot_reward(r, label=None, color=None):
    plt.plot([0, 1], r, label=label, color=color)
    plt.xlim([0, 1])
    plt.xlabel("P(s=s1)")
    plt.ylabel("Reward")
    
for c, a, r in zip(mcolors.TABLEAU_COLORS, actions, rewards):
    plot_reward(r, label=a, color=c)
    lb, ub = compute_segment(r, rewards)
    plt.axhline(y=0, xmin=lb, xmax=ub, color=c, linewidth=6)
plt.legend()
plt.ylim(ymin=0)
plt.savefig("instant-reward.png", bbox_inches="tight")

In [None]:
def transform_reward_obs(r, a, o):
    M = np.array([
        [Z[(s0, o)] * P[(s0, a, s0)], Z[(s1, o)] * P[(s0, a, s1)]],
        [Z[(s0, o)] * P[(s1, a, s0)], Z[(s1, o)] * P[(s1, a, s1)]]
        ])
    v = np.matmul(M, r)
    return v

def transform_reward(r, a, cumulative=False):
    vectors = []
    for o in obs:
        vectors.append(transform_reward_obs(r, a, o))
    if cumulative:
        vectors.append(np.array([R[s0, a], R[s1, a]]))
    return np.sum(vectors, axis=0)

def get_reward_val(r, b):
    return np.dot(r, b)

In [None]:
transformed_rewards = [transform_reward_obs(r, a0, o0) for r in rewards]
for c, a, r in zip(mcolors.TABLEAU_COLORS, actions, transformed_rewards):
    plot_reward(r, label=f"Action {a}")
    lb, ub = compute_segment(r, transformed_rewards)
    plt.axhline(y=0, xmin=lb, xmax=ub, color=c, linewidth=6)
plt.legend()
plt.ylim(ymin=0)
plt.title("Assuming we play a_0 and observe o_0 in the first step")
plt.savefig("transformed-value.png", bbox_inches='tight')

### 3. Fix action

In [None]:
transformed_rewards = [transform_reward(r, a0) for r in rewards]
for c, a, r in zip(mcolors.TABLEAU_COLORS, actions, transformed_rewards):
    plot_reward(r, label=f"Action {a}", color=c)
    lb, ub = compute_segment(r, transformed_rewards)
    plt.axhline(y=0, xmin=lb, xmax=ub, color=c, linewidth=6)
plt.legend()
plt.ylim(ymin=0)
plt.title("Assuming we play a_0 first step")
plt.savefig("transformed-value-action.png", bbox_inches='tight')

### 4. Consider all actions!

In [None]:
import itertools

plt.figure(figsize=(10, 6))
transformed_rewards = dict()
for a, r in zip(actions, rewards):
    for init_a in actions:
        transformed_rewards[(a, init_a)] = (transform_reward(r, init_a, cumulative=True))

for c, (a, a_init) in zip(mcolors.TABLEAU_COLORS, itertools.product(actions, actions)):
    r = transformed_rewards[(a, a_init)]
    plot_reward(r, color=c, label=f"Playing {init_a} then {a}")
    lb, ub = compute_segment(r, transformed_rewards.values())
    plt.axhline(y=0, xmin=lb, xmax=ub, color=c, linewidth=10)
    
plt.ylim(ymin=0)
plt.legend()
plt.title("Cumulative reward")
plt.savefig("nothing-fixed.png", bbox_inches="tight")