diff --git a/docs/api.md b/docs/api.md index 712877d8..f777fa75 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1,11 +1,11 @@ --- title: API Reference -description: Complete API reference for everyrow — screen, rank, dedupe, merge, and research operations powered by LLM web research agents. +description: Complete API reference for everyrow — screen, rank, dedupe, merge, forecast, and research operations powered by LLM web research agents. --- # API Reference -Five operations for processing data with LLM-powered web research agents. Each takes a DataFrame and a natural-language instruction. +Six operations for processing data with LLM-powered web research agents. Each takes a DataFrame and a natural-language instruction. ## screen @@ -55,6 +55,17 @@ result = await merge(task=..., left_table=df1, right_table=df2) Guides: [Fuzzy Join Without Matching Keys](/docs/fuzzy-join-without-keys) Case Studies: [LLM Merging at Scale](/docs/case-studies/llm-powered-merging-at-scale), [Match Software Vendors to Requirements](/docs/case-studies/match-software-vendors-to-requirements) +## forecast + +```python +result = await forecast(input=questions_df) +``` + +`forecast` takes a DataFrame of binary questions and produces a calibrated probability estimate (0–100) and rationale for each row. Each question is researched across six dimensions in parallel, then synthesized by an ensemble of forecasters. Validated against 1500 hard forecasting questions and 15M research documents. + +[Full reference →](/docs/reference/FORECAST) +Blog posts: [Automating Forecasting Questions](https://futuresearch.ai/automating-forecasting-questions/), [arXiv paper](https://arxiv.org/abs/2506.21558) + ## agent_map / single_agent ```python diff --git a/docs/reference/FORECAST.md b/docs/reference/FORECAST.md new file mode 100644 index 00000000..4d1e6aed --- /dev/null +++ b/docs/reference/FORECAST.md @@ -0,0 +1,86 @@ +--- +title: forecast +description: API reference for the EveryRow forecast tool, which produces calibrated probability estimates for binary questions using web research and an ensemble of forecasters. +--- + +# Forecast + +`forecast` takes a DataFrame of binary questions and produces a calibrated probability estimate (0–100) and rationale for each row. The approach is validated against FutureSearch's past-casting environment of 1500 hard forecasting questions and 15M research documents. See more at [Automating Forecasting Questions](https://futuresearch.ai/automating-forecasting-questions/) and [arXiv:2506.21558](https://arxiv.org/abs/2506.21558). + +## Examples + +```python +from pandas import DataFrame +from everyrow.ops import forecast + +questions = DataFrame([ + { + "question": "Will the US Federal Reserve cut rates by at least 25bp before July 1, 2027?", + "resolution_criteria": "Resolves YES if the Fed announces at least one rate cut of 25bp or more at any FOMC meeting between now and June 30, 2027.", + }, +]) + +result = await forecast(input=questions) +print(result.data[["question", "probability", "rationale"]]) +``` + +The output DataFrame contains the original columns plus `probability` (int, 0–100) and `rationale` (str). + +### Batch context + +When all rows share common framing, pass it via `context` instead of repeating it in every row: + +```python +result = await forecast( + input=geopolitics_questions, + context="Focus on EU regulatory and diplomatic sources. Assume all questions resolve by end of 2027.", +) +``` + +Leave `context` empty when rows are self-contained—a well-specified question with resolution criteria needs no additional instruction. + +## Input columns + +The input DataFrame should contain at minimum a `question` column. All columns are passed to the research agents and forecasters. + +| Column | Required | Purpose | +|--------|----------|---------| +| `question` | Yes | The binary question to forecast | +| `resolution_criteria` | Recommended | Exactly how YES/NO is determined—the "contract" | +| `resolution_date` | Optional | When the question closes | +| `background` | Optional | Additional context the forecasters should know | + +Column names are not enforced—research agents infer meaning from content. A column named `scenario` instead of `question` works fine. + +## Parameters + +| Name | Type | Description | +|------|------|-------------| +| `input` | DataFrame | Rows to forecast, one question per row | +| `context` | str \| None | Optional batch-level instructions that apply to every row | +| `session` | Session | Optional, auto-created if omitted | + +## Output + +Two columns are added to each input row: + +| Column | Type | Description | +|--------|------|-------------| +| `probability` | int | 0–100, calibrated probability of YES resolution | +| `rationale` | str | Detailed reasoning with citations from web research | + +Probabilities are clamped to [3, 97]—even near-certain outcomes retain residual uncertainty. + +## Performance + +| Rows | Time | Cost | +|------|------|------| +| 1 | ~5 min | ~$0.60 | +| 5 | ~6 min | ~$3 | +| 20 | ~10 min | ~$12 | + +## Related docs + +### Blog posts +- [Automating Forecasting Questions](https://futuresearch.ai/automating-forecasting-questions/) +- [arXiv paper: Automated Forecasting](https://arxiv.org/abs/2506.21558) diff --git a/everyrow-mcp/manifest.json b/everyrow-mcp/manifest.json index a5d0a6c5..4d9c125a 100644 --- a/everyrow-mcp/manifest.json +++ b/everyrow-mcp/manifest.json @@ -49,6 +49,10 @@ "name": "everyrow_agent", "description": "Run web research agents on each row of a CSV file." }, + { + "name": "everyrow_forecast", + "description": "Forecast the probability of binary questions from a CSV file." + }, { "name": "everyrow_single_agent", "description": "Run a single web research agent on a task, optionally with context data." diff --git a/everyrow-mcp/src/everyrow_mcp/models.py b/everyrow-mcp/src/everyrow_mcp/models.py index cd872a37..f8b08688 100644 --- a/everyrow-mcp/src/everyrow_mcp/models.py +++ b/everyrow-mcp/src/everyrow_mcp/models.py @@ -259,6 +259,30 @@ def validate_csv_paths(cls, v: str) -> str: return v +class ForecastInput(BaseModel): + """Input for the forecast operation.""" + + model_config = ConfigDict(str_strip_whitespace=True, extra="forbid") + + input_csv: str = Field( + ..., + description="Absolute path to the input CSV file containing a binary " + "question and optional resolution criteria on each row.", + ) + context: str | None = Field( + default=None, + description="Optional batch-level context or instructions that apply to every row " + "(e.g. 'Focus on EU regulatory sources' or 'Assume resolution by end of 2027'). " + "Leave empty when the rows are self-contained.", + ) + + @field_validator("input_csv") + @classmethod + def validate_input_csv(cls, v: str) -> str: + validate_csv_path(v) + return v + + class SingleAgentInput(BaseModel): """Input for a single agent operation (no CSV).""" diff --git a/everyrow-mcp/src/everyrow_mcp/tools.py b/everyrow-mcp/src/everyrow_mcp/tools.py index 475708d9..859860b5 100644 --- a/everyrow-mcp/src/everyrow_mcp/tools.py +++ b/everyrow-mcp/src/everyrow_mcp/tools.py @@ -21,6 +21,7 @@ from everyrow.ops import ( agent_map_async, dedupe_async, + forecast_async, merge_async, rank_async, screen_async, @@ -40,6 +41,7 @@ from everyrow_mcp.models import ( AgentInput, DedupeInput, + ForecastInput, MergeInput, ProgressInput, RankInput, @@ -533,6 +535,78 @@ async def everyrow_merge(params: MergeInput) -> list[TextContent]: ] +@mcp.tool( + name="everyrow_forecast", + structured_output=False, + annotations=ToolAnnotations( + title="Probability Forecast", + readOnlyHint=False, + destructiveHint=False, + idempotentHint=False, + openWorldHint=True, + ), +) +async def everyrow_forecast(params: ForecastInput) -> list[TextContent]: + """Forecast the probability of binary questions from a CSV file. + + Each row is forecast using an approach validated against FutureSearch's + past-casting environment of 1500 hard forecasting questions and 15M research + documents, see more at https://futuresearch.ai/automating-forecasting-questions/ + and https://arxiv.org/abs/2506.21558. + + The CSV should contain at minimum a ``question`` column. Recommended additional + columns: ``resolution_criteria``, ``resolution_date``, ``background``. All + columns are passed to the research agents and forecasters. + + The optional ``context`` parameter provides batch-level instructions that apply + to every row (e.g. "Focus on EU regulatory sources"). Leave it empty when the + rows are self-contained. + + Output columns added: ``rationale`` (str) and ``probability`` (int, 0-100). + + This function submits the task and returns immediately with a task_id and session_url. + After receiving a result from this tool, share the session_url with the user. + Then immediately call everyrow_progress(task_id) to monitor. + Once the task is completed, call everyrow_results to save the output. + """ + client = _get_client() + + _clear_task_state() + df = pd.read_csv(params.input_csv) + + async with create_session(client=client) as session: + session_url = session.get_url() + cohort_task = await forecast_async( + task=params.context or "", + session=session, + input=df, + ) + task_id = str(cohort_task.task_id) + _write_task_state( + task_id, + task_type=PublicTaskType.FORECAST, + session_url=session_url, + total=len(df), + completed=0, + failed=0, + running=0, + status=TaskStatus.RUNNING, + started_at=datetime.now(UTC), + ) + + return [ + TextContent( + type="text", + text=( + f"Submitted: {len(df)} rows for forecasting (6 research dimensions + dual forecaster per row).\n" + f"Session: {session_url}\n" + f"Task ID: {task_id}\n\n" + f"Share the session_url with the user, then immediately call everyrow_progress(task_id='{task_id}')." + ), + ) + ] + + @mcp.tool( name="everyrow_progress", structured_output=False, diff --git a/src/everyrow/generated/api/operations/forecast_operations_forecast_post.py b/src/everyrow/generated/api/operations/forecast_operations_forecast_post.py new file mode 100644 index 00000000..e68c8e4c --- /dev/null +++ b/src/everyrow/generated/api/operations/forecast_operations_forecast_post.py @@ -0,0 +1,220 @@ +from http import HTTPStatus +from typing import Any, cast +from urllib.parse import quote + +import httpx + +from ...client import AuthenticatedClient, Client +from ...types import Response, UNSET +from ... import errors + +from ...models.error_response import ErrorResponse +from ...models.forecast_operation import ForecastOperation +from ...models.insufficient_balance_error import InsufficientBalanceError +from ...models.operation_response import OperationResponse +from ...types import UNSET, Unset +from typing import cast + + + +def _get_kwargs( + *, + body: ForecastOperation, + x_cohort_source: None | str | Unset = UNSET, + +) -> dict[str, Any]: + headers: dict[str, Any] = {} + if not isinstance(x_cohort_source, Unset): + headers["X-Cohort-Source"] = x_cohort_source + + + + + + + + _kwargs: dict[str, Any] = { + "method": "post", + "url": "/operations/forecast", + } + + _kwargs["json"] = body.to_dict() + + + headers["Content-Type"] = "application/json" + + _kwargs["headers"] = headers + return _kwargs + + + +def _parse_response(*, client: AuthenticatedClient | Client, response: httpx.Response) -> ErrorResponse | InsufficientBalanceError | OperationResponse | None: + if response.status_code == 200: + response_200 = OperationResponse.from_dict(response.json()) + + + + return response_200 + + if response.status_code == 402: + response_402 = InsufficientBalanceError.from_dict(response.json()) + + + + return response_402 + + if response.status_code == 422: + response_422 = ErrorResponse.from_dict(response.json()) + + + + return response_422 + + if client.raise_on_unexpected_status: + raise errors.UnexpectedStatus(response.status_code, response.content) + else: + return None + + +def _build_response(*, client: AuthenticatedClient | Client, response: httpx.Response) -> Response[ErrorResponse | InsufficientBalanceError | OperationResponse]: + return Response( + status_code=HTTPStatus(response.status_code), + content=response.content, + headers=response.headers, + parsed=_parse_response(client=client, response=response), + ) + + +def sync_detailed( + *, + client: AuthenticatedClient, + body: ForecastOperation, + x_cohort_source: None | str | Unset = UNSET, + +) -> Response[ErrorResponse | InsufficientBalanceError | OperationResponse]: + """ AI-powered probability forecast + + Run 6 parallel research agents per row, then synthesize into a probability forecast with rationale. + + Args: + x_cohort_source (None | str | Unset): + body (ForecastOperation): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[ErrorResponse | InsufficientBalanceError | OperationResponse] + """ + + + kwargs = _get_kwargs( + body=body, +x_cohort_source=x_cohort_source, + + ) + + response = client.get_httpx_client().request( + **kwargs, + ) + + return _build_response(client=client, response=response) + +def sync( + *, + client: AuthenticatedClient, + body: ForecastOperation, + x_cohort_source: None | str | Unset = UNSET, + +) -> ErrorResponse | InsufficientBalanceError | OperationResponse | None: + """ AI-powered probability forecast + + Run 6 parallel research agents per row, then synthesize into a probability forecast with rationale. + + Args: + x_cohort_source (None | str | Unset): + body (ForecastOperation): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + ErrorResponse | InsufficientBalanceError | OperationResponse + """ + + + return sync_detailed( + client=client, +body=body, +x_cohort_source=x_cohort_source, + + ).parsed + +async def asyncio_detailed( + *, + client: AuthenticatedClient, + body: ForecastOperation, + x_cohort_source: None | str | Unset = UNSET, + +) -> Response[ErrorResponse | InsufficientBalanceError | OperationResponse]: + """ AI-powered probability forecast + + Run 6 parallel research agents per row, then synthesize into a probability forecast with rationale. + + Args: + x_cohort_source (None | str | Unset): + body (ForecastOperation): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[ErrorResponse | InsufficientBalanceError | OperationResponse] + """ + + + kwargs = _get_kwargs( + body=body, +x_cohort_source=x_cohort_source, + + ) + + response = await client.get_async_httpx_client().request( + **kwargs + ) + + return _build_response(client=client, response=response) + +async def asyncio( + *, + client: AuthenticatedClient, + body: ForecastOperation, + x_cohort_source: None | str | Unset = UNSET, + +) -> ErrorResponse | InsufficientBalanceError | OperationResponse | None: + """ AI-powered probability forecast + + Run 6 parallel research agents per row, then synthesize into a probability forecast with rationale. + + Args: + x_cohort_source (None | str | Unset): + body (ForecastOperation): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + ErrorResponse | InsufficientBalanceError | OperationResponse + """ + + + return (await asyncio_detailed( + client=client, +body=body, +x_cohort_source=x_cohort_source, + + )).parsed diff --git a/src/everyrow/generated/models/__init__.py b/src/everyrow/generated/models/__init__.py index 311717c4..70dbc540 100644 --- a/src/everyrow/generated/models/__init__.py +++ b/src/everyrow/generated/models/__init__.py @@ -16,6 +16,9 @@ from .dedupe_operation_strategy import DedupeOperationStrategy from .error_response import ErrorResponse from .error_response_details_type_0 import ErrorResponseDetailsType0 +from .forecast_operation import ForecastOperation +from .forecast_operation_input_type_1_item import ForecastOperationInputType1Item +from .forecast_operation_input_type_2 import ForecastOperationInputType2 from .health_response import HealthResponse from .http_validation_error import HTTPValidationError from .insufficient_balance_error import InsufficientBalanceError @@ -70,6 +73,9 @@ "DedupeOperationStrategy", "ErrorResponse", "ErrorResponseDetailsType0", + "ForecastOperation", + "ForecastOperationInputType1Item", + "ForecastOperationInputType2", "HealthResponse", "HTTPValidationError", "InsufficientBalanceError", diff --git a/src/everyrow/generated/models/forecast_operation.py b/src/everyrow/generated/models/forecast_operation.py new file mode 100644 index 00000000..e24eeafe --- /dev/null +++ b/src/everyrow/generated/models/forecast_operation.py @@ -0,0 +1,177 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any, TypeVar, BinaryIO, TextIO, TYPE_CHECKING, Generator + +from attrs import define as _attrs_define +from attrs import field as _attrs_field + +from ..types import UNSET, Unset + +from ..types import UNSET, Unset +from typing import cast +from uuid import UUID + +if TYPE_CHECKING: + from ..models.forecast_operation_input_type_1_item import ForecastOperationInputType1Item + from ..models.forecast_operation_input_type_2 import ForecastOperationInputType2 + + + + + +T = TypeVar("T", bound="ForecastOperation") + + + +@_attrs_define +class ForecastOperation: + """ + Attributes: + input_ (ForecastOperationInputType2 | list[ForecastOperationInputType1Item] | UUID): The input data as a) the ID + of an existing artifact, b) a single record in the form of a JSON object, or c) a table of records in the form + of a list of JSON objects + task (str): Overall context or instructions for the forecast. Each row in the input should contain the + question/scenario to forecast. + session_id (None | Unset | UUID): Session ID. If not provided, a new session is auto-created for this task. + """ + + input_: ForecastOperationInputType2 | list[ForecastOperationInputType1Item] | UUID + task: str + session_id: None | Unset | UUID = UNSET + additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) + + + + + + def to_dict(self) -> dict[str, Any]: + from ..models.forecast_operation_input_type_2 import ForecastOperationInputType2 + from ..models.forecast_operation_input_type_1_item import ForecastOperationInputType1Item + input_: dict[str, Any] | list[dict[str, Any]] | str + if isinstance(self.input_, UUID): + input_ = str(self.input_) + elif isinstance(self.input_, list): + input_ = [] + for input_type_1_item_data in self.input_: + input_type_1_item = input_type_1_item_data.to_dict() + input_.append(input_type_1_item) + + + else: + input_ = self.input_.to_dict() + + + task = self.task + + session_id: None | str | Unset + if isinstance(self.session_id, Unset): + session_id = UNSET + elif isinstance(self.session_id, UUID): + session_id = str(self.session_id) + else: + session_id = self.session_id + + + field_dict: dict[str, Any] = {} + field_dict.update(self.additional_properties) + field_dict.update({ + "input": input_, + "task": task, + }) + if session_id is not UNSET: + field_dict["session_id"] = session_id + + return field_dict + + + + @classmethod + def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: + from ..models.forecast_operation_input_type_1_item import ForecastOperationInputType1Item + from ..models.forecast_operation_input_type_2 import ForecastOperationInputType2 + d = dict(src_dict) + def _parse_input_(data: object) -> ForecastOperationInputType2 | list[ForecastOperationInputType1Item] | UUID: + try: + if not isinstance(data, str): + raise TypeError() + input_type_0 = UUID(data) + + + + return input_type_0 + except (TypeError, ValueError, AttributeError, KeyError): + pass + try: + if not isinstance(data, list): + raise TypeError() + input_type_1 = [] + _input_type_1 = data + for input_type_1_item_data in (_input_type_1): + input_type_1_item = ForecastOperationInputType1Item.from_dict(input_type_1_item_data) + + + + input_type_1.append(input_type_1_item) + + return input_type_1 + except (TypeError, ValueError, AttributeError, KeyError): + pass + if not isinstance(data, dict): + raise TypeError() + input_type_2 = ForecastOperationInputType2.from_dict(data) + + + + return input_type_2 + + input_ = _parse_input_(d.pop("input")) + + + task = d.pop("task") + + def _parse_session_id(data: object) -> None | Unset | UUID: + if data is None: + return data + if isinstance(data, Unset): + return data + try: + if not isinstance(data, str): + raise TypeError() + session_id_type_0 = UUID(data) + + + + return session_id_type_0 + except (TypeError, ValueError, AttributeError, KeyError): + pass + return cast(None | Unset | UUID, data) + + session_id = _parse_session_id(d.pop("session_id", UNSET)) + + + forecast_operation = cls( + input_=input_, + task=task, + session_id=session_id, + ) + + + forecast_operation.additional_properties = d + return forecast_operation + + @property + def additional_keys(self) -> list[str]: + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/src/everyrow/generated/models/forecast_operation_input_type_1_item.py b/src/everyrow/generated/models/forecast_operation_input_type_1_item.py new file mode 100644 index 00000000..44d2ea5d --- /dev/null +++ b/src/everyrow/generated/models/forecast_operation_input_type_1_item.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any, TypeVar, BinaryIO, TextIO, TYPE_CHECKING, Generator + +from attrs import define as _attrs_define +from attrs import field as _attrs_field + +from ..types import UNSET, Unset + + + + + + + +T = TypeVar("T", bound="ForecastOperationInputType1Item") + + + +@_attrs_define +class ForecastOperationInputType1Item: + """ + """ + + additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) + + + + + + def to_dict(self) -> dict[str, Any]: + + field_dict: dict[str, Any] = {} + field_dict.update(self.additional_properties) + + return field_dict + + + + @classmethod + def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: + d = dict(src_dict) + forecast_operation_input_type_1_item = cls( + ) + + + forecast_operation_input_type_1_item.additional_properties = d + return forecast_operation_input_type_1_item + + @property + def additional_keys(self) -> list[str]: + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/src/everyrow/generated/models/forecast_operation_input_type_2.py b/src/everyrow/generated/models/forecast_operation_input_type_2.py new file mode 100644 index 00000000..798673b2 --- /dev/null +++ b/src/everyrow/generated/models/forecast_operation_input_type_2.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any, TypeVar, BinaryIO, TextIO, TYPE_CHECKING, Generator + +from attrs import define as _attrs_define +from attrs import field as _attrs_field + +from ..types import UNSET, Unset + + + + + + + +T = TypeVar("T", bound="ForecastOperationInputType2") + + + +@_attrs_define +class ForecastOperationInputType2: + """ + """ + + additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) + + + + + + def to_dict(self) -> dict[str, Any]: + + field_dict: dict[str, Any] = {} + field_dict.update(self.additional_properties) + + return field_dict + + + + @classmethod + def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: + d = dict(src_dict) + forecast_operation_input_type_2 = cls( + ) + + + forecast_operation_input_type_2.additional_properties = d + return forecast_operation_input_type_2 + + @property + def additional_keys(self) -> list[str]: + return list(self.additional_properties.keys()) + + def __getitem__(self, key: str) -> Any: + return self.additional_properties[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.additional_properties[key] = value + + def __delitem__(self, key: str) -> None: + del self.additional_properties[key] + + def __contains__(self, key: str) -> bool: + return key in self.additional_properties diff --git a/src/everyrow/generated/models/public_task_type.py b/src/everyrow/generated/models/public_task_type.py index 35a86786..2246f06a 100644 --- a/src/everyrow/generated/models/public_task_type.py +++ b/src/everyrow/generated/models/public_task_type.py @@ -4,6 +4,7 @@ class PublicTaskType(str, Enum): AGENT = "agent" DEDUPE = "dedupe" + FORECAST = "forecast" MERGE = "merge" RANK = "rank" SCREEN = "screen" diff --git a/src/everyrow/ops.py b/src/everyrow/ops.py index f6e69de1..0f3ba2e0 100644 --- a/src/everyrow/ops.py +++ b/src/everyrow/ops.py @@ -11,6 +11,7 @@ from everyrow.generated.api.operations import ( agent_map_operations_agent_map_post, dedupe_operations_dedupe_post, + forecast_operations_forecast_post, merge_operations_merge_post, rank_operations_rank_post, screen_operations_screen_post, @@ -26,6 +27,8 @@ DedupeOperation, DedupeOperationInputType1Item, DedupeOperationStrategy, + ForecastOperation, + ForecastOperationInputType1Item, LLMEnumPublic, MergeOperation, MergeOperationLeftInputType1Item, @@ -45,7 +48,7 @@ from everyrow.generated.types import UNSET from everyrow.result import MergeResult, Result, ScalarResult, TableResult from everyrow.session import Session, create_session -from everyrow.task import LLM, EffortLevel, EveryrowTask, MergeTask +from everyrow.task import LLM, EffortLevel, EveryrowTask, MergeTask, print_progress T = TypeVar("T", bound=BaseModel) InputData = UUID | list[dict[str, Any]] | dict[str, Any] @@ -765,3 +768,88 @@ async def dedupe_async( cohort_task = EveryrowTask(response_model=BaseModel, is_map=True, is_expand=False) cohort_task.set_submitted(response.task_id, response.session_id, session.client) return cohort_task + + +# --- Forecast --- + + +async def forecast( + input: DataFrame | UUID | TableResult, + context: str | None = None, + session: Session | None = None, +) -> TableResult: + """Forecast the probability of binary questions resolving YES or NO. + + Each row is forecast using an approach validated against FutureSearch's + past-casting environment of 1500 hard forecasting questions and 15M research + documents, see more at https://futuresearch.ai/automating-forecasting-questions/ + and https://arxiv.org/abs/2506.21558. + + The input table should contain at minimum a ``question`` column with the binary + question to forecast. Recommended additional columns: ``resolution_criteria``, + ``resolution_date``, ``background``. All columns are passed to the research + agents and forecasters. + + Args: + input: The input table. Each row should contain the question/scenario to + forecast. + context: Optional batch-level context or instructions that apply to every + row (e.g. "Focus on EU regulatory sources" or "Assume resolution by + end of 2027"). Leave *None* when the rows are self-contained. + session: Optional session. If not provided, one will be created automatically. + + Returns: + TableResult with ``probability`` (int, 0-100) and ``rationale`` (str) columns + added to each input row. + """ + task = context or "" + if session is None: + async with create_session() as internal_session: + cohort_task = await forecast_async( + task=task, + session=internal_session, + input=input, + ) + result = await cohort_task.await_result(on_progress=print_progress) + if isinstance(result, TableResult): + return result + raise EveryrowError("Forecast task did not return a table result") + cohort_task = await forecast_async( + task=task, + session=session, + input=input, + ) + result = await cohort_task.await_result(on_progress=print_progress) + if isinstance(result, TableResult): + return result + raise EveryrowError("Forecast task did not return a table result") + + +async def forecast_async( + task: str, + session: Session, + input: DataFrame | UUID | TableResult, +) -> EveryrowTask[BaseModel]: + """Submit a forecast task asynchronously. + + Returns: + EveryrowTask that resolves to a TableResult with `probability` and `rationale` columns. + """ + input_data = _prepare_table_input(input, ForecastOperationInputType1Item) + + body = ForecastOperation( + input_=input_data, # type: ignore + task=task, + session_id=session.session_id, + ) + + response = await forecast_operations_forecast_post.asyncio( + client=session.client, body=body + ) + response = handle_response(response) + + cohort_task: EveryrowTask[BaseModel] = EveryrowTask( + response_model=BaseModel, is_map=True, is_expand=False + ) + cohort_task.set_submitted(response.task_id, response.session_id, session.client) + return cohort_task diff --git a/tests/integration/test_forecast.py b/tests/integration/test_forecast.py new file mode 100644 index 00000000..b5f4b06e --- /dev/null +++ b/tests/integration/test_forecast.py @@ -0,0 +1,38 @@ +"""Integration tests for forecast operation.""" + +import pandas as pd +import pytest + +from everyrow.ops import forecast +from everyrow.result import TableResult + +pytestmark = [pytest.mark.integration, pytest.mark.asyncio] + + +async def test_forecast_returns_probability_and_rationale(): + """Test that forecast returns a TableResult with probability and rationale.""" + input_df = pd.DataFrame( + [ + { + "question": "Will the US Federal Reserve cut rates by at least 25bp before July 1, 2027?", + "resolution_criteria": "Resolves YES if the Fed announces at least one rate cut of 25bp or more at any FOMC meeting between now and June 30, 2027.", + }, + ] + ) + + result = await forecast(input=input_df) + + assert isinstance(result, TableResult) + assert result.artifact_id is not None + assert "probability" in result.data.columns + assert "rationale" in result.data.columns + assert len(result.data) == 1 + + prob = result.data["probability"].iloc[0] + rationale = result.data["rationale"].iloc[0] + + assert int(prob) == prob, f"Probability should be integer, got {prob}" + assert 3 <= prob <= 97, f"Probability {prob}% outside reasonable range [3, 97]" + assert len(str(rationale)) > 200, ( + f"Rationale too short: {len(str(rationale))} chars" + )