# CS 390-670 Project 4 Markov Decision Processes

# MDP 1: Weather Forecasting

In [1]:
# MDP 1: Weather Forecasting
# This MDP models a weather forecasting problem. The agent has to decide whether to issue a forecast of 
# "rain" or "no rain" based on the current state of the weather. The weather can either be "sunny" or "cloudy".
# The agent receives a reward of +10 for issuing an accurate forecast and a reward of -10 for issuing an inaccurate forecast.

# States: sunny, cloudy
# Actions: rain, no rain
# Rewards: +10 for accurate forecast, -10 for inaccurate forecast
# Transition probabilities:

# If the weather is sunny and the agent issues "rain", there is a 30% chance of rain and a 70% chance of no rain.
# If the weather is sunny and the agent issues "no rain", there is a 70% chance of no rain and a 30% chance of rain.
# If the weather is cloudy and the agent issues "rain", there is a 60% chance of rain and a 40% chance of no rain.
# If the weather is cloudy and the agent issues "no rain", there is a 40% chance of no rain and a 60% chance of rain.


#IMPLEMENTATION
import gym
import numpy as np

# Define the MDP
P = np.array([
    # action 0: sunny
    [[0.9, 0.1], [0.2, 0.8]],

    # action 1: rainy
    [[0.3, 0.7], [0.6, 0.4]]
])

R = np.array([
    # action 0: sunny
    [100, 0],

    # action 1: rainy
    [50, 50]
])

gamma = 0.9

# Define the functions for value iteration and policy iteration
def value_iteration(P, R, gamma, theta=1e-5):
    V = np.zeros(P.shape[0])
    while True:
        Q = np.sum(P * (R + gamma * V), axis=2)
        V_new = np.max(Q, axis=1)
        if np.max(np.abs(V - V_new)) < theta:
            break
        V = V_new
    return V

def policy_iteration(P, R, gamma, theta=1e-5):
    n_states, n_actions, _ = P.shape
    policy = np.zeros(n_states, dtype=int)
    while True:
        V = value_iteration(P, R, gamma, theta=theta)
        Q = np.sum(P * (R + gamma * V), axis=2)
        policy_new = np.argmax(Q, axis=1)
        if np.array_equal(policy, policy_new):
            break
        policy = policy_new
    return policy

# Solve the MDP using value iteration and policy iteration
print("Value iteration results:")
V = value_iteration(P, R, gamma)
print("State values:", V)
policy = policy_iteration(P, R, gamma)
print("Optimal policy:", policy)

print("Policy iteration results:")
policy = policy_iteration(P, R, gamma)
print("Optimal policy:", policy)
V = value_iteration(P, R, gamma)
print("State values:", V)


Value iteration results:
State values: [850.68483805 795.8903175 ]
Optimal policy: [0 1]
Policy iteration results:
Optimal policy: [0 1]
State values: [850.68483805 795.8903175 ]


In [2]:
import numpy as np
import mdptoolbox

# Define the MDP parameters
P = np.array([[[0.5, 0.5, 0.0], [0.0, 0.5, 0.5], [0.0, 0.0, 1.0]],
              [[0.5, 0.0, 0.5], [0.5, 0.0, 0.5], [0.0, 0.0, 1.0]],
              [[0.0, 0.5, 0.5], [0.5, 0.0, 0.5], [0.0, 0.0, 1.0]]])
R = np.array([[10, 0, 0], [0, 5, 0], [0, 0, 1]])
discount = 0.9

print(P)

[[[0.5 0.5 0. ]
  [0.  0.5 0.5]
  [0.  0.  1. ]]

 [[0.5 0.  0.5]
  [0.5 0.  0.5]
  [0.  0.  1. ]]

 [[0.  0.5 0.5]
  [0.5 0.  0.5]
  [0.  0.  1. ]]]


In [3]:
print(R)

[[10  0  0]
 [ 0  5  0]
 [ 0  0  1]]


In [4]:
# Create an MDP object
mdp = mdptoolbox.mdp.PolicyIteration(P, R, discount)

# Solve the MDP using policy iteration
pi_policy = mdp.policy
pi_V = mdp.V
pi_num_iter = mdp.iter

print("Policy Iteration Policy: ", pi_policy)
print("Policy Iteration Optimal Value Function: ", pi_V)
print("Policy Iteration Number of Iterations: ", pi_num_iter)

Policy Iteration Policy:  [0 1 2]
Policy Iteration Optimal Value Function:  [0. 0. 0.]
Policy Iteration Number of Iterations:  0


In [5]:

import pandas as pd
from sklearn.preprocessing import MinMaxScaler, KBinsDiscretizer
import matplotlib.pyplot as plt
import numpy as np
# Preprocess the data by normalizing and discretizing
data = pd.read_csv("C:\\Users\\n\\Downloads\\wines.csv")
scaler = MinMaxScaler()
data_norm = scaler.fit_transform(data.iloc[:,1:])
est = KBinsDiscretizer(n_bins=10, encode='ordinal', strategy='uniform')
data_disc = est.fit_transform(data_norm)
print(data_disc)

[[7. 2. 6. 6. 3. 7. 7. 1. 5. 1. 4. 9. 5.]
 [6. 2. 9. 9. 7. 8. 8. 6. 6. 1. 4. 7. 5.]
 [6. 1. 3. 3. 3. 6. 7. 1. 4. 1. 5. 5. 7.]
 [8. 2. 5. 4. 3. 7. 8. 0. 5. 1. 4. 9. 7.]
 [8. 2. 5. 4. 4. 6. 8. 4. 6. 1. 5. 5. 6.]
 [5. 1. 6. 5. 2. 7. 6. 0. 4. 1. 4. 8. 7.]
 [6. 1. 4. 4. 2. 6. 6. 0. 3. 1. 3. 5. 9.]
 [7. 2. 6. 5. 5. 6. 7. 1. 4. 2. 4. 5. 7.]
 [6. 1. 6. 5. 3. 7. 7. 1. 4. 2. 4. 4. 8.]
 [6. 2. 7. 4. 4. 9. 9. 1. 5. 3. 4. 5. 9.]
 [0. 1. 4. 7. 1. 7. 6. 7. 6. 0. 9. 5. 0.]
 [4. 0. 0. 0. 1. 4. 0. 1. 0. 0. 4. 2. 1.]
 [3. 0. 6. 9. 0. 4. 4. 4. 4. 0. 6. 5. 2.]
 [3. 0. 3. 2. 9. 4. 5. 2. 9. 1. 4. 3. 4.]
 [5. 5. 3. 3. 0. 2. 2. 5. 1. 1. 1. 3. 0.]
 [4. 0. 6. 5. 3. 6. 7. 3. 7. 2. 5. 3. 3.]
 [6. 0. 1. 3. 1. 4. 5. 0. 4. 2. 5. 4. 0.]
 [4. 0. 3. 6. 0. 5. 5. 0. 2. 2. 4. 8. 1.]
 [6. 6. 5. 8. 1. 2. 4. 9. 5. 2. 2. 2. 1.]
 [6. 0. 1. 3. 0. 9. 9. 0. 7. 3. 4. 6. 1.]
 [3. 8. 6. 7. 1. 1. 0. 7. 1. 1. 1. 0. 3.]
 [5. 3. 5. 9. 3. 0. 2. 0. 1. 3. 0. 0. 2.]
 [7. 6. 4. 6. 1. 2. 0. 9. 1. 3. 2. 2. 2.]
 [8. 5. 6. 6. 5. 3. 0. 7. 1. 3. 3.

# MDP 2: Robot Navigation

In [6]:

# Description:
# Imagine a robot that is navigating in a grid world. The robot can move up, down, left, or right, 
# and the objective is to reach the goal state while avoiding obstacles. The robot receives a reward of 
# +10 for reaching the goal state and a reward of -1 for hitting an obstacle. The robot receives a reward of -0.1 for
# each time step it spends in the grid world. The grid world has a finite size, and the robot cannot move outside of 
# the grid world.

# MDP Definition:
# States: The states of the MDP are represented by the robot's location on the grid world.
# Actions: The actions of the MDP are represented by the robot's movement in the grid world (up, down, left, right).
# Rewards: The rewards of the MDP are defined as follows:
# +10 for reaching the goal state.
# -1 for hitting an obstacle.
# -0.1 for each time step spent in the grid world.
# Transition Probabilities: The transition probabilities for each action depend on the robot's 
#     current location and the action taken. If the robot attempts to move into an obstacle or outside the grid world, 
#     it remains in its current location with probability 1.

#IMPLEMENTATION 

import numpy as np
class RobotNavigationMDP:
    def __init__(self, width, height, start_state, end_state, obstacle_states, step_cost=-1, end_reward=10):
        self.width = width
        self.height = height
        self.states = [(x, y) for x in range(width) for y in range(height)]
        self.start_state = start_state
        self.end_state = end_state
        self.obstacle_states = obstacle_states
        self.step_cost = step_cost
        self.end_reward = end_reward

    def actions(self, state):
        if state == self.end_state:
            return [None]
        if state in self.obstacle_states:
            return [None]
        actions = ["north", "south", "east", "west"]
        x, y = state
        if x == 0:
            actions.remove("west")
        elif x == self.width - 1:
            actions.remove("east")
        if y == 0:
            actions.remove("north")
        elif y == self.height - 1:
            actions.remove("south")
        return actions

    def reward(self, state, action, next_state):
        if next_state == self.end_state:
            return self.end_reward
        if next_state in self.obstacle_states:
            return 0
        return self.step_cost

    def transition(self, state, action):
        if action is None:
            return [(1.0, state)]
        next_states = {}
        for a in self.actions(state):
            if a == action:
                prob = 0.7
            else:
                prob = 0.1
            next_state = self._move(state, a)
            if next_state not in next_states:
                next_states[next_state] = 0
            next_states[next_state] += prob
        return [(prob, next_state) for next_state, prob in next_states.items()]

    def _move(self, state, action):
        x, y = state
        if action == "north":
            return (x, y - 1)
        elif action == "south":
            return (x, y + 1)
        elif action == "east":
            return (x + 1, y)
        elif action == "west":
            return (x - 1, y)
        else:
            return state

    def value_iteration(self, gamma, theta):
        V = {s: 0 for s in self.states}
        while True:
            delta = 0
            for s in self.states:
                v = V[s]
                V[s] = max(sum(p * (self.reward(s, a, next_s) + gamma * V[next_s]) for (p, next_s) in self.transition(s, a)) for a in self.actions(s))
                delta = max(delta, abs(v - V[s]))
            if delta < theta:
                break
        policy = {s: max(self.actions(s), key=lambda a: sum(p * (self.reward(s, a, next_s) + gamma * V[next_s]) for (p, next_s) in self.transition(s, a))) for s in self.states}
        return V, policy


In [8]:
print(policy)
print(V)

[0 1]
[850.68483805 795.8903175 ]


In [None]:
def policy_iteration(self, gamma, theta):
    V = {s: 0 for s in self.states}
    policy = {s: self.actions(s)[0] for s in self.states}
    while True:
        while True:
            delta = 0
            for s in self.states:
                v = V[s]
                a = policy[s]
                V[s] = sum([self.transition_probs[s][a][next_s] * (self.rewards[s][a][next_s] + gamma * V[next_s]) 
                            for next_s in self.states])
                delta = max(delta, abs(v - V[s]))
            if delta < theta:
                break
        policy_stable = True
        for s in self.states:
            old_action = policy[s]
            policy[s] = self.actions(s)[np.argmax([sum([self.transition_probs[s][a][next_s] * 
                                                      (self.rewards[s][a][next_s] + gamma * V[next_s]) 
                                                      for next_s in self.states]) 
                                                      for a in self.actions(s)])]
            if old_action != policy[s]:
                policy_stable = False
        if policy_stable:
            return V, policy


In [None]:
#Prints the Delta
print( delta )

In [None]:
print(V)

In [None]:
print(policy)

# Taxi MDP using the gym

In [None]:
import gym

env = gym.make('Taxi-v3')

# Set up MDP parameters
gamma = 0.9
num_states = env.observation_space.n
num_actions = env.action_space.n
theta = 0.0001

# Value iteration algorithm
V = [0] * num_states
delta = float('inf')
iter_count = 0
while delta > theta:
    delta = 0
    for s in range(num_states):
        v = V[s]
        max_q = float('-inf')
        for a in range(num_actions):
            q = 0
            for prob, next_state, reward, done in env.P[s][a]:
                q += prob * (reward + gamma * V[next_state])
            max_q = max(max_q, q)
        V[s] = max_q
        delta = max(delta, abs(v - V[s]))
    iter_count += 1

# Print results
print("Value iteration converged in", iter_count, "iterations")
print("V =", V)

# Policy iteration algorithm
V = [0] * num_states
pi = [0] * num_states
stable_policy = False
iter_count = 0
while not stable_policy:
    # Policy evaluation
    delta = float('inf')
    while delta > theta:
        delta = 0
        for s in range(num_states):
            v = V[s]
            a = pi[s]
            q = 0
            for prob, next_state, reward, done in env.P[s][a]:
                q += prob * (reward + gamma * V[next_state])
            V[s] = q
            delta = max(delta, abs(v - V[s]))

    # Policy improvement
    stable_policy = True
    for s in range(num_states):
        old_action = pi[s]
        max_q = float('-inf')
        for a in range(num_actions):
            q = 0
            for prob, next_state, reward, done in env.P[s][a]:
                q += prob * (reward + gamma * V[next_state])
            if q > max_q:
                max_q = q
                pi[s] = a
        if pi[s] != old_action:
            stable_policy = False

    iter_count += 1

# Print results
print("Policy iteration converged in", iter_count, "iterations")
print("V =", V)
print("pi =", pi)


In [None]:
def get_transition_prob(state, action, possible_state):
    """
    Get the probability of transitioning from state to possible_state given the action taken.
    Args:
        state (tuple): the current state
        action (int): the index of the action taken
        possible_state (tuple): the possible new state resulting from taking the action
    Returns:
        float: the probability of transitioning to possible_state given state and action
    """
    # implement the transition function
    # this could be a deterministic or stochastic function, depending on the MDP
    # for example, it could depend on the Taxi , and color
    # and involve a series of if-else statements or a probability distribution
    return 0.33

In [None]:
# define the states
import itertools
Taxi_levels = ["low", "medium", "high"]
color_levels = ["low", "medium", "high"]
proline_levels = ["low", "medium", "high"]
states = list(itertools.product(Taxi_levels, color_levels, proline_levels))

# define the state space
state_space = {}
for i, state in enumerate(states):
    state_space[state] = i

In [None]:
#Get new state function
def get_new_state(state, action):
    Taxi , color, proline = state
    if action == 0: # low action
        if Taxi  == "low":
            new_Taxi  = "low"
        else:
            new_alcohol = "low" if np.random.random() < 0.8 else "medium"
    elif action == 1: # medium action
        if Taxi  == "low":
            new_Taxi  = "medium"
        elif Taxi  == "medium":
            new_Taxi  = "medium" if np.random.random() < 0.5 else "high"
        else:
            new_Taxi  = "high" if np.random.random() < 0.8 else "medium"
    else: # high action
        if Taxi  == "high":
            new_Taxi  = "high"
        else:
            new_Taxi  = "high" if np.random.random() < 0.8 else "medium"
    return (new_Taxi , color, proline)


In [None]:
#Get wine ratings function
def get_Taxi_rating(state):
    """
    Given a state, returns a rating for the quality of the wine, which is determined by the alcohol content and
    the color intensity of the wine.
    """
    Taxi , color, proline = state
    if Taxi  == "low":
        if color == "low":
            return 1
        elif color == "medium":
            return 2
        else:
            return 3
    elif Taxi  == "medium":
        if color == "low":
            return 2
        elif color == "medium":
            return 4
        else:
            return 6
    else:
        if color == "low":
            return 3
        elif color == "medium":
            return 6
        else:
            return 9


In [None]:
# define the rewards
actions = ["low", "medium", "high"]
rewards = np.zeros((len(actions), len(states)))
for i, state in enumerate(states):
    for j, action in enumerate(actions):
        # get the new state after taking the action
        new_state = get_new_state(state, action)
        # calculate the reward based on the quality rating of the wine
        rating = get_Taxi_rating(new_state)
        reward = rating * 10
        rewards[j, i] = reward
print("Reward is"," " ,rewards[j, i] )

#    THE END