Method 1

In [1]:
import numpy as np

class MDP():
    def __init__(self):
        self.A = [0, 1]
        self.S = [0, 1, 2, 3, 4]
        self.gamma = 0.996

        # Transition probabilities for no investment (action 0)
        P0 = np.array([[0.5, .15, .15, 0, .20],
                      [0, .5, .0, .25, .25],
                      [0, 0, .15, .05, .8],
                      [0, 0, 0, 0, 1],
                      [0, 0, 0, 0, 1]])

        R0 = np.array([0, 0, 0, 10, 0])

        # Transition probabilities for investment (action 1)
        P1 = np.array([[0.5, .25, .15, 0, .10],
                      [0, .5, .0, .35, .15],
                      [0, 0, .20, .05, .75],
                      [0, 0, 0, 0, 1],
                      [0, 0, 0, 0, 1]])

        R1 = np.array([-0.1, -0.1, -0.1, 10, 0])

        self.P = [P0, P1]
        self.R = [R0, R1]

    def step(self, s, a):
        s_prime = np.random.choice(len(self.S), p=self.P[a][s])
        R = self.R[a][s]
        if s_prime == 4:
            done = True
        else:
            done = False
        return s_prime, R, done

    def simulate(self, s, a, π):
        done = False
        t = 0
        history = []
        while not done:
            if t > 0:
                a = π[s]
            s_prime, R, done = self.step(s, a)
            history.append((s, a, R))
            s = s_prime
            t += 1
        return history

    def value_iteration(self):
        V = np.zeros(len(self.S))
        V[3] = 10  # Success state value

        iteration = 0
        while True:
            iteration += 1
            delta = 0
            V_old = V.copy()

            for s in range(3):  # Non-terminal states
                action_values = []
                for a in self.A:
                    immediate_reward = self.R[a][s]
                    future_value = 0
                    for s_prime in range(len(self.S)):
                        prob = self.P[a][s][s_prime]
                        future_value += prob * V_old[s_prime]
                    action_values.append(immediate_reward + self.gamma * future_value)

                V[s] = max(action_values)
                delta = max(delta, abs(V[s] - V_old[s]))

            if delta < 0.0001:
                print(f"Value iteration converged after {iteration} iterations")
                break

        # Get policy from value function
        policy = np.zeros(len(self.S))
        for s in range(3):  # Non-terminal states
            action_values = []
            for a in self.A:
                immediate_reward = self.R[a][s]
                future_value = 0
                for s_prime in range(len(self.S)):
                    prob = self.P[a][s][s_prime]
                    future_value += prob * V[s_prime]
                action_values.append(immediate_reward + self.gamma * future_value)
            policy[s] = np.argmax(action_values)

        policy[3] = -1  # Terminal states
        policy[4] = -1

        return V, policy

    def policy_iteration(self):
        V = np.zeros(len(self.S))
        V[3] = 10  # Success state value
        policy = np.zeros(len(self.S), dtype=int)

        iteration = 0
        while True:
            iteration += 1
            # Policy Evaluation
            while True:
                delta = 0
                for s in range(3):
                    v = V[s]
                    a = policy[s]

                    immediate_reward = self.R[a][s]
                    future_value = 0
                    for s_prime in range(len(self.S)):
                        prob = self.P[a][s][s_prime]
                        future_value += prob * V[s_prime]

                    V[s] = immediate_reward + self.gamma * future_value
                    delta = max(delta, abs(v - V[s]))

                if delta < 0.0001:
                    break

            # Policy Improvement
            policy_stable = True
            for s in range(3):
                old_action = policy[s]

                action_values = []
                for a in self.A:
                    immediate_reward = self.R[a][s]
                    future_value = 0
                    for s_prime in range(len(self.S)):
                        prob = self.P[a][s][s_prime]
                        future_value += prob * V[s_prime]
                    action_values.append(immediate_reward + self.gamma * future_value)

                policy[s] = np.argmax(action_values)

                if old_action != policy[s]:
                    policy_stable = False

            if policy_stable:
                print(f"Policy iteration converged after {iteration} iterations")
                break

        policy[3] = -1  # Terminal states
        policy[4] = -1

        return V, policy

# Create MDP instance and solve using both methods
mdp = MDP()

# Solve using Value Iteration
print("\n=== Value Iteration Results ===")
V_value, policy_value = mdp.value_iteration()

# Solve using Policy Iteration
print("\n=== Policy Iteration Results ===")
V_policy, policy_policy = mdp.policy_iteration()

# Print results for both methods
state_names = ["Phase 0", "Phase 1 (Promising)", "Phase 1 (Disappointing)", "Success", "Failure"]

print("\nValue Function Comparison:")
print("State               Value Iteration    Policy Iteration")
print("-" * 55)
for s in range(len(mdp.S)):
    print(f"{state_names[s]:<20} ${V_value[s]:>8.3f}M        ${V_policy[s]:>8.3f}M")

print("\nOptimal Policy Comparison:")
print("State               Value Iteration    Policy Iteration")
print("-" * 55)
for s in range(len(mdp.S)):
    if policy_value[s] == -1:
        action_value = "Terminal state"
        action_policy = "Terminal state"
    else:
        action_value = "Invest $100k" if policy_value[s] == 1 else "No investment"
        action_policy = "Invest $100k" if policy_policy[s] == 1 else "No investment"
    print(f"{state_names[s]:<20} {action_value:<16} {action_policy}")


=== Value Iteration Results ===
Value iteration converged after 20 iterations

=== Policy Iteration Results ===
Policy iteration converged after 2 iterations

Value Function Comparison:
State               Value Iteration    Policy Iteration
-------------------------------------------------------
Phase 0              $   3.321M        $   3.321M
Phase 1 (Promising)  $   6.745M        $   6.745M
Phase 1 (Disappointing) $   0.585M        $   0.585M
Success              $  10.000M        $  10.000M
Failure              $   0.000M        $   0.000M

Optimal Policy Comparison:
State               Value Iteration    Policy Iteration
-------------------------------------------------------
Phase 0              Invest $100k     Invest $100k
Phase 1 (Promising)  Invest $100k     Invest $100k
Phase 1 (Disappointing) No investment    No investment
Success              Terminal state   Terminal state
Failure              Terminal state   Terminal state


Method 2

In [2]:
import numpy as np

class VaccineMDP():
    def __init__(self):
        # Initialize MDP parameters
        self.A = [0, 1]  # Actions: 0 = no investment, 1 = invest $100k
        self.S = [0, 1, 2, 3, 4]  # States
        self.gamma = 0.996  # Discount factor

        # Transition probabilities for no investment (action 0)
        self.P0 = np.array([
            [0.5, 0.15, 0.15, 0, 0.20],
            [0, 0.5, 0.0, 0.25, 0.25],
            [0, 0, 0.15, 0.05, 0.8],
            [0, 0, 0, 1, 0],  # Success state stays in success
            [0, 0, 0, 0, 1]   # Failure state stays in failure
        ])

        # Transition probabilities for investment (action 1)
        self.P1 = np.array([
            [0.5, 0.25, 0.15, 0, 0.10],
            [0, 0.5, 0.0, 0.35, 0.15],
            [0, 0, 0.20, 0.05, 0.75],
            [0, 0, 0, 1, 0],  # Success state stays in success
            [0, 0, 0, 0, 1]   # Failure state stays in failure
        ])

        # Rewards for each action
        self.R0 = np.array([0, 0, 0, 10, 0])  # No investment
        self.R1 = np.array([-0.1, -0.1, -0.1, 10, 0])  # Investment ($100k = 0.1M)

        self.P = [self.P0, self.P1]
        self.R = [self.R0, self.R1]

    def value_iteration(self, theta=0.0001, max_iterations=1000):
        # Initialize value function with terminal state values
        V = np.zeros(len(self.S))
        V[3] = 10  # Success state value = $10M
        V[4] = 0   # Failure state value = $0M

        for i in range(max_iterations):
            delta = 0
            V_old = V.copy()

            # For each non-terminal state
            for s in range(3):  # Only iterate through states 0, 1, and 2
                # Calculate value for each action
                action_values = []
                for a in self.A:
                    immediate_reward = self.R[a][s]
                    future_value = 0
                    for s_prime in range(len(self.S)):
                        prob = self.P[a][s][s_prime]
                        future_value += prob * V_old[s_prime]
                    action_values.append(immediate_reward + self.gamma * future_value)

                V[s] = max(action_values)
                delta = max(delta, abs(V[s] - V_old[s]))

            if delta < theta:
                print(f"Value iteration converged after {i+1} iterations")
                break

        return V

    def policy_iteration(self, theta=0.0001, max_iterations=1000):
        # Initialize random policy and value function
        policy = np.zeros(len(self.S), dtype=int)
        V = np.zeros(len(self.S))
        V[3] = 10  # Success state value = $10M
        V[4] = 0   # Failure state value = $0M

        for i in range(max_iterations):
            # Policy Evaluation
            while True:
                delta = 0
                for s in range(3):  # Only non-terminal states
                    v = V[s]
                    a = policy[s]

                    # Calculate new value
                    immediate_reward = self.R[a][s]
                    future_value = 0
                    for s_prime in range(len(self.S)):
                        prob = self.P[a][s][s_prime]
                        future_value += prob * V[s_prime]

                    V[s] = immediate_reward + self.gamma * future_value
                    delta = max(delta, abs(v - V[s]))

                if delta < theta:
                    break

            # Policy Improvement
            policy_stable = True
            for s in range(3):  # Only non-terminal states
                old_action = policy[s]

                # Find best action
                action_values = []
                for a in self.A:
                    immediate_reward = self.R[a][s]
                    future_value = 0
                    for s_prime in range(len(self.S)):
                        prob = self.P[a][s][s_prime]
                        future_value += prob * V[s_prime]
                    action_values.append(immediate_reward + self.gamma * future_value)

                policy[s] = np.argmax(action_values)

                if old_action != policy[s]:
                    policy_stable = False

            if policy_stable:
                print(f"Policy iteration converged after {i+1} iterations")
                break

        # Set terminal state policies
        policy[3] = -1  # Success state
        policy[4] = -1  # Failure state

        return V, policy

    def get_optimal_policy(self, V):
        policy = np.zeros(len(self.S))

        # Only compute policy for non-terminal states
        for s in range(3):
            action_values = []
            for a in self.A:
                immediate_reward = self.R[a][s]
                future_value = 0
                for s_prime in range(len(self.S)):
                    prob = self.P[a][s][s_prime]
                    future_value += prob * V[s_prime]
                action_values.append(immediate_reward + self.gamma * future_value)

            policy[s] = np.argmax(action_values)

        policy[3] = -1  # Success state
        policy[4] = -1  # Failure state

        return policy

# Create MDP instance and solve using both methods
mdp = VaccineMDP()

# Solve using Value Iteration
print("\n=== Value Iteration Results ===")
V_value = mdp.value_iteration()
policy_value = mdp.get_optimal_policy(V_value)

# Solve using Policy Iteration
print("\n=== Policy Iteration Results ===")
V_policy, policy_policy = mdp.policy_iteration()

# Print results for both methods
state_names = ["Phase 0", "Phase 1 (Promising)", "Phase 1 (Disappointing)", "Success", "Failure"]

print("\nValue Function Comparison:")
print("State               Value Iteration    Policy Iteration")
print("-" * 55)
for s in range(len(mdp.S)):
    print(f"{state_names[s]:<20} ${V_value[s]:>8.3f}M        ${V_policy[s]:>8.3f}M")

print("\nOptimal Policy Comparison:")
print("State               Value Iteration    Policy Iteration")
print("-" * 55)
for s in range(len(mdp.S)):
    if policy_value[s] == -1:
        action_value = "Terminal state"
        action_policy = "Terminal state"
    else:
        action_value = "Invest $100k" if policy_value[s] == 1 else "No investment"
        action_policy = "Invest $100k" if policy_policy[s] == 1 else "No investment"
    print(f"{state_names[s]:<20} {action_value:<16} {action_policy}")


=== Value Iteration Results ===
Value iteration converged after 20 iterations

=== Policy Iteration Results ===
Policy iteration converged after 2 iterations

Value Function Comparison:
State               Value Iteration    Policy Iteration
-------------------------------------------------------
Phase 0              $   3.321M        $   3.321M
Phase 1 (Promising)  $   6.745M        $   6.745M
Phase 1 (Disappointing) $   0.585M        $   0.585M
Success              $  10.000M        $  10.000M
Failure              $   0.000M        $   0.000M

Optimal Policy Comparison:
State               Value Iteration    Policy Iteration
-------------------------------------------------------
Phase 0              Invest $100k     Invest $100k
Phase 1 (Promising)  Invest $100k     Invest $100k
Phase 1 (Disappointing) No investment    No investment
Success              Terminal state   Terminal state
Failure              Terminal state   Terminal state
