In [None]:
import sys
import os
import json

sys.path.append("/Users/ludovicomitchener/Desktop/repos/data-analysis-crow/src")
import hashlib
import shutil
from pathlib import Path
import time
import logging


from ldp.agent import AgentConfig
from ldp.alg.rollout import RolloutManager
from ldp.data_structures import Trajectory, Transition

from fhda.data_analysis_env import DataAnalysisEnv
from fhda.notebook_env import NBEnvironment
from fhda.utils import NBLanguage
from fhda import prompts
import fhda.config as cfg

In [None]:
def setup_data_analysis_env(
    query: str, dataset: Path, language: NBLanguage = NBLanguage.PYTHON
):
    # Hash the task to get a unique identifier
    task_hash = hashlib.sha256(query.encode()).hexdigest()
    trajectory_path = (
        Path(os.path.abspath("tmp_results_dir")) / f"{task_hash}-{time.time()}"
    )
    trajectory_path.mkdir(parents=True, exist_ok=True)
    nb_path = trajectory_path / NBEnvironment.NOTEBOOK_NAME
    # Copy task data to trajectory path
    if dataset.is_dir():
        for item in dataset.iterdir():
            if item.is_file():
                shutil.copy2(item, trajectory_path)
            elif item.is_dir():
                shutil.copytree(item, trajectory_path / item.name, dirs_exist_ok=True)
    else:
        shutil.copy2(dataset, trajectory_path)
    # Augment incoming task with CoT instructions
    augmented_task = f"""\
    Here is the user query to address:


    <query>
    {query}
    </query>

    {prompts.CHAIN_OF_THOUGHT_AGNOSTIC.format(language=language.name)}
    {prompts.GENERAL_NOTEBOOK_GUIDELINES.format(language=language.name)}"""

    if language == NBLanguage.R:
        augmented_task += f"\n{prompts.R_SPECIFIC_GUIDELINES}"

    dae = DataAnalysisEnv(
        problem_id=f"data-analysis-task-{task_hash}",
        problem=augmented_task,
        eval_mode=None,
        nb_path=nb_path,
        work_dir=trajectory_path,
        language=language,
        system_prompt=prompts.CAPSULE_SYSTEM_PROMPT_QUERY,
        use_tmp_work_dir=False,
        run_notebook_on_edit=True if cfg.USE_DOCKER else False,
    )
    return dae

In [None]:
# ENVIRONMENT CONFIGURATION

# Set your API keys
os.environ["ANTHROPIC_API_KEY"] = ""
# os.environ["OPENAI_API_KEY"] = ""
# If using docker, be sure to pull the image from docker hub first
# docker pull futurehouse/bixbench:aviary-notebook-env
# This image includes many bioinformatics and data science packages
cfg.USE_DOCKER = False
# This can be R or PYTHON in Docker or with a local kernel if you have R installed
LANGUAGE = NBLanguage.PYTHON
MAX_STEPS = 3
MODEL_NAME = "claude-3-7-sonnet-latest"

In [None]:
# AVIARY ROLLOUT
# This folder only contains a single csv file on animal brain size and body mass from here:
# https://animaltraits.org/
# However, it could contain many files including nested folders

logger = logging.getLogger(__name__)
logger.info("Setting up data analysis environment")

dataset = Path("datasets/brain_size_data.csv")
query = "Analyze the dataset and give me an in depth analysis using pretty plots. I am particularly interested in crows."
environment = setup_data_analysis_env(query, dataset, LANGUAGE)

agent = AgentConfig(
    agent_type="ReActAgent",
    agent_kwargs={
        "llm_model": {
            "parallel_tool_calls": False,
            "num_retries": 3,
            "temperature": 1.0,
            "name": MODEL_NAME,
        },
        "hide_old_env_states": True,
    },
)

agent = agent.construct_agent()
rollout = RolloutManager(agent=agent)

# You can see the notebook updating live in the tmp_results_dir folder
result = await rollout.sample_trajectories(
    environments=[environment], max_steps=MAX_STEPS
)

print("Trajectory completed! Final notebook available at: \n", environment.nb_path)
print(f"Final agent answer:\n{environment.state.answer}")

In [None]:
# INSPECT THE RESULT
trajectory = result[0]
# You can inspect each step in the trajectory and see what the agent's reasoning was,
# what tool it called, and what the observation was
for c, step in enumerate(trajectory.steps):
    print(f"Timestep {c}")
    print(f"Done: {step.done}")
    print("Agent Reasoning:")
    for message in step.agent_state.messages:
        if message.content:
            print(f"Message: {message.content[:200]} [Truncated]")
    # print(f"Observation: {step.observation[:200]} [Truncated]")
    print(f"Action: {step.action.value}")
    print("---")

In [None]:
# VANILLA ROLLOUT - this is a simple version of the what the rollout Manager does
dataset_folder = Path("dataset")
query = "Analyze the dataset and give me an in depth analysis using pretty plots. I am particularly interested in crows."
environment = setup_data_analysis_env(query, dataset_folder)

obs, tools = await environment.reset()
agent_state = await agent.init_state(tools)
trajectory = Trajectory()
max_steps = 10
for timestep in range(max_steps):
    action, next_agent_state, value = await agent.get_asv(agent_state, obs)
    next_obs, reward, done, trunc = await environment.step(action.value)
    # Create the transition object
    transition = Transition(
        timestep=timestep,
        agent_state=agent_state,
        next_agent_state=next_agent_state,
        observation=obs,
        next_observation=next_obs,
        action=action,
        reward=reward,
        done=done,
        truncated=trunc,
        value=value,
    )
    # Update steps by creating a new list with the additional transition
    trajectory.steps = [*trajectory.steps, transition]
    if done or trunc:
        break

    agent_state = next_agent_state
    obs = next_obs

In [None]:
# PLATFORM ROLLOUT

from futurehouse_client import FutureHouseClient
from futurehouse_client.models import Stage, TaskRequest, RuntimeConfig
from futurehouse_client.models.app import AuthType

# CONFIGURATION
CROW_STAGE = Stage.PROD
API_KEY = ""
JOB_NAME = "job-futurehouse-data-analysis-crow-high"
MAX_STEPS = 25
LANGUAGE = "R"
DATA_GCS_LOCATION = "bixbench_data/CapsuleFolder-1d54e4a7-8b0f-4224-bd31-efcfded0d46c"


client = FutureHouseClient(
    stage=CROW_STAGE,
    auth_type=AuthType.API_KEY,
    api_key=API_KEY,
)


task = f"""\
Here is the user query to address:


<query>
Make a discovery using this dataset.
</query>

{prompts.CHAIN_OF_THOUGHT_AGNOSTIC.format(language=LANGUAGE)}
{prompts.GENERAL_NOTEBOOK_GUIDELINES.format(language=LANGUAGE)}"""

job_data = TaskRequest(
    name=JOB_NAME,
    query=task,
    runtime_config=RuntimeConfig(
        max_steps=MAX_STEPS,
        upload_id=DATA_GCS_LOCATION,  # This is just an example dataset
        environment_config={
            "run_notebook_on_edit": False,
            "eval": True,
            "language": LANGUAGE,
        },
        # timeout=600,
    ),
)
job_id = client.create_task(job_data)
status = "in progress"
while status == "in progress":
    print("Waiting for task to complete... checking again in 15 seconds")
    time.sleep(15)
    status = client.get_task(job_id).status

job_result = client.get_task(job_id, verbose=True)
answer = job_result.environment_frame["state"]["state"]["answer"]
print(
    f"Task completed, the full analysis is available at:https://platform.futurehouse.org/trajectories/{job_id}\n Agent answer: {answer}"
)

In [None]:
# You can also view the notebook locally by saving it to a directory of your choice
# Define the path where you want to save the notebook
notebook_path = "output/analysis_notebook.ipynb"

os.makedirs(os.path.dirname(notebook_path), exist_ok=True)
notebook_content = job_result.environment_frame["state"]["state"]["nb_state"]
with open(notebook_path, "w") as f:
    json.dump(notebook_content, f, indent=2)

print(f"Notebook saved to {os.path.abspath(notebook_path)}")