# CS3263 - Assignment 2
---
#### Sem 2, AY 2024/25

👋 Welcome to the Assignment 2 of course CS3263 - Foundations of Artificial Intelligence! 

# Guideline

- Please complete this Assignment in a **TWO-person team**.
- **ONE person from each team** will need to submit **ALL** the answers.
- Grading will be done team-wise. Please register your groups on Canvas.
- The deadline for the assignment is **`7 April 2025, 2359`**.

You are encouraged to discuss solution ideas. However, each team *must write up the solutions independently*. It is considered plagiarism if the solution write-up is highly similar to other teams' write-ups or other sources.


---


# Submission

- You can download the programming package from Canvas or from https://github.com/CS3263-AI-Sem2-2425/Assignment2.
- No late submissions are allowed.
- Complete the codes in `Assignment2.ipynb` and **copy** your solutions to `programming.py`.
- Please write the following on the top of your solution files:
    - The **name(s)** and **metric number(s)** of ALL team members as they appear on Canvas.
    - Collaborators (write *None* if no collaborators).
    - Sources, if any, or if you obtained the solutions through research, e.g., through the web.
- Your programming part should be zipped and named `netid1-netid2.zip`. **Only submit the zipped file to Canvas**.
- **IMPORTANT:** The programming part will be auto-graded.
    - Note that the `programming.py` file is designed for auto-grading, so you cannot run it normally from your end but don't worry.
    - DO NOT modify any code outside the blocks you're allowed to; DO NOT print any message in `programming.py`.

---


# Programming Assignment
In this programming assignment, we are going to solve the searching problems in a gird maze environment.

This notebook will walk you through how to use the environment, how to define your own agent, and how to solve the problems.

You are expected to add your codes in the blocks noted by:
```python
# ------- your code starts here ----- #

# Some code examples ...

# ------- your code ends here ------- #
```
Let's get started!

---

## Environment Setup
To set up the Python programming environment, three basic packages, i.e., `typer`, `typing`, and `numpy` are needed.

You can install them by the way you like.

- For example, install using `pip`:

    ```
    pip install typer numpy
    ```

- Or install using `conda`:

    ```
    conda install typer numpy
    ```

In [2]:
import numpy as np

from typing import Callable
from minihouse.robotminihousemodel import MiniHouseV1
from minihouse.minihousemodel import state_class, item_object

---

# Minihouse Environment
All environments are defined in the `minihouse` package.

Each of the **states** and **actions** are represented as an integer.

You can use the following functions to convert between the integer and the corresponding state description.

```python
goal_state = state_class(
    robot_agent=None,
    human_agent=None,
    object_list={"apple": item_object("apple", True, "table")},
    container_list=None,
    surface_list=None,
    room_list=None,
)

env = MiniHouseV1(
    instruction="move the apple to the table",
    goal_state=goal_state,
)
env.reset()
print(env.state)
print(env.state_to_index(env.state))
```

---


MiniHouseV1 Environment Instructions:
```python
env.nS # Number of states
env.nA # Number of states
env.state.room_dict env.state.container_dict env.state.surface_dict env.state.item_dict # get object info
env.state.container_dict['name'].position # get object position

# Problem 1: Policy Iteration & Value Iteration

In this problem, you are expected to implement the **policy iteration** and **value iteration** algorithms to solve the searching problems in the `MiniHouse` environment. 

Here, we provide a template for you to implement your own agents. 

In [4]:
class mdp_solver:
    
    def __init__(self):
        self.iteration = 0
        print("MDP initialized!")


    def get_action_value(
        self, s:int, a:int, V:np.ndarray, gamma:float, env_transition:Callable):
        r"""
        Code for getting action value. Compute the value of taking action a in state s
        I.e., compute Q(s, a) = \sum_{s'} p(s'| s, a) * [r + gamma * V(s')]
        args:
            s: state
            a: action
            V: value function
            gamma: discount factor
            env_transition: transition function
        returns:
            value: action value
        """
        value = 0

        for prob, next_state, reward, done in env_transition(s, a):

            # ------- your code starts here ----- #

            value += prob * (reward + gamma * V[next_state])

            # ------- your code ends here ------- #

        return value



    def get_max_action_value(
        self, s:int, env_nA:int, env_transition:Callable, V:np.ndarray, gamma:float):
        r"""
        Code for getting max action value. Takes in the current state and returns 
        the max action value and action that leads to it. I.e., compute
        a* = argmax_a \sum_{s'} p(s'| s, a) * [r + gamma * V(s')]
        args:
            s: state
            env_nA: number of actions
            env_transition: transition function
            V: value function
            gamma: discount factor
        returns:
            max_value: max action value
            max_action: action that leads to max action value
        """
        max_value = -np.inf
        max_action = -1

        for a in range(env_nA):

            # ------- your code starts here ----- #

            value = self.get_action_value(s, a, V, gamma, env_transition)
            if value > max_value:
                max_value = value
                max_action = a

            # ------- your code ends here ------- #

        return max_value, max_action



    def get_policy(
        self, env_nS:int, env_nA:int, env_transition:Callable, gamma:float, V:np.ndarray):
        """
        Code for getting policy. Takes in an Value function and returns the optimal policy
        args:
            env_nS: number of states
            env_nA: number of actions
            env_transition: transition function
            gamma: discount factor
            V: value function
        returns:
            policy: policy
        """
        policy = np.zeros(env_nS)

        for s in range(env_nS):
            max_value = -np.inf
            max_action = -1
            for a in range(env_nA):

                # ------- your code starts here ----- #
            
                value = self.get_action_value(s, a, V, gamma, env_transition)
                if value > max_value:
                    max_value = value
                    max_action = a

                # ------- your code ends here ------- #

            policy[s] = max_action

        return policy
        


    def policy_evaluation(
        self, env_nS:int, env_transition:Callable, V:np.ndarray, gamma:float, theta:float, policy:np.ndarray):
        """
        Code for policy evaluation. Takes in an MDP and returns the converged value function
        args:
            env_nS: number of states
            env_transition: transition function
            V: value function
            gamma: discount factor
            theta: convergence threshold
            policy: policy
        returns:
            V: value function
        """ 

        while True:
            delta = 0
            for s in range(env_nS):

                # ------- your code starts here ----- #

                v = 0
                for prob, next_state, reward, done in env_transition(s, policy[s]):
                    v += prob * (reward + gamma * V[next_state])
                delta = max(delta, abs(V[s] - v))
                V[s] = v

                # ------- your code ends here ------- #

            if delta < theta:
                break

        return V
        


    def policy_improvement(
        self, env_nS:int, env_nA:int, env_transition:Callable, policy:np.ndarray, V:np.ndarray, gamma:float):
        """
        Code for policy improvement. Takes in an MDP and returns the converged policy
        args:
            env_nS: number of states
            env_nA: number of actions
            env_transition: transition function
            policy: policy
            V: value function
            gamma: discount factor
        returns:
            policy_stable: whether policy is stable
            policy: policy
        """
        policy_stable = True

        for s in range(env_nS):

            # ------- your code starts here ----- #

            old_action = policy[s]
            max_value = -np.inf
            best_action = -1
            for a in range(env_nA):
                value = self.get_action_value(s, a, V, gamma, env_transition)
                if value > max_value:
                    max_value = value
                    best_action = a
            policy[s] = best_action
            if old_action != best_action:
                policy_stable = False

            # ------- your code ends here ------- #

        return policy_stable, policy



    def value_iteration(
        self, gamma:float, theta:float, env_nS:int, env_nA:int, env_transition:Callable):
        """
        The code for value iteration. Takes in an MDP and returns the optimal policy
        and value function.
        args:
            gamma: discount factor
            theta: convergence threshold
            env_nS: number of states
            env_nA: number of actions
            env_transition: transition function
        returns:
            policy: optimal policy
            V: optimal value function 
        """
        V = np.zeros(env_nS)
        converged = False

        while not converged:
            delta = 0

            # ------- your code starts here ----- #

            for s in range(env_nS):
                max_value = -np.inf
                for a in range(env_nA):
                    value = self.get_action_value(s, a, V, gamma, env_transition)
                    if value > max_value:
                        max_value = value
                delta = max(delta, abs(V[s] - max_value))
                V[s] = max_value
            if delta < theta:
                converged = True

            # ------- your code ends here ------- #

        policy = self.get_policy(env_nS, env_nA, env_transition, gamma, V)
        
        return policy, V
        


    def policy_iteration(
        self, gamma:float, theta:float, env_nS:int, env_nA:int, env_transition:Callable):
        """
        Code for policy iteration. Takes in an MDP and returns the optimal policy
        args:
            gamma: discount factor
            theta: convergence threshold
            env_nS: number of states
            env_nA: number of actions
            env_transition: transition function
        returns:
            policy: optimal policy
            V: optimal value function
        """
        V = np.zeros(env_nS)
        policy = np.zeros(env_nS)
        converged = False

        while not converged:

            # ------- your code starts here ----- #

            while not converged:
                V = self.policy_evaluation(env_nS, env_transition, V, gamma, theta, policy)
                policy_stable, policy = self.policy_improvement(env_nS, env_nA, env_transition, policy, V, gamma)
                if policy_stable:
                    converged = True

            # ------- your code ends here ------- #

        return policy, V


---

# 🔍 Guideline

Here, we introduce the role of each function in the above solution template. 


## Get Action Value

The function is as follows:

```python
    def get_action_value(
        self, s:int, a:int, V:np.ndarray, gamma:float, env_transition:Callable):
```
This function is used to get the action value. The input arguments are:
* `s`: state
* `a`: action
* `V`: value function, which is a list of float numbers, each float number represents the value for the corresponding state index.
* `gamma`: discount factor
* `env_transition`: transition function, which returns a list of tuples (probability, next_state_index, reward, done), where probability is a float number, next_state_index is an integer, reward is a float number, done is a boolean value indicating whether the task is finished.

The output arguments are:
* `value`: action value, which is a float number.

---


## Get Max Action Value

The function is as follows:

```python
    def get_max_action_value(
        self, s:int, env_nA:int, env_transition:Callable, V:np.ndarray, gamma:float):

```
This function is used to get the max action value. The input arguments are:
* `s`: state
* `env_nA`: number of actions
* `env_transition`: transition function, which returns a list of tuples (probability, next_state_index, reward, done), where probability is a float number, next_state_index is an integer, reward is a float number, done is a boolean value indicating whether the task is finished.
* `V`: value function, which is a list of float numbers, each float number represents the value for the corresponding state index.
* `gamma`: discount factor

The output arguments are:
* `max_value`: max action value, which is a float number.
* `max_action`: action that leads to max action value, which is an integer.

---


## Get Policy

The function is as follows:

```python
    def get_policy(
        self, env_nS:int, env_nA:int, env_transition:Callable, gamma:float, V:np.ndarray):
```
This function is used to extract a policy given a value function. The input arguments are:
* `env_nS`: number of states
* `env_nA`: number of actions
* `env_transition`: transition function, which returns a list of tuples (probability, next_state_index, reward, done), where probability is a float number, next_state_index is an integer, reward is a float number, done is a boolean value indicating whether the task is finished.
* `gamma`: discount factor
* `V`: value function, which is a list of float numbers, each float number represents the value for the corresponding state index.

The output arguments are:
* `policy`: optimal policy, which is a list of integers, each integer represents the action index for the corresponding state index.

---


## Policy Evaluation

The function is as follows:

```python
    def policy_evaluation(
        self, env_nS:int, env_transition:Callable, V:np.ndarray, gamma:float, theta:float, policy:np.ndarray):
```
This function is used to implement policy iteration. You are expected to implement the policy evaluation algorithm in this function. The input arguments are:
* `env_nS`: number of states
* `env_transition`: transition function, which returns a list of tuples (probability, next_state_index, reward, done), where probability is a float number, next_state_index is an integer, reward is a float number, done is a boolean value indicating whether the task is finished.
* `V`: value function, which is a list of float numbers, each float number represents the value for the corresponding state index.
* `gamma`: discount factor
* `theta`: convergence threshold
* `policy`: policy, which is a list of integers, each integer represents the action index for the corresponding state index.

The output arguments are:
* `V`: value function, which is a list of float numbers, each float number represents the value for the corresponding state index.

---


## Policy Improvement

The function is as follows:

```python
    def policy_improvement(
        self, env_nS:int, env_nA:int, env_transition:Callable, policy:np.ndarray, V:np.ndarray, gamma:float):
```
This function is used to implement policy iteration. You are expected to implement the policy improvement algorithm in this function. The input arguments are:
* `env_nS`: number of states
* `env_nA`: number of actions
* `env_transition`: transition function, which returns a list of tuples (probability, next_state_index, reward, done), where probability is a float number, next_state_index is an integer, reward is a float number, done is a boolean value indicating whether the task is finished.
* `policy`: policy, which is a list of integers, each integer represents the action index for the corresponding state index.
* `gamma`: discount factor
* `V`: value function, which is a list of float numbers, each float number represents the value for the corresponding state index.


The output arguments are:
* `olicy_stable`: whether policy is stable, which is a boolean value.
* `policy`: optimal policy, which is a list of integers, each integer represents the action index for the corresponding state index.

---


## Value Iteration

The function is as follows:

```python
    def value_iteration(
        self, gamma:float, theta:float, env_nS:int, env_nA:int, env_transition:Callable):
```
This function is used to implement value iteration. You are expected to implement the value iteration algorithm in this function. The input arguments are:
* `gamma`: discount factor
* `theta`: convergence threshold
* `env_nS`: number of states
* `env_nA`: number of actions
* `env_transition`: transition function, which returns a list of tuples (probability, next_state_index, reward, done), where probability is a float number, next_state_index is an integer, reward is a float number, done is a boolean value indicating whether the task is finished.

The output arguments are:
* `policy`: optimal policy, which is a list of integers, each integer represents the action index for the corresponding state index.
* `V`: optimal value function, which is a list of float numbers, each float number represents the value for the corresponding state index.

---


## Policy Iteration

The function is as follows:

```python
    def policy_iteration(
        self, gamma:float, theta:float, env_nS:int, env_nA:int, env_transition:Callable):
 ```
 
This function is utilized to conduct policy iteration. Through policy iteration, you iteratively improve the policy until it converges to the optimal policy. The process involves two main steps: policy evaluation, where you calculate the value function for a given policy, and policy improvement, where you generate a new policy based on the calculated value function.

The input arguments for this function are:

- `gamma`: The discount factor, a float that determines the importance of future rewards.
- `theta`: The convergence threshold, a small float value that determines when to stop iterating because the value function has sufficiently converged.
- `env_nS`: The number of states in the environment, an integer.
- `env_nA`: The number of actions available in the environment, an integer.
- `env_transition`: The transition function of the environment. It takes a state and an action as input and returns a list of tuples (probability, next_state_index, reward, done). Each tuple represents a possible outcome of taking the action in the given state, where:
    - `probability` is a float representing the likelihood of the outcome.
    - `next_state_index` is an integer index of the next state.
    - `reward` is a float representing the received reward.
    - `done` is a boolean indicating whether the episode has ended.

The output arguments are:

- `policy`: The optimal policy derived from policy iteration. It is a NumPy array where each element represents the best action (as an integer index) to take in the corresponding state.
- `V`: The optimal value function, a NumPy array of floats, where each element represents the value of being in the corresponding state under the optimal policy.


---


# Try On Your Own!

You can try the given test environment to test your implementations.

In [5]:
from minihouse.robotminihousemodel import MiniHouseV1
from minihouse.minihousev1 import test_cases

In [6]:
def test(VI_bool = True, index = 0, verbose=False):

    instruction, goal_state, initial_conditions = test_cases[index]

    env = MiniHouseV1(
        instruction=instruction,
        goal_state=goal_state,
        initial_conditions=initial_conditions,
        verbose=verbose,
    )
    env.reset()
    if verbose:
        print("state: ", env.state_to_index(env.state))
        print("num state: ", env.nS)
        print("num actions: ", env.nA)
        print()

    ms = mdp_solver()
    
    if VI_bool:
        policy, V = ms.value_iteration(0.9, 0.0001, env_nS=env.nS, env_nA=env.nA, env_transition=env.transition)
        if verbose:
            print("Value Iteration")
            print()
    else:
        policy, V = ms.policy_iteration(0.9, 0.0001, env_nS=env.nS, env_nA=env.nA, env_transition=env.transition)
        if verbose:
            print("Policy Iteration")
            print()
    
    if verbose:
        print("V: ", repr(V))
        print()
        print("policy: ", repr(policy))
        print()
    
    env.reset()
    
    for i in range(100):
        print()
        print(f"---------- Step: {i} ----------")
        
        action = int(policy[env.state_to_index(env.state)])
        obs, reward, done, _, _ = env.step(action)
        
        if verbose:
            print("obs: ", obs)
            print("reward: ", reward)
            print("done: ", done)
        
        if done:
            break

---

## Test: Value Iteration
Run the following code to test your implementation of value iteration. 

In [7]:
test(index=0, VI_bool=True, verbose=True)

state:  9
num state:  56
num actions:  13

MDP initialized!
Value Iteration

V:  array([499.99959766, 499.99963427, 499.99963427, 499.99966755,
       499.99962964, 499.99966334, 499.99966334, 499.99969397,
       306.62502756, 345.7146245 , 271.83106078, 240.86060754,
       345.7146245 , 389.63008967, 306.62509833, 271.83112512,
       438.96711366, 389.6300606 , 389.6300606 , 345.71466242,
       438.96714564, 389.63008967, 389.63008967, 345.71468884,
       345.71459253, 389.6300606 , 306.62506926, 271.8310987 ,
       345.7146245 , 389.63008967, 306.62509833, 271.83112512,
       345.71459253, 306.62506926, 389.6300606 , 345.71466242,
       345.7146245 , 306.62509833, 389.63008967, 345.71468884,
       271.83102881, 240.86057848, 306.62506926, 345.71466242,
       271.83106078, 240.86060754, 306.62509833, 345.71468884,
       494.39523867, 438.96718355, 438.96718355, 389.63012413,
       494.39526773, 438.96720997, 438.96720997, 389.63014815])

policy:  array([1., 4., 3., 0., 1.,

---

## Test: Policy Iteration

Run the following code to test your implementation of policy iteration. 

In [None]:
test(index=0, VI_bool=False, verbose=True)

---


## 🎉 Congratulations!

You have finished all the requirements for Problem 1.

---

# Problem 2: Q-Learning


In this problem, you are expected to implement the **Q-learning** algorithm to solve the searching problems in the `MiniHouse` environment. 

Similar to the previous problem, we provide a template for you to implement your own agents. 

In [8]:
class mdp_solver_q_learning:
    
    def __init__(self):
        print("MDP initialized for Q-learning!")


    def epsilon_greedy(self, Q, state, epsilon):
        if np.random.rand() < epsilon:
            
            # ------- your code starts here ----- #
            return np.random.choice(Q.shape[1])          
            # ------- your code ends here ------- #
            
        else:
            
            # ------- your code starts here ----- #
            return np.argmax(Q[state])
            # ------- your code ends here ------- #
   

    def Q_learning(
        self, alpha:float, gamma:float, theta:float, epsilon:float, env_nS:int, env_nA:int, env_transition, env, num_episodes=1000, initial_action = None, initial_reward = 10):
        """
        Q-learning algorithm.
        Args:
            gamma: discount factor
            theta: convergence threshold
            env_nS: number of states
            env_nA: number of actions
            env_transition: transition function
            num_episodes: number of episodes
        Returns:
            Q: learned Q-value function
            rewards: rewards obtained in each episode
        """
        Q = np.zeros((env_nS, env_nA))
        rewards = []
        
        if initial_action is not None:
            for i, action in enumerate(initial_action):
                Q[i][action] += initial_reward
        
        for episode in range(num_episodes):
            env.reset()
            state = env.state_to_index(env.state)
            done = False
            
            while not done:
                
                # ------- your code starts here ----- #
                if episode == 0:
                    Q_prev = Q.copy()
                if 'total_reward' not in locals():
                    total_reward = 0

                action_idx = self.epsilon_greedy(Q, state, epsilon)
                action = env.index_to_action(action_idx)
                _, reward, done, _, _ = env.step(action)
                next_state_idx = env.state_to_index(env.get_state)

                best_next_action = np.argmax(Q[next_state_idx])
                Q[state][action_idx] += alpha * (reward + gamma * Q[next_state_idx][best_next_action] - Q[state][action_idx])

                state = next_state_idx

                # Accumulate rewards in the current episode
                if len(rewards) <= episode:
                    rewards.append(reward)
                else:
                    rewards[episode] += reward

                # Check for convergence after the episode ends
                if done:
                    if episode > 0 and np.max(np.abs(Q - Q_prev)) < theta:
                        rewards = rewards[:episode+1]  # truncate to valid episodes
                        done = True
                        raise StopIteration  # force exit from outer loop
                    Q_prev = Q.copy()
                    total_reward = 0
                
                # ------- your code ends here ------- #
        
        return np.argmax(Q, axis=1), rewards


---

# 🔍 Guideline

Here, we introduce the role of each function in the above solution template. 


## Epsilon-Greedy Action Selection
The function is as follows:

```python
    def epsilon_greedy(self, Q, state, epsilon):
```

This function selects an action based on the epsilon-greedy strategy, which balances exploration and exploitation. With a probability of epsilon, it chooses a random action (exploration), and with a probability of 1 - epsilon, it chooses the action with the highest Q-value for the current state (exploitation).

The input arguments are:

- `Q`: The Q-value table, a 2D NumPy array where each element Q[state, action] represents the estimated value of taking an action in a state.
- `state`: The current state index, an integer representing the current situation of the environment.
- `epsilon`: The exploration rate, a float between 0 and 1 that determines the probability of choosing a random action.

The output is:
- An integer representing the index of the selected action.

---


## Q-Learning

The function is as follows:

```python
def Q_learning(
        self, alpha:float, gamma:float, theta:float, epsilon:float, env_nS:int, env_nA:int, env_transition, env, num_episodes=1000, initial_action = None, initial_reward = 10):
```

This function implements the Q-learning algorithm, a model-free reinforcement learning technique used to learn the quality of actions denoting how good it is to take an action from a particular state.

The input arguments are:
- `alpha`: The learning rate, a float that determines the extent to which the newly acquired information will override the old information.
- `gamma`: The discount factor, a float that balances the importance of immediate and future rewards.
- `theta`: The convergence threshold, not directly used in basic Q-learning but can be utilized for extensions or stopping criteria based on improvements.
- `epsilon`: Exploration rate in the epsilon-greedy strategy, determining the trade-off between exploration and exploitation.
- `env_nS`: The number of states in the environment, indicating how many distinct states the agent can be in.
- `env_nA`: The number of actions available in the environment, indicating the range of actions the agent can take.
- `env_transition`: The transition function of the environment that models the dynamics of the environment. It's used to simulate the next state and reward given a state-action pair.
- `env`: The environment object itself, providing access to essential methods like reset and state_to_index.
- `num_episodes`: The number of episodes to run the algorithm for, allowing the agent sufficient time to learn from its interactions with the environment.
- `initial_action`: Initial action for each state.
- `initial_reward`: Initial reward for given initial state.

The outputs are:
- `Q`: The learned Q-value function, represented as a 2D numpy array where each element [s, a] corresponds to the estimated value of taking action a in state s.
- `rewards`: A list of accumulated rewards, one for each episode, indicating the total reward the agent has obtained during that episode.


---

# Try On Your Own!

You can try the given test environment to test your implementations.

In [9]:
from minihouse.robotminihousemodel import MiniHouseV1
from minihouse.minihousev1 import test_cases

## Test: Q-Learning

Run the following code to test your implementation of the Q-learning algorithm. 

In [10]:
def test_q_learning(Q_bool=False, index=0, num_episodes=1000, verbose=False, generate_solution=False):

    instruction, goal_state, initial_conditions = test_cases[index]

    env = MiniHouseV1(
        instruction=instruction,
        goal_state=goal_state,
        initial_conditions=initial_conditions,
        verbose=verbose
    )
    env.reset()
    
    
    if verbose:
        print("state: ", env.state_to_index(env.state))
        print("num state: ", env.nS)
        print("num actions: ", env.nA)
        print()
        
    msq = mdp_solver_q_learning()


    policy, V = msq.Q_learning(
        alpha=0.1,
        gamma=0.9,
        theta=0.0001,
        epsilon=0.1,
        env_nS=env.nS,
        env_nA=env.nA,
        env_transition=env.transition,
        env=env,
        num_episodes=num_episodes,
    )

    print("Learned policy:")
    print(policy)

    
    if verbose:
        print("Policy Iteration")
        print()
        print("V: ", repr(V))
        print()
        print("policy: ", repr(policy))
        print()
        
    env.reset()
    
    for i in range(100):
        print()
        print(f"---------- Step: {i} ----------")
        action = int(policy[env.state_to_index(env.state)])
        # action = policy[env.state_to_index(env.state)].astype(int)
        obs, reward, done, _, _ = env.step(action)
        if verbose:
            print("obs: ", obs)
            print("reward: ", reward)
            print("done: ", done)
        if done:
            break

    if generate_solution:
        np.savetxt(f'data/V_{index}.py', V, delimiter=',')

        solution_values = np.loadtxt(f'data/V_{index}.py')

        assert len(V) == len(solution_values), \
            'Length of Values is incorrect'

        assert np.allclose(V, solution_values), \
            'Values incorrect'

    return None

In [11]:
test_q_learning(index=0, num_episodes=1000, verbose=True)

state:  9
num state:  56
num actions:  13

MDP initialized for Q-learning!
action:  move to the living room
action:  move to the living room
action:  move to the kitchen
action:  move to the kitchen
action:  place the apple at the fridge
action:  place the apple at the kitchen
action:  move to the bedroom
action:  move to the bathroom
action:  open the fridge
action:  move to the living room
action:  move to the living room
action:  move to the kitchen
action:  move to the kitchen
action:  move to the bedroom
action:  move to the bathroom
action:  open the fridge
action:  close the fridge
action:  close the fridge
action:  pick the apple
action:  place the apple at the table
action:  place the apple at the living room
action:  place the apple at the bedroom
action:  place the apple at the bathroom
action:  pick the apple
action:  move to the living room
action:  move to the bedroom
action:  move to the living room
action:  move to the bathroom
action:  open the fridge
action:  close th

---

## 🎉 Congratulations!

You have finished all the requirements for Problem 2.

---

# Problem 3: Integrating GPT-4 for Decision Support in MiniHouse


In this problem, you are expected to enhance the **Q-learning** algorithm implemented in problem 2 by integrating Generative Models for decision support in the MiniHouse environment

Tasks to Accomplish:
- **Prompt Design:** Crafting detailed and clear prompts that accurately describe the decision-making problem and obtaining useful suggestions from Generative Models.
- **API Key:** Ensure you have a valid API key from the respective provider of the Generative Model you're using.
- **Post-processing:** The action generated by GPT-4 might require post-processing or validation to ensure they fit within the constraints and capabilities of the MiniHouse simulation.
- **Integration:** This approach can serve as a high-level heuristic or guide for developing or refining decision-making strategies within your environment. Actual integration might involve translating natural language actions into environment-specific operations or commands.

In this assignment, we recommand using LLaMA as the primary Large Language Model due to its strong generation ability, contextual understanding, and easy to access(open-source). LLaMA's high-quality text generation makes it well-suited for assisting in reinforcement learning and search-based problems. However, students are encouraged to explore other LLMs such as GPT4, Claude, Gemini or Deepseek. 

When choosing your own LLM, you should consider factors like efficiency, API availability, fine-tuning capabilities, and prompt adherence to ensure the model aligns well with the task requirements. 

# 🔍 Guideline
The goal of this problem is to utilize generative model as a heuristics for Q-learning algorithm. Here, we introduce the role of each function in the above solution template. 

First to use the generative model API, you need to install OpenAI Python package by 

```python
pip install openai
pip install pytorch (please check the correct version through pytorch website)
pip install sentence-transformers
conda install -c conda-forge openai
conda install -c pytorch pytorch
conda install -c conda-forge sentence-transformers
```

You need to create a client for Generative model API. You can sign up on openai/nvidia/other website for API key generation.

In [21]:
from openai import OpenAI

# ------- your code starts here ----- #
client = OpenAI(
    base_url = "https://openrouter.ai/api/v1",
    api_key = "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"
)
# ------- your code ends here ------- #
    

# create a client to call the generative model


## describe_state

The function is as follows:

```python
def describe_state(state):
```


This function implements the natural language describtion generation of the current state.

The input arguments are:
- `state`: The current state of the Minihouse environment.

The outputs are:
- `describe`: Natural language discribtion of the current state. This description will be included in the prompt fed into the generative model.



---

In [20]:
# describe your current state
def describe_state(state):
    describe = ''

    # ------- your code starts here ----- #
    describe = f"The robot is currently in the {state.robot_agent.position}.\n"

    for obj_name, obj in state.object_dict.items():
        describe += f"The {obj_name} is located at {obj.position}.\n"

    for container_name, container in state.container_dict.items():
        describe += f"The {container_name} is {'open' if container.is_open else 'closed'}.\n"

    for surface_name, surface in state.surface_dict.items():
        describe += f"The {surface_name} is in the {surface.position}.\n"

    # ------- your code ends here ------- #
    
    
    print(describe)
    return describe

## plan

LLM_Model class:

```python
def plan(self, task, observe):
```


This function implements the next move planning task using LLM.

The input arguments are:
- `task`: The final goal of the planning task.
- `observe`: The natural language describtion of the current state.

The outputs are:
- `best_action`: The optimal action generatated from LLM.


---

Note: 
The ideal output should be like:

'open fridge', 'close fridge', 'pick apple', 'place apple fridge', 'place apple table', 'place apple bedroom', 'place apple bathroom', 'place apple living room', 'move living room', 'move kitchen', 'move bedroom', 'move bathroom', etc.

But since different models may interpret prompts differently and generate varying outputs, you should experiment with prompt engineering to achieve reliable and relevant results for the tasks.

## find_most_similar


```python
def find_most_similar(self, query_str, corpus_embedding):
```


This function aims to find the most similar valid action given the action generated from LLM.

The input arguments are:
- `query_str`: The action generated from LLM.
- `corpus_embedding`: The embedding of the valid action in action list.

The outputs are:
- `most_similar_idx`: The index of the most similar valid action.


    
---

## query_llm

```python
def query_llm(self, task, observe = None):
```

Query LLM to get the next move.

The input arguments are:
- `task`: The final goal of the planning task.
- `observe`: The natural language describtion of the current state.

The outputs are:
- `samples`: Samples generated from n times LLM query.


    
---

In [22]:
import torch
import openai
from sentence_transformers import SentenceTransformer
from sentence_transformers import util as st_utils

class LLM_Model:
    def __init__(self, device, instruction, goal_state, initial_conditions, model='gpt-3.5-turbo'):
        self.device = device
        self.model = model
        self.sampling_params = \
            {
                "max_tokens": 32,
                "temperature": 0.5,
                "top_p": 0.9,
                "n": 1,
                "presence_penalty": 0.5,
                "frequency_penalty": 0.3,
                "stop": ['\n']
            }
        
        self.prompt_begin = """Generate the most logical next move in the scene. 
You must strictly follow the format in the following examples and output exactly **one** action. 
The generated action must be strictly from **GROUNDED_ACTION_LIST**."""
        self.condition_list = ['ROOM_LIST', 'OBJECT_LIST', 'OBJECT_POSITION_LIST','CONTAINER_LIST', 'SURFACE_LIST', 'CONTAINER_POSITION_LIST', 'CONNECTED_ROOM','ACTION_DICT', 'GROUNDED_ACTION_LIST']
        self.initial_conditions = initial_conditions
        self.ROOM_LIST, self.OBJECT_LIST, self.OBJECT_POSITION_LIST, \
        self.CONTAINER_LIST, self.SURFACE_LIST, self.CONTAINER_POSITION_LIST, self.CONNECTED_ROOM, \
        self.ACTION_DICT, self.GROUNDED_ACTION_LIST = initial_conditions
        
        self.translation_lm = SentenceTransformer('all-MiniLM-L6-v2').to(self.device)
        self.container_list_embedding = self.translation_lm.encode(self.CONTAINER_LIST, batch_size=8, 
                convert_to_tensor=True, device=self.device)  # lower batch_size if limited by GPU memory
        self.object_list_embedding = self.translation_lm.encode(self.OBJECT_LIST, batch_size=8,
                convert_to_tensor=True, device=self.device)
        if self.SURFACE_LIST:
            self.position_list =  self.CONTAINER_LIST + self.SURFACE_LIST
            self.surface_list_embedding = self.translation_lm.encode(self.SURFACE_LIST, batch_size=8,
                convert_to_tensor=True, device=self.device)
            self.position_list_embedding = torch.concat((self.container_list_embedding, self.surface_list_embedding), dim=0)
        self.room_embedding = self.translation_lm.encode(self.ROOM_LIST, batch_size=8,
                convert_to_tensor=True, device=self.device)
        self.action_list_embedding = self.translation_lm.encode(self.GROUNDED_ACTION_LIST, batch_size=8, 
                convert_to_tensor=True, device=self.device)  # lower batch_size if limited by GPU memory
    
    def find_most_similar(self, query_str, corpus_embedding):
        # helper function for finding similar sentence in a corpus given a query
        query_embedding = self.translation_lm.encode(query_str, convert_to_tensor=True, device=self.device, show_progress_bar=False,)
        # calculate cosine similarity against each candidate sentence in the corpus
        cos_scores = st_utils.pytorch_cos_sim(query_embedding, corpus_embedding)[0].detach().cpu().numpy()
        # retrieve high-ranked index and similarity score
        most_similar_idx = np.argmax(cos_scores)
        return most_similar_idx
    
    def query_llm(self, task, observe = None):
        prompt_content = self.prompt_begin
        task = 'Scene: ' + "\n".join(f"{a}:{b}" for a, b in zip(self.condition_list, self.initial_conditions) if a != 'ACTION_DICT') + '\nCurrent State: ' + observe + 'Task: ' + task

        
        generated_samples = []
        for _ in range(self.sampling_params['n']): # call llm for n times and append each response to 'generated_samples'
            try: 
                
                # ------- your code starts here ----- #
                
                response = client.chat.completions.create(
                    model=self.model,
                    messages=[
                        {"role": "system", "content": self.prompt_begin},
                        {"role": "user", "content": task}
                    ],
                    **self.sampling_params
                )

                # Extract the message content
                generated_samples.append(response.choices[0].message.content.strip())
                
                # ------- your code ends here ------- #
                
            except Exception as e:
                print(f"Error: {e}")

        samples = generated_samples
        # print(samples)
        return samples
    
    def plan(self, task, observe):
        samples = self.query_llm(task, observe = observe)
        action_list, action_index = [], []
        for sample in samples:
            most_similar_idx = self.find_most_similar(sample, self.action_list_embedding)
            # find the most similar action in the action list
            translated_action = self.GROUNDED_ACTION_LIST[most_similar_idx]
            action_list.append(translated_action)
            action_index.append(most_similar_idx)
        best_action = [max(action_list, key=action_list.count), max(action_index, key=action_list.count)]
        # [action, index]
        return best_action


# Try On Your Own!

You can try the given test environment to test your implementations.

In [23]:

def test_q_learning_with_llm(Q_bool=False, index=0, num_episodes=1000, verbose=False, generate_solution=False):

    instruction, goal_state, initial_conditions = test_cases[index]
    
    # ------- your code starts here ----- #

    llm_model = LLM_Model(device = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
                          instruction=instruction,
                          goal_state=goal_state,
                          initial_conditions=initial_conditions, 
                          model = "meta-llama/llama-2-13b-chat")

    # ------- your code ends here ------- #

    
    env = MiniHouseV1(
        instruction=instruction,
        goal_state=goal_state,
        initial_conditions=initial_conditions,
        verbose=verbose
    )
    obs, reward, done, history, valid_action = env.reset()

    best_action_llm = []
    print('Your task is:' + instruction)
    for state_index in range(env.nS):
        print("state: ", state_index)
        describe = describe_state(env.index_to_state(state_index))
        best_action = llm_model.plan(instruction, observe=describe)
        action_name = best_action[0]
        action_index = best_action[1]
        best_action_llm.append(action_index)
        print('The best action is ' + action_name + '\n')
    
    msq = mdp_solver_q_learning()
    policy, V = msq.Q_learning(
        alpha=0.1,
        gamma=0.9,
        theta=0.0001,
        epsilon=0.1,
        env_nS=env.nS,
        env_nA=env.nA,
        env_transition=env.transition,
        env=env,
        num_episodes=num_episodes,
        initial_action = best_action_llm,
    )
    
    if verbose:
        print("state: ", env.state_to_index(env.state))
        print("num state: ", env.nS)
        print("num actions: ", env.nA)
        print()

    msq = mdp_solver_q_learning()

    policy, V = msq.Q_learning(
        alpha=0.1,
        gamma=0.9,
        theta=0.0001,
        epsilon=0.1,
        env_nS=env.nS,
        env_nA=env.nA,
        env_transition=env.transition,
        env=env,
        num_episodes=num_episodes,
    )
    
    if verbose:
        print("Policy Iteration")
        print()
        print("V: ", repr(V))
        print()
        print("policy: ", repr(policy))
        print()
        
    env.reset()
    
    for i in range(100):
        print()
        print(f"---------- Step: {i} ----------")
        action = int(policy[env.state_to_index(env.state)])
        obs, reward, done, _, _ = env.step(action)
        

        if verbose:
            print("obs: ", obs)
            print("reward: ", reward)
            print("done: ", done)

        if done:
            break

    if generate_solution:
        np.savetxt(f'data/V_{index}.py', V, delimiter=',')

        solution_values = np.loadtxt(f'data/V_{index}.py')

        assert len(V) == len(solution_values), \
            'Length of Values is incorrect'

        assert np.allclose(V, solution_values), \
            'Values incorrect'

    return None

In [24]:
test_q_learning_with_llm(index=0, num_episodes=1000, verbose=True)

Your task is:move the apple to the table
state:  0
The robot is currently in the living room.
The apple is located at table.
The fridge is closed.
The table is in the living room.

The best action is pick apple

state:  1
The robot is currently in the kitchen.
The apple is located at table.
The fridge is closed.
The table is in the living room.

The best action is open fridge

state:  2
The robot is currently in the bedroom.
The apple is located at table.
The fridge is closed.
The table is in the living room.

The best action is pick apple

state:  3
The robot is currently in the bathroom.
The apple is located at table.
The fridge is closed.
The table is in the living room.

The best action is pick apple

state:  4
The robot is currently in the living room.
The apple is located at table.
The fridge is open.
The table is in the living room.

The best action is open fridge

state:  5
The robot is currently in the kitchen.
The apple is located at table.
The fridge is open.
The table is in

## 🎉 Congratulations!

Congratulations! You have successfully completed all the requirements for Problem 3.

We hope that through this assignment, you have gained hands-on experience in leveraging LLMs for decision support in search problems and developed the following skills:

1) Obtaining API keys and making calls to generative models.
2) Crafting effective prompts to accurately describe the state and task.
3) Interacting with the generative model and collecting meaningful responses.

Following this, you should now have a deeper understanding of how to design effective prompts, process and validate model-generated actions. 

---

## Answers

**a) Can value/policy iteration work with 10 movable items and more rooms? What else could you use?**
- Value / policy iteration might still work, but they can become slow and memory-intensive as the state space grows due to more items and rooms. Alternatively, apart from Q-learning, which might still be a good choice, maybe we can use Deep RL (e.g., DQN) to handle larger state spaces without needing a full model. What's more, maybe we can combine LLMs as heuristics or high-level policy guides (like you're already doing) to reduce search space.

**b) What about handling 100 movable objects?**
- With 100 movable objects, since out current method relys on storing every possible action in every situation, the number of combinations would become way too large and simply wouldn't work. It is infeasible to store or update the full Q-table. What's more, value and policy iteration will very likely fail as well, due to the crazy memory & time complexity.

**c) Can other model-free RL methods help in the above cases? Why?**
- Yes, other model-free methods can be very helpful. In this case, we prefer methods that do not need to know exactly how the environment works — they just learn from trying things out. This makes them a good fit for complex environments like a big house with many objects. They are also easier to scale up, especially if you use tools like neural networks or language models to help guide the learning process.

- For example, SARSA is a method that learns by updating its knowledge based on the actual actions taken, which can make learning more stable in some situations. Deep Q-Networks (DQN) go a step further by using neural networks to estimate the best actions, which helps a lot when there are too many situations to track individually. 