# Frozen Lake + ToT-DFS

## Utils

### Imports

In [100]:
import gymnasium as gym
from environments.frozen_lake.common.environment import FrozenLakeEnvWrapper
from environments.frozen_lake.common.evaluate_output_parser import FrozenMapEvaluateOutputParser
from langchain_openai import ChatOpenAI
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
import os
from environments.frozen_lake.tot_dfs.evaluate_prompts import evaluate_prompt
from environments.frozen_lake.tot_dfs.generate_prompts import generate_openai_tools_prompt
from planning_library.strategies import TreeOfThoughtsDFSStrategy
from operator import itemgetter
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.runnables import RunnableLambda


%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### Logging

In [101]:
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "ToT + Frozen Lake test"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"

## Environment

We use a custom wrapper on top of Frozen Lake from gym. It changes the following:
* it accepts `AgentAction` from LangChain and internally calls the original environment with right actions;
* it transforms the observations returned by the original environment: instead of a single number, it's now current position (x,y) on the board;
* it supports performing a given sequence of actions when resetting an environment.

In [102]:
env = gym.make("FrozenLake-v1", desc=generate_random_map(size=4), is_slippery=True, render_mode="human")
env = FrozenLakeEnvWrapper(env)
env.reset(seed=123)

(0, {'prob': 1})

## Strategy

### Hyperparameters

In [117]:
# ToT hyperparameters
max_num_thoughts = 3  # number of thoughts to generate at each iteration
max_num_steps = 20  # total maximum number of iterations
value_threshold = 0.1  # threshold for evaluation; only thoughts with value > value_threshold will be explored

# other hyperparameters
model_name = "gpt-3.5-turbo"
temperature = 0.8

### Evaluator

#### Prompt

In [118]:
evaluate_prompt.input_variables

['inputs', 'next_thought', 'trajectory']

In [119]:
print(
    evaluate_prompt.format(
        inputs="\n".join("".join(x.decode() for x in y) for y in env.get_wrapper_attr("desc")),
        trajectory=[],
        next_thought="move right",
    )
)

System: You are a helpful assistant that judges whether suggestions for individual steps in Frozen Lake game seem plausible.
Human: 
In Frozen Lake game, you move on a 2D grid. The end goal is to cross this grid from start 
to finish without falling into any holes.

You start at location (0,0) (upper left corner) of the frozen lake grid world, and the finish is located 
at far extent of the world, i.e., (n - 1, n - 1) for n x n grid (lower right corner). 

The first coordinate is X axis.
When you move left, you decrease your first coordinate by 1 (can't be bigger than n - 1).
When you move right, you increase your first coordinate by 1 (can't be lower than 0).

The second coordinate is Y axis. 
When you move down, you increase your second coordinate by 1 (can't be bigger than n - 1).
When you move right, you decrease your second coordinate by 1 (can't be lower than 0).

You will be given a map, optionally, a sequence of intermediate steps and a suggestion for the next step. Your goal i

#### Formatting utils

In [120]:
from langchain.agents.format_scratchpad.openai_tools import format_to_openai_tool_messages
from typing import Union, List


def format_next_thought(next_thought: Union[List[AgentAction], AgentAction, AgentFinish]) -> str:
    if isinstance(next_thought, AgentAction):
        return f"Call {next_thought.tool} with input {next_thought.tool_input}"
    elif isinstance(next_thought, AgentFinish):
        return f"Finish with the following comment: {next_thought.log}"
    else:
        return "; ".join([format_next_thought(action) for action in next_thought])

#### Putting it all together

In [121]:
evaluator_runnable = (
    {
        "inputs": itemgetter("inputs") | RunnableLambda(itemgetter("inputs")),
        "trajectory": itemgetter("trajectory") | RunnableLambda(format_to_openai_tool_messages),
        "next_thought": itemgetter("next_thought") | RunnableLambda(format_next_thought),
    }
    | evaluate_prompt
    | ChatOpenAI(model=model_name, temperature=temperature)
    | FrozenMapEvaluateOutputParser()
).with_config(run_name="Evaluate Thought")

### Thought Generator

#### Prompt

In [122]:
generate_openai_tools_prompt.input_variables

['agent_scratchpad', 'inputs', 'previous_thoughts']

In [123]:
print(
    generate_openai_tools_prompt.format(
        inputs="\n".join("".join(x.decode() for x in y) for y in env.get_wrapper_attr("desc")),
        agent_scratchpad=[],
        previous_thoughts=[],
    )
)

System: You are an intelligent agent playing a Frozen Lake game.
Human: 
In Frozen Lake game, you move on a 2D grid. Your goal is to cross this grid from start 
to finish without falling into any holes.

You start at location (0,0) (upper left corner) of the frozen lake grid world, and the finish is located 
at far extent of the world, i.e., (n - 1, n - 1) for n x n grid (lower right corner).

The first coordinate is X axis.
When you move left, you decrease your first coordinate by 1 (can't be bigger than n - 1).
When you move right, you increase your first coordinate by 1 (can't be lower than 0).

The second coordinate is Y axis. 
When you move down, you increase your second coordinate by 1 (can't be bigger than n - 1).
When you move right, you decrease your second coordinate by 1 (can't be lower than 0).

The map is an n x n grid where different types of cells are denoted by different letters:
* S - start cell
* G - goal cell
* F - frozen cell
* H - hole cell

Example for 2 x 2 case:

#### Tools

In [124]:
tools = [env.name_to_tool_map[name] for name in env.name_to_tool_map]
tools

[MoveTool(env=<gymnasium.envs.toy_text.frozen_lake.FrozenLakeEnv object at 0x2f960de90>)]

#### Formatting utils

In [125]:
from langchain_core.utils.function_calling import convert_to_openai_tool
from langchain_core.messages import BaseMessage, AIMessage

from langchain.agents.format_scratchpad.openai_tools import format_to_openai_tool_messages
from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser, OpenAIToolAgentAction


def format_previous_thoughts(
    previous_thoughts: List[Union[List[AgentAction], AgentFinish, AgentAction]]
) -> List[BaseMessage]:
    messages = []
    for agent_action in previous_thoughts:
        if isinstance(agent_action, OpenAIToolAgentAction):
            messages.append(AIMessage(content=f"Calling {agent_action.tool} with {agent_action.tool_input}"))
        elif isinstance(agent_action, list):
            messages.extend(format_previous_thoughts(agent_action))
        else:
            messages.append(AIMessage(content=agent_action.log))
    return messages

#### Putting it all together

In [126]:
# copied from langchain.agents.create_openai_tools_agent to allow for custom formatting of `previous_thoughts in inputs

llm = ChatOpenAI(model=model_name, temperature=temperature).bind(tools=[convert_to_openai_tool(tool) for tool in tools])

agent = (
    {
        "inputs": itemgetter("inputs"),
        "agent_scratchpad": lambda x: format_to_openai_tool_messages(x["intermediate_steps"]),
        "previous_thoughts": lambda x: format_previous_thoughts(x["previous_thoughts"]),
    }
    | generate_openai_tools_prompt
    | llm
    | OpenAIToolsAgentOutputParser()
).with_config(run_name="Generate Thought")

### Defining strategy

In [127]:
from planning_library.action_executors import GymnasiumActionExecutor


action_executor = GymnasiumActionExecutor(env, seed=123)

In [128]:
strategy_executor = TreeOfThoughtsDFSStrategy.create(
    agent=agent,
    tools=tools,
    action_executor=action_executor,
    evaluator_runnable=evaluator_runnable,
    value_threshold=value_threshold,
    max_num_thoughts=max_num_thoughts,
    max_iterations=max_num_steps,
)

In [129]:
env.reset(seed=123)
strategy_executor.invoke({"inputs": "\n".join("".join(x.decode() for x in y) for y in env.get_wrapper_attr("desc"))})



[1m> Entering new TreeOfThoughtsDFSStrategy chain...[0m

[1m> Finished chain.[0m


{'inputs': 'SFFF\nFFFH\nFFFF\nFFFG', 'outputs': []}