diff --git a/mdp4e.py b/mdp4e.py new file mode 100644 index 000000000..b9597f3cd --- /dev/null +++ b/mdp4e.py @@ -0,0 +1,552 @@ +"""Markov Decision Processes (Chapter 16) + +First we define an MDP, and the special case of a GridMDP, in which +states are laid out in a 2-dimensional grid. We also represent a policy +as a dictionary of {state: action} pairs, and a Utility function as a +dictionary of {state: number} pairs. We then define the value_iteration +and policy_iteration algorithms.""" + +from utils4e import argmax, vector_add, orientations, turn_right, turn_left +from planning import * +import random +import numpy as np +from collections import defaultdict + + +# _____________________________________________________________ +# 16.1 Sequential Detection Problems + + +class MDP: + """A Markov Decision Process, defined by an initial state, transition model, + and reward function. We also keep track of a gamma value, for use by + algorithms. The transition model is represented somewhat differently from + the text. Instead of P(s' | s, a) being a probability number for each + state/state/action triplet, we instead have T(s, a) return a + list of (p, s') pairs. We also keep track of the possible states, + terminal states, and actions for each state. [page 646]""" + + def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9): + if not (0 < gamma <= 1): + raise ValueError("An MDP must have 0 < gamma <= 1") + + # collect states from transitions table if not passed. + self.states = states or self.get_states_from_transitions(transitions) + + self.init = init + + if isinstance(actlist, list): + # if actlist is a list, all states have the same actions + self.actlist = actlist + + elif isinstance(actlist, dict): + # if actlist is a dict, different actions for each state + self.actlist = actlist + + self.terminals = terminals + self.transitions = transitions or {} + if not self.transitions: + print("Warning: Transition table is empty.") + + self.gamma = gamma + + self.reward = reward or {s: 0 for s in self.states} + + # self.check_consistency() + + def R(self, state): + """Return a numeric reward for this state.""" + + return self.reward[state] + + def T(self, state, action): + """Transition model. From a state and an action, return a list + of (probability, result-state) pairs.""" + + if not self.transitions: + raise ValueError("Transition model is missing") + else: + return self.transitions[state][action] + + def actions(self, state): + """Return a list of actions that can be performed in this state. By default, a + fixed list of actions, except for terminal states. Override this + method if you need to specialize by state.""" + + if state in self.terminals: + return [None] + else: + return self.actlist + + def get_states_from_transitions(self, transitions): + if isinstance(transitions, dict): + s1 = set(transitions.keys()) + s2 = set(tr[1] for actions in transitions.values() + for effects in actions.values() + for tr in effects) + return s1.union(s2) + else: + print('Could not retrieve states from transitions') + return None + + def check_consistency(self): + + # check that all states in transitions are valid + assert set(self.states) == self.get_states_from_transitions(self.transitions) + + # check that init is a valid state + assert self.init in self.states + + # check reward for each state + assert set(self.reward.keys()) == set(self.states) + + # check that all terminals are valid states + assert all(t in self.states for t in self.terminals) + + # check that probability distributions for all actions sum to 1 + for s1, actions in self.transitions.items(): + for a in actions.keys(): + s = 0 + for o in actions[a]: + s += o[0] + assert abs(s - 1) < 0.001 + + +class MDP2(MDP): + """ + Inherits from MDP. Handles terminal states, and transitions to and from terminal states better. + """ + + def __init__(self, init, actlist, terminals, transitions, reward=None, gamma=0.9): + MDP.__init__(self, init, actlist, terminals, transitions, reward, gamma=gamma) + + def T(self, state, action): + if action is None: + return [(0.0, state)] + else: + return self.transitions[state][action] + + +class GridMDP(MDP): + """A two-dimensional grid MDP, as in [Figure 16.1]. All you have to do is + specify the grid as a list of lists of rewards; use None for an obstacle + (unreachable state). Also, you should specify the terminal states. + An action is an (x, y) unit vector; e.g. (1, 0) means move east.""" + + def __init__(self, grid, terminals, init=(0, 0), gamma=.9): + grid.reverse() # because we want row 0 on bottom, not on top + reward = {} + states = set() + self.rows = len(grid) + self.cols = len(grid[0]) + self.grid = grid + for x in range(self.cols): + for y in range(self.rows): + if grid[y][x]: + states.add((x, y)) + reward[(x, y)] = grid[y][x] + self.states = states + actlist = orientations + transitions = {} + for s in states: + transitions[s] = {} + for a in actlist: + transitions[s][a] = self.calculate_T(s, a) + MDP.__init__(self, init, actlist=actlist, + terminals=terminals, transitions=transitions, + reward=reward, states=states, gamma=gamma) + + def calculate_T(self, state, action): + if action: + return [(0.8, self.go(state, action)), + (0.1, self.go(state, turn_right(action))), + (0.1, self.go(state, turn_left(action)))] + else: + return [(0.0, state)] + + def T(self, state, action): + return self.transitions[state][action] if action else [(0.0, state)] + + def go(self, state, direction): + """Return the state that results from going in this direction.""" + + state1 = tuple(vector_add(state, direction)) + return state1 if state1 in self.states else state + + def to_grid(self, mapping): + """Convert a mapping from (x, y) to v into a [[..., v, ...]] grid.""" + + return list(reversed([[mapping.get((x, y), None) + for x in range(self.cols)] + for y in range(self.rows)])) + + def to_arrows(self, policy): + chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'} + return self.to_grid({s: chars[a] for (s, a) in policy.items()}) + + +# ______________________________________________________________________________ + + +""" [Figure 16.1] +A 4x3 grid environment that presents the agent with a sequential decision problem. +""" + +sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1], + [-0.04, None, -0.04, -1], + [-0.04, -0.04, -0.04, -0.04]], + terminals=[(3, 2), (3, 1)]) + + +# ______________________________________________________________________________ +# 16.1.3 The Bellman equation for utilities + + +def q_value(mdp, s, a, U): + if not a: + return mdp.R(s) + res = 0 + for p, s_prime in mdp.T(s, a): + res += p * (mdp.R(s) + mdp.gamma * U[s_prime]) + return res + + +# TODO: DDN in figure 16.4 and 16.5 + +# ______________________________________________________________________________ +# 16.2 Algorithms for MDPs +# 16.2.1 Value Iteration + + +def value_iteration(mdp, epsilon=0.001): + """Solving an MDP by value iteration. [Figure 16.6]""" + + U1 = {s: 0 for s in mdp.states} + R, T, gamma = mdp.R, mdp.T, mdp.gamma + while True: + U = U1.copy() + delta = 0 + for s in mdp.states: + # U1[s] = R(s) + gamma * max(sum(p*U[s1] for (p, s1) in T(s, a)) + # for a in mdp.actions(s)) + U1[s] = max(q_value(mdp, s, a, U) for a in mdp.actions(s)) + delta = max(delta, abs(U1[s] - U[s])) + if delta <= epsilon * (1 - gamma) / gamma: + return U + + +# ______________________________________________________________________________ +# 16.2.2 Policy Iteration + + +def best_policy(mdp, U): + """Given an MDP and a utility function U, determine the best policy, + as a mapping from state to action.""" + + pi = {} + for s in mdp.states: + pi[s] = argmax(mdp.actions(s), key=lambda a: q_value(mdp, s, a, U)) + return pi + + +def expected_utility(a, s, U, mdp): + """The expected utility of doing a in state s, according to the MDP and U.""" + + return sum(p * U[s1] for (p, s1) in mdp.T(s, a)) + + +def policy_iteration(mdp): + """Solve an MDP by policy iteration [Figure 17.7]""" + + U = {s: 0 for s in mdp.states} + pi = {s: random.choice(mdp.actions(s)) for s in mdp.states} + while True: + U = policy_evaluation(pi, U, mdp) + unchanged = True + for s in mdp.states: + a_star = argmax(mdp.actions(s), key=lambda a: q_value(mdp, s, a, U)) + # a = argmax(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp)) + if q_value(mdp, s, a_star, U) > q_value(mdp, s, pi[s], U): + pi[s] = a_star + unchanged = False + if unchanged: + return pi + + +def policy_evaluation(pi, U, mdp, k=20): + """Return an updated utility mapping U from each state in the MDP to its + utility, using an approximation (modified policy iteration).""" + + R, T, gamma = mdp.R, mdp.T, mdp.gamma + for i in range(k): + for s in mdp.states: + U[s] = R(s) + gamma * sum(p * U[s1] for (p, s1) in T(s, pi[s])) + return U + + +# ___________________________________________________________________ +# 16.4 Partially Observed MDPs + + +class POMDP(MDP): + """A Partially Observable Markov Decision Process, defined by + a transition model P(s'|s,a), actions A(s), a reward function R(s), + and a sensor model P(e|s). We also keep track of a gamma value, + for use by algorithms. The transition and the sensor models + are defined as matrices. We also keep track of the possible states + and actions for each state. [page 659].""" + + def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95): + """Initialize variables of the pomdp""" + + if not (0 < gamma <= 1): + raise ValueError('A POMDP must have 0 < gamma <= 1') + + self.states = states + self.actions = actions + + # transition model cannot be undefined + self.t_prob = transitions or {} + if not self.t_prob: + print('Warning: Transition model is undefined') + + # sensor model cannot be undefined + self.e_prob = evidences or {} + if not self.e_prob: + print('Warning: Sensor model is undefined') + + self.gamma = gamma + self.rewards = rewards + + def remove_dominated_plans(self, input_values): + """ + Remove dominated plans. + This method finds all the lines contributing to the + upper surface and removes those which don't. + """ + + values = [val for action in input_values for val in input_values[action]] + values.sort(key=lambda x: x[0], reverse=True) + + best = [values[0]] + y1_max = max(val[1] for val in values) + tgt = values[0] + prev_b = 0 + prev_ix = 0 + while tgt[1] != y1_max: + min_b = 1 + min_ix = 0 + for i in range(prev_ix + 1, len(values)): + if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0: + trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1]) + if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b: + min_b = trans_b + min_ix = i + prev_b = min_b + prev_ix = min_ix + tgt = values[min_ix] + best.append(tgt) + + return self.generate_mapping(best, input_values) + + def remove_dominated_plans_fast(self, input_values): + """ + Remove dominated plans using approximations. + Resamples the upper boundary at intervals of 100 and + finds the maximum values at these points. + """ + + values = [val for action in input_values for val in input_values[action]] + values.sort(key=lambda x: x[0], reverse=True) + + best = [] + sr = 100 + for i in range(sr + 1): + x = i / float(sr) + maximum = (values[0][1] - values[0][0]) * x + values[0][0] + tgt = values[0] + for value in values: + val = (value[1] - value[0]) * x + value[0] + if val > maximum: + maximum = val + tgt = value + + if all(any(tgt != v) for v in best): + best.append(np.array(tgt)) + + return self.generate_mapping(best, input_values) + + def generate_mapping(self, best, input_values): + """Generate mappings after removing dominated plans""" + + mapping = defaultdict(list) + for value in best: + for action in input_values: + if any(all(value == v) for v in input_values[action]): + mapping[action].append(value) + + return mapping + + def max_difference(self, U1, U2): + """Find maximum difference between two utility mappings""" + + for k, v in U1.items(): + sum1 = 0 + for element in U1[k]: + sum1 += sum(element) + sum2 = 0 + for element in U2[k]: + sum2 += sum(element) + return abs(sum1 - sum2) + + +class Matrix: + """Matrix operations class""" + + @staticmethod + def add(A, B): + """Add two matrices A and B""" + + res = [] + for i in range(len(A)): + row = [] + for j in range(len(A[0])): + row.append(A[i][j] + B[i][j]) + res.append(row) + return res + + @staticmethod + def scalar_multiply(a, B): + """Multiply scalar a to matrix B""" + + for i in range(len(B)): + for j in range(len(B[0])): + B[i][j] = a * B[i][j] + return B + + @staticmethod + def multiply(A, B): + """Multiply two matrices A and B element-wise""" + + matrix = [] + for i in range(len(B)): + row = [] + for j in range(len(B[0])): + row.append(B[i][j] * A[j][i]) + matrix.append(row) + + return matrix + + @staticmethod + def matmul(A, B): + """Inner-product of two matrices""" + + return [[sum(ele_a * ele_b for ele_a, ele_b in zip(row_a, col_b)) for col_b in list(zip(*B))] for row_a in A] + + @staticmethod + def transpose(A): + """Transpose a matrix""" + + return [list(i) for i in zip(*A)] + + +def pomdp_value_iteration(pomdp, epsilon=0.1): + """Solving a POMDP by value iteration.""" + + U = {'': [[0] * len(pomdp.states)]} + count = 0 + while True: + count += 1 + prev_U = U + values = [val for action in U for val in U[action]] + value_matxs = [] + for i in values: + for j in values: + value_matxs.append([i, j]) + + U1 = defaultdict(list) + for action in pomdp.actions: + for u in value_matxs: + u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)], + Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))), + [[1], [1]]) + u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]]) + U1[action].append(u1[0]) + + U = pomdp.remove_dominated_plans_fast(U1) + # replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations + + if count > 10: + if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma: + return U + + +__doc__ += """ +>>> pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .01)) + +>>> sequential_decision_environment.to_arrows(pi) +[['>', '>', '>', '.'], ['^', None, '^', '.'], ['^', '>', '^', '<']] + +>>> from utils import print_table + +>>> print_table(sequential_decision_environment.to_arrows(pi)) +> > > . +^ None ^ . +^ > ^ < + +>>> print_table(sequential_decision_environment.to_arrows(policy_iteration(sequential_decision_environment))) +> > > . +^ None ^ . +^ > ^ < +""" # noqa + +""" +s = { 'a' : { 'plan1' : [(0.2, 'a'), (0.3, 'b'), (0.3, 'c'), (0.2, 'd')], + 'plan2' : [(0.4, 'a'), (0.15, 'b'), (0.45, 'c')], + 'plan3' : [(0.2, 'a'), (0.5, 'b'), (0.3, 'c')], + }, + 'b' : { 'plan1' : [(0.2, 'a'), (0.6, 'b'), (0.2, 'c'), (0.1, 'd')], + 'plan2' : [(0.6, 'a'), (0.2, 'b'), (0.1, 'c'), (0.1, 'd')], + 'plan3' : [(0.3, 'a'), (0.3, 'b'), (0.4, 'c')], + }, + 'c' : { 'plan1' : [(0.3, 'a'), (0.5, 'b'), (0.1, 'c'), (0.1, 'd')], + 'plan2' : [(0.5, 'a'), (0.3, 'b'), (0.1, 'c'), (0.1, 'd')], + 'plan3' : [(0.1, 'a'), (0.3, 'b'), (0.1, 'c'), (0.5, 'd')], + }, + } +""" + + +# __________________________________________________________________________ +# Chapter 17 Multiagent Planning + + +def double_tennis_problem(): + """ + [Figure 17.1] DOUBLE-TENNIS-PROBLEM + A multiagent planning problem involving two partner tennis players + trying to return an approaching ball and repositioning around in the court. + + Example: + >>> from planning import * + >>> dtp = double_tennis_problem() + >>> goal_test(dtp.goals, dtp.init) + False + >>> dtp.act(expr('Go(A, RightBaseLine, LeftBaseLine)')) + >>> dtp.act(expr('Hit(A, Ball, RightBaseLine)')) + >>> goal_test(dtp.goals, dtp.init) + False + >>> dtp.act(expr('Go(A, LeftNet, RightBaseLine)')) + >>> goal_test(dtp.goals, dtp.init) + True + """ + + return PlanningProblem( + init='At(A, LeftBaseLine) & At(B, RightNet) & Approaching(Ball, RightBaseLine) & Partner(A, B) & Partner(B, A)', + goals='Returned(Ball) & At(a, LeftNet) & At(a, RightNet)', + actions=[Action('Hit(actor, Ball, loc)', + precond='Approaching(Ball, loc) & At(actor, loc)', + effect='Returned(Ball)'), + Action('Go(actor, to, loc)', + precond='At(actor, loc)', + effect='At(actor, to) & ~At(actor, loc)')]) diff --git a/tests/test_mdp4e.py b/tests/test_mdp4e.py new file mode 100644 index 000000000..1e91bc34b --- /dev/null +++ b/tests/test_mdp4e.py @@ -0,0 +1,166 @@ +from mdp4e import * + +sequential_decision_environment_1 = GridMDP([[-0.1, -0.1, -0.1, +1], + [-0.1, None, -0.1, -1], + [-0.1, -0.1, -0.1, -0.1]], + terminals=[(3, 2), (3, 1)]) + +sequential_decision_environment_2 = GridMDP([[-2, -2, -2, +1], + [-2, None, -2, -1], + [-2, -2, -2, -2]], + terminals=[(3, 2), (3, 1)]) + +sequential_decision_environment_3 = GridMDP([[-1.0, -0.1, -0.1, -0.1, -0.1, 0.5], + [-0.1, None, None, -0.5, -0.1, -0.1], + [-0.1, None, 1.0, 3.0, None, -0.1], + [-0.1, -0.1, -0.1, None, None, -0.1], + [0.5, -0.1, -0.1, -0.1, -0.1, -1.0]], + terminals=[(2, 2), (3, 2), (0, 4), (5, 0)]) + + +def test_value_iteration(): + ref1 = { + (3, 2): 1.0, (3, 1): -1.0, + (3, 0): 0.12958868267972745, (0, 1): 0.39810203830605462, + (0, 2): 0.50928545646220924, (1, 0): 0.25348746162470537, + (0, 0): 0.29543540628363629, (1, 2): 0.64958064617168676, + (2, 0): 0.34461306281476806, (2, 1): 0.48643676237737926, + (2, 2): 0.79536093684710951} + assert sum(value_iteration(sequential_decision_environment, .01).values())-sum(ref1.values()) < 0.0001 + + ref2 = { + (3, 2): 1.0, (3, 1): -1.0, + (3, 0): -0.0897388258468311, (0, 1): 0.146419707398967840, + (0, 2): 0.30596200514385086, (1, 0): 0.010092796415625799, + (0, 0): 0.00633408092008296, (1, 2): 0.507390193380827400, + (2, 0): 0.15072242145212010, (2, 1): 0.358309043654212570, + (2, 2): 0.71675493618997840} + assert sum(value_iteration(sequential_decision_environment_1, .01).values()) - sum(ref2.values()) < 0.0001 + + ref3 = { + (3, 2): 1.0, (3, 1): -1.0, + (3, 0): -3.5141584808407855, (0, 1): -7.8000009574737180, + (0, 2): -6.1064293596058830, (1, 0): -7.1012549580376760, + (0, 0): -8.5872244532783200, (1, 2): -3.9653547121245810, + (2, 0): -5.3099468802901630, (2, 1): -3.3543366255753995, + (2, 2): -1.7383376462930498} + assert sum(value_iteration(sequential_decision_environment_2, .01).values())-sum(ref3.values()) < 0.0001 + + ref4 = { + (0, 0): 4.350592130345558, (0, 1): 3.640700980321895, (0, 2): 3.0734806370346943, (0, 3): 2.5754335063434937, (0, 4): -1.0, + (1, 0): 3.640700980321895, (1, 1): 3.129579352304856, (1, 4): 2.0787517066719916, + (2, 0): 3.0259220379893352, (2, 1): 2.5926103577982897, (2, 2): 1.0, (2, 4): 2.507774181360808, + (3, 0): 2.5336747364500076, (3, 2): 3.0, (3, 3): 2.292172805400873, (3, 4): 2.996383110867515, + (4, 0): 2.1014575936349886, (4, 3): 3.1297590518608907, (4, 4): 3.6408806798779287, + (5, 0): -1.0, (5, 1): 2.5756132058995282, (5, 2): 3.0736603365907276, (5, 3): 3.6408806798779287, (5, 4): 4.350771829901593} + assert sum(value_iteration(sequential_decision_environment_3, .01).values()) - sum(ref4.values()) < 0.001 + + +def test_policy_iteration(): + assert policy_iteration(sequential_decision_environment) == { + (0, 0): (0, 1), (0, 1): (0, 1), (0, 2): (1, 0), + (1, 0): (1, 0), (1, 2): (1, 0), (2, 0): (0, 1), + (2, 1): (0, 1), (2, 2): (1, 0), (3, 0): (-1, 0), + (3, 1): None, (3, 2): None} + + assert policy_iteration(sequential_decision_environment_1) == { + (0, 0): (0, 1), (0, 1): (0, 1), (0, 2): (1, 0), + (1, 0): (1, 0), (1, 2): (1, 0), (2, 0): (0, 1), + (2, 1): (0, 1), (2, 2): (1, 0), (3, 0): (-1, 0), + (3, 1): None, (3, 2): None} + + assert policy_iteration(sequential_decision_environment_2) == { + (0, 0): (1, 0), (0, 1): (0, 1), (0, 2): (1, 0), + (1, 0): (1, 0), (1, 2): (1, 0), (2, 0): (1, 0), + (2, 1): (1, 0), (2, 2): (1, 0), (3, 0): (0, 1), + (3, 1): None, (3, 2): None} + + +def test_best_policy(): + pi = best_policy(sequential_decision_environment, + value_iteration(sequential_decision_environment, .01)) + assert sequential_decision_environment.to_arrows(pi) == [['>', '>', '>', '.'], + ['^', None, '^', '.'], + ['^', '>', '^', '<']] + + pi_1 = best_policy(sequential_decision_environment_1, + value_iteration(sequential_decision_environment_1, .01)) + assert sequential_decision_environment_1.to_arrows(pi_1) == [['>', '>', '>', '.'], + ['^', None, '^', '.'], + ['^', '>', '^', '<']] + + pi_2 = best_policy(sequential_decision_environment_2, + value_iteration(sequential_decision_environment_2, .01)) + assert sequential_decision_environment_2.to_arrows(pi_2) == [['>', '>', '>', '.'], + ['^', None, '>', '.'], + ['>', '>', '>', '^']] + + pi_3 = best_policy(sequential_decision_environment_3, + value_iteration(sequential_decision_environment_3, .01)) + assert sequential_decision_environment_3.to_arrows(pi_3) == [['.', '>', '>', '>', '>', '>'], + ['v', None, None, '>', '>', '^'], + ['v', None, '.', '.', None, '^'], + ['v', '<', 'v', None, None, '^'], + ['<', '<', '<', '<', '<', '.']] + + +def test_transition_model(): + transition_model = { 'a' : { 'plan1' : [(0.2, 'a'), (0.3, 'b'), (0.3, 'c'), (0.2, 'd')], + 'plan2' : [(0.4, 'a'), (0.15, 'b'), (0.45, 'c')], + 'plan3' : [(0.2, 'a'), (0.5, 'b'), (0.3, 'c')], + }, + 'b' : { 'plan1' : [(0.2, 'a'), (0.6, 'b'), (0.2, 'c'), (0.1, 'd')], + 'plan2' : [(0.6, 'a'), (0.2, 'b'), (0.1, 'c'), (0.1, 'd')], + 'plan3' : [(0.3, 'a'), (0.3, 'b'), (0.4, 'c')], + }, + 'c' : { 'plan1' : [(0.3, 'a'), (0.5, 'b'), (0.1, 'c'), (0.1, 'd')], + 'plan2' : [(0.5, 'a'), (0.3, 'b'), (0.1, 'c'), (0.1, 'd')], + 'plan3' : [(0.1, 'a'), (0.3, 'b'), (0.1, 'c'), (0.5, 'd')], + }, + } + + mdp = MDP(init="a", actlist={"plan1","plan2", "plan3"}, terminals={"d"}, states={"a","b","c", "d"}, transitions=transition_model) + + assert mdp.T("a","plan3") == [(0.2, 'a'), (0.5, 'b'), (0.3, 'c')] + assert mdp.T("b","plan2") == [(0.6, 'a'), (0.2, 'b'), (0.1, 'c'), (0.1, 'd')] + assert mdp.T("c","plan1") == [(0.3, 'a'), (0.5, 'b'), (0.1, 'c'), (0.1, 'd')] + + +def test_pomdp_value_iteration(): + t_prob = [[[0.65, 0.35], [0.65, 0.35]], [[0.65, 0.35], [0.65, 0.35]], [[1.0, 0.0], [0.0, 1.0]]] + e_prob = [[[0.5, 0.5], [0.5, 0.5]], [[0.5, 0.5], [0.5, 0.5]], [[0.8, 0.2], [0.3, 0.7]]] + rewards = [[5, -10], [-20, 5], [-1, -1]] + + gamma = 0.95 + actions = ('0', '1', '2') + states = ('0', '1') + + pomdp = POMDP(actions, t_prob, e_prob, rewards, states, gamma) + utility = pomdp_value_iteration(pomdp, epsilon=5) + + for _, v in utility.items(): + sum_ = 0 + for element in v: + sum_ += sum(element) + + assert -9.76 < sum_ < -9.70 or 246.5 < sum_ < 248.5 or 0 < sum_ < 1 + + +def test_pomdp_value_iteration2(): + t_prob = [[[0.5, 0.5], [0.5, 0.5]], [[0.5, 0.5], [0.5, 0.5]], [[1.0, 0.0], [0.0, 1.0]]] + e_prob = [[[0.5, 0.5], [0.5, 0.5]], [[0.5, 0.5], [0.5, 0.5]], [[0.85, 0.15], [0.15, 0.85]]] + rewards = [[-100, 10], [10, -100], [-1, -1]] + + gamma = 0.95 + actions = ('0', '1', '2') + states = ('0', '1') + + pomdp = POMDP(actions, t_prob, e_prob, rewards, states, gamma) + utility = pomdp_value_iteration(pomdp, epsilon=100) + + for _, v in utility.items(): + sum_ = 0 + for element in v: + sum_ += sum(element) + + assert -77.31 < sum_ < -77.25 or 799 < sum_ < 800 diff --git a/utils4e.py b/utils4e.py index dd90e49ca..ec29ba226 100644 --- a/utils4e.py +++ b/utils4e.py @@ -415,12 +415,12 @@ def conv1D(X, K): """1D convolution. X: input vector; K: kernel vector""" return np.convolve(X, K, mode='same') - def GaussianKernel(size=3): mean = (size-1)/2 stdev = 0.1 return [gaussian(mean, stdev, x) for x in range(size)] + def gaussian_kernel_1d(size=3, sigma=0.5): mean = (size-1)/2 return [gaussian(mean, sigma, x) for x in range(size)]