diff --git a/docs-site/scripts/check-links.py b/docs-site/scripts/check-links.py index 6a34da08..781c7a06 100644 --- a/docs-site/scripts/check-links.py +++ b/docs-site/scripts/check-links.py @@ -70,6 +70,7 @@ "https://pip.pypa.io/en/stable/", "https://www.kaggle.com/code/rafaelpoyiadzi/active-learning-with-an-llm-oracle", "https://www.kaggle.com/datasets/tunguz/pubmed-title-abstracts-2019-baseline", + 'https://arxiv.org/abs/2506.21558' } diff --git a/docs/getting-started.md b/docs/getting-started.md index 96bb6f70..bf2309a7 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -113,6 +113,9 @@ async with create_session(name="Background Ranking") as session: # Wait for result when ready result = await task.await_result() + + # Or cancel if no longer needed + await task.cancel() ``` **Print the task ID.** If your script crashes, recover the result later: diff --git a/docs/mcp-server.md b/docs/mcp-server.md index 4e1a3319..c5c5b3bc 100644 --- a/docs/mcp-server.md +++ b/docs/mcp-server.md @@ -77,6 +77,18 @@ Run web research agents on each row. Returns `task_id` and `session_url`. Call `everyrow_progress` to monitor. +### everyrow_single_agent + +Run a single web research agent on a question, without a CSV. Use this when you want to research one thing — the agent can search the web, read pages, and return structured results. + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `task` | string | Yes | Natural language task for the agent to perform. | +| `input_data` | object | No | Optional context as key-value pairs (e.g. `{"company": "Acme", "url": "acme.com"}`). | +| `response_schema` | object | No | JSON schema for structured output. Default: `{"type": "object", "properties": {"answer": {"type": "string"}}}`. | + +Returns `task_id` and `session_url`. Call `everyrow_progress` to monitor. + ## Progress and Results Tools ### everyrow_progress @@ -100,6 +112,16 @@ Retrieve results from a completed task and save to CSV. Returns confirmation with row count and file path. +### everyrow_cancel + +Cancel a running task. Use when the user wants to stop a task that is currently processing. + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `task_id` | string | Yes | The task ID to cancel. | + +Returns a confirmation message. If the task has already finished, returns an error with its current state. + ## Workflow ``` diff --git a/everyrow-mcp/manifest.json b/everyrow-mcp/manifest.json index a5d0a6c5..e187a396 100644 --- a/everyrow-mcp/manifest.json +++ b/everyrow-mcp/manifest.json @@ -60,6 +60,10 @@ { "name": "everyrow_results", "description": "Retrieve results from a completed everyrow task and save them to a CSV." + }, + { + "name": "everyrow_cancel", + "description": "Cancel a running everyrow task. Use when the user wants to stop a task that is currently processing." } ], "user_config": { diff --git a/everyrow-mcp/src/everyrow_mcp/models.py b/everyrow-mcp/src/everyrow_mcp/models.py index cd872a37..3494a5e1 100644 --- a/everyrow-mcp/src/everyrow_mcp/models.py +++ b/everyrow-mcp/src/everyrow_mcp/models.py @@ -310,3 +310,11 @@ class ResultsInput(BaseModel): def validate_output(cls, v: str) -> str: validate_csv_output_path(v) return v + + +class CancelInput(BaseModel): + """Input for cancelling a running task.""" + + model_config = ConfigDict(str_strip_whitespace=True, extra="forbid") + + task_id: str = Field(..., description="The task ID to cancel.") diff --git a/everyrow-mcp/src/everyrow_mcp/server.py b/everyrow-mcp/src/everyrow_mcp/server.py index 21706bb9..a57e5e23 100644 --- a/everyrow-mcp/src/everyrow_mcp/server.py +++ b/everyrow-mcp/src/everyrow_mcp/server.py @@ -16,6 +16,7 @@ ) from everyrow_mcp.models import ( # noqa: F401 AgentInput, + CancelInput, DedupeInput, MergeInput, ProgressInput, @@ -27,6 +28,7 @@ ) from everyrow_mcp.tools import ( # noqa: F401 everyrow_agent, + everyrow_cancel, everyrow_dedupe, everyrow_merge, everyrow_progress, diff --git a/everyrow-mcp/src/everyrow_mcp/tools.py b/everyrow-mcp/src/everyrow_mcp/tools.py index 475708d9..25ab7ed5 100644 --- a/everyrow-mcp/src/everyrow_mcp/tools.py +++ b/everyrow-mcp/src/everyrow_mcp/tools.py @@ -8,6 +8,7 @@ import pandas as pd from everyrow.api_utils import handle_response +from everyrow.constants import EveryrowError from everyrow.generated.api.tasks import ( get_task_result_tasks_task_id_result_get, get_task_status_tasks_task_id_status_get, @@ -27,6 +28,7 @@ single_agent_async, ) from everyrow.session import create_session, get_session_url +from everyrow.task import cancel_task from mcp.types import TextContent, ToolAnnotations from pydantic import BaseModel, create_model @@ -39,6 +41,7 @@ ) from everyrow_mcp.models import ( AgentInput, + CancelInput, DedupeInput, MergeInput, ProgressInput, @@ -749,3 +752,47 @@ async def everyrow_results(params: ResultsInput) -> list[TextContent]: except Exception as e: return [TextContent(type="text", text=f"Error retrieving results: {e!r}")] + + +@mcp.tool( + name="everyrow_cancel", + structured_output=False, + annotations=ToolAnnotations( + title="Cancel a Running Task", + readOnlyHint=False, + destructiveHint=True, + idempotentHint=False, + openWorldHint=False, + ), +) +async def everyrow_cancel(params: CancelInput) -> list[TextContent]: + """Cancel a running everyrow task. Use when the user wants to stop a task that is currently processing.""" + if _app._client is None: + return [TextContent(type="text", text="Error: MCP server not initialized.")] + client = _app._client + + task_id = params.task_id + try: + await cancel_task(task_id=UUID(task_id), client=client) + _clear_task_state() + return [ + TextContent( + type="text", + text=f"Cancelled task {task_id}.", + ) + ] + except EveryrowError as e: + _clear_task_state() + return [ + TextContent( + type="text", + text=f"Error cancelling task {task_id}: {e!r}", + ) + ] + except Exception as e: + return [ + TextContent( + type="text", + text=f"Error cancelling task {task_id}: {e!r}", + ) + ] diff --git a/everyrow-mcp/tests/test_server.py b/everyrow-mcp/tests/test_server.py index c0c77983..d9dd0f7f 100644 --- a/everyrow-mcp/tests/test_server.py +++ b/everyrow-mcp/tests/test_server.py @@ -12,6 +12,7 @@ import pandas as pd import pytest +from everyrow.constants import EveryrowError from everyrow.generated.models.public_task_type import PublicTaskType from everyrow.generated.models.task_progress_info import TaskProgressInfo from everyrow.generated.models.task_result_response import TaskResultResponse @@ -24,6 +25,7 @@ from everyrow_mcp.server import ( AgentInput, + CancelInput, MergeInput, ProgressInput, RankInput, @@ -32,6 +34,7 @@ SingleAgentInput, _schema_to_model, everyrow_agent, + everyrow_cancel, everyrow_progress, everyrow_results, everyrow_single_agent, @@ -616,3 +619,120 @@ async def test_results_saves_csv(self, tmp_path: Path): output_df = pd.read_csv(output_file) assert len(output_df) == 2 assert list(output_df.columns) == ["name", "answer"] + + +class TestCancel: + """Tests for everyrow_cancel.""" + + @pytest.mark.asyncio + async def test_cancel_running_task(self): + """Test cancelling a running task returns success message.""" + mock_client = _make_mock_client() + task_id = str(uuid4()) + + with ( + patch("everyrow_mcp.app._client", mock_client), + patch("everyrow_mcp.tools._clear_task_state") as mock_clear, + patch( + "everyrow_mcp.tools.cancel_task", new_callable=AsyncMock + ) as mock_cancel, + ): + mock_cancel.return_value = None + + params = CancelInput(task_id=task_id) + result = await everyrow_cancel(params) + + text = result[0].text + assert task_id in text + assert "cancelled" in text.lower() + mock_clear.assert_called_once() + + @pytest.mark.asyncio + async def test_cancel_already_terminated_task(self): + """Test cancelling an already terminated task clears state and returns an error message.""" + + mock_client = _make_mock_client() + task_id = str(uuid4()) + + with ( + patch("everyrow_mcp.app._client", mock_client), + patch("everyrow_mcp.tools._clear_task_state") as mock_clear, + patch( + "everyrow_mcp.tools.cancel_task", new_callable=AsyncMock + ) as mock_cancel, + ): + mock_cancel.side_effect = EveryrowError( + f"Task {task_id} is already COMPLETED" + ) + + params = CancelInput(task_id=task_id) + result = await everyrow_cancel(params) + + text = result[0].text + assert task_id in text + assert "Error" in text + mock_clear.assert_called_once() + + @pytest.mark.asyncio + async def test_cancel_task_not_found(self): + """Test cancelling a nonexistent task clears state and returns an error message.""" + + mock_client = _make_mock_client() + task_id = str(uuid4()) + + with ( + patch("everyrow_mcp.app._client", mock_client), + patch("everyrow_mcp.tools._clear_task_state") as mock_clear, + patch( + "everyrow_mcp.tools.cancel_task", new_callable=AsyncMock + ) as mock_cancel, + ): + mock_cancel.side_effect = EveryrowError("Task not found") + + params = CancelInput(task_id=task_id) + result = await everyrow_cancel(params) + + text = result[0].text + assert "Error" in text + assert "not found" in text.lower() + mock_clear.assert_called_once() + + @pytest.mark.asyncio + async def test_cancel_api_error(self): + """Test cancel with unexpected error returns error message.""" + mock_client = _make_mock_client() + task_id = str(uuid4()) + + with ( + patch("everyrow_mcp.app._client", mock_client), + patch( + "everyrow_mcp.tools.cancel_task", new_callable=AsyncMock + ) as mock_cancel, + ): + mock_cancel.side_effect = RuntimeError("Network failure") + + params = CancelInput(task_id=task_id) + result = await everyrow_cancel(params) + + text = result[0].text + assert "Error" in text + assert "Network failure" in text + + @pytest.mark.asyncio + async def test_cancel_without_client(self): + """Test cancel when MCP server is not initialized.""" + with patch("everyrow_mcp.app._client", None): + params = CancelInput(task_id=str(uuid4())) + result = await everyrow_cancel(params) + + assert "not initialized" in result[0].text + + def test_cancel_input_validation(self): + """Test CancelInput strips whitespace and forbids extra fields.""" + # Whitespace stripping + inp = CancelInput(task_id=" abc-123 ") + assert inp.task_id == "abc-123" + + # Extra fields forbidden + with pytest.raises(ValidationError): + CancelInput(task_id="abc", extra_field="x") # type: ignore[call-arg] diff --git a/src/everyrow/generated/api/tasks/cancel_task_tasks_task_id_cancel_post.py b/src/everyrow/generated/api/tasks/cancel_task_tasks_task_id_cancel_post.py new file mode 100644 index 00000000..004cb866 --- /dev/null +++ b/src/everyrow/generated/api/tasks/cancel_task_tasks_task_id_cancel_post.py @@ -0,0 +1,126 @@ +from http import HTTPStatus +from typing import Any +from urllib.parse import quote +from uuid import UUID + +import httpx + +from ... import errors +from ...client import AuthenticatedClient, Client +from ...models.error_response import ErrorResponse +from ...models.http_validation_error import HTTPValidationError +from ...types import Response + + +def _get_kwargs( + task_id: UUID, +) -> dict[str, Any]: + _kwargs: dict[str, Any] = { + "method": "post", + "url": "/tasks/{task_id}/cancel".format( + task_id=quote(str(task_id), safe=""), + ), + } + + return _kwargs + + +def _parse_response( + *, client: AuthenticatedClient | Client, response: httpx.Response +) -> ErrorResponse | HTTPValidationError | None: + if response.status_code == 200: + # Cancel response is a minimal payload — success is signalled by the 200 status alone. + return None + + if response.status_code == 404: + response_404 = ErrorResponse.from_dict(response.json()) + + return response_404 + + if response.status_code == 409: + response_409 = ErrorResponse.from_dict(response.json()) + + return response_409 + + if response.status_code == 422: + response_422 = HTTPValidationError.from_dict(response.json()) + + return response_422 + + if client.raise_on_unexpected_status: + raise errors.UnexpectedStatus(response.status_code, response.content) + else: + return None + + +def _build_response( + *, client: AuthenticatedClient | Client, response: httpx.Response +) -> Response[ErrorResponse | HTTPValidationError | None]: + return Response( + status_code=HTTPStatus(response.status_code), + content=response.content, + headers=response.headers, + parsed=_parse_response(client=client, response=response), + ) + + +def sync_detailed( + task_id: UUID, + *, + client: AuthenticatedClient, +) -> Response[ErrorResponse | HTTPValidationError | None]: + """Cancel a running task + + Cancel a task by its ID. Returns HTTP 200 on success, 404 if not found, + or 409 if the task is already in a terminal state. + + Args: + task_id (UUID): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[ErrorResponse | HTTPValidationError | None] + """ + + kwargs = _get_kwargs( + task_id=task_id, + ) + + response = client.get_httpx_client().request( + **kwargs, + ) + + return _build_response(client=client, response=response) + + +async def asyncio_detailed( + task_id: UUID, + *, + client: AuthenticatedClient, +) -> Response[ErrorResponse | HTTPValidationError | None]: + """Cancel a running task + + Cancel a task by its ID. Returns HTTP 200 on success, 404 if not found, + or 409 if the task is already in a terminal state. + + Args: + task_id (UUID): + + Raises: + errors.UnexpectedStatus: If the server returns an undocumented status code and Client.raise_on_unexpected_status is True. + httpx.TimeoutException: If the request takes longer than Client.timeout. + + Returns: + Response[ErrorResponse | HTTPValidationError | None] + """ + + kwargs = _get_kwargs( + task_id=task_id, + ) + + response = await client.get_async_httpx_client().request(**kwargs) + + return _build_response(client=client, response=response) diff --git a/src/everyrow/task.py b/src/everyrow/task.py index c485d0ca..f726a818 100644 --- a/src/everyrow/task.py +++ b/src/everyrow/task.py @@ -11,6 +11,7 @@ from everyrow.api_utils import create_client, handle_response from everyrow.constants import EveryrowError from everyrow.generated.api.tasks import ( + cancel_task_tasks_task_id_cancel_post, get_task_result_tasks_task_id_result_get, get_task_status_tasks_task_id_status_get, ) @@ -81,6 +82,16 @@ async def get_status( ) return await get_task_status(self.task_id, client) + async def cancel(self, client: AuthenticatedClient | None = None) -> None: + if self.task_id is None: + raise EveryrowError("Task must be submitted before cancelling") + client = client or self._client + if client is None: + raise EveryrowError( + "No client available. Provide a client or use the task within a session context." + ) + await cancel_task(self.task_id, client) + async def await_result( self, client: AuthenticatedClient | None = None, @@ -175,6 +186,24 @@ async def await_task_completion( return status_response +async def cancel_task(task_id: UUID, client: AuthenticatedClient) -> None: + """Cancel a running task by its ID. + + Args: + task_id: The UUID of the task to cancel. + client: An authenticated client. + + Raises: + EveryrowError: If the task is not found, already in a terminal state, or another error occurs. + """ + response = await cancel_task_tasks_task_id_cancel_post.asyncio_detailed( + task_id=task_id, client=client + ) + if response.status_code == 200: + return + handle_response(response.parsed) + + async def get_task_status( task_id: UUID, client: AuthenticatedClient ) -> TaskStatusResponse: @@ -275,6 +304,16 @@ async def get_status( ) return await get_task_status(self.task_id, client) + async def cancel(self, client: AuthenticatedClient | None = None) -> None: + if self.task_id is None: + raise EveryrowError("Task must be submitted before cancelling") + client = client or self._client + if client is None: + raise EveryrowError( + "No client available. Provide a client or use the task within a session context." + ) + await cancel_task(self.task_id, client) + async def await_result( self, client: AuthenticatedClient | None = None ) -> MergeResult: