#### A toy example: Modelling whether or not to go to bed early as an SDP

In [51]:
import numpy as np
import copy

In [52]:
# Four states: E = "Energized", T = "Tired", C = "Calm", S = "Stressed"
states = ["EC", "ES", "TC", "TS"]

# Two possible actions, to go to bed early or late. If late, maybe you have time to study more?
def actions(x: str) -> list[str]:
    if x in states:
        return ["Early", "Late"]
    else:
        raise ValueError(f"Invalid state '{x}'.")

In [53]:
# Defining probabilities depending on action and state.

pE_Early_C = 0.9
pE_Early_S = 0.7

pT_Early_C = 1 - pE_Early_C
pT_Early_S = 1 - pE_Early_S

# --- --- --- #

pC_Early_C = 0.9
pC_Early_S = 0.4

pS_Early_C = 1 - pC_Early_C
pS_Early_S = 1 - pC_Early_S

# --- --- --- #

pE_Late_C = 0.3
pE_Late_S = 0.2

pT_Late_C = 1 - pE_Late_C
pT_Late_S = 1 - pE_Late_S

# --- --- --- #

pC_Late_C = 0.8
pC_Late_S = 0.6

pS_Late_C = 1- pC_Late_C
pS_Late_S = 1- pC_Late_S


In [54]:
# Function that checks that no probabilities are negative and returns "Monads".
def mkSimpleProb(pairs: list[tuple[str, float]]) -> dict[str, float]:
    dist: dict[str, float] = {}
    for (st, pr) in pairs:
        if pr >= 0:
            dist[st] = pr
    return dist

In [55]:
# Defining transition function
def nextFunc(t: int, x: str, y: str) -> dict[str, float]:
    if x == "EC":
        if y == "Early":
            return mkSimpleProb([
                ("EC", pE_Early_C * pC_Early_C),
                ("ES", pE_Early_C * pS_Early_C),
                ("TC", pT_Early_C * pC_Early_C),
                ("TS", pT_Early_C * pS_Early_C)
            ])

        elif y == "Late":
                        return mkSimpleProb([
                ("EC", pE_Late_C * pC_Late_C),
                ("ES", pE_Late_C * pS_Late_C),
                ("TC", pT_Late_C * pC_Late_C),
                ("TS", pT_Late_C * pS_Late_C)
            ])

        else:
            raise ValueError(f'Invalid control"{y}" at t={t}')
    elif x == "ES":
        if y == "Early":
             return mkSimpleProb([
                ("EC", pE_Early_S * pC_Early_S),
                ("ES", pE_Early_S * pS_Early_S),
                ("TC", pT_Early_S * pC_Early_S),
                ("TS", pT_Early_S * pS_Early_S)
            ])

        elif y == "Late":
             return mkSimpleProb([
                ("EC", pE_Late_S * pC_Late_S),
                ("ES", pE_Late_S * pS_Late_S),
                ("TC", pT_Late_S * pC_Late_S),
                ("TS", pT_Late_S * pS_Late_S)
            ])

        else:
            raise ValueError(f'Invalid control"{y}" at t={t}')
    elif x == "TC":
        if y == "Early":
             return mkSimpleProb([
                ("EC", pE_Early_C * pC_Early_C),
                ("ES", pE_Early_C * pS_Early_C),
                ("TC", pT_Early_C * pC_Early_C),
                ("TS", pT_Early_C * pS_Early_C)
            ])

        elif y == "Late":
             return mkSimpleProb([
                ("EC", pE_Late_C * pC_Late_C),
                ("ES", pE_Late_C * pS_Late_C),
                ("TC", pT_Late_C * pC_Late_C),
                ("TS", pT_Late_C * pS_Late_C)
            ])

        else:
            raise ValueError(f'Invalid control"{y}" at t={t}')
    elif x == "TS":
        if y == "Early":
             return mkSimpleProb([
                ("EC", pE_Early_S * pC_Early_S),
                ("ES", pE_Early_S * pS_Early_S),
                ("TC", pT_Early_S * pC_Early_S),
                ("TS", pT_Early_S * pS_Early_S)
            ])

        elif y == "Late":
             return mkSimpleProb([
                ("EC", pE_Late_S * pC_Late_S),
                ("ES", pE_Late_S * pS_Late_S),
                ("TC", pT_Late_S * pC_Late_S),
                ("TS", pT_Late_S * pS_Late_S)
            ])

        else:
            raise ValueError(f'Invalid control"{y}" at t={t}')
    else:
        raise ValueError(f'Invalid state "{x}" at t = {t}')  


In [56]:
# Testing the next-function.

test_next = nextFunc(0, "ES", "Early")

for state, pr in test_next.items():
    print(state, np.round(pr, 6))

print("\nSum of probabilities:", sum(test_next.values()))

EC 0.28
ES 0.42
TC 0.12
TS 0.18

Sum of probabilities: 1.0


In [57]:
def reward(t: int, x: str, y:str, next_x: str) -> float:
    if t < 0 or type(t) != int:
        raise ValueError(f"Invalid time step: '{t}' (must be positive integer).")
    if x not in states:
        raise ValueError(f"Invalid state: '{x}'")
    if y not in actions(x):
        raise ValueError(f"Invalid action: '{y}'")
    if next_x not in states:
        raise ValueError(f"Invalid next state: '{next_x}'")
    if next_x == "EC": rew = 1.0
    elif next_x == "TC": rew = 0.5
    elif next_x == "ES": rew = 0.25
    else: rew = 0.0
    return rew

In [58]:
def add(a: float, b: float) -> float:
    if type(a) != float or type(b) != float:
        raise TypeError(f"Inputs must be of type 'float', not '{type(a).__name__}' and '{type(b).__name__}'.")
    return a + b # In default implementation, returns regular floating point addition.

In [59]:
# Function for measuring a certain value.
def meas(val: float, pr: float) -> float:
    if type(val) != float or type(pr) != float:
        raise TypeError(f"Inputs must be of type 'float', not '{type(val).__name__}' and '{type(pr).__name__}'.")
    return val * pr # In default implementation, returns the expected value.

In [60]:
t = 0
x = "EC"
y = "Early"
x_next = "TC"

test_reward = reward(t, x, y, x_next)
print(f'If, at t = {t} and in state "{x}", you take action "{y}" and end up in state {x_next}, you get the reward \033[1m{test_reward}.')

If, at t = 0 and in state "EC", you take action "Early" and end up in state TC, you get the reward [1m0.5.


In [None]:
# Default value of zero-length policy sequences.
zero = 0.0

# Computing the total expected value from a policy sequence when starting at time t in state x.
def val(t: int, ps: list[dict[str, str]], x: str) -> float:
    if t < 0 or type(t) != int:
        raise ValueError(f"Invalid time step: '{t}' (must be positive integer).")
    if type(ps) != list:
        raise TypeError(f"Invalid policy list, must be list of dictionaries (or empty list).")
    if x not in states:
        raise ValueError(f"Invalid state: '{x}'")
    value = zero
    if len(ps) == 0:
        return value
    y = ps[0][x]
    m_next = nextFunc(t, x, y)
    for x_prim, pr in m_next.items():
        value += meas(add(reward(t, x, y, x_prim), val(t+1, ps[1:], x_prim)), pr)
    return value

In [None]:
# Computes the best single policy to add to an existing policy sequence.
def bestExt(t: int, ps_tail: list[dict[str, str]]) -> dict[str, str]:
    policy = dict()

    for state in states:
        best_value = -np.inf
        best_action = None
        
        # For each available action in the current state
        for action in actions(state):
            # Calculate value of taking action in state
            p = {state: action}
            value = val(t, [p] + ps_tail, state)
            # Choose the action with the highest expected value
            if value >= best_value:
                best_value = value
                best_action = action
        
        policy[state] = best_action

    return policy

In [None]:
def worstExt(t: int, ps_tail: list[dict[str, str]] | list[None]) -> dict[str, str]:
    if t < 0 or type(t) != int:
        raise ValueError(f"Invalid time step: '{t}' (must be positive integer).")
    if type(ps_tail) != list:
        raise TypeError(f"Invalid ps_tail, must be list of dictionaries (or empty list).")
    
    policy = dict()

    for state in states:
        worst_value = np.inf
        worst_action = None

        # For each available action in the current state
        for action in actions(state):
            # Calculate value of taking action in state
            p = {state: action}
            value = val(t, [p] + ps_tail, state)
            # Choose the action with the highest expected value
            if value <= worst_value:
                worst_value = value
                worst_action = action

        policy[state] = worst_action

    return policy

In [63]:
# Builds an optimal policy sequence by recursively adding the best extension (starting from the end).
def bi(t: int, n: int) -> list[dict[str, str]]:
    if n == 0:
        return []
    else:
        ps_tail = bi(t + 1, n - 1)
        p = bestExt(t, ps_tail)
        return [p] + ps_tail

In [70]:
# Testing an optimal policy
bi_test = bi(0, 7)
bi_test

[{'EC': 'Early', 'ES': 'Late', 'TC': 'Early', 'TS': 'Late'},
 {'EC': 'Early', 'ES': 'Late', 'TC': 'Early', 'TS': 'Late'},
 {'EC': 'Early', 'ES': 'Late', 'TC': 'Early', 'TS': 'Late'},
 {'EC': 'Early', 'ES': 'Late', 'TC': 'Early', 'TS': 'Late'},
 {'EC': 'Early', 'ES': 'Late', 'TC': 'Early', 'TS': 'Late'},
 {'EC': 'Early', 'ES': 'Late', 'TC': 'Early', 'TS': 'Late'},
 {'EC': 'Early', 'ES': 'Early', 'TC': 'Early', 'TS': 'Early'}]

In [64]:
# For a given time step, state and decision horizon, returns the optimal action and the 
# expected value of the sequence it starts (assuming the rest of the sequence is optimal).
def best(t: int, n: int, x: str) -> str:
    if n == 0:
        raise ValueError("The horizon must be greater than zero!")
    ps = bi(t+1, n-1)
    p = bestExt(t, ps)
    b = p[x]
    vb = val(t, [p] + ps, x)
    return f"Horizon, best, value: {n}, {b}, {vb}"

In [65]:
# Computing the best decision for different decision horizons.
bests = []
for i in range(1,8):
    bests.append(best(0, i, "ES"))

for best in bests:
    print(best)

Horizon, best, value: 1, Early, 0.44499999999999995
Horizon, best, value: 2, Late, 1.0845
Horizon, best, value: 3, Late, 1.8408499999999999
Horizon, best, value: 4, Late, 2.632255
Horizon, best, value: 5, Late, 3.4341765000000004
Horizon, best, value: 6, Late, 4.239252950000001
Horizon, best, value: 7, Late, 5.0452758850000015


In [66]:
# Returns a value between 0 and 1, where 0 means "does not matter at all"
# and 1 means "matters maximally" to achieving the defined goal of the SDP.
def mMeas(t: int, n: int, x: str) -> float:
    ps = bi(t, n)
    ps_prim = copy.deepcopy(ps)
    if ps[0][x] == "Early":
        ps_prim[0][x] = "Late"
    else:
        ps_prim[0][x] = "Early"

    best_action_val = val(t, ps, x)
    worst_action_val = val(t, ps_prim, x)

    return (best_action_val - worst_action_val) / best_action_val

In [67]:
# Comparing importance of a few choises.
print(mMeas(0, 1, "EC"))
print(mMeas(0, 2, "EC"))
print(mMeas(0, 4, "EC"))
print(mMeas(0, 6, "EC"))
print(mMeas(0, 8, "EC"))

0.39031339031339046
0.2253541697093618
0.12324171421761498
0.08351621684493062
0.06301985082274782


In [68]:
# Comparing importance of a few choises.
print(mMeas(0, 4, "EC"))
print(mMeas(0, 4, "TC"))
print(mMeas(0, 4, "ES"))
print(mMeas(0, 4, "TS"))

0.12324171421761498
0.12324171421761498
0.027404259845645595
0.027404259845645595
