# Project Name: Markov Decision Process (MDP)  by Artificial Intelligence
#Contributor: Rajeev singh sisodiya

Project details:

Markov Decision Process (MDP) is a mathematical framework used to model decision-making in situations where an agent interacts with an environment over a series of discrete time steps. MDPs are a fundamental concept in reinforcement learning and provide a formal structure for studying and solving problems related to decision making under uncertainty.

In an MDP, the agent selects actions in various states, and the environment responds by transitioning the system to new states, providing rewards based on these actions. The key components of an MDP include states, actions, transition probabilities, rewards, and policies. MDPs are built on the Markov property, ensuring that the future state depends only on the current state and action, simplifying the modeling process.

Value functions, such as the state-value and action-value functions, are fundamental to MDPs. These functions estimate the expected cumulative future rewards and guide the agent in making decisions. The Bellman equations capture the recursive relationships between values of states or state-action pairs, facilitating the computation of optimal policies.

The pursuit of an optimal policy, one that maximizes cumulative rewards, is a central goal in MDPs. Algorithms like Q-learning and policy iteration are employed to find these optimal policies efficiently. MDPs have widespread applications in AI, ranging from robotics and autonomous systems to game playing and decision support systems. They provide a robust framework for modeling complex decision-making scenarios and have proven instrumental in advancing AI capabilities in dynamic and uncertain environments.







# Typical Reinforcement Learning Cycle:

Agent: The learner or decision-maker that interacts with the environment.
Environment: The external system with which the agent interacts.

State (S): A representation of the current situation of the environment. The agent perceives the current state of the environment (e.g., game board configuration, robot sensor readings).

Action (A): The set of possible moves or decisions that the agent can make. Based on the state and its policy, the agent chooses an action to execute (e.g., move a piece, turn a knob).

Reward (R): A numerical value indicating the immediate benefit or cost associated with an action in a given state.  The environment gives the agent a reward (positive or negative) based on the chosen action and resulting state change.

Policy (π): A strategy or mapping from states to actions that the agent follows. The agent learns from the experience and updates its policy to improve future actions in similar situations.

In [1]:
import random

# Environment class (replace with your specific environment implementation)
class Environment:
    def __init__(self):
        # Initialize environment state, actions, rewards, etc.
        pass

    def get_state(self):
        # Return the current state of the environment
        pass

    def take_action(self, action):
        # Perform the action and return the next state, reward, and done flag
        pass

# Agent class
class Agent:
    def __init__(self, environment):
        self.environment = environment
        # Initialize policy (e.g., a Q-table or function approximator)
        pass

    def act(self, state):
        # Choose an action based on the current policy and state
        pass

    def update_policy(self, state, action, reward, next_state, done):
        # Update the policy based on the experience
        pass

# Reinforcement Learning loop
def rl_loop(agent, environment, num_episodes):
    for episode in range(num_episodes):
        state = environment.reset()  # Reset environment to initial state
        done = False

        while not done:
            action = agent.act(state)  # Choose action
            next_state, reward, done = environment.take_action(action)  # Take action
            agent.update_policy(state, action, reward, next_state, done)  # Update policy
            state = next_state  # Transition to the next state


The typical reinforcement learning cycle involves an agent interacting with an environment, making decisions, and learning from the feedback received in the form of rewards.

In [3]:
import random

class Environment:
    def __init__(self):
        self.state = 0

    def reset(self):
        self.state = 0

    def step(self, action):
        # Simulate the environment dynamics
        if action == "left":
            self.state = max(0, self.state - 1)
        elif action == "right":
            self.state = min(4, self.state + 1)

        # Provide a reward based on the new state
        if self.state == 4:
            reward = 1
            done = True
        else:
            reward = 0
            done = False

        return self.state, reward, done

class Agent:
    def __init__(self):
        self.q_values = {"left": 0, "right": 0}
        self.epsilon = 0.1  # Exploration-exploitation trade-off

    def select_action(self):
        # Epsilon-greedy strategy for action selection
        if random.uniform(0, 1) < self.epsilon:
            return random.choice(["left", "right"])
        else:
            return max(self.q_values, key=self.q_values.get)

    def update_q_values(self, action, reward):
        # Simple Q-learning update rule
        self.q_values[action] += 0.1 * (reward - self.q_values[action])

# Main Reinforcement Learning Cycle
def main():
    env = Environment()
    agent = Agent()

    total_episodes = 10

    for episode in range(total_episodes):
        env.reset()
        total_reward = 0

        while True:
            action = agent.select_action()
            state, reward, done = env.step(action)

            total_reward += reward

            # Update Q-values based on the observed reward
            agent.update_q_values(action, reward)

            if done:
                print(f"Episode {episode + 1} - Total Reward: {total_reward}")
                break

if __name__ == "__main__":
    main()


Episode 1 - Total Reward: 1
Episode 2 - Total Reward: 1
Episode 3 - Total Reward: 1
Episode 4 - Total Reward: 1
Episode 5 - Total Reward: 1
Episode 6 - Total Reward: 1
Episode 7 - Total Reward: 1
Episode 8 - Total Reward: 1
Episode 9 - Total Reward: 1
Episode 10 - Total Reward: 1


# Why do we need MDP? How does it differ from Bandit Problems?

MDPs extend the concept of bandit problems to sequential decision-making scenarios. While bandit problems involve a single decision point, MDPs deal with situations where decisions have long-term consequences and are made sequentially over time.

To illustrate why we need Markov Decision Processes (MDP) and how they differ from Bandit Problems, let's create a simple example in Python. We'll implement a Multi-Armed Bandit problem and then extend it to a Markov Decision Process scenario.

In [4]:
import numpy as np

class Bandit:
    def __init__(self, num_arms):
        self.num_arms = num_arms
        self.true_means = np.random.normal(0, 1, num_arms)
        self.action_values = np.zeros(num_arms)
        self.action_counts = np.zeros(num_arms)

    def pull_arm(self, arm):
        reward = np.random.normal(self.true_means[arm], 1)
        self.action_counts[arm] += 1
        self.action_values[arm] += (reward - self.action_values[arm]) / self.action_counts[arm]
        return reward

class MDPEnvironment:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions
        self.transition_probabilities = np.random.rand(num_states, num_actions, num_states)
        self.transition_probabilities /= np.sum(self.transition_probabilities, axis=2, keepdims=True)
        self.rewards = np.random.normal(0, 1, (num_states, num_actions))

    def transition(self, state, action):
        next_state = np.random.choice(self.num_states, p=self.transition_probabilities[state, action])
        reward = self.rewards[state, action]
        return next_state, reward

class Agent:
    def __init__(self, num_arms_or_actions):
        self.num_arms_or_actions = num_arms_or_actions
        self.action_values = np.zeros(num_arms_or_actions)
        self.action_counts = np.zeros(num_arms_or_actions)

    def select_action(self):
        return np.argmax(self.action_values)

    def update_action_value(self, action, reward):
        self.action_counts[action] += 1
        self.action_values[action] += (reward - self.action_values[action]) / self.action_counts[action]

# Multi-Armed Bandit
def bandit_example():
    num_arms = 5
    bandit = Bandit(num_arms)
    agent = Agent(num_arms)
    total_pulls = 1000

    for _ in range(total_pulls):
        action = agent.select_action()
        reward = bandit.pull_arm(action)
        agent.update_action_value(action, reward)

    print("Multi-Armed Bandit Example:")
    print(f"True Means: {bandit.true_means}")
    print(f"Estimated Means: {agent.action_values}\n")

# Markov Decision Process
def mdp_example():
    num_states = 5
    num_actions = 2
    environment = MDPEnvironment(num_states, num_actions)
    agent = Agent(num_actions)
    total_steps = 1000

    state = np.random.randint(num_states)  # Initial state

    for _ in range(total_steps):
        action = agent.select_action()
        next_state, reward = environment.transition(state, action)
        agent.update_action_value(action, reward)
        state = next_state

    print("Markov Decision Process Example:")
    print(f"Transition Probabilities:\n{environment.transition_probabilities}")
    print(f"Estimated Action Values: {agent.action_values}")

if __name__ == "__main__":
    bandit_example()
    mdp_example()


Multi-Armed Bandit Example:
True Means: [ 1.1059011  -0.13737328  0.92130453  0.13559085 -0.38338092]
Estimated Means: [1.08380994 0.         0.         0.         0.        ]

Markov Decision Process Example:
Transition Probabilities:
[[[0.47880719 0.18778438 0.08312988 0.01692055 0.233358  ]
  [0.29546345 0.22640316 0.10014194 0.23714636 0.1408451 ]]

 [[0.06522042 0.1487977  0.33506289 0.17528427 0.27563471]
  [0.13487743 0.24304678 0.15147628 0.3111573  0.15944222]]

 [[0.13502925 0.29414492 0.23533696 0.11349399 0.22199488]
  [0.0720785  0.22149613 0.04410567 0.28770461 0.37461509]]

 [[0.20867114 0.06058335 0.14213177 0.34124099 0.24737276]
  [0.0617198  0.3369266  0.06315125 0.18024636 0.35795599]]

 [[0.1337484  0.42162935 0.06636137 0.2621761  0.11608477]
  [0.30722961 0.30480013 0.18037476 0.17892341 0.02867209]]]
Estimated Action Values: [-0.01017791  0.8029741 ]


# Markov Property:

The Markov property states that the future state of the system is independent of the past given the present state. In the context of MDPs, it implies that the current state contains all the information needed to make decisions about the future.

Below is a simple Python example to illustrate the Markov property in the context of a simple state transition process:

In [6]:
class Environment:
    def __init__(self):
        # Initialize environment state, actions, rewards, etc.
        pass

    def get_state(self):
        # Return the current state of the environment
        pass

    def take_action(self, action):
        # Determine next state based only on current state and action
        next_state = self._transition_function(self.current_state, action)

        # Determine reward based only on current state, action, and next state
        reward = self._reward_function(self.current_state, action, next_state)

        # Update current state
        self.current_state = next_state

        return next_state, reward, self._is_terminal_state(next_state)  # Done flag based on terminal state

    # ... (implementations for transition_function, reward_function, is_terminal_state)


In [5]:
import numpy as np

class MarkovChain:
    def __init__(self, transition_matrix):
        self.transition_matrix = transition_matrix
        self.num_states = len(transition_matrix)

    def transition(self, current_state):
        # Simulate a state transition based on the Markov property
        next_state = np.random.choice(self.num_states, p=self.transition_matrix[current_state])
        return next_state

def main():
    # Define a transition matrix for a simple Markov Chain
    # Each row represents the transition probabilities from the current state to all other states
    transition_matrix = np.array([
        [0.7, 0.3],
        [0.2, 0.8]
    ])

    markov_chain = MarkovChain(transition_matrix)

    # Simulate state transitions while maintaining the Markov property
    current_state = np.random.choice(markov_chain.num_states)  # Initial state

    num_steps = 10
    print("State transitions:")
    for _ in range(num_steps):
        print(f"Step: {_}, Current State: {current_state}")
        current_state = markov_chain.transition(current_state)

if __name__ == "__main__":
    main()


State transitions:
Step: 0, Current State: 0
Step: 1, Current State: 0
Step: 2, Current State: 0
Step: 3, Current State: 0
Step: 4, Current State: 0
Step: 5, Current State: 0
Step: 6, Current State: 0
Step: 7, Current State: 0
Step: 8, Current State: 1
Step: 9, Current State: 1


Episodic and Continuous Tasks:

Episodic tasks: Have a well-defined starting and ending point (e.g., games with rounds).
Continuous tasks: Have no natural termination point (e.g., a robot navigating in an environment).

To illustrate the concepts of episodic and continuous tasks in the context of reinforcement learning, let's create a simple Python example. We'll define two environments, one representing an episodic task and the other a continuous task.

# Episodic Tasks:

In [9]:
# Reinforcement Learning loop
def rl_loop(agent, environment, num_episodes):
    for episode in range(num_episodes):
        state = environment.reset()  # Reset environment to initial state
        done = False

        while not done:
            action = agent.act(state)  # Choose action
            next_state, reward, done = environment.take_action(action)  # Take action
            agent.update_policy(state, action, reward, next_state, done)  # Update policy
            state = next_state  # Transition to the next state

def rl_loop_episodic(agent, environment, num_episodes):
    for episode in range(num_episodes):
        state = environment.reset()  # Reset to initial state at the start of each episode
        done = False
        episode_reward = 0  # Track total reward per episode

        while not done:
            action = agent.act(state)
            next_state, reward, done = environment.take_action(action)
            agent.update_policy(state, action, reward, next_state, done)
            state = next_state
            episode_reward += reward

        # Print or log episode reward statistics
        print(f"Episode {episode + 1}: Total Reward = {episode_reward}")


# Continuous Tasks:

In [8]:
# Reinforcement Learning loop
def rl_loop(agent, environment, num_episodes):
    for episode in range(num_episodes):
        state = environment.reset()  # Reset environment to initial state
        done = False

        while not done:
            action = agent.act(state)  # Choose action
            next_state, reward, done = environment.take_action(action)  # Take action
            agent.update_policy(state, action, reward, next_state, done)  # Update policy
            state = next_state  # Transition to the next state


def rl_loop_continuous(agent, environment, num_steps):
    state = environment.reset()  # Initial state
    done = False
    total_reward = 0  # Track total reward across steps

    for step in range(num_steps):
        action = agent.act(state)
        next_state, reward, done = environment.take_action(action)
        agent.update_policy(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        if done:  # Restart if the task ends prematurely
            state = environment.reset()
            done = False

    # Print or log total reward statistics
    print(f"Total Reward over {num_steps} steps: {total_reward}")


In this below machine learning code example , EpisodicTask represents an episodic task with a fixed number of steps per episode, and ContinuousTask represents a continuous task where the episode doesn't have a fixed length. The tasks have a state space of 10 states, and the agent takes random actions (0 or 1) at each step.

The run_task function is used to simulate the agent interacting with the task for a specified number of steps, and it returns the total reward obtained during the interaction.

The main function demonstrates running both episodic and continuous tasks and printing the total rewards obtained.

In [7]:
import numpy as np

class EpisodicTask:
    def __init__(self, num_states):
        self.num_states = num_states
        self.current_state = 0
        self.episode_length = 5
        self.current_step = 0

    def reset(self):
        self.current_state = 0
        self.current_step = 0

    def step(self, action):
        # In an episodic task, the episode terminates after a fixed number of steps
        self.current_step += 1
        if self.current_step >= self.episode_length:
            done = True
        else:
            done = False

        # Reward is 1 if the action leads to the last state, 0 otherwise
        reward = 1 if self.current_state == self.num_states - 1 else 0

        # Transition to the next state
        if not done:
            self.current_state = min(self.num_states - 1, self.current_state + action)

        return self.current_state, reward, done

class ContinuousTask:
    def __init__(self, num_states):
        self.num_states = num_states
        self.current_state = 0

    def reset(self):
        self.current_state = 0

    def step(self, action):
        # In a continuous task, the episode does not have a fixed length
        self.current_state = min(self.num_states - 1, max(0, self.current_state + action))

        # Reward is 1 if the action leads to the last state, 0 otherwise
        reward = 1 if self.current_state == self.num_states - 1 else 0

        # For continuous tasks, we don't have a concept of "done"
        done = False

        return self.current_state, reward, done

def run_task(task, total_steps):
    total_reward = 0
    task.reset()

    for _ in range(total_steps):
        action = np.random.randint(2)  # Random action (0 or 1)
        state, reward, done = task.step(action)
        total_reward += reward

        if done:
            break

    return total_reward

def main():
    num_states = 10
    total_steps = 100

    # Run an episodic task
    episodic_task = EpisodicTask(num_states)
    episodic_reward = run_task(episodic_task, total_steps)
    print(f"Episodic Task Reward: {episodic_reward}")

    # Run a continuous task
    continuous_task = ContinuousTask(num_states)
    continuous_reward = run_task(continuous_task, total_steps)
    print(f"Continuous Task Reward: {continuous_reward}")

if __name__ == "__main__":
    main()


Episodic Task Reward: 0
Continuous Task Reward: 85


# Returns using Discount Factor in Continuous Task:

The return is the cumulative sum of discounted rewards over time. The discount factor (γ) represents the importance of future rewards, and it typically ranges between 0 and 1. Returns with a discount factor in a continuous task can be calculated as the sum of discounted rewards over time. Here's a simple Python example to illustrate this concept:

In [1]:
import numpy as np

class ContinuousTask:
    def __init__(self, num_states, discount_factor=0.9):
        self.num_states = num_states
        self.current_state = 0
        self.discount_factor = discount_factor

    def reset(self):
        self.current_state = 0

    def step(self, action):
        # In a continuous task, the episode does not have a fixed length
        self.current_state = min(self.num_states - 1, max(0, self.current_state + action))

        # Reward is 1 if the action leads to the last state, 0 otherwise
        reward = 1 if self.current_state == self.num_states - 1 else 0

        return self.current_state, reward

def calculate_discounted_return(rewards, discount_factor):
    discounted_return = 0
    discount = 1.0

    for reward in rewards:
        discounted_return += discount * reward
        discount *= discount_factor

    return discounted_return

def run_continuous_task(task, total_steps):
    rewards = []
    task.reset()

    for _ in range(total_steps):
        action = np.random.randint(2)  # Random action (0 or 1)
        state, reward = task.step(action)
        rewards.append(reward)

    return rewards

def main():
    num_states = 10
    total_steps = 5

    discount_factor = 0.9
    continuous_task = ContinuousTask(num_states, discount_factor)

    rewards = run_continuous_task(continuous_task, total_steps)

    print("Rewards:", rewards)
    discounted_return = calculate_discounted_return(rewards, discount_factor)
    print(f"Discounted Return: {discounted_return:.2f}")

if __name__ == "__main__":
    main()


Rewards: [0, 0, 0, 0, 0]
Discounted Return: 0.00


# Policies:

A policy (π) is a mapping from states to actions, indicating what action to take in each state. Policies can be deterministic or stochastic.

Policy Flexibility: RL algorithms can learn both deterministic and stochastic policies, depending on the problem and the desired exploration-exploitation balance.

Policy Updates: Reinforcement Learning algorithms update policies based on experience to improve the agent's decision-making over time.

Action Probability Distribution: Stochastic policies offer the potential to explore different actions and potentially discover better strategies, even if they initially appear less promising.

Deterministic Policies:

Policy Representation: A dictionary maps states to specific actions, indicating the action to take deterministically in each state.

Action Selection: The act_deterministic function simply looks up the state in the policy dictionary and returns the corresponding action.

In [2]:
# Deterministic policy example using a dictionary
deterministic_policy = {
    "state1": "action1",
    "state2": "action2",
    # ... more state-action mappings
}

def act_deterministic(state, policy):
    return policy[state]  # Directly retrieve the action for a given state


Stochastic Policies:

Policy Representation: A dictionary maps states to dictionaries of action probabilities, representing the probability of taking each action in a given state.
Action Selection: The act_stochastic function uses random.choices to randomly select an action based on the probabilities defined in the policy for the current state.

In [3]:
# Stochastic policy example using a dictionary of action probabilities
stochastic_policy = {
    "state1": {"action1": 0.8, "action2": 0.2},
    "state2": {"action1": 0.3, "action2": 0.7},
    # ... more state-action probability mappings
}

import random

def act_stochastic(state, policy):
    action_probabilities = policy[state]
    action = random.choices(list(action_probabilities.keys()), weights=list(action_probabilities.values()))[0]
    return action  # Select action probabilistically


 Below is an example in Python illustrating deterministic and stochastic policies in the context of a simple environment with discrete states and actions:

In [4]:
import numpy as np

class SimpleEnvironment:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions

    def step(self, state, action):
        # Simulate a simple environment with a fixed reward for each action
        reward = np.random.normal(0, 1)
        next_state = (state + 1) % self.num_states
        return next_state, reward

class DeterministicPolicy:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions
        self.policy = np.zeros(num_states, dtype=int)

    def set_policy(self, state, action):
        self.policy[state] = action

    def get_action(self, state):
        return self.policy[state]

class StochasticPolicy:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions
        self.policy_matrix = np.random.rand(num_states, num_actions)
        self.policy_matrix /= np.sum(self.policy_matrix, axis=1, keepdims=True)

    def get_action(self, state):
        return np.random.choice(self.num_actions, p=self.policy_matrix[state])

def run_episode(environment, policy, num_steps):
    total_reward = 0
    state = 0

    for _ in range(num_steps):
        action = policy.get_action(state)
        next_state, reward = environment.step(state, action)
        total_reward += reward
        state = next_state

    return total_reward

def main():
    num_states = 5
    num_actions = 3
    num_steps = 10

    environment = SimpleEnvironment(num_states, num_actions)

    # Deterministic Policy
    deterministic_policy = DeterministicPolicy(num_states, num_actions)
    for state in range(num_states):
        action = np.random.randint(num_actions)
        deterministic_policy.set_policy(state, action)

    total_reward_det = run_episode(environment, deterministic_policy, num_steps)
    print(f"Deterministic Policy - Total Reward: {total_reward_det:.2f}")

    # Stochastic Policy
    stochastic_policy = StochasticPolicy(num_states, num_actions)

    total_reward_stoch = run_episode(environment, stochastic_policy, num_steps)
    print(f"Stochastic Policy - Total Reward: {total_reward_stoch:.2f}")

if __name__ == "__main__":
    main()


Deterministic Policy - Total Reward: -0.18
Stochastic Policy - Total Reward: 0.47


# Value Functions:

Value functions estimate the expected cumulative future rewards of being in a certain state or taking a certain action.

Value Function Types:

State-value function (V(s)): Estimates the value of being in a state.

Action-value function (Q(s, a)): Estimates the value of taking a specific action in a state.

Iterative Updates: RL algorithms iteratively update these value functions based on experience, gradually improving their accuracy.

Temporal Difference Learning: Many algorithms use techniques like TD(0), SARSA, or Q-learning to update value functions based on the difference between estimated and actual rewards.

Function Approximation: For large or continuous state/action spaces, value functions can be approximated using function approximators like neural networks or linear regression.

Let's create a simple example in Python to illustrate value functions in the context of a Markov Decision Process (MDP):

In [8]:
import numpy as np

class SimpleMDP:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions
        self.transition_matrix = np.random.rand(num_states, num_actions, num_states)
        self.transition_matrix /= np.sum(self.transition_matrix, axis=2, keepdims=True)
        self.rewards = np.random.normal(0, 1, (num_states, num_actions))

class ValueFunctions:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions
        self.state_values = np.zeros(num_states)
        self.action_values = np.zeros((num_states, num_actions))

    def update_state_value(self, state, value):
        self.state_values[state] = value

    def update_action_value(self, state, action, value):
        self.action_values[state, action] = value

def value_iteration(mdp, value_functions, discount_factor, num_iterations=100):
    for _ in range(num_iterations):
        # Update state values
        for state in range(mdp.num_states):
            max_action_value = max(value_functions.action_values[state, :])
            value_functions.update_state_value(state, max_action_value)

        # Update action values
        for state in range(mdp.num_states):
            for action in range(mdp.num_actions):
                next_state_values = np.sum(
                    mdp.transition_matrix[state, action, :] * value_functions.state_values * discount_factor
                )
                value_functions.update_action_value(state, action, mdp.rewards[state, action] + next_state_values)

def main():
    num_states = 4
    num_actions = 2
    discount_factor = 0.9

    mdp = SimpleMDP(num_states, num_actions)
    value_functions = ValueFunctions(num_states, num_actions)

    value_iteration(mdp, value_functions, discount_factor)

    print("State Values:")
    print(value_functions.state_values)

    print("\nAction Values:")
    print(value_functions.action_values)

if __name__ == "__main__":
    main()


State Values:
[0.9548376  0.07993539 2.08145235 1.61207434]

Action Values:
[[ 0.95484135  0.03261043]
 [ 0.07993914 -0.18624919]
 [-1.29354857  2.0814561 ]
 [ 1.61207808  0.45627135]]


# State-value Function (V(s)):

The expected return starting from a given state under a specific policy.

In [5]:
# State-value function example using a dictionary
state_values = {
    "state1": 0.5,
    "state2": 1.2,
    # ... more state-value pairs
}

def estimate_state_value(state, state_values):
    return state_values[state]  # Retrieve the estimated value for a given state


# Action-value Function (Q(s, a)):

The expected return starting from a given state, taking a specific action, and following a specific policy thereafter.

In [6]:
# Action-value function example using a nested dictionary
action_values = {
    "state1": {"action1": 0.3, "action2": 1.0},
    "state2": {"action1": 1.5, "action2": 0.8},
    # ... more state-action value pairs
}

def estimate_action_value(state, action, action_values):
    return action_values[state][action]  # Retrieve the estimated value for a given state-action pair


# Bellman Equation:

Describes the relationship between the value of a state (or state-action pair) and the values of its successor states (or state-action pairs).

Recursive Relationship: The Bellman Equation establishes a recursive relationship between the value of a state or state-action pair and the values of its possible successor states or state-action pairs.

Iterative Updates: RL algorithms use this relationship to iteratively update value functions, gradually converging towards optimal values.

Dynamic Programming and Temporal Difference Learning: The Bellman Equation forms the basis for both dynamic programming methods (e.g., Value Iteration, Policy Iteration) and temporal difference learning algorithms (e.g., Q-learning, SARSA).

# State-Value Bellman Equation (V(s))

In [9]:
def bellman_equation_v(state, state_values, environment, gamma):
    # Calculate expected value of successor states
    expected_value = 0
    for next_state, prob in environment.get_possible_next_states(state).items():
        reward = environment.get_reward(state, next_state)
        expected_value += prob * (reward + gamma * state_values[next_state])

    return expected_value  # This represents V(s) according to the Bellman Equation


# Action-Value Bellman Equation (Q(s, a))



In [10]:
def bellman_equation_q(state, action, action_values, environment, gamma):
    # Calculate expected value of taking action in state
    expected_value = 0
    for next_state, prob in environment.get_possible_next_states(state, action).items():
        reward = environment.get_reward(state, action, next_state)
        max_next_q = max(action_values[next_state].values())  # Find max Q-value over possible next actions
        expected_value += prob * (reward + gamma * max_next_q)

    return expected_value  # This represents Q(s, a) according to the Bellman Equation


Let's create a Python example to illustrate the Bellman equation in the context of a Markov Decision Process (MDP):

In [11]:
import numpy as np

class SimpleMDP:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions
        self.transition_matrix = np.random.rand(num_states, num_actions, num_states)
        self.transition_matrix /= np.sum(self.transition_matrix, axis=2, keepdims=True)
        self.rewards = np.random.normal(0, 1, (num_states, num_actions))

class BellmanEquationSolver:
    def __init__(self, mdp, discount_factor):
        self.mdp = mdp
        self.discount_factor = discount_factor
        self.state_values = np.zeros(mdp.num_states)
        self.action_values = np.zeros((mdp.num_states, mdp.num_actions))

    def solve_bellman_equation(self, num_iterations=100):
        for _ in range(num_iterations):
            # Update state values using the Bellman equation
            for state in range(self.mdp.num_states):
                max_action_value = max(self.action_values[state, :])
                self.state_values[state] = max_action_value

            # Update action values using the Bellman equation
            for state in range(self.mdp.num_states):
                for action in range(self.mdp.num_actions):
                    next_state_values = np.sum(
                        self.mdp.transition_matrix[state, action, :] * self.state_values * self.discount_factor
                    )
                    self.action_values[state, action] = self.mdp.rewards[state, action] + next_state_values

def main():
    num_states = 4
    num_actions = 2
    discount_factor = 0.9

    mdp = SimpleMDP(num_states, num_actions)
    bellman_solver = BellmanEquationSolver(mdp, discount_factor)

    bellman_solver.solve_bellman_equation()

    print("State Values:")
    print(bellman_solver.state_values)

    print("\nAction Values:")
    print(bellman_solver.action_values)

if __name__ == "__main__":
    main()


State Values:
[13.6435754  13.61821393 11.64222591 14.75321899]

Action Values:
[[11.50522915 13.64361567]
 [13.61825421 13.18706319]
 [11.64226619 10.96563128]
 [12.2103402  14.75325926]]


# Optimal Policy:

The policy that maximizes the expected return in every state.

Value Function-Based Policy: Optimal policies are often derived from learned value functions, as the optimal action in a state typically corresponds to the action with the highest value.

Dynamic Programming or Temporal Difference Learning: Algorithms like Value Iteration, Policy Iteration, Q-learning, and SARSA can be used to learn optimal value functions and policies iteratively.

Exploration-Exploitation Trade-off: Stochastic policies balance exploration (trying new actions) with exploitation (choosing actions known to be good), which is crucial for effective learning in uncertain environments.

Deterministic Optimal Policy:



In [12]:
# Optimal policy represented as a dictionary mapping states to actions
optimal_policy = {}

# Function to find the action that maximizes the expected value in a state
def find_optimal_action(state, action_values):
    return max(action_values[state], key=action_values[state].get)  # Find the action with the highest Q-value

# Derive the optimal policy based on the learned action-value function
for state in action_values:
    optimal_policy[state] = find_optimal_action(state, action_values)


Stochastic Optimal Policy:

In [13]:
import random

# Optimal policy represented as a dictionary mapping states to action probabilities
optimal_policy = {}

# Function to assign probabilities to actions based on their Q-values
def assign_action_probabilities(state, action_values):
    action_probs = {}
    total_q = sum(action_values[state].values())  # Normalize probabilities
    for action, q_value in action_values[state].items():
        action_probs[action] = q_value / total_q
    return action_probs

# Derive the optimal policy with probabilities
for state in action_values:
    optimal_policy[state] = assign_action_probabilities(state, action_values)


Let's create a Python example to find the optimal policy in the context of a Markov Decision Process (MDP). We'll use the policy iteration algorithm to iteratively improve the policy until it converges to the optimal policy:

In [14]:
import numpy as np

class SimpleMDP:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions
        self.transition_matrix = np.random.rand(num_states, num_actions, num_states)
        self.transition_matrix /= np.sum(self.transition_matrix, axis=2, keepdims=True)
        self.rewards = np.random.normal(0, 1, (num_states, num_actions))

class OptimalPolicySolver:
    def __init__(self, mdp, discount_factor):
        self.mdp = mdp
        self.discount_factor = discount_factor
        self.policy = np.random.randint(0, mdp.num_actions, mdp.num_states)

    def evaluate_policy(self):
        state_values = np.zeros(self.mdp.num_states)
        tol = 1e-6  # Convergence tolerance

        while True:
            prev_state_values = state_values.copy()

            # Evaluate state values using the current policy
            for state in range(self.mdp.num_states):
                action = self.policy[state]
                next_state_values = np.sum(
                    self.mdp.transition_matrix[state, action, :] * state_values * self.discount_factor
                )
                state_values[state] = self.mdp.rewards[state, action] + next_state_values

            # Check for convergence
            if np.max(np.abs(state_values - prev_state_values)) < tol:
                break

        return state_values

    def improve_policy(self, state_values):
        # Improve policy using greedy strategy
        for state in range(self.mdp.num_states):
            action_values = np.zeros(self.mdp.num_actions)
            for action in range(self.mdp.num_actions):
                next_state_values = np.sum(
                    self.mdp.transition_matrix[state, action, :] * state_values * self.discount_factor
                )
                action_values[action] = self.mdp.rewards[state, action] + next_state_values

            # Update policy to the action that maximizes expected return
            self.policy[state] = np.argmax(action_values)

    def solve_optimal_policy(self, max_iterations=100):
        for _ in range(max_iterations):
            state_values = self.evaluate_policy()
            self.improve_policy(state_values)

def main():
    num_states = 4
    num_actions = 2
    discount_factor = 0.9

    mdp = SimpleMDP(num_states, num_actions)
    optimal_policy_solver = OptimalPolicySolver(mdp, discount_factor)

    optimal_policy_solver.solve_optimal_policy()

    print("Optimal Policy:")
    print(optimal_policy_solver.policy)

if __name__ == "__main__":
    main()


Optimal Policy:
[0 0 0 1]


# Bellman Optimality Equation:

Describes the relationship between the optimal value of a state (or state-action pair) and the values of its successor states (or state-action pairs) under the optimal policy.

 The Bellman Optimality Equation expresses the relationship between the optimal value of a state (or state-action pair) and the values of its successor states (or state-action pairs) under the optimal policy. The following Python example illustrates how to solve for the optimal values using the Bellman Optimality Equation in the context of a Markov Decision Process (MDP):

In [15]:
import numpy as np

class SimpleMDP:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions
        self.transition_matrix = np.random.rand(num_states, num_actions, num_states)
        self.transition_matrix /= np.sum(self.transition_matrix, axis=2, keepdims=True)
        self.rewards = np.random.normal(0, 1, (num_states, num_actions))

class BellmanOptimalitySolver:
    def __init__(self, mdp, discount_factor):
        self.mdp = mdp
        self.discount_factor = discount_factor
        self.optimal_state_values = np.zeros(mdp.num_states)
        self.optimal_action_values = np.zeros((mdp.num_states, mdp.num_actions))

    def solve_bellman_optimality_equation(self, num_iterations=100):
        for _ in range(num_iterations):
            # Update state values using the Bellman Optimality Equation
            for state in range(self.mdp.num_states):
                max_action_value = max(self.optimal_action_values[state, :])
                self.optimal_state_values[state] = max_action_value

            # Update action values using the Bellman Optimality Equation
            for state in range(self.mdp.num_states):
                for action in range(self.mdp.num_actions):
                    next_state_values = np.sum(
                        self.mdp.transition_matrix[state, action, :] * self.optimal_state_values * self.discount_factor
                    )
                    self.optimal_action_values[state, action] = self.mdp.rewards[state, action] + next_state_values

    def get_optimal_policy(self):
        # Derive the optimal policy from the computed optimal action values
        optimal_policy = np.argmax(self.optimal_action_values, axis=1)
        return optimal_policy

def main():
    num_states = 4
    num_actions = 2
    discount_factor = 0.9

    mdp = SimpleMDP(num_states, num_actions)
    bellman_optimality_solver = BellmanOptimalitySolver(mdp, discount_factor)

    bellman_optimality_solver.solve_bellman_optimality_equation()

    print("Optimal State Values:")
    print(bellman_optimality_solver.optimal_state_values)

    print("\nOptimal Action Values:")
    print(bellman_optimality_solver.optimal_action_values)

    optimal_policy = bellman_optimality_solver.get_optimal_policy()
    print("\nOptimal Policy:")
    print(optimal_policy)

if __name__ == "__main__":
    main()


Optimal State Values:
[4.96736232 5.55588727 6.36279299 4.36417633]

Optimal Action Values:
[[4.96737815 3.88710943]
 [4.85222433 5.5559031 ]
 [6.36280881 4.9245516 ]
 [4.36419215 4.07047531]]

Optimal Policy:
[0 1 0 0]


State-Value Bellman Optimality Equation (V(s))*



In [16]:
def bellman_optimality_equation_v(state, environment, gamma):
    # Find the optimal action in the state under the optimal policy
    optimal_action = find_optimal_action(state, action_values)  # Assuming action_values represent the optimal Q-function

    # Calculate the expected value of the optimal action
    expected_value = 0
    for next_state, prob in environment.get_possible_next_states(state, optimal_action).items():
        reward = environment.get_reward(state, optimal_action, next_state)
        expected_value += prob * (reward + gamma * state_values[next_state])  # Use state_values to approximate V*(s)

    return expected_value  # This represents V*(s) according to the Bellman Optimality Equation


Action-Value Bellman Optimality Equation (Q(s, a))*

In [17]:
def bellman_optimality_equation_q(state, action, environment, gamma):
    # Calculate the expected value of taking the action in the state under the optimal policy
    expected_value = 0
    for next_state, prob in environment.get_possible_next_states(state, action).items():
        reward = environment.get_reward(state, action, next_state)
        max_next_q = max(action_values[next_state].values())  # Find max Q-value under the optimal policy
        expected_value += prob * (reward + gamma * max_next_q)

    return expected_value  # This represents Q*(s, a) according to the Bellman Optimality Equation


Recursive Relationship: The Bellman Optimality Equation establishes a recursive relationship between the optimal value of a state or state-action pair and the optimal values of its possible successor states or state-action pairs.

Focus on Optimal Policy: It explicitly considers the optimal policy when evaluating expected values.

Foundation for Algorithms: Dynamic programming methods like Value Iteration and Policy Iteration directly solve the Bellman Optimality Equation, while temporal difference learning algorithms like Q-learning indirectly converge to its solution.