In [5]:
class MDP:
    def __init__(self):
        self.states = ['s0', 's1', 's2']
        self.actions = ['a0', 'a1']
        self.transitions = {
            's0': {'a0': [('s0', 0.5), ('s1', 0.5)], 'a1': [('s1', 1.0)]},
            's1': {'a0': [('s0', 0.7), ('s2', 0.3)], 'a1': [('s2', 1.0)]},
            's2': {'a0': [('s1', 1.0)], 'a1': [('s0', 1.0)]}
        }
        self.rewards = {
            ('s0', 'a0', 's0'): 1,
            ('s0', 'a0', 's1'): 2,
            ('s0', 'a1', 's1'): 5,
            ('s1', 'a0', 's0'): -1,
            ('s1', 'a0', 's2'): 3,
            ('s1', 'a1', 's2'): 4,
            ('s2', 'a0', 's1'): 2,
            ('s2', 'a1', 's0'): 0,
        }

    def transition_probability(self, current_state, action, next_state):
        if current_state in self.transitions and action in self.transitions[current_state]:
            for next_state_tuple, prob in self.transitions[current_state][action]:
                if next_state_tuple == next_state:
                    return prob
        return 0

    def reward(self, current_state, action, next_state):
        return self.rewards.get((current_state, action, next_state), 0)


In [6]:
import numpy as np

class ValueIteration:
    def __init__(self, mdp, gamma=0.9, theta=0.0001):
        self.mdp = mdp
        self.gamma = gamma
        self.theta = theta
        self.V = {state: 0 for state in self.mdp.states}  # Initialize V(s) = 0 for all states

    def run(self):
        while True:
            delta = 0
            for state in self.mdp.states:
                v = self.V[state]
                # Bellman update
                self.V[state] = max(self.calculate_action_value(state, action) for action in self.mdp.actions)
                delta = max(delta, abs(v - self.V[state]))
            # Stop when the values converge
            if delta < self.theta:
                break
        return self.V

    def calculate_action_value(self, state, action):
        action_value = 0
        for next_state_tuple in self.mdp.transitions[state][action]:
            next_state, prob = next_state_tuple
            reward = self.mdp.reward(state, action, next_state)
            action_value += prob * (reward + self.gamma * self.V[next_state])
        return action_value

    def derive_policy(self):
        policy = {}
        for state in self.mdp.states:
            best_action = None
            best_value = float('-inf')
            for action in self.mdp.actions:
                action_value = self.calculate_action_value(state, action)
                if action_value > best_value:
                    best_value = action_value
                    best_action = action
            policy[state] = best_action
        return policy


In [7]:
# Create an instance of the MDP
mdp = MDP()

# Create an instance of the Value Iteration algorithm
value_iteration = ValueIteration(mdp)

# Run value iteration to find the optimal value function
optimal_values = value_iteration.run()
print("Optimal Value Function:")
for state, value in optimal_values.items():
    print(f"V({state}) = {value:.2f}")

# Derive the optimal policy
optimal_policy = value_iteration.derive_policy()
print("\nOptimal Policy:")
for state, action in optimal_policy.items():
    print(f"π({state}) = {action}")


Optimal Value Function:
V(s0) = 32.47
V(s1) = 30.53
V(s2) = 29.47

Optimal Policy:
π(s0) = a1
π(s1) = a1
π(s2) = a0


In [None]:
import matplotlib.pyplot as plt

class ValueIterationWithTracking(ValueIteration):
    def __init__(self, mdp, gamma=0.9, theta=0.0001):
        super().__init__(mdp, gamma, theta)
        self.history = []

    def run(self):
        while True:
            delta = 0
            values_snapshot = {}
            for state in self.mdp.states:
                v = self.V[state]
                self.V[state] = max(self.calculate_action_value(state, action) for action in self.mdp.actions)
                delta = max(delta, abs(v - self.V[state]))
                values_snapshot[state] = self.V[state]
            self.history.append(values_snapshot)  # Store the value function for each iteration
            if delta < self.theta:
                break
        return self.V

    def plot_value_convergence(self):
        plt.figure()
        iterations = range(len(self.history))
        for state in self.mdp.states:
            values = [v[state] for v in self.history]
            plt.plot(iterations, values, label=f'V({state})')
        plt.xlabel('Iterations')
        plt.ylabel('Value')
        plt.title('Value Function Convergence')
        plt.legend()
        plt.show()

# Create an instance of the Value Iteration with tracking
value_iteration_tracking = ValueIterationWithTracking(mdp)

# Run value iteration
value_iteration_tracking.run()

# Plot the value function convergence
value_iteration_tracking.plot_value_convergence()
