# Production and Maintenance Planning (POMDP)

In [3]:
import numpy as np

# Grid Dimensions
rows, cols = 3, 4  # Dimensions of the grid (3 rows, 4 columns)
num_states = rows * cols  # Total number of states

# Actions and Observations
actions = ['continue to manufacture', 'shut down and do maintenance']
observations = ['o1', 'o2', 'o3']

# Transition Probabilities
transition_probabilities = {
    0: {0: [(0, 0.5), (1, 0.4), (2, 0.1)], 1: [(0, 1.0)]},
    1: {0: [(1, 0.5), (2, 0.4), (3, 0.1)], 1: [(0, 0.9), (1, 0.1)]},
    2: {0: [(2, 0.5), (3, 0.5)], 1: [(0, 0.6), (1, 0.3), (2, 0.1)]},
    3: {0: [(3, 1.0)], 1: [(0, 0.3), (1, 0.3), (2, 0.3), (3, 0.1)]},
    4: {0: [(1, 0.25), (2, 0.25), (3, 0.25), (4, 0.25)], 1: [(4, 1.0)]},
    5: {0: [(2, 0.6), (3, 0.2), (5, 0.2)], 1: [(4, 0.9), (5, 0.1)]},
    6: {0: [(3, 0.6), (6, 0.2), (7, 0.2)], 1: [(4, 0.6), (5, 0.3), (6, 0.1)]},
    7: {0: [(3, 0.3), (7, 0.7)], 1: [(4, 0.3), (5, 0.3), (6, 0.3), (7, 0.1)]},
    8: {0: [(3, 0.25), (5, 0.25), (6, 0.25), (8, 0.25)], 1: [(8, 1.0)]},
    9: {0: [(3, 0.25), (6, 0.25), (7, 0.25), (9, 0.25)], 1: [(8, 0.9), (9, 0.1)]},
    10: {0: [(7, 0.4), (10, 0.3), (11, 0.3)], 1: [(8, 0.6), (9, 0.3), (10, 0.1)]},
    11: {0: [(11, 1.0)], 1: [(8, 0.3), (9, 0.3), (10, 0.3), (11, 0.1)]}
}

# Rewards: R(s, a, s') = reward for transitioning from s -> s' with action a
reward_action = {
    (0, 0, 0): -40, (0, 0, 1): -45, (0, 0, 2): -50, 
    (0, 1, 0): -40,
    (1, 0, 1): -40, (1, 0, 2): -45, (1, 0, 3): -50, 
    (1, 1, 0): -70, (1, 1, 1): -40,
    (2, 0, 2): -40, (2, 0, 3): -45, 
    (2, 1, 0): -70, (2, 1, 1): -70, (2, 1, 2): -40,
    (3, 0, 3): -40, 
    (3, 1, 0): -90, (3, 1, 1): -70, (3, 1, 2): -70, (3, 1, 3): -40,
    (4, 0, 1): -80, (4, 0, 2): -85, (4, 0, 3): -90, (4, 0, 4): -90,
    (4, 1, 4): -80,
    (5, 0, 2): -80, (5, 0, 3): -85, (5, 0, 5): -90,
    (5, 1, 4): -110, (5, 1, 5): -80,
    (6, 0, 3): -90, (6, 0, 6): -90, (6, 0, 7): -110,
    (6, 1, 4): -110, (6, 1, 5): -110, (6, 1, 6): -80,
    (7, 0, 3): -90, (7, 0, 7): -90,
    (7, 1, 4): -130, (7, 1, 5): -110, (7, 1, 6): -110, (7, 1, 7): -80,
    (8, 0, 3): -110, (8, 0, 5): -80, (8, 0, 6): -110, (8, 0, 8): -110,
    (8, 1, 8): -110,
    (9, 0, 3): -110, (9, 0, 6): -80, (9, 0, 7): -110, (9, 0, 9): -110,
    (9, 1, 8): -140, (9, 1, 9): -110,
    (10, 0, 7): -110, (10, 0, 10): -110, (10, 0, 11): -140,
    (10, 1, 8): -140, (10, 1, 9): -140, (10, 1, 10): -110,
    (11, 0, 11): -110,
    (11, 1, 8): -160, (11, 1, 9): -140, (11, 1, 10): -140, (11, 1, 11): -110
}


# QMDP

In [8]:
import numpy as np

# Grid dimensions
rows, cols = 3, 4  # 3 rows and 4 columns
num_states = rows * cols  # Total number of states
num_actions = len(actions)  # Number of actions

# Discount factor
gamma = 0.9

# Initial belief (Uniform distribution)
initial_belief = np.ones(num_states) / num_states  # Uniform belief

# Initialize alpha vectors (one for each action)
alpha_vectors = np.zeros((num_actions, num_states))

# Function to update alpha vectors
def update_alpha(alpha_vectors, transition_probabilities, reward_action, gamma):
    new_alpha_vectors = np.zeros_like(alpha_vectors)
    for a in range(num_actions):  # Iterate over actions
        for s in range(num_states):  # Iterate over states
            # Immediate reward for (s, a)
            immediate_reward = sum(
                prob * reward_action.get((s, a, s_prime), 0)
                for s_prime, prob in transition_probabilities.get(s, {}).get(a, [])
            )
            # Future reward based on max of previous alpha vectors
            future_reward = 0
            for s_prime, prob in transition_probabilities.get(s, {}).get(a, []):
                max_alpha = max(alpha_vectors[:, s_prime])
                future_reward += prob * max_alpha
            
            # Update alpha value for action a at state s
            new_alpha_vectors[a, s] = immediate_reward + gamma * future_reward
    return new_alpha_vectors

# Compute expected value of an action given the belief
def compute_action_value(alpha_vectors, belief):
    action_values = []
    for a in range(num_actions):
        value = sum(belief[s] * alpha_vectors[a, s] for s in range(num_states))
        action_values.append(value)
    return action_values

# Number of iterations
num_iterations = 1000

# Print the initial belief
print("Initial Belief for All States:")
for state in range(num_states):
    print(f"State {state}: {initial_belief[state]:.4f}")

# Iteratively update alpha vectors
for _ in range(num_iterations):
    alpha_vectors = update_alpha(alpha_vectors, transition_probabilities, reward_action, gamma)

# Print the final alpha vectors
print("\nFinal Alpha Vectors (One Per Action):")
for a in range(num_actions):
    print(f"Action '{actions[a]}': {alpha_vectors[a]}")

# Compute the best action based on the initial belief
action_values = compute_action_value(alpha_vectors, initial_belief)
best_action = np.argmax(action_values)

# Print the results
print("\nOptimal Action Based on Initial Belief:")
print(f"Best action: {actions[best_action]}")
print("Action values:", action_values)


Initial Belief for All States:
State 0: 0.0833
State 1: 0.0833
State 2: 0.0833
State 3: 0.0833
State 4: 0.0833
State 5: 0.0833
State 6: 0.0833
State 7: 0.0833
State 8: 0.0833
State 9: 0.0833
State 10: 0.0833
State 11: 0.0833

Final Alpha Vectors (One Per Action):
Action 'continue to manufacture': [-406.44380165 -408.42975207 -404.54545455 -400.         -463.44441482
 -455.43237251 -495.51746869 -535.13513514 -524.46930873 -547.60882046
 -658.51749869 -704.40419528]
Action 'shut down and do maintenance': [-400.         -427.75867769 -429.68512397 -436.50330579 -497.09997334
 -523.37888953 -524.82329676 -543.04861129 -582.02237786 -611.10493392
 -624.25033873 -660.44910587]

Optimal Action Based on Initial Belief:
Best action: continue to manufacture
Action values: [-500.32901854876224, -521.6770528942701]


# Point-based value iteration

In [25]:
import numpy as np
import matplotlib.pyplot as plt

# Grid Dimensions and Actions
rows, cols = 3, 4
num_states = rows * cols
actions = ['continue to manufacture', 'shut down and do maintenance']
observations = ['o1', 'o2', 'o3']
num_actions = len(actions)
num_observations = len(observations)

# Discount Factor
gamma = 0.9

# Transition Probabilities
transition_probabilities = {
    0: {0: [(0, 0.5), (1, 0.4), (2, 0.1)], 1: [(0, 1.0)]},
    1: {0: [(1, 0.5), (2, 0.4), (3, 0.1)], 1: [(0, 0.9), (1, 0.1)]},
    2: {0: [(2, 0.5), (3, 0.5)], 1: [(0, 0.6), (1, 0.3), (2, 0.1)]},
    3: {0: [(3, 1.0)], 1: [(0, 0.3), (1, 0.3), (2, 0.3), (3, 0.1)]},
    4: {0: [(1, 0.25), (2, 0.25), (3, 0.25), (4, 0.25)], 1: [(4, 1.0)]},
    5: {0: [(2, 0.6), (3, 0.2), (5, 0.2)], 1: [(4, 0.9), (5, 0.1)]},
    6: {0: [(3, 0.6), (6, 0.2), (7, 0.2)], 1: [(4, 0.6), (5, 0.3), (6, 0.1)]},
    7: {0: [(3, 0.3), (7, 0.7)], 1: [(4, 0.3), (5, 0.3), (6, 0.3), (7, 0.1)]},
    8: {0: [(3, 0.25), (5, 0.25), (6, 0.25), (8, 0.25)], 1: [(8, 1.0)]},
    9: {0: [(3, 0.25), (6, 0.25), (7, 0.25), (9, 0.25)], 1: [(8, 0.9), (9, 0.1)]},
    10: {0: [(7, 0.4), (10, 0.3), (11, 0.3)], 1: [(8, 0.6), (9, 0.3), (10, 0.1)]},
    11: {0: [(11, 1.0)], 1: [(8, 0.3), (9, 0.3), (10, 0.3), (11, 0.1)]}
}

# Rewards: R(s, a, s') = reward for transitioning from s -> s' with action a
reward_action = {
    (0, 0, 0): -40, (0, 0, 1): -45, (0, 0, 2): -50, 
    (0, 1, 0): -40,
    (1, 0, 1): -40, (1, 0, 2): -45, (1, 0, 3): -50, 
    (1, 1, 0): -70, (1, 1, 1): -40,
    (2, 0, 2): -40, (2, 0, 3): -45, 
    (2, 1, 0): -70, (2, 1, 1): -70, (2, 1, 2): -40,
    (3, 0, 3): -40, 
    (3, 1, 0): -90, (3, 1, 1): -70, (3, 1, 2): -70, (3, 1, 3): -40,
    (4, 0, 1): -80, (4, 0, 2): -85, (4, 0, 3): -90, (4, 0, 4): -90,
    (4, 1, 4): -80,
    (5, 0, 2): -80, (5, 0, 3): -85, (5, 0, 5): -90,
    (5, 1, 4): -110, (5, 1, 5): -80,
    (6, 0, 3): -90, (6, 0, 6): -90, (6, 0, 7): -110,
    (6, 1, 4): -110, (6, 1, 5): -110, (6, 1, 6): -80,
    (7, 0, 3): -90, (7, 0, 7): -90,
    (7, 1, 4): -130, (7, 1, 5): -110, (7, 1, 6): -110, (7, 1, 7): -80,
    (8, 0, 3): -110, (8, 0, 5): -80, (8, 0, 6): -110, (8, 0, 8): -110,
    (8, 1, 8): -110,
    (9, 0, 3): -110, (9, 0, 6): -80, (9, 0, 7): -110, (9, 0, 9): -110,
    (9, 1, 8): -140, (9, 1, 9): -110,
    (10, 0, 7): -110, (10, 0, 10): -110, (10, 0, 11): -140,
    (10, 1, 8): -140, (10, 1, 9): -140, (10, 1, 10): -110,
    (11, 0, 11): -110,
    (11, 1, 8): -160, (11, 1, 9): -140, (11, 1, 10): -140, (11, 1, 11): -110
}

observation_probabilities = {
    0: {'o1': 0.9, 'o2': 0.1, 'o3': 0.0},
    1: {'o1': 0.8, 'o2': 0.2, 'o3': 0.0},
    2: {'o1': 0.7, 'o2': 0.2, 'o3': 0.1},
    3: {'o1': 0.1, 'o2': 0.5, 'o3': 0.4},
    4: {'o1': 0.9, 'o2': 0.1, 'o3': 0.0},
    5: {'o1': 0.8, 'o2': 0.2, 'o3': 0.0},
    6: {'o1': 0.7, 'o2': 0.2, 'o3': 0.1},
    7: {'o1': 0.1, 'o2': 0.5, 'o3': 0.4},
    8: {'o1': 0.9, 'o2': 0.1, 'o3': 0.0},
    9: {'o1': 0.8, 'o2': 0.2, 'o3': 0.0},
    10: {'o1': 0.7, 'o2': 0.2, 'o3': 0.1},
    11: {'o1': 0.1, 'o2': 0.5, 'o3': 0.4}
}

# Generate Random Beliefs
m = 10  # Number of belief points
# Number of iterations
num_iterations = 10
beliefs = np.random.dirichlet(np.ones(num_states), size=m)

# Initialize Alpha Vectors
alpha_vectors = [np.zeros(num_states) for _ in range(m)]

def update_belief(b, action, observation):
    """Update belief b' given current belief b, action, and observation."""
    new_belief = np.zeros(num_states)
    
    for s_prime in range(num_states):
        prob_sum = 0
        for s in range(num_states):
            transitions = transition_probabilities.get(s, {}).get(action, [])
            prob_sum += sum(p for sp, p in transitions if sp == s_prime) * b[s]
        new_belief[s_prime] = prob_sum * observation_probabilities[s_prime].get(observation, 0)
    
    new_belief /= np.sum(new_belief)  # Normalize new belief
    return new_belief

def update_alpha_vector(b, action):
    """Compute a new alpha vector for a given action."""
    new_alpha = np.zeros(num_states)
    
    for s in range(num_states):
        # Calculate immediate reward R(s, a, s')
        immediate_reward = sum(
            prob * reward_action.get((s, action, s_prime), 0)
            for s_prime, prob in transition_probabilities.get(s, {}).get(action, [])
        )
        
        # Calculate future reward: sum over states and observations
        future_reward = 0
        for s_prime, prob in transition_probabilities.get(s, {}).get(action, []):
            for o in observations:
                obs_prob = observation_probabilities[s_prime].get(o, 0)
                b_prime = update_belief(b, action, o)
                future_reward += prob * obs_prob * max(
                    np.dot(alpha_vectors[i], b_prime) for i in range(len(alpha_vectors))
                )

        # Update alpha vector with discount factor
        new_alpha[s] = immediate_reward + gamma * future_reward
    
    return new_alpha

### Backup Function to Find Best Action
def backup(belief):
    """Backup step to find the best alpha vector and action for a given belief."""
    best_value = -np.inf
    best_alpha = None
    best_action = None

    for a in range(num_actions):
        alpha = update_alpha_vector(belief, a)
        value = np.dot(alpha, belief)  # V(b) = α_a^T * b

        if value > best_value:
            best_value = value
            best_alpha = alpha
            best_action = a

    return best_alpha, best_action

# Plot Alpha Vectors Evolution
def plot_alpha_vectors(alpha_vectors, iteration):
    plt.figure(figsize=(12, 6))
    for i, alpha in enumerate(alpha_vectors):
        plt.plot(range(len(alpha)), alpha, label=f"Alpha Vector {i}")

    plt.title(f"Alpha Vectors after Iteration {iteration}")
    plt.xlabel("State")
    plt.ylabel("Alpha Vector Value")
    plt.legend()
    plt.grid(True)
    plt.show()
    
for iteration in range(num_iterations):
    new_alpha_vectors = []
    
    # Iterate over belief points
    for belief in beliefs:
        best_alpha, _ = backup(belief)
        new_alpha_vectors.append(best_alpha)

    alpha_vectors = new_alpha_vectors  # Update new alpha vectors list
    # plot_alpha_vectors(alpha_vectors, iteration + 1)  # Plot the evolution of alpha vectors

# Output Optimal Action with Final Alpha Vectors
print("\nOptimal Action Based on Each Belief:")
for i, belief in enumerate(beliefs):
    print(f"\nBelief {i}: {belief}")

    action_values = []
    alpha_vectors_action = []

    for a in range(num_actions):
        alpha = update_alpha_vector(belief, a)
        alpha_vectors_action.append(alpha)
        value = np.dot(alpha, belief)  # V(b) = α_a^T * b
        action_values.append(value)

        print(f"  Alpha vector for action '{actions[a]}': {alpha}")
        print(f"  Value for action '{actions[a]}': {value}")

    best_action = np.argmax(action_values)
    print(f"  -> Best action: '{actions[best_action]}' with value {action_values[best_action]}")


Optimal Action Based on Each Belief:

Belief 0: [0.06113196 0.20967194 0.05029107 0.10603428 0.01430722 0.15209329
 0.21423819 0.04601897 0.01569096 0.11352841 0.01172777 0.00526594]
  Alpha vector for action 'continue to manufacture': [-364.306557   -363.73715319 -362.09964719 -358.45026496 -406.72490212
 -403.37647662 -412.91001785 -408.45026496 -422.97490212 -422.2086473
 -438.1398943  -428.45026496]
  Value for action 'continue to manufacture': -391.1463125460281
  Alpha vector for action 'shut down and do maintenance': [-374.73095728 -401.71995504 -401.61204228 -407.0965926  -414.73095728
 -441.71995504 -441.61204228 -447.0965926  -444.73095728 -471.71995504
 -471.61204228 -477.0965926 ]
  Value for action 'shut down and do maintenance': -427.3777028316442
  -> Best action: 'continue to manufacture' with value -391.1463125460281

Belief 1: [0.00532849 0.25262461 0.06374673 0.15351748 0.00841892 0.09439538
 0.16411752 0.11799836 0.00263182 0.09657412 0.02560797 0.0150386 ]
  Alpha

In [26]:
# Apply one-step lookahead using the provided belief states, transition, and reward model

def compute_one_step_lookahead(belief, action):
    """Compute the one-step lookahead value for a given belief and action."""
    # Compute immediate reward R(b, a) = ∑_s b(s) ∑_s' P(s'|s, a) R(s, a, s')
    immediate_reward = 0
    for s in range(num_states):
        belief_s = belief[s]
        reward_sum = sum(
            prob * reward_action.get((s, action, s_prime), 0)
            for s_prime, prob in transition_probabilities.get(s, {}).get(action, [])
        )
        immediate_reward += belief_s * reward_sum

    # Compute future reward ∑_o P(o|b,a) * U(Update(b, a, o))
    future_reward = 0
    for o in observations:
        # P(o|b,a) = ∑_s' P(o|s', a) * ∑_s P(s'|s, a) * b(s)
        prob_observation = 0
        for s_prime in range(num_states):
            obs_prob = observation_probabilities[s_prime].get(o, 0)
            transition_prob_sum = sum(
                prob * belief[s]
                for s in range(num_states)
                for sp, prob in transition_probabilities.get(s, {}).get(action, [])
                if sp == s_prime
            )
            prob_observation += obs_prob * transition_prob_sum

        # UΓ(Update(b, a, o)) = max(α_i^T * Update(b, a, o))
        updated_belief = update_belief(belief, action, o)
        max_future_value = max(np.dot(alpha_vector, updated_belief) for alpha_vector in alpha_vectors)
        future_reward += prob_observation * max_future_value

    # Return the total value for action a
    total_value = immediate_reward + gamma * future_reward
    return total_value


# Evaluate the one-step lookahead for the first belief state in the set for all actions
belief_state_index = 0
belief = beliefs[belief_state_index]
lookahead_values = [compute_one_step_lookahead(belief, a) for a in range(num_actions)]

# Determine the best action based on the maximum value
best_action_index = np.argmax(lookahead_values)
best_action_name = actions[best_action_index]

belief, lookahead_values, best_action_index, best_action_name


(array([0.06113196, 0.20967194, 0.05029107, 0.10603428, 0.01430722,
        0.15209329, 0.21423819, 0.04601897, 0.01569096, 0.11352841,
        0.01172777, 0.00526594]),
 [-391.1463125460281, -427.37770283164417],
 0,
 'continue to manufacture')