Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion everyrow-mcp/src/everyrow_mcp/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ async def everyrow_browse_lists(
Call with no parameters to see all available lists, or use search/category
to narrow results.
"""
logger.info(
"everyrow_browse_lists: search=%s category=%s",
params.search,
params.category,
)
client = _get_client(ctx)

try:
Expand All @@ -156,6 +161,7 @@ async def everyrow_browse_lists(
)
]

logger.info("everyrow_browse_lists: found %d list(s)", len(results))
lines = [f"Found {len(results)} built-in list(s):\n"]
for i, item in enumerate(results, 1):
fields_str = ", ".join(item.fields) if item.fields else "(no fields listed)"
Expand Down Expand Up @@ -193,6 +199,7 @@ async def everyrow_use_list(

The copy is a fast database operation (<1s) — no polling needed.
"""
logger.info("everyrow_use_list: artifact_id=%s", params.artifact_id)
client = _get_client(ctx)

try:
Expand All @@ -204,13 +211,18 @@ async def everyrow_use_list(
)

# Fetch the copied data and save as CSV
df, _ = await _fetch_task_result(client, str(result.task_id))
df, _, _ = await _fetch_task_result(client, str(result.task_id))

csv_path = Path.cwd() / f"built-in-list-{result.artifact_id}.csv"
df.to_csv(csv_path, index=False)
except Exception as e:
return [TextContent(type="text", text=f"Error importing built-in list: {e!r}")]

logger.info(
"everyrow_use_list: imported artifact_id=%s rows=%d",
result.artifact_id,
len(df),
)
return [
TextContent(
type="text",
Expand Down Expand Up @@ -993,6 +1005,7 @@ async def everyrow_progress(
unless the task is completed or failed. The tool handles pacing internally.
Do not add commentary between progress calls, just call again immediately.
"""
logger.debug("everyrow_progress: task_id=%s", params.task_id)
client = _get_client(ctx)
task_id = params.task_id

Expand Down Expand Up @@ -1033,6 +1046,9 @@ async def everyrow_progress(
ts = TaskState(status_response)
ts.write_file(task_id)

if ts.is_terminal:
logger.info("everyrow_progress: task_id=%s status=%s", task_id, ts.status.value)

return [TextContent(type="text", text=ts.progress_message(task_id))]


Expand All @@ -1044,6 +1060,7 @@ async def everyrow_results_stdio(
Only call this after everyrow_progress reports status 'completed'.
Pass output_path (ending in .csv) to save results as a local CSV file.
"""
logger.info("everyrow_results (stdio): task_id=%s", params.task_id)
client = _get_client(ctx)
task_id = params.task_id

Expand Down Expand Up @@ -1092,6 +1109,12 @@ async def everyrow_results_http(
controls how many rows _you_ can read.
After results load, tell the user how many rows you can see vs the total.
"""
logger.info(
"everyrow_results (http): task_id=%s offset=%s page_size=%s",
params.task_id,
params.offset,
params.page_size,
)
client = _get_client(ctx)
task_id = params.task_id
mcp_server_url = ctx.request_context.lifespan_context.mcp_server_url
Expand Down Expand Up @@ -1186,6 +1209,11 @@ async def everyrow_list_sessions(
Use this to find past sessions or check what's been run.
Results are paginated — 25 sessions per page by default.
"""
logger.info(
"everyrow_list_sessions: offset=%s limit=%s",
params.offset,
params.limit,
)
log_client_info(ctx, "everyrow_list_sessions")
client = _get_client(ctx)

Expand Down Expand Up @@ -1251,6 +1279,7 @@ async def everyrow_balance(ctx: EveryRowContext) -> list[TextContent]:
Returns the account balance in dollars. Use this to verify available
credits before submitting tasks.
"""
logger.info("everyrow_balance: called")
client = _get_client(ctx)

try:
Expand All @@ -1266,6 +1295,7 @@ async def everyrow_balance(ctx: EveryRowContext) -> list[TextContent]:
)
]

logger.info("everyrow_balance: $%.2f", response.current_balance_dollars)
return [
TextContent(
type="text",
Expand Down Expand Up @@ -1293,6 +1323,7 @@ async def everyrow_list_session_tasks(
Use this to find task IDs for a session so you can display previous results
with mcp__display__show_task(task_id, label).
"""
logger.info("everyrow_list_session_tasks: session_id=%s", params.session_id)
client = _get_client(ctx)

try:
Expand Down Expand Up @@ -1341,6 +1372,7 @@ async def everyrow_cancel(
params: CancelInput, ctx: EveryRowContext
) -> list[TextContent]:
"""Cancel a running everyrow task. Use when the user wants to stop a task that is currently processing."""
logger.info("everyrow_cancel: task_id=%s", params.task_id)
log_client_info(ctx, "everyrow_cancel")
client = _get_client(ctx)
task_id = params.task_id
Expand Down
41 changes: 39 additions & 2 deletions everyrow-mcp/src/everyrow_mcp/uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ async def request_upload_url(

try:
engine_upload_url = data["upload_url"]
upload_id = data["upload_id"]
# Rewrite the URL to point at the MCP server instead of the Engine.
# The Claude.ai sandbox can reach the MCP server but not api.everyrow.ai.
upload_url = _rewrite_upload_url(engine_upload_url, mcp_server_url)
result = {
"upload_url": upload_url,
"upload_id": data["upload_id"],
"upload_id": upload_id,
"expires_in": data["expires_in"],
"max_size_bytes": data["max_size_bytes"],
"curl_command": f'curl -X PUT -H "Content-Type: text/csv" -T {shlex.quote(params.filename)} {shlex.quote(upload_url)}',
Expand All @@ -138,6 +139,12 @@ async def request_upload_url(
)
]

logger.info(
"Upload URL requested: upload_id=%s filename=%s expires_in=%s",
upload_id,
params.filename,
data.get("expires_in"),
)
return [TextContent(type="text", text=json.dumps(result))]


Expand Down Expand Up @@ -205,19 +212,49 @@ async def proxy_upload(request: Request) -> Response:
engine_url = f"{engine_url}?{request.url.query}"

body = await request.body()
size_bytes = len(body)
headers = {
k: v
for k, v in request.headers.items()
if k.lower() in ("content-type", "content-length")
}

logger.info(
"Upload proxy started: upload_id=%s size_bytes=%d",
upload_id,
size_bytes,
)

try:
async with httpx.AsyncClient(timeout=_PROXY_TIMEOUT) as http:
resp = await http.put(engine_url, content=body, headers=headers)
except httpx.HTTPError as exc:
logger.error("Upload proxy failed: %s", exc)
logger.error("Upload proxy failed: upload_id=%s error=%s", upload_id, exc)
return JSONResponse({"detail": "Upload proxy error"}, status_code=502)

if resp.status_code >= 400:
logger.warning(
"Upload proxy error response: upload_id=%s status=%d body=%s",
upload_id,
resp.status_code,
resp.text[:200],
)
else:
# Parse artifact_id from Engine response for traceability
artifact_id = None
try:
resp_data = resp.json()
artifact_id = resp_data.get("artifact_id")
except Exception:
pass
logger.info(
"Upload proxy completed: upload_id=%s status=%d artifact_id=%s size_bytes=%d",
upload_id,
resp.status_code,
artifact_id,
size_bytes,
)

return Response(
content=resp.content,
status_code=resp.status_code,
Expand Down