Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs-site/scripts/check-links.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"https://pip.pypa.io/en/stable/",
"https://www.kaggle.com/code/rafaelpoyiadzi/active-learning-with-an-llm-oracle",
"https://www.kaggle.com/datasets/tunguz/pubmed-title-abstracts-2019-baseline",
'https://arxiv.org/abs/2506.21558'
}


Expand Down
3 changes: 3 additions & 0 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ async with create_session(name="Background Ranking") as session:

# Wait for result when ready
result = await task.await_result()

# Or cancel if no longer needed
await task.cancel()
```

**Print the task ID.** If your script crashes, recover the result later:
Expand Down
22 changes: 22 additions & 0 deletions docs/mcp-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ Run web research agents on each row.

Returns `task_id` and `session_url`. Call `everyrow_progress` to monitor.

### everyrow_single_agent

Run a single web research agent on a question, without a CSV. Use this when you want to research one thing — the agent can search the web, read pages, and return structured results.

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `task` | string | Yes | Natural language task for the agent to perform. |
| `input_data` | object | No | Optional context as key-value pairs (e.g. `{"company": "Acme", "url": "acme.com"}`). |
| `response_schema` | object | No | JSON schema for structured output. Default: `{"type": "object", "properties": {"answer": {"type": "string"}}}`. |

Returns `task_id` and `session_url`. Call `everyrow_progress` to monitor.

## Progress and Results Tools

### everyrow_progress
Expand All @@ -100,6 +112,16 @@ Retrieve results from a completed task and save to CSV.

Returns confirmation with row count and file path.

### everyrow_cancel

Cancel a running task. Use when the user wants to stop a task that is currently processing.

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `task_id` | string | Yes | The task ID to cancel. |

Returns a confirmation message. If the task has already finished, returns an error with its current state.

## Workflow

```
Expand Down
4 changes: 4 additions & 0 deletions everyrow-mcp/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
{
"name": "everyrow_results",
"description": "Retrieve results from a completed everyrow task and save them to a CSV."
},
{
"name": "everyrow_cancel",
"description": "Cancel a running everyrow task. Use when the user wants to stop a task that is currently processing."
}
],
"user_config": {
Expand Down
8 changes: 8 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,11 @@ class ResultsInput(BaseModel):
def validate_output(cls, v: str) -> str:
validate_csv_output_path(v)
return v


class CancelInput(BaseModel):
"""Input for cancelling a running task."""

model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")

task_id: str = Field(..., description="The task ID to cancel.")
2 changes: 2 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from everyrow_mcp.models import ( # noqa: F401
AgentInput,
CancelInput,
DedupeInput,
MergeInput,
ProgressInput,
Expand All @@ -27,6 +28,7 @@
)
from everyrow_mcp.tools import ( # noqa: F401
everyrow_agent,
everyrow_cancel,
everyrow_dedupe,
everyrow_merge,
everyrow_progress,
Expand Down
47 changes: 47 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import pandas as pd
from everyrow.api_utils import handle_response
from everyrow.constants import EveryrowError
from everyrow.generated.api.tasks import (
get_task_result_tasks_task_id_result_get,
get_task_status_tasks_task_id_status_get,
Expand All @@ -27,6 +28,7 @@
single_agent_async,
)
from everyrow.session import create_session, get_session_url
from everyrow.task import cancel_task
from mcp.types import TextContent, ToolAnnotations
from pydantic import BaseModel, create_model

Expand All @@ -39,6 +41,7 @@
)
from everyrow_mcp.models import (
AgentInput,
CancelInput,
DedupeInput,
MergeInput,
ProgressInput,
Expand Down Expand Up @@ -749,3 +752,47 @@ async def everyrow_results(params: ResultsInput) -> list[TextContent]:

except Exception as e:
return [TextContent(type="text", text=f"Error retrieving results: {e!r}")]


@mcp.tool(
name="everyrow_cancel",
structured_output=False,
annotations=ToolAnnotations(
title="Cancel a Running Task",
readOnlyHint=False,
destructiveHint=True,
idempotentHint=False,
openWorldHint=False,
),
)
async def everyrow_cancel(params: CancelInput) -> list[TextContent]:
"""Cancel a running everyrow task. Use when the user wants to stop a task that is currently processing."""
if _app._client is None:
return [TextContent(type="text", text="Error: MCP server not initialized.")]
client = _app._client

task_id = params.task_id
try:
await cancel_task(task_id=UUID(task_id), client=client)
_clear_task_state()
return [
TextContent(
type="text",
text=f"Cancelled task {task_id}.",
)
]
except EveryrowError as e:
_clear_task_state()
return [
TextContent(
type="text",
text=f"Error cancelling task {task_id}: {e!r}",
)
]
except Exception as e:
return [
TextContent(
type="text",
text=f"Error cancelling task {task_id}: {e!r}",
)
]
Comment on lines +792 to +798
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The generic except Exception block in everyrow_cancel fails to call _clear_task_state(), leading to stale state on unexpected API errors.
Severity: MEDIUM

Suggested Fix

Modify the generic except Exception block in the everyrow_cancel function to also call _clear_task_state(). This will ensure that the task state is consistently cleared after any cancellation attempt, regardless of whether the API call succeeds, fails with a known error, or fails with an unexpected error.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: everyrow-mcp/src/everyrow_mcp/tools.py#L792-L798

Potential issue: When the `cancel_task` API call receives an unexpected HTTP status
(e.g., 500, 503), the underlying client raises an `errors.UnexpectedStatus` exception.
This exception is not an `EveryrowError` and is caught by the generic `except Exception`
block in the `everyrow_cancel` tool. However, this exception handler does not call
`_clear_task_state()`, which is inconsistent with the handling of `EveryrowError`. This
results in the task state file (`~/.everyrow/task.json`) not being cleared, leaving
stale data and causing potential confusion about the active task's status.

120 changes: 120 additions & 0 deletions everyrow-mcp/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import pandas as pd
import pytest
from everyrow.constants import EveryrowError
from everyrow.generated.models.public_task_type import PublicTaskType
from everyrow.generated.models.task_progress_info import TaskProgressInfo
from everyrow.generated.models.task_result_response import TaskResultResponse
Expand All @@ -24,6 +25,7 @@

from everyrow_mcp.server import (
AgentInput,
CancelInput,
MergeInput,
ProgressInput,
RankInput,
Expand All @@ -32,6 +34,7 @@
SingleAgentInput,
_schema_to_model,
everyrow_agent,
everyrow_cancel,
everyrow_progress,
everyrow_results,
everyrow_single_agent,
Expand Down Expand Up @@ -616,3 +619,120 @@ async def test_results_saves_csv(self, tmp_path: Path):
output_df = pd.read_csv(output_file)
assert len(output_df) == 2
assert list(output_df.columns) == ["name", "answer"]


class TestCancel:
"""Tests for everyrow_cancel."""

@pytest.mark.asyncio
async def test_cancel_running_task(self):
"""Test cancelling a running task returns success message."""
mock_client = _make_mock_client()
task_id = str(uuid4())

with (
patch("everyrow_mcp.app._client", mock_client),
patch("everyrow_mcp.tools._clear_task_state") as mock_clear,
patch(
"everyrow_mcp.tools.cancel_task", new_callable=AsyncMock
) as mock_cancel,
):
mock_cancel.return_value = None

params = CancelInput(task_id=task_id)
result = await everyrow_cancel(params)

text = result[0].text
assert task_id in text
assert "cancelled" in text.lower()
mock_clear.assert_called_once()

@pytest.mark.asyncio
async def test_cancel_already_terminated_task(self):
"""Test cancelling an already terminated task clears state and returns an error message."""

mock_client = _make_mock_client()
task_id = str(uuid4())

with (
patch("everyrow_mcp.app._client", mock_client),
patch("everyrow_mcp.tools._clear_task_state") as mock_clear,
patch(
"everyrow_mcp.tools.cancel_task", new_callable=AsyncMock
) as mock_cancel,
):
mock_cancel.side_effect = EveryrowError(
f"Task {task_id} is already COMPLETED"
)

params = CancelInput(task_id=task_id)
result = await everyrow_cancel(params)

text = result[0].text
assert task_id in text
assert "Error" in text
mock_clear.assert_called_once()

@pytest.mark.asyncio
async def test_cancel_task_not_found(self):
"""Test cancelling a nonexistent task clears state and returns an error message."""

mock_client = _make_mock_client()
task_id = str(uuid4())

with (
patch("everyrow_mcp.app._client", mock_client),
patch("everyrow_mcp.tools._clear_task_state") as mock_clear,
patch(
"everyrow_mcp.tools.cancel_task", new_callable=AsyncMock
) as mock_cancel,
):
mock_cancel.side_effect = EveryrowError("Task not found")

params = CancelInput(task_id=task_id)
result = await everyrow_cancel(params)

text = result[0].text
assert "Error" in text
assert "not found" in text.lower()
mock_clear.assert_called_once()

@pytest.mark.asyncio
async def test_cancel_api_error(self):
"""Test cancel with unexpected error returns error message."""
mock_client = _make_mock_client()
task_id = str(uuid4())

with (
patch("everyrow_mcp.app._client", mock_client),
patch(
"everyrow_mcp.tools.cancel_task", new_callable=AsyncMock
) as mock_cancel,
):
mock_cancel.side_effect = RuntimeError("Network failure")

params = CancelInput(task_id=task_id)
result = await everyrow_cancel(params)

text = result[0].text
assert "Error" in text
assert "Network failure" in text

@pytest.mark.asyncio
async def test_cancel_without_client(self):
"""Test cancel when MCP server is not initialized."""
with patch("everyrow_mcp.app._client", None):
params = CancelInput(task_id=str(uuid4()))
result = await everyrow_cancel(params)

assert "not initialized" in result[0].text

def test_cancel_input_validation(self):
"""Test CancelInput strips whitespace and forbids extra fields."""
# Whitespace stripping
inp = CancelInput(task_id=" abc-123 ")
assert inp.task_id == "abc-123"

# Extra fields forbidden
with pytest.raises(ValidationError):
CancelInput(task_id="abc", extra_field="x") # type: ignore[call-arg]
Loading