Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/reference/RESEARCH.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Each row gets its own agent that researches independently.
| `task` | str | The agent task describing what to research |
| `session` | Session | Optional, auto-created if omitted |
| `input` | BaseModel \| DataFrame \| UUID | Optional input context |
| `effort_level` | EffortLevel | LOW, MEDIUM, or HIGH (default: LOW) |
| `effort_level` | EffortLevel | LOW, MEDIUM, or HIGH (default: MEDIUM) |
| `llm` | LLM | Optional agent LLM override |
| `response_model` | BaseModel | Optional schema for structured output |
| `return_table` | bool | (`single_agent` only) If True, returns a table instead of a scalar result |
Expand All @@ -78,8 +78,8 @@ Each row gets its own agent that researches independently.

The effort level lets you control how thorough the research is.

- `LOW`: Quick lookups, basic web searches, fast and cheap (default)
- `MEDIUM`: More thorough research, multiple sources consulted
- `LOW`: Just a single LLM call, not a real agent, cheapest & fastest
- `MEDIUM`: More thorough research, multiple sources consulted (default)
- `HIGH`: Deep research, cross-referencing sources, higher accuracy

### Response model
Expand Down
89 changes: 64 additions & 25 deletions src/everyrow/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
InputData = UUID | list[dict[str, Any]] | dict[str, Any]


DEFAULT_EFFORT_LEVEL = EffortLevel.MEDIUM


class DefaultAgentResponse(BaseModel):
answer: str

Expand Down Expand Up @@ -112,7 +115,9 @@ async def create_scalar_artifact(input: BaseModel, session: Session) -> UUID:
data=CreateArtifactRequestDataType1.from_dict(input.model_dump()),
session_id=session.session_id,
)
response = await create_artifact_artifacts_post.asyncio(client=session.client, body=body)
response = await create_artifact_artifacts_post.asyncio(
client=session.client, body=body
)
response = handle_response(response)
return response.artifact_id

Expand All @@ -124,7 +129,9 @@ async def create_table_artifact(input: DataFrame, session: Session) -> UUID:
data=[CreateArtifactRequestDataType0Item.from_dict(r) for r in records],
session_id=session.session_id,
)
response = await create_artifact_artifacts_post.asyncio(client=session.client, body=body)
response = await create_artifact_artifacts_post.asyncio(
client=session.client, body=body
)
response = handle_response(response)
return response.artifact_id

Expand All @@ -137,7 +144,7 @@ async def single_agent[T: BaseModel](
task: str,
session: Session | None = None,
input: BaseModel | UUID | Result | None = None,
effort_level: EffortLevel | None = EffortLevel.LOW,
effort_level: EffortLevel | None = DEFAULT_EFFORT_LEVEL,
llm: LLM | None = None,
iteration_budget: int | None = None,
include_research: bool | None = None,
Comment on lines 144 to 150
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Changing the default effort_level to MEDIUM is a breaking change. Users providing custom parameters must now explicitly set effort_level=None to avoid an API validation error.
Severity: HIGH

Suggested Fix

To avoid breaking existing clients, either revert the default effort_level or, preferably, modify the logic to automatically set effort_level to None if any custom parameters (llm, iteration_budget, etc.) are provided. Additionally, ensure all documentation, including the agent_map docstring, is updated to reflect the correct default behavior.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/everyrow/ops.py#L144-L150

Potential issue: Changing the default `effort_level` to `EffortLevel.MEDIUM` introduces
a breaking change for users providing custom parameters like `llm` or
`iteration_budget`. The API requires that `effort_level` and custom parameters are
mutually exclusive. Users who previously called the function with custom parameters
without explicitly setting `effort_level` will now have the default
`effort_level=MEDIUM` sent along with their custom parameters. This will cause the API
to return a 422 validation error, breaking existing client implementations. The issue is
made worse by outdated docstrings that still list the old default value.

Expand All @@ -151,7 +158,7 @@ async def single_agent(
task: str,
session: Session | None = None,
input: BaseModel | UUID | Result | None = None,
effort_level: EffortLevel | None = EffortLevel.LOW,
effort_level: EffortLevel | None = DEFAULT_EFFORT_LEVEL,
llm: LLM | None = None,
iteration_budget: int | None = None,
include_research: bool | None = None,
Expand All @@ -164,7 +171,7 @@ async def single_agent[T: BaseModel](
task: str,
session: Session | None = None,
input: BaseModel | DataFrame | UUID | Result | None = None,
effort_level: EffortLevel | None = EffortLevel.LOW,
effort_level: EffortLevel | None = DEFAULT_EFFORT_LEVEL,
llm: LLM | None = None,
iteration_budget: int | None = None,
include_research: bool | None = None,
Comment on lines 171 to 177

This comment was marked as outdated.

Expand All @@ -178,7 +185,7 @@ async def single_agent[T: BaseModel](
session: Optional session. If not provided, one will be created automatically.
input: Input data (BaseModel, DataFrame, UUID, or Result).
effort_level: Effort level preset (low/medium/high). Mutually exclusive with
custom params (llm, iteration_budget, include_research). Default: low.
custom params (llm, iteration_budget, include_research). Default: medium.
llm: LLM to use. Required when effort_level is None.
iteration_budget: Number of agent iterations (0-20). Required when effort_level is None.
include_research: Include research notes. Required when effort_level is None.
Expand Down Expand Up @@ -220,33 +227,43 @@ async def single_agent_async[T: BaseModel](
task: str,
session: Session,
input: BaseModel | DataFrame | UUID | Result | None = None,
effort_level: EffortLevel | None = EffortLevel.LOW,
effort_level: EffortLevel | None = DEFAULT_EFFORT_LEVEL,
llm: LLM | None = None,
iteration_budget: int | None = None,
include_research: bool | None = None,
response_model: type[T] = DefaultAgentResponse,
return_table: bool = False,
) -> EveryrowTask[T]:
"""Submit a single_agent task asynchronously."""
input_data = _prepare_single_input(input, SingleAgentOperationInputType1Item, SingleAgentOperationInputType2)
input_data = _prepare_single_input(
input, SingleAgentOperationInputType1Item, SingleAgentOperationInputType2
)

# Build the operation body with either preset or custom params
body = SingleAgentOperation(
input_=input_data, # type: ignore
task=task,
session_id=session.session_id,
response_schema=SingleAgentOperationResponseSchemaType0.from_dict(response_model.model_json_schema()),
effort_level=PublicEffortLevel(effort_level.value) if effort_level is not None else UNSET,
response_schema=SingleAgentOperationResponseSchemaType0.from_dict(
response_model.model_json_schema()
),
effort_level=PublicEffortLevel(effort_level.value)
if effort_level is not None
else UNSET,
llm=LLMEnumPublic(llm.value) if llm is not None else UNSET,
iteration_budget=iteration_budget if iteration_budget is not None else UNSET,
include_research=include_research if include_research is not None else UNSET,
return_list=return_table,
)

response = await single_agent_operations_single_agent_post.asyncio(client=session.client, body=body)
response = await single_agent_operations_single_agent_post.asyncio(
client=session.client, body=body
)
response = handle_response(response)

cohort_task: EveryrowTask[T] = EveryrowTask(response_model=response_model, is_map=False, is_expand=return_table)
cohort_task: EveryrowTask[T] = EveryrowTask(
response_model=response_model, is_map=False, is_expand=return_table
)
cohort_task.set_submitted(response.task_id, response.session_id, session.client)
return cohort_task

Expand All @@ -258,7 +275,7 @@ async def agent_map(
task: str,
session: Session | None = None,
input: DataFrame | UUID | TableResult | None = None,
effort_level: EffortLevel | None = EffortLevel.LOW,
effort_level: EffortLevel | None = DEFAULT_EFFORT_LEVEL,
llm: LLM | None = None,
iteration_budget: int | None = None,
include_research: bool | None = None,
Expand Down Expand Up @@ -321,7 +338,7 @@ async def agent_map_async(
task: str,
session: Session,
input: DataFrame | UUID | TableResult,
effort_level: EffortLevel | None = EffortLevel.LOW,
effort_level: EffortLevel | None = DEFAULT_EFFORT_LEVEL,
llm: LLM | None = None,
iteration_budget: int | None = None,
include_research: bool | None = None,
Expand All @@ -336,19 +353,27 @@ async def agent_map_async(
input_=input_data, # type: ignore
task=task,
session_id=session.session_id,
response_schema=AgentMapOperationResponseSchemaType0.from_dict(response_model.model_json_schema()),
effort_level=PublicEffortLevel(effort_level.value) if effort_level is not None else UNSET,
response_schema=AgentMapOperationResponseSchemaType0.from_dict(
response_model.model_json_schema()
),
effort_level=PublicEffortLevel(effort_level.value)
if effort_level is not None
else UNSET,
llm=LLMEnumPublic(llm.value) if llm is not None else UNSET,
iteration_budget=iteration_budget if iteration_budget is not None else UNSET,
include_research=include_research if include_research is not None else UNSET,
join_with_input=True,
enforce_row_independence=enforce_row_independence,
)

response = await agent_map_operations_agent_map_post.asyncio(client=session.client, body=body)
response = await agent_map_operations_agent_map_post.asyncio(
client=session.client, body=body
)
response = handle_response(response)

cohort_task = EveryrowTask(response_model=response_model, is_map=True, is_expand=False)
cohort_task = EveryrowTask(
response_model=response_model, is_map=True, is_expand=False
)
cohort_task.set_submitted(response.task_id, response.session_id, session.client)
return cohort_task

Expand Down Expand Up @@ -387,7 +412,9 @@ async def screen[T: BaseModel](
if isinstance(result, TableResult):
return result
raise EveryrowError("Screen task did not return a table result")
cohort_task = await screen_async(task=task, session=session, input=input, response_model=response_model)
cohort_task = await screen_async(
task=task, session=session, input=input, response_model=response_model
)
result = await cohort_task.await_result()
if isinstance(result, TableResult):
return result
Expand All @@ -408,10 +435,14 @@ async def screen_async[T: BaseModel](
input_=input_data, # type: ignore
task=task,
session_id=session.session_id,
response_schema=ScreenOperationResponseSchemaType0.from_dict(actual_response_model.model_json_schema()),
response_schema=ScreenOperationResponseSchemaType0.from_dict(
actual_response_model.model_json_schema()
),
)

response = await screen_operations_screen_post.asyncio(client=session.client, body=body)
response = await screen_operations_screen_post.asyncio(
client=session.client, body=body
)
response = handle_response(response)

cohort_task: EveryrowTask[T] = EveryrowTask(
Expand Down Expand Up @@ -498,7 +529,9 @@ async def rank_async[T: BaseModel](
# Validate that field_name exists in the model
properties = response_schema.get("properties", {})
if field_name not in properties:
raise ValueError(f"Field {field_name} not in response model {response_model.__name__}")
raise ValueError(
f"Field {field_name} not in response model {response_model.__name__}"
)
else:
# Build a minimal JSON schema with just the sort field
json_type_map = {
Expand All @@ -509,7 +542,9 @@ async def rank_async[T: BaseModel](
}
response_schema = {
"type": "object",
"properties": {field_name: {"type": json_type_map.get(field_type, field_type)}},
"properties": {
field_name: {"type": json_type_map.get(field_type, field_type)}
},
"required": [field_name],
}

Expand Down Expand Up @@ -619,7 +654,9 @@ async def merge_async(
session_id=session.session_id,
)

response = await merge_operations_merge_post.asyncio(client=session.client, body=body)
response = await merge_operations_merge_post.asyncio(
client=session.client, body=body
)
response = handle_response(response)

merge_task = MergeTask()
Expand Down Expand Up @@ -714,7 +751,9 @@ async def dedupe_async(
strategy_prompt=strategy_prompt if strategy_prompt is not None else UNSET,
)

response = await dedupe_operations_dedupe_post.asyncio(client=session.client, body=body)
response = await dedupe_operations_dedupe_post.asyncio(
client=session.client, body=body
)
response = handle_response(response)

cohort_task = EveryrowTask(response_model=BaseModel, is_map=True, is_expand=False)
Expand Down