## generic value iteration algo 

In [37]:
import numpy as np 
from typing import Callable

## SPECIALISED FOR EX 9.27

In [3]:
def Value_iteration(S:list[tuple], # states
                    A:tuple,       # actions
                    P:list,        # probabilities
                    R:list,         # rewards
                    cutoff:int = 1000, 
                    discount_rate:float=0.8):
    # we assign V0[s] arbitrarily to start with
    V0 = np.zeros(2) #np.random.rand(len(S))  # Ensures values are float
    V_k = V0.astype(float)  
    k = 0
    pi = [0,0]
    while k < cutoff:
        V_k_minus_1 = V_k
        for index, state in enumerate(S):
            possible_values = [R[index][a] + discount_rate * (P[index][a]*V_k_minus_1[0] + (1-P[index][a])*V_k_minus_1[1]) for a in range(len(A))]
            V_k[index] = max(possible_values).astype(float)
        k += 1

    for index, state in enumerate(S):
        possible_values =  [R[index][a] + discount_rate * (P[index][a]*V_k_minus_1[0] + (1-P[index][a])*V_k_minus_1[1]) for a in range(len(A))]
        pi[index] =  possible_values.index(max(possible_values))

    return V_k, pi


###############################################
############# FUNCTION CALL ###################
###############################################

States = ('healthy', 'sick')
Actions = ('Relax', 'Party')
# we will be taking 0 to be healthy and 1 to be sick in terms of state
# and we will be using 0 for relax, and 1 to party 

Probabilities = [[0.95, # if healthy and relax
                 0.7],  # if healthy and party
                [ 0.5,  # if sick and relax
                 0.1]]  # if sick and party

Rewards = [[7, # if healthy and relax
            10],  # if healthy and party
            [0,  # if sick and relax
            2]]  # if sick and party

values, policies = Value_iteration(States, Actions, Probabilities, Rewards)

print('We determine the optimal policy to be:')
for index, value in enumerate(policies):
    print(f'if {States[index]}, then {Actions[value]}')

We determine the optimal policy to be:
if healthy, then Party
if sick, then Relax


## SPECIALISED FOR EX 9.28

In [26]:
def bellman_equation(actions, state, probabilities, V, rewards, discount_factor):
    values = []
    i, j = state  # coords
    corners = [(0, 0), (0, len(V) - 1), (len(V) - 1, 0), (len(V) - 1, len(V) - 1)]
    
    for intended_action in actions:
        # this inner for loop should be maximising over all intended actions to tell you which is the best
        summation_term = 0
        
        for possible_action in actions:
            probability_of_occurence = probabilities[0] if possible_action == intended_action else probabilities[1]
            
            # check where we are, and if this is within the grid
            new_i, new_j = i + possible_action[0], j + possible_action[1]
            
            if 0 <= new_i < len(V) and 0 <= new_j < len(V[0]):
                reward = rewards[i, j]  # Immediate reward 
                
                if reward != 0:  # if you are on a reward cell/ move into one, teleport to random corner
                    future_value = sum(V[corner] for corner in corners) / 4  
                else:
                    future_value = V[new_i, new_j]  # non special transition
            else:
                # stay in place and apply wall penalty if you hit wall 
                # print('this is triggered')
                reward = -1
                future_value = V[i, j]
            summation_term += probability_of_occurence * (reward + discount_factor * future_value)
        
        values.append(summation_term)  
    
    return max(values) # only return max value, not all of them 


In [66]:
def Value_iteration(S:np.ndarray, # states
                    A:tuple,       # actions
                    P:list,        # probabilities
                    R:list,         # rewards
                    cutoff:int = 1000, 
                    discount_rate:float=0.8):
    
    # we assign V0[s] arbitrarily to start with
    V0 = np.zeros(S.shape) 
    V_k = V0.astype(float)  
    k = 0
    pi = np.zeros(S.shape) 
    while k < cutoff:
        V_k_minus_1 = V_k.copy()
        
        for (i, j), value in np.ndenumerate(S):
            V_k[i][j] = bellman_equation(A, (i,j), P, V_k_minus_1, R, discount_rate)
        k += 1

    return np.round(V_k, decimals=1)

In [72]:
states = np.zeros((10, 10))

actions = [(1, 0),    # right
           (-1, 0),   # left
           (0, 1),    # up
           (0, -1)]   # down

probabilities = [0.7, 0.1]

rewards = np.zeros((10,10))
rewards[8][7] = 10
rewards[7][2] = 3
rewards[3][4] = -5
rewards[3][7] = -10

output = Value_iteration(S = states, 
                A = actions, 
                P = probabilities, 
                R = rewards, 
                cutoff=1000,
                discount_rate=0.9)

In [73]:
print(output.T)

[[ 0.9  1.4  1.7  2.   2.5  2.9  3.5  4.   3.6  3. ]
 [ 1.3  1.7  2.1  2.5  3.   3.6  4.2  4.9  4.3  3.7]
 [ 1.5  1.9  2.3  2.8  3.4  4.1  5.   6.   5.1  4.5]
 [ 1.4  1.8  2.1  2.4  3.4  4.1  4.8  5.6  6.   5.4]
 [ 1.4  1.8  1.8 -2.   3.5  4.6  5.5  6.4  7.2  6.5]
 [ 1.7  2.1  2.6  3.2  4.5  5.5  6.5  7.6  8.7  7.7]
 [ 1.6  2.1  2.5  3.1  5.1  6.4  7.6  9.  10.6  9.1]
 [ 1.5  1.8  1.4 -7.   4.7  7.   8.7 10.6 13.  10.7]
 [ 1.7  2.1  2.6  3.2  5.1  6.4  7.6  9.  10.6  9.1]
 [ 1.7  2.3  2.9  3.6  4.5  5.4  6.5  7.6  8.8  7.7]]


In [36]:
V0 = np.zeros(states.shape) 
V = V0.astype(float)  
discount_factor = 0.9
values = []
i, j = (8,7)

corners = [(0, 0), (0, len(V) - 1), (len(V) - 1, 0), (len(V) - 1, len(V) - 1)]

for intended_action in actions:
    # this inner for loop should be maximising over all intended actions to tell you which is the best
    summation_term = 0
    
    for possible_action in actions:
        probability_of_occurence = probabilities[0] if possible_action == intended_action else probabilities[1]
        
        # check where we are, and if this is within the grid
        new_i, new_j = i + possible_action[0], j + possible_action[1]
        
        if 0 <= new_i < len(V) and 0 <= new_j < len(V[0]):
            reward = rewards[i, j]  # Immediate reward 
            
            if reward != 0:  # if you are on a reward cell/ move into one, teleport to random corner
                future_value = sum(V[corner] for corner in corners) / 4  
            else:
                future_value = V[new_i, new_j]  # non special transition
        else:
            # stay in place and apply wall penalty if you hit wall 
            # print('this is triggered')
            reward = -1
            future_value = V[i, j]
        summation_term += probability_of_occurence * (reward + discount_factor * future_value)
    
    values.append(summation_term)  
    

In [1]:
len_x, len_y = 10, 10 

states = [(i, j) for i in range(len_x) for j in range(len_y)]
probabilities = {}

corners = [(0, 0), (0, len_y - 1), (len_x - 1, 0), (len_x - 1, len_y - 1)]
reward_states = [(8,7), (7,2), (3,4), (3,7)]
reward_values = [10, 3, -5, -10]

actions = [(1, 0),    # right
           (-1, 0),   # left
           (0, 1),    # up
           (0, -1)]   # down

In [None]:
def populate_probabilities_usual(state, actions):
    sub_dirs = {}
    for index, intended_move in enumerate(actions):
        sub_dirs[index] = {}
        for actual_move in actions:
            value = 0.7 if intended_move == actual_move else 0.1
            sub_dirs[index][(state[0] + actual_move[0], state[1] + actual_move[1])] = value
    return sub_dirs

def populate_probabilities_rewards(state, actions):
    sub_dirs = {}
    for index, _ in enumerate(actions):
        sub_dirs[index] = {}
        for corner in corners:
            sub_dirs[index][(corner)] = 0.25
    return sub_dirs

def populate_probabilities_edge(state, actions):
    sub_dirs = {}

    for index, intended_move in enumerate(actions):
        sub_dirs[index] = {}

        for actual_move in actions:
            new_row = state[0] + actual_move[0]
            new_col = state[1] + actual_move[1]

            # Check if out of bounds
            if (new_row < 0 or new_row >= len_y or
                new_col < 0 or new_col >= len_x):
                # Remain in the same cell 
                new_state = state
            else:
                new_state = (new_row, new_col)

            value = 0.7 if intended_move == actual_move else 0.1
            sub_dirs[index][new_state] = value

    return sub_dirs

In [3]:
for state in states:
    i, j = state
    if state in reward_states and reward_values[reward_states.index(state)]:
        probabilities[state] = populate_probabilities_rewards(state, actions)
    elif 0 < i < len_x-1 and 0 < j < len_y-1:
        # we are in standard operating conditions, make usual sub directories
        probabilities[state] = populate_probabilities_usual(state, actions)
    else: # we must be at an edge
        probabilities[state] = populate_probabilities_edge(state, actions)

In [4]:
probabilities[(9,7)]

{0: {(9, 7): 0.7, (8, 7): 0.1, (9, 8): 0.1, (9, 6): 0.1},
 1: {(9, 7): 0.1, (8, 7): 0.7, (9, 8): 0.1, (9, 6): 0.1},
 2: {(9, 7): 0.1, (8, 7): 0.1, (9, 8): 0.7, (9, 6): 0.1},
 3: {(9, 7): 0.1, (8, 7): 0.1, (9, 8): 0.1, (9, 6): 0.7}}

In [20]:
# next, we have to populate the rewards array. this will be another dict except only 2 layered
def populate_reward_cells(actions, reward_states, reward_values):
    rewards = {}
    temp_dict = {}
    for index, state in enumerate(reward_states):
        for action in range(len(actions)):
            temp_dict[action] = reward_values[index]
        rewards[state] = temp_dict.copy()
    return rewards

def populate_penalising_edges(rewards):
    ''' 
    this has one hell of an edge case, where you could be in the cell which is on the perimeter

    this is the sort of thing we will be handling later, and for now we will assume you cannot 
    have a reward cell on the perimeter 
    '''
    for s in states:
        if s not in rewards:
            rewards[s] = {}

        for action_index, (steps_right, steps_up) in enumerate(actions):
            new_row = s[0] + steps_right
            new_col = s[1] + steps_up

            # Check if this move goes out of bounds
            if (new_row < 0 or new_row >= len_y or
                new_col < 0 or new_col >= len_x):
                rewards[s][action_index] = -1
            else:
                 if action_index not in rewards[s]:
                    rewards[s][action_index] = 0

    return rewards

In [21]:
rewards = populate_reward_cells(actions, reward_states, reward_values)
rewards = populate_penalising_edges(rewards)

In [22]:
rewards[(8,7)]

{0: 10, 1: 10, 2: 10, 3: 10}

In [48]:
from MDP import GenericMDP
mdp_solver = GenericMDP(states, actions, probabilities, rewards, 0.9, 20000)

In [49]:
dictionary = mdp_solver._value_iteration()
array = np.zeros((len_x, len_y))
for item in dictionary:
    array[item[0], item[1]] = dictionary[item]

array.T

array([[ 1.37700006,  1.75631032,  2.06727055,  2.41229572,  2.80728923,
         3.25946591,  3.77589807,  4.29849387,  3.91585833,  3.35391213],
       [ 1.6288284 ,  1.91035485,  2.26002193,  2.67269485,  3.1562562 ,
         3.71824628,  4.37436645,  5.11011325,  4.56303896,  4.2851314 ],
       [ 1.71978096,  2.02087277,  2.42486539,  2.93049844,  3.56180527,
         4.2775603 ,  5.14283059,  6.22044812,  5.34133284,  5.05864124],
       [ 1.61395853,  1.84994841,  2.14856498,  2.53089662,  3.54593743,
         4.24858208,  4.94749848,  5.70788112,  6.18332563,  5.93171706],
       [ 1.6494942 ,  1.88019617,  1.88185409, -1.77955188,  3.56652832,
         4.75048039,  5.62562489,  6.58137558,  7.3889553 ,  6.96204048],
       [ 1.88343777,  2.21568169,  2.66575438,  3.27308018,  4.60834409,
         5.58863084,  6.60394126,  7.77199058,  8.91039452,  8.1533275 ],
       [ 1.88438252,  2.19477089,  2.61039606,  3.22294645,  5.24379883,
         6.50948493,  7.75893531,  9.17996279

{(0, 0): np.float64(0.0),
 (0, 1): np.float64(0.0),
 (0, 2): np.float64(0.0),
 (0, 3): np.float64(0.0),
 (0, 4): np.float64(0.0),
 (0, 5): np.float64(0.0),
 (0, 6): np.float64(0.0),
 (0, 7): np.float64(0.0),
 (0, 8): np.float64(0.0),
 (0, 9): np.float64(0.0),
 (1, 0): np.float64(0.0),
 (1, 1): np.float64(0.0),
 (1, 2): np.float64(0.0),
 (1, 3): np.float64(0.0),
 (1, 4): np.float64(0.0),
 (1, 5): np.float64(0.0),
 (1, 6): np.float64(0.0),
 (1, 7): np.float64(0.0),
 (1, 8): np.float64(0.0),
 (1, 9): np.float64(0.0),
 (2, 0): np.float64(0.0),
 (2, 1): np.float64(0.0),
 (2, 2): np.float64(0.0),
 (2, 3): np.float64(0.0),
 (2, 4): np.float64(0.0),
 (2, 5): np.float64(0.0),
 (2, 6): np.float64(0.0),
 (2, 7): np.float64(0.0),
 (2, 8): np.float64(0.0),
 (2, 9): np.float64(0.0),
 (3, 0): np.float64(0.0),
 (3, 1): np.float64(0.0),
 (3, 2): np.float64(0.0),
 (3, 3): np.float64(0.0),
 (3, 4): np.float64(-5.0),
 (3, 5): np.float64(0.0),
 (3, 6): np.float64(0.0),
 (3, 7): np.float64(-10.0),
 (3, 8): 