# Project: Deep Reinforcement Learning - Value and Policy Iterations
**Author:** Sayed Pedram Haeri Boroujeni  
**Position:** PhD Student, Clemson University  
**Affiliation:** Department of Computer Science  
**Email:** shaerib@g.clemson.edu  
**Date Created:** June 10, 2025  
**Last Updated:** June 15, 2025 

##### 1. Importing Required Libraries

In [2]:
import torch
import gym
import time
import numpy as np
import pandas as pd

##### 2. Checking GPU Availability

In [None]:
# Check if CUDA is available
cuda_available = torch.cuda.is_available()
print(f"CUDA Available: {cuda_available}")

if cuda_available:
    print(f"CUDA Version: {torch.version.cuda}")
    print(f"Number of GPUs: {torch.cuda.device_count()}")
    print(f"Device Name: {torch.cuda.get_device_name()}")
else:
    print("CUDA is not available on this system.")

##### 3. Defining FrozenLake Environment

In [3]:
# If map_name is specified (e.g., "4x4" or "8x8"), the environment uses a predefined map of that size.
env = gym.make("FrozenLake-v1", map_name="4x4", render_mode="human")
print("Observation space:", env.observation_space)
print("Action space:", env.action_space)

Observation space: Discrete(16)
Action space: Discrete(4)


##### 4. Inspecting Terminal and Goal States

In [4]:
nS = env.observation_space.n   # number of states
nA = env.action_space.n        # number of actions
print("Number of States:", nS)
print("Number of Actions:", nA)

desc = env.unwrapped.desc.astype("U")
print("\nFrozenLake Grid (S=start, F=frozen, H=hole, G=goal):")
for row in desc:
    print(" ".join(row))

Number of States: 16
Number of Actions: 4

FrozenLake Grid (S=start, F=frozen, H=hole, G=goal):
S F F F
F H F H
F F F H
H F F G


##### 5. Generating a Random Policy

In [5]:
random_policy = np.ones((nS, nA)) / nA
print("Random Policy (probabilities for each action in each state):")
for i in range(nS):
    print(f" State {i:2d}: {random_policy[i]}")

Random Policy (probabilities for each action in each state):
 State  0: [0.25 0.25 0.25 0.25]
 State  1: [0.25 0.25 0.25 0.25]
 State  2: [0.25 0.25 0.25 0.25]
 State  3: [0.25 0.25 0.25 0.25]
 State  4: [0.25 0.25 0.25 0.25]
 State  5: [0.25 0.25 0.25 0.25]
 State  6: [0.25 0.25 0.25 0.25]
 State  7: [0.25 0.25 0.25 0.25]
 State  8: [0.25 0.25 0.25 0.25]
 State  9: [0.25 0.25 0.25 0.25]
 State 10: [0.25 0.25 0.25 0.25]
 State 11: [0.25 0.25 0.25 0.25]
 State 12: [0.25 0.25 0.25 0.25]
 State 13: [0.25 0.25 0.25 0.25]
 State 14: [0.25 0.25 0.25 0.25]
 State 15: [0.25 0.25 0.25 0.25]


##### 6. Defining Policy Eavluation Function

In [6]:
def policy_evaluation(env, policy, gamma=0.99, theta=1e-10):
    """
        Evaluates a given stochastic policy for a stichastic environment.
    Args:
        env: The OpenAI Gym environment (must have .P, .nS, .nA).
        policy: A 2D NumPy array of shape (nS, nA), where each row is a probability distribution over actions.
        gamma: Discount factor for future rewards.
        theta: Convergence threshold — stops iteration when value change is below this amount.
    Returns:
        V: Value function for each state
        num_iterations: Number of iterations taken to converge
    """
    prev_V = np.zeros(nS)        # Initialize previous value function to zeros   
    eval_iters = 0
    
    while True:
        eval_iters +=1
        delta = 0           # Change in value function                
        V = np.zeros(nS)    # Initialize current value function to zeros
        
        for state in range(nS):
            for action in range(nA):
                for prob, next_state, reward, done in env.P[state][action]:
                    V[state] += policy[state][action] * prob * (reward + gamma * prev_V[next_state])   
                    
        delta = np.max(np.abs(V - prev_V))

        if delta < theta:
            break
        prev_V = V

    return V, eval_iters 

In [7]:
V, eval_iters= policy_evaluation(env, random_policy)
print(f"Policy Evaluation converged after {eval_iters} iterations.")
print("V Values is:")

# Automatically compute grid size (assumes square layout)
grid_size = int(np.sqrt(len(V)))
V_grid = V.reshape((grid_size, grid_size))
for row in V_grid:
    print("  ".join(f"{val:.4f}" for val in row))

Policy Evaluation converged after 93 iterations.
V Values is:
0.0124  0.0104  0.0193  0.0095
0.0148  0.0000  0.0389  0.0000
0.0326  0.0843  0.1378  0.0000
0.0000  0.1703  0.4336  0.0000


##### 7. Defining Policy Improvement Function

In [8]:
def policy_improvement(env, V, gamma=0.99):
    """
        Performs a greedy policy improvement step based on the given value function V.
    Args:
        env: The OpenAI Gym or Gymnasium environment (must have .P)
        V: A 1D NumPy array of shape (nS,) representing the state value function
        gamma: Discount factor
    Returns:
        policy: A 2D NumPy array of shape (nS, nA) representing the improved policy (one-hot greedy)
    """
    policy = np.zeros((nS, nA))
    
    for state in range(nS):
        q_values = np.zeros(nA)
        for action in range(nA):
            for prob, next_state, reward, done in env.P[state][action]:
                q_values[action] += prob * (reward + gamma * V[next_state])
                
        best_action_index = np.argmax(q_values)
        policy[state][best_action_index] = 1.0  # Greedy one-hot policy

    return policy

In [9]:
improved_policy = policy_improvement(env, V)
improved_policy

array([[1., 0., 0., 0.],
       [0., 0., 0., 1.],
       [1., 0., 0., 0.],
       [0., 0., 0., 1.],
       [1., 0., 0., 0.],
       [1., 0., 0., 0.],
       [1., 0., 0., 0.],
       [1., 0., 0., 0.],
       [0., 0., 0., 1.],
       [0., 1., 0., 0.],
       [1., 0., 0., 0.],
       [1., 0., 0., 0.],
       [1., 0., 0., 0.],
       [0., 0., 1., 0.],
       [0., 1., 0., 0.],
       [1., 0., 0., 0.]])

##### 8. Defining Policy Iteration Function

In [10]:
def policy_iteration(env, gamma=0.99, theta=1e-10):
    """
        Performs policy iteration for a given environment.
    Returns:
        final_policy: optimal policy (nS x nA)
        final_value: optimal value function (nS,)
        iterations: number of improvement iterations performed
    """
    policy = random_policy # Start with uniform random policy
    policy_iters = 0

    while True:
        V, eval_iters = policy_evaluation(env, policy)
        new_policy = policy_improvement(env, V)
        policy_iters += 1
        
        # Check if policy is stable
        if np.array_equal(new_policy, policy):
            break
        
        policy = new_policy
    
    return policy, V, policy_iters, eval_iters

In [11]:
pi_optimal_policy, optimal_value, policy_iters, eval_iters = policy_iteration(env)
print(f"Policy Iteration converged in {policy_iters} iterations using {eval_iters} total policy evaluation updates.")
print("Optimal Policy using policy iteration method is: \n", pi_optimal_policy)

Policy Iteration converged in 3 iterations using 574 total policy evaluation updates.
Optimal Policy using policy iteration method is: 
 [[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]


##### 9. Defining Value Iteration Function

In [12]:
def value_iteration(env, gamma=0.99, theta=1e-10):
    """
        Performs value iteration to compute the optimal policy and value function.
    Args:
        env: OpenAI Gym environment (must have .P, .nS, .nA)
        gamma: Discount factor
        theta: Convergence threshold
    Returns:
        V: Optimal value function
        policy: Optimal policy (one-hot, shape nS x nA)
        value_iters: Number of iterations until convergence
    """
    policy = np.zeros((nS, nA))
    prev_V = np.zeros(nS)
    value_iters = 0

    while True:
        value_iters += 1
        q_values = np.zeros((nS, nA))

        for state in range(nS):
            for action in range(nA):
                for prob, next_state, reward, done in env.P[state][action]:
                    q_values[state][action] += prob * (reward + gamma * prev_V[next_state])
            
        V = np.max(q_values, axis=1)
        delta = np.max(np.abs(prev_V - V))
        prev_V = V
        
        if delta < theta:
            break        

    for state in range(nS):
        best_action = np.argmax(q_values[state])
        policy[state][best_action] = 1.0  # Greedy one-hot policy

    return policy, value_iters

In [13]:
vi_optimal_policy, value_iters = value_iteration(env)
print(f"Value Iteration converged in {value_iters} iterations.")
print("Optimal Policy using value iteration method is: \n", pi_optimal_policy)

Value Iteration converged in 571 iterations.
Optimal Policy using value iteration method is: 
 [[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]


##### 10. Comparing Policy Iteration and Value Iteration

In [14]:
start = time.time()
pi_optimal_policy, optimal_value, policy_iters, eval_iters = policy_iteration(env)
end = time.time()
print(f"Policy Iteration took {end-start:.2f} Seconds.")
print(f"Policy Iteration converged in {policy_iters} iterations.")

Policy Iteration took 0.26 Seconds.
Policy Iteration converged in 3 iterations.


In [15]:
start = time.time()
vi_optimal_policy, value_iters = value_iteration(env)
end = time.time()
print(f"Value Iteration took {end-start:.2f} Seconds.")
print(f"Value Iteration converged in {value_iters} iterations.")

Value Iteration took 0.11 Seconds.
Value Iteration converged in 571 iterations.


##### 11. Extra-information - Showing in PyGame

In [None]:
import gymnasium as gym
import numpy as np
import time
np.bool8 = np.bool_
# Simulate one episode using the value-based greedy policy
def run_episode(env, V, gamma=0.99, delay=0.5):
    state, _ = env.reset()
    done = False
    total_reward = 0

    while not done:
        time.sleep(delay)
        env.render()

        # Greedy action based on estimated values
        best_action = None
        best_value = -float("inf")
        for action in range(env.action_space.n):
            value = sum(
                prob * (reward + gamma * V[next_state])
                for prob, next_state, reward, done in env.P[state][action]
            )
            if value > best_value:
                best_value = value
                best_action = action

        state, reward, done, _, _ = env.step(best_action)
        total_reward += reward

    env.render()
    print(f"Episode finished. Total reward: {total_reward}")

In [None]:
run_episode(env, V)
env.close()