Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions everyrow-mcp/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@
{
"name": "everyrow_balance",
"description": "Check the current billing balance for the authenticated user."
},
{
"name": "everyrow_browse_lists",
"description": "Browse available reference lists of well-known entities."
},
{
"name": "everyrow_use_list",
"description": "Import a reference list into your session and save it as a CSV file."
}
],
"user_config": {
Expand Down
26 changes: 26 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,32 @@ def _validate_task_id(v: str) -> str:
return v


class BrowseListsInput(BaseModel):
"""Input for browsing reference lists."""

model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")

search: str | None = Field(
default=None,
description="Search term to match against list names (case-insensitive).",
)
category: str | None = Field(
default=None,
description="Filter by category (e.g. 'Finance', 'Geography').",
)


class UseListInput(BaseModel):
"""Input for importing a reference list into a session."""

model_config = ConfigDict(str_strip_whitespace=True, extra="forbid")

artifact_id: str = Field(
...,
description="artifact_id from everyrow_browse_lists results.",
)


class ProgressInput(BaseModel):
"""Input for checking task progress."""

Expand Down
121 changes: 121 additions & 0 deletions everyrow-mcp/src/everyrow_mcp/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import pandas as pd
from everyrow.api_utils import handle_response
from everyrow.built_in_lists import list_built_in_datasets, use_built_in_list
from everyrow.constants import EveryrowError
from everyrow.generated.api.billing import get_billing_balance_billing_get
from everyrow.generated.api.tasks import get_task_status_tasks_task_id_status_get
Expand All @@ -35,6 +36,7 @@
from everyrow_mcp.config import settings
from everyrow_mcp.models import (
AgentInput,
BrowseListsInput,
CancelInput,
DedupeInput,
ForecastInput,
Expand All @@ -47,6 +49,7 @@
SingleAgentInput,
StdioResultsInput,
UploadDataInput,
UseListInput,
_schema_to_model,
)
from everyrow_mcp.result_store import (
Expand Down Expand Up @@ -102,6 +105,124 @@ async def _check_task_ownership(task_id: str) -> list[TextContent] | None:
return None


@mcp.tool(
name="everyrow_browse_lists",
structured_output=False,
annotations=ToolAnnotations(
title="Browse Reference Lists",
readOnlyHint=True,
destructiveHint=False,
idempotentHint=True,
openWorldHint=False,
),
)
async def everyrow_browse_lists(
params: BrowseListsInput, ctx: EveryRowContext
) -> list[TextContent]:
"""Browse available reference lists of well-known entities.

Includes company lists (S&P 500, FTSE 100, Russell 3000, sector breakdowns
like Global Banks or Semiconductor companies), geographic lists (all countries,
EU members, US states, major cities), people (billionaires, heads of state,
AI leaders), institutions (top universities, regulators), and infrastructure
(airports, ports, power stations).

Use this when the user's analysis involves a well-known group that we might
already have a list for. Returns names, fields, and artifact_ids to pass to
everyrow_use_list.

Call with no parameters to see all available lists, or use search/category
to narrow results.
"""
client = _get_client(ctx)

try:
results = await list_built_in_datasets(
client, search=params.search, category=params.category
)
except Exception as e:
return [TextContent(type="text", text=f"Error browsing built-in lists: {e!r}")]

if not results:
search_desc = f" matching '{params.search}'" if params.search else ""
cat_desc = f" in category '{params.category}'" if params.category else ""
return [
TextContent(
type="text",
text=f"No built-in lists found{search_desc}{cat_desc}.",
)
]

lines = [f"Found {len(results)} built-in list(s):\n"]
for i, item in enumerate(results, 1):
fields_str = ", ".join(item.fields) if item.fields else "(no fields listed)"
lines.append(
f"{i}. {item.name} [{item.category}]\n"
f" Fields: {fields_str}\n"
f" artifact_id: {item.artifact_id}\n"
)
lines.append(
"To use one of these lists, call everyrow_use_list with the artifact_id."
)

return [TextContent(type="text", text="\n".join(lines))]


@mcp.tool(
name="everyrow_use_list",
structured_output=False,
annotations=ToolAnnotations(
title="Import Reference List",
readOnlyHint=False,
destructiveHint=False,
idempotentHint=False,
openWorldHint=False,
),
)
async def everyrow_use_list(
params: UseListInput, ctx: EveryRowContext
) -> list[TextContent]:
"""Import a reference list into your session and save it as a CSV file.

This copies the dataset into a new session, fetches the data, and saves
it as a CSV file ready to pass to other everyrow utilities for analysis
or research.

The copy is a fast database operation (<1s) — no polling needed.
"""
client = _get_client(ctx)

try:
async with create_session(client=client) as session:
session_url = session.get_url()
result = await use_built_in_list(
artifact_id=UUID(params.artifact_id),
session=session,
)

# Fetch the copied data and save as CSV
df, _ = await _fetch_task_result(client, str(result.task_id))

csv_path = Path.cwd() / f"built-in-list-{result.artifact_id}.csv"
df.to_csv(csv_path, index=False)
except Exception as e:
return [TextContent(type="text", text=f"Error importing built-in list: {e!r}")]

return [
TextContent(
type="text",
text=(
f"Imported built-in list into your session.\n\n"
f"CSV saved to: {csv_path}\n"
f"Rows: {len(df)}\n"
f"Columns: {', '.join(df.columns)}\n"
f"Session: {session_url}\n\n"
f"Pass {csv_path} as input_csv to other everyrow utilities for analysis or research."
),
)
]


@mcp.tool(
name="everyrow_agent",
structured_output=False,
Expand Down
2 changes: 2 additions & 0 deletions everyrow-mcp/tests/test_mcp_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async def test_list_tools(self, _http_state):
[
"everyrow_agent",
"everyrow_balance",
"everyrow_browse_lists",
"everyrow_cancel",
"everyrow_dedupe",
"everyrow_forecast",
Expand All @@ -185,6 +186,7 @@ async def test_list_tools(self, _http_state):
"everyrow_screen",
"everyrow_single_agent",
"everyrow_upload_data",
"everyrow_use_list",
]
)
assert tool_names == expected
Expand Down
104 changes: 104 additions & 0 deletions src/everyrow/built_in_lists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Built-in lists: browse and import pre-built datasets."""

from dataclasses import dataclass
from uuid import UUID

from everyrow.constants import EveryrowError
from everyrow.generated.client import AuthenticatedClient
from everyrow.session import Session


@dataclass
class BuiltInListItem:
"""A built-in dataset available for import."""

name: str
artifact_id: UUID
category: str
fields: list[str]


@dataclass
class UseBuiltInListResult:
"""Result of importing a built-in list into a session."""

artifact_id: UUID
session_id: UUID
task_id: UUID


async def list_built_in_datasets(
client: AuthenticatedClient,
search: str | None = None,
category: str | None = None,
) -> list[BuiltInListItem]:
"""Fetch available built-in datasets from the API.

Args:
client: Authenticated API client.
search: Optional search term to match against list names (case-insensitive).
category: Optional category filter.

Returns:
List of available built-in datasets.
"""
params: dict[str, str] = {}
if search:
params["search"] = search
if category:
params["category"] = category

response = await client.get_async_httpx_client().request(
method="GET",
url="/built-in-lists",
params=params,
)
if response.status_code != 200:
raise EveryrowError(f"Failed to list built-in datasets: {response.text}")

data = response.json()
return [
BuiltInListItem(
name=item["name"],
artifact_id=UUID(item["artifact_id"]),
category=item["category"],
fields=item["fields"],
)
for item in data.get("lists", [])
]


async def use_built_in_list(
artifact_id: UUID,
session: Session,
session_id: UUID | None = None,
) -> UseBuiltInListResult:
"""Copy a built-in list into a session, ready for use in operations.

Args:
artifact_id: The artifact_id from browse results.
session: Session object (provides client and session_id).
session_id: Optional override session_id. Defaults to session.session_id.

Returns:
UseBuiltInListResult with the new artifact_id, session_id, and task_id.
"""
body = {
"artifact_id": str(artifact_id),
"session_id": str(session_id or session.session_id),
}

response = await session.client.get_async_httpx_client().request(
method="POST",
url="/built-in-lists/use",
json=body,
)
if response.status_code != 200:
raise EveryrowError(f"Failed to use built-in list: {response.text}")

data = response.json()
return UseBuiltInListResult(
artifact_id=UUID(data["artifact_id"]),
session_id=UUID(data["session_id"]),
task_id=UUID(data["task_id"]),
)