In [5]:
#Dynamic Programming- Policy Evaluation (Prediction), Policy Improvement, Policy Iteration, Value Iteration


In [7]:
import numpy as np

def policy_evaluation(policy, P, rewards, gamma=0.9, theta=1e-6):
    """Performs policy evaluation for a given policy."""
    V = np.zeros(len(P))  # Initialize state values to zero
    while True:
        delta = 0
        for s in range(len(P)):
            v = V[s]
            V[s] = sum([policy[s, a] * sum([P[s][a][s_prime] * (rewards[s][a][s_prime] + gamma * V[s_prime])
                                             for s_prime in range(len(P))])
                        for a in range(len(P[s]))])
            delta = max(delta, abs(v - V[s]))
        if delta < theta:
            break
    return V

def policy_improvement(V, P, rewards, gamma=0.9):
    """Improves the policy given the state-value function V."""
    policy = np.zeros((len(P), len(P[0])))
    for s in range(len(P)):
        q_values = np.zeros(len(P[s]))
        for a in range(len(P[s])):
            q_values[a] = sum([P[s][a][s_prime] * (rewards[s][a][s_prime] + gamma * V[s_prime])
                               for s_prime in range(len(P))])
        best_action = np.argmax(q_values)
        policy[s] = np.eye(len(P[s]))[best_action]  # One-hot encoding of best action
    return policy

def policy_iteration(P, rewards, gamma=0.9):
    """Performs policy iteration to find the optimal policy."""
    policy = np.ones((len(P), len(P[0]))) / len(P[0])  # Initialize uniform random policy
    while True:
        V = policy_evaluation(policy, P, rewards, gamma)
        new_policy = policy_improvement(V, P, rewards, gamma)
        if np.array_equal(new_policy, policy):
            break
        policy = new_policy
    return policy, V

def value_iteration(P, rewards, gamma=0.9, theta=1e-6):
    """Performs value iteration to find the optimal value function and policy."""
    V = np.zeros(len(P))
    while True:
        delta = 0
        for s in range(len(P)):
            q_values = np.zeros(len(P[s]))
            for a in range(len(P[s])):
                q_values[a] = sum([P[s][a][s_prime] * (rewards[s][a][s_prime] + gamma * V[s_prime])
                                   for s_prime in range(len(P))])
            max_q = np.max(q_values)
            delta = max(delta, abs(V[s] - max_q))
            V[s] = max_q
        if delta < theta:
            break
    policy = policy_improvement(V, P, rewards, gamma)
    return policy, V

# Test Example
def test():
    P = [
        [[0.8, 0.2], [0.5, 0.5]],  # Transition probabilities for state 0
        [[0.7, 0.3], [0.4, 0.6]]   # Transition probabilities for state 1
    ]
    rewards = [
        [[1, 0], [0, 1]],  # Rewards for state 0
        [[1, 0], [0, 1]]   # Rewards for state 1
    ]
    gamma = 0.9
    optimal_policy, optimal_values = value_iteration(P, rewards, gamma)
    print("Optimal Policy:", optimal_policy)
    print("Optimal Values:", optimal_values)

test()


Optimal Policy: [[1. 0.]
 [1. 0.]]
Optimal Values: [7.80219045 7.69230102]


In [None]:
#Asynchronous Dynamic Programming, Generalized Policy Iteration,

In [9]:
import numpy as np
import random

def policy_evaluation(policy, P, rewards, gamma=0.9, theta=1e-6):
    """Performs policy evaluation for a given policy."""
    V = np.zeros(len(P))  # Initialize state values to zero
    while True:
        delta = 0
        for s in range(len(P)):
            v = V[s]
            V[s] = sum([policy[s, a] * sum([P[s][a][s_prime] * (rewards[s][a][s_prime] + gamma * V[s_prime])
                                             for s_prime in range(len(P))])
                        for a in range(len(P[s]))])
            delta = max(delta, abs(v - V[s]))
        if delta < theta:
            break
    return V

def policy_improvement(V, P, rewards, gamma=0.9):
    """Improves the policy given the state-value function V."""
    policy = np.zeros((len(P), len(P[0])))
    for s in range(len(P)):
        q_values = np.zeros(len(P[s]))
        for a in range(len(P[s])):
            q_values[a] = sum([P[s][a][s_prime] * (rewards[s][a][s_prime] + gamma * V[s_prime])
                               for s_prime in range(len(P))])
        best_action = np.argmax(q_values)
        policy[s] = np.eye(len(P[s]))[best_action]  # One-hot encoding of best action
    return policy

def policy_iteration(P, rewards, gamma=0.9):
    """Performs policy iteration to find the optimal policy."""
    policy = np.ones((len(P), len(P[0]))) / len(P[0])  # Initialize uniform random policy
    while True:
        V = policy_evaluation(policy, P, rewards, gamma)
        new_policy = policy_improvement(V, P, rewards, gamma)
        if np.array_equal(new_policy, policy):
            break
        policy = new_policy
    return policy, V

def value_iteration(P, rewards, gamma=0.9, theta=1e-6):
    """Performs value iteration to find the optimal value function and policy."""
    V = np.zeros(len(P))
    while True:
        delta = 0
        for s in range(len(P)):
            q_values = np.zeros(len(P[s]))
            for a in range(len(P[s])):
                q_values[a] = sum([P[s][a][s_prime] * (rewards[s][a][s_prime] + gamma * V[s_prime])
                                   for s_prime in range(len(P))])
            max_q = np.max(q_values)
            delta = max(delta, abs(V[s] - max_q))
            V[s] = max_q
        if delta < theta:
            break
    policy = policy_improvement(V, P, rewards, gamma)
    return policy, V

def asynchronous_value_iteration(P, rewards, gamma=0.9, theta=1e-6):
    """Performs asynchronous value iteration, updating states in random order."""
    V = np.zeros(len(P))
    while True:
        delta = 0
        states = list(range(len(P)))
        random.shuffle(states)  # Randomize state update order
        for s in states:
            q_values = np.zeros(len(P[s]))
            for a in range(len(P[s])):
                q_values[a] = sum([P[s][a][s_prime] * (rewards[s][a][s_prime] + gamma * V[s_prime])
                                   for s_prime in range(len(P))])
            max_q = np.max(q_values)
            delta = max(delta, abs(V[s] - max_q))
            V[s] = max_q
        if delta < theta:
            break
    policy = policy_improvement(V, P, rewards, gamma)
    return policy, V

# Test Example
def test():
    P = [
        [[0.8, 0.2], [0.5, 0.5]],  # Transition probabilities for state 0
        [[0.7, 0.3], [0.4, 0.6]]   # Transition probabilities for state 1
    ]
    rewards = [
        [[1, 0], [0, 1]],  # Rewards for state 0
        [[1, 0], [0, 1]]   # Rewards for state 1
    ]
    gamma = 0.9
    optimal_policy, optimal_values = asynchronous_value_iteration(P, rewards, gamma)
    print("Optimal Policy (Asynchronous):", optimal_policy)
    print("Optimal Values (Asynchronous):", optimal_values)

test()


Optimal Policy (Asynchronous): [[1. 0.]
 [1. 0.]]
Optimal Values (Asynchronous): [7.80219123 7.69230099]
