# Dynamic Programming for the Recycle Robot Environment

(Implementation of Example 3.3 in Sutton & Barto textbook, 2nd edition.)

<img src="./images/RecyclingRobot_mod.jpg" 
     align="center" 
     width="800" />

### Imports

In [1]:
from gymnasium import Env, spaces
from gymnasium.envs.toy_text.utils import categorical_sample
from typing import Optional
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import random
import time
random.seed(5)
%matplotlib inline

## Create MDP for the Recycling Robot

In [2]:
class RecycleEnv(Env):

    def __init__(self, alpha=0.5, beta=0.2, duration=20,r_search=4, r_wait=1):
        self.observation_space = spaces.Discrete(2)  # 0: low 1: high
        self.action_space = spaces.Discrete(3)  # 0: search 1: wait 2: recharge

        self.state = "high"
        self.sum = 0

        self.StateActionPairings = {"low": ["search", "wait", "recharge"],
                                    "high": ["search", "wait"]}

        # action: {state:[(nextstate, reward, probability, termination)]}
        self.TransitionStatesandProbs = {
            "wait": {
                "high": [("high", r_wait, 1, False)],
                "low": [("low", r_wait, 1, False)]
            },
            "recharge": {
                "low": [("high", 0, 1, False)]
            },
            "search": {
                "high": [("high", r_search, round(alpha, 1), False),
                         ("low", r_search, round(1-alpha, 1), False)],
                "low": [("low", r_search, round(beta, 1), False),
                        ("high", -3, round(1-beta, 1), False)]
            }}

    def reset(self, seed=None, options: Optional[dict] = None):
        self.state = "high"
        self.time = 0
        self.sum = 0
        print("Environment reset: Current state is High")
        return self.state, {}

    def step(self, a):
        transitions = self.TransitionStatesandProbs[a][self.state]
        i = categorical_sample([t[2] for t in transitions], self.np_random)
        s, r, p, t = transitions[i]
        self.SumReward(r)
        self.state = s
        return (s, r, t, False, {})

    def SumReward(self, reward=0):
        self.sum = self.sum + reward
        return self.sum

    def getPossibleActions(self, state):
        return self.StateActionPairings[state]

    def getTransitionStatesandProbs(self, state, action):
        return self.TransitionStatesandProbs[action][state]

## Agent using Value Iteration to find the optimal policy

In [11]:
class ValueIterationAgent():
    def __init__(self, env, discount=0.9, iterations=100, theta=0.01):
        self.name = "Value Iteration Agent"
        self.env = env
        self.discount = discount
        self.theta = theta
        self.V = {"low": 0, "high": 0}
        self.pi = {}

        # train
        self.train(iterations)

    def train(self, iterations):
        states = ["low", "high"]  # states are numbered
        start_time = time.time()
        for i in range(iterations):
            delta = 0

            for state in states:
                v = self.V[state]
                actions = self.env.getPossibleActions(state)
                action_value_list = []
                for action in actions:

                    ###################################################
                    # YOU WILL IMPLEMENT THIS FUNCTION IN TASK 1
                    q_val_for_given_action = self.calculate_q_value_estimate(state, action)
                    ###################################################

                    action_value_list.append(q_val_for_given_action)

                ###################################################
                # YOU WILL IMPLEMENT THIS FUNCTION IN TASK 2
                self.V[state] = self.update_state_value_function(action_value_list)  # max(value_list_for_actions)
                ###################################################

                ###################################################
                # YOU WILL IMPLEMENT THIS FUNCTION IN TASK 3
                delta = self.update_delta(delta, v, state)  # max(delta, abs(v - self.V[state]))
                ###################################################

                # explicit "policy improvement"
                self.pi[state] = actions[action_value_list.index(self.V[state])]

            # stopping criterion
            if delta < self.theta:
                print(f"Stopping criterion satisfied after {i} iterations.\n")
                print(f"Value function is: {self.V}\n")
                print(f"Optimal policy is: {self.pi}\n")
                elapsed_time = time.time() - start_time
                print(f"Execution time: {elapsed_time:.4f} seconds\n")  # Print execution time

                break

    def getValue(self, state):
        return self.V[state]

    def getPolicy(self, state):
        return self.pi[state]

    def getAction(self, state):
        return self.getPolicy(state)

---
## Task 1

Your task is to implement the method `calculate_q_value` that takes as input a state and an action (see `ValueIterationAgent`).
In this method, there is already a loop implemented over all possible next states and their reward.
In the loop, you therefore have access to the transition probability $ p(s', r \mid s, a) $, here `trans_prob`, the next state $s'$, here `next_state`, and the reward $r$, here `reward`.

Based on these variables and the attributes of `ValueIterationAgent`, calculate the action-value function estimate of the current iteration.

_Hint: Note that the current state-value function estimate is stored in `self.V`._

In [12]:
def calculate_q_value_estimate(self, state, action):

    q = 0
    for sum_element in self.env.getTransitionStatesandProbs(state, action):

        # Extract information
        next_state = sum_element[0]  # next state s'
        reward = sum_element[1]  # reward r
        trans_prob = sum_element[2]  # transition probability p(s', r | s, a)

        # YOUR CODE HERE
        q += trans_prob * (reward + self.discount * self.V[next_state])

    return q


# This adds the function as a method to the specified class.
setattr(ValueIterationAgent, "calculate_q_value_estimate", calculate_q_value_estimate)

---
## Task 2

Your task is to implement the method `update_state_value_function` that takes as input a list of action-value functions estimates $q(s,a)$ for all actions given the state $s$ (see `ValueIterationAgent`).
Based on this list, calculate the state-value function $v(s)$.

In [13]:
def update_state_value_function(self, list_of_q_values):
    
    # YOUR CODE HERE
    return max(list_of_q_values)


# This adds the function as a method to the specified class.
setattr(ValueIterationAgent, "update_state_value_function", update_state_value_function)

---

## Task 3

Your task is to implement the method `update_delta` that takes as input the current $\Delta$, here `delta`, the state-value function before the update, here `v`, and the state, here `state`.
Based on this list, calculate the updated $\Delta$.

In [14]:
def update_delta(self, delta, v, state):

    # YOUR CODE HERE
    return max(delta, abs(v - self.V[state]))


# This adds the function as a method to the specified class.
setattr(ValueIterationAgent, "update_delta", update_delta)

## Dummy Agent using the random policy

In [15]:
class DummyAgent():
    def __init__(self, env):
        self.name = "Dummy Agent"
        self.env = env

    def getValue(self, state):
        pass

    def getPolicy(self, state):
        actions = self.env.getPossibleActions(state)
        return random.choice(actions)

    def getAction(self, state):
        return self.getPolicy(state)

### Define play-function

In [16]:
def play(agent, env, rounds=100):
    print("#"*10, "\n")
    print(f"Starting play with agent: {agent.name}\n")

    for round_no in range(rounds):
        current_state = env.state
        player_move = agent.getAction(current_state)
        next_state, r, t, _, _ = env.step(player_move)

        print(f"Round {round_no} \nAction: {player_move}\tReward: {r}\tNext State: {next_state}")
        img = Image.open("./images/"+current_state+"_"+player_move+"_"+next_state+".jpg")
        plt.figure(figsize=(10, 7))
        plt.imshow(img)
        plt.axis('off')
        plt.show()

        input("Press Enter to continue!")

    print(f"{rounds} Rounds Ended:\n")
    print(f"The agent collected a total return of {env.sum}\n")
    env.reset()
    print("#"*10, "\n")

## Dummy Agent

Test the behavior of a recycling robot with random actions.

In [9]:
env = RecycleEnv()

In [None]:
agent = DummyAgent(env)
play(agent, env, rounds=10)

## Value Iteration Agent

Test the optimal behavior of the recycling robot given the specified parametrization.

In [None]:
agent = ValueIterationAgent(env=env)
play(agent, env, rounds=10)

## Questions

**Question 1.** Suppose the reward for waiting is higher than for searching. What do you expect will happen to the value functions of the two states? What would be the optimal policy?

YOUR ANSWER HERE

In [None]:
env = RecycleEnv( r_search=1, r_wait=4)
agent = ValueIterationAgent(env=env)

play(agent, env, rounds=10)

# Policy Iteration

In [17]:
class PolicyIterationAgent():
    def __init__(self, env, discount=0.9, iterations=100, theta=0.01):
        self.name = "Policy Iteration Agent"
        self.env = env
        self.discount = discount
        self.theta = theta
        self.V = {"low": 0, "high": 0}
        self.pi = {"low": self.env.getPossibleActions("low")[0], "high": self.env.getPossibleActions("high")[0]}

        # train
        self.train(iterations)

    def train(self, iterations):

        states = ["low", "high"]  # states are numbered
        start_time = time.time()
        for i in range(iterations):

            # Policy Evaluation
            while True:
                delta = 0
                for state in states:
                    v = self.V[state]
                    value = self.calculate_state_value(state)
                    self.V[state] = value
                    delta = max(delta, abs(v - self.V[state]))

                if delta < self.theta:
                    break
            
            # Policy Improvement
            policy_stable = True
            for state in states:
                old_action = self.pi[state]
                actions = self.env.getPossibleActions(state)
                action_value_list = []
                for action in actions:
                    action_value_list.append(self.calculate_q_value(state, action))

                self.pi[state] = actions[action_value_list.index(max(action_value_list))]

                if old_action != self.pi[state]:
                    policy_stable = False
            if policy_stable:
                print(f"Stopping criterion satisfied after {i} iterations.\n")
                print(f"Value function is: {self.V}\n")
                print(f"Optimal policy is: {self.pi}\n")
                elapsed_time = time.time() - start_time
                print(f"Execution time: {elapsed_time:.4f} seconds\n")  # Print execution time

                break
            



    def getValue(self, state):
        return self.V[state]

    def getPolicy(self, state):
        return self.pi[state]

    def getAction(self, state):
        return self.getPolicy(state)
    
    def calculate_state_value(self, state):
       action = self.pi[state]
       sum = self.calculate_q_value(state, action)
        
       return sum

    def calculate_q_value(self,state,action):
        sum = 0
        for transition in self.env.getTransitionStatesandProbs(state, action):
            next_state, reward, prob, _ = transition
            sum += prob * (reward + self.discount * self.V[next_state])
        return sum

In [18]:
env = RecycleEnv( r_search=4, r_wait=1)
agent = PolicyIterationAgent(env=env)


Stopping criterion satisfied after 1 iterations.

Value function is: {'low': 24.77519542578896, 'high': 27.536435654499513}

Optimal policy is: {'low': 'recharge', 'high': 'search'}

Execution time: 0.0005 seconds



In [19]:
env = RecycleEnv( r_search=4, r_wait=1)
agent = ValueIterationAgent(env=env)

Stopping criterion satisfied after 39 iterations.

Value function is: {'low': 24.773480007530324, 'high': 27.534806007153808}

Optimal policy is: {'low': 'recharge', 'high': 'search'}

Execution time: 0.0004 seconds

