Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 69 additions & 24 deletions backend/app/adapters/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import httpx

from backend.app.adapters.credentials import resolve_credentials
from backend.app.adapters.errors import ClusterUnreachableError, TargetNotFoundError
from backend.app.adapters.errors import (
ClusterUnreachableError,
TargetNotFoundError,
TargetsForbiddenError,
)
from backend.app.adapters.protocol import (
EngineType,
ExplainTree,
Expand Down Expand Up @@ -356,14 +360,41 @@ async def list_targets(self, *, request_id: str | None = None) -> list[TargetInf

System indices (those whose name starts with ``.``) are filtered out
so the operator sees only user-facing collections.

``translate_errors=False`` is used so the per-status mapping below can
distinguish ACL-restricted clusters (401/403 → ``TargetsForbiddenError``)
from unreachable clusters (5xx / connection failures →
``ClusterUnreachableError``). The frontend uses this distinction to
auto-engage manual-mode target entry on ACL restriction
(``feat_create_study_target_autocomplete`` FR-2 / FR-5).

Raises:
TargetsForbiddenError: when the cluster returns HTTP 401 or 403.
ClusterUnreachableError: connection failures (after the one
internal retry in ``_request``) or any non-2xx response other
than 401/403.
"""
resp = await self._request(
"GET",
"/_cat/indices",
params={"format": "json", "h": "index,docs.count"},
request_id=request_id,
)
resp.raise_for_status()
try:
resp = await self._request(
"GET",
"/_cat/indices",
params={"format": "json", "h": "index,docs.count"},
request_id=request_id,
translate_errors=False,
)
except httpx.HTTPError as exc:
# _request with translate_errors=False re-raises httpx
# connection-class exceptions (ConnectError, RemoteProtocolError,
# ConnectTimeout, ReadTimeout) AFTER its one internal retry. We
# translate here so the router emits 503 CLUSTER_UNREACHABLE
# instead of letting the raw exception surface as 500.
raise ClusterUnreachableError(str(exc)) from exc
if resp.status_code in (401, 403):
raise TargetsForbiddenError(
f"cluster denied listing call (HTTP {resp.status_code} from /_cat/indices)"
)
if resp.status_code >= 400:
raise ClusterUnreachableError(f"HTTP {resp.status_code} from /_cat/indices")
rows: list[dict[str, Any]] = resp.json()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resp.json() call assumes the response is a list. If the API returns a dictionary or None, this could lead to runtime errors in the subsequent loop. Consider adding a check to ensure rows is a list.

out: list[TargetInfo] = []
for row in rows:
Expand Down Expand Up @@ -393,15 +424,23 @@ async def get_schema(self, target: str, *, request_id: str | None = None) -> Sch

Raises:
TargetNotFoundError: when the cluster returns 404 for ``target``.
ClusterUnreachableError: connection / auth / 5xx; raised inside
``_request`` because ``translate_errors=True`` (default).
ClusterUnreachableError: connection failures (after the one internal
retry in ``_request``) or any non-2xx response other than 404.
"""
mapping_resp = await self._request(
"GET",
f"/{target}/_mapping",
request_id=request_id,
translate_errors=False,
)
try:
mapping_resp = await self._request(
"GET",
f"/{target}/_mapping",
request_id=request_id,
translate_errors=False,
)
except httpx.HTTPError as exc:
# _request with translate_errors=False re-raises httpx
# connection-class exceptions AFTER its one internal retry.
# Translate to ClusterUnreachableError so the router emits 503
# CLUSTER_UNREACHABLE instead of 500 INTERNAL_ERROR. Mirrors the
# pattern adopted by list_targets in feat_create_study_target_autocomplete.
raise ClusterUnreachableError(str(exc)) from exc
if mapping_resp.status_code == 404:
raise TargetNotFoundError(target)
if mapping_resp.status_code in (401, 403) or mapping_resp.status_code >= 500:
Expand Down Expand Up @@ -620,15 +659,21 @@ async def explain(

Maps engine errors:
* 404 → ``TargetNotFoundError`` (target doesn't exist).
* Auth / 5xx → ``ClusterUnreachableError`` (raised by ``_request``).
* Auth / 5xx → ``ClusterUnreachableError``.
* Connection failures (after one internal retry in ``_request``) →
``ClusterUnreachableError`` via the defensive ``httpx.HTTPError``
catch (mirrors ``list_targets`` + ``get_schema``).
"""
resp = await self._request(
"POST",
f"/{target}/_explain/{doc_id}",
json=query.body,
request_id=request_id,
translate_errors=False,
)
try:
resp = await self._request(
"POST",
f"/{target}/_explain/{doc_id}",
json=query.body,
request_id=request_id,
translate_errors=False,
)
except httpx.HTTPError as exc:
raise ClusterUnreachableError(str(exc)) from exc
if resp.status_code == 404:
raise TargetNotFoundError(target)
if resp.status_code in (401, 403) or resp.status_code >= 500:
Expand Down
14 changes: 14 additions & 0 deletions backend/app/adapters/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
``CLUSTER_UNREACHABLE`` at the router (Story 2.1).
* ``TargetNotFoundError`` — index/collection 404 → 404 ``TARGET_NOT_FOUND``
at the router (Story 2.3 extension).
* ``TargetsForbiddenError`` — cluster denied the listing call (401/403 from
``_cat/indices``) → 403 ``TARGETS_FORBIDDEN`` at the router
(``feat_create_study_target_autocomplete`` Story B1). Distinct from
``ClusterUnreachableError`` so the frontend can route ACL-restricted
clusters to manual-mode input rather than retrying.
* ``InvalidQueryDSLError`` — per-query parse failure when the caller passes
``strict_errors=True`` (run_query API path) → 400 ``INVALID_QUERY_DSL``
at the router (Story 2.5 extension).
Expand Down Expand Up @@ -34,6 +39,15 @@ def __init__(self, target: str) -> None:
self.target = target


class TargetsForbiddenError(Exception):
"""Cluster denied the listing call (401/403 from ``_cat/indices``).

Maps to 403 TARGETS_FORBIDDEN, ``retryable=false`` at the router. The
frontend auto-engages manual-mode target entry on this code; retrying
will not help because the cluster's ACL is the cause.
"""


class InvalidQueryDSLError(Exception):
"""Engine rejected a query body as malformed (top-level 400 or per-query parse).

Expand Down
9 changes: 8 additions & 1 deletion backend/app/adapters/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,14 @@ async def health_check(self, *, request_id: str | None = None) -> HealthStatus:
...

async def list_targets(self, *, request_id: str | None = None) -> list[TargetInfo]:
"""List indices/collections on the cluster (excludes engine system indices)."""
"""List indices/collections on the cluster (excludes engine system indices).

Concrete implementations raise ``TargetsForbiddenError`` when the engine
denies the listing call due to ACL (401/403), and ``ClusterUnreachableError``
for connection failures / 5xx. Mirrors ``get_schema``'s pattern of
404 → ``TargetNotFoundError``: per-failure exception classes let the
router translate to distinct ``error_code`` envelopes.
"""
...

async def get_schema(self, target: str, *, request_id: str | None = None) -> Schema:
Expand Down
44 changes: 44 additions & 0 deletions backend/app/api/v1/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* ``GET /api/v1/clusters/{cluster_id}`` — detail
* ``DELETE /api/v1/clusters/{cluster_id}`` — soft-delete
* ``GET /api/v1/clusters/{cluster_id}/schema`` — schema introspection
* ``GET /api/v1/clusters/{cluster_id}/targets`` — list indices/collections
* ``POST /api/v1/clusters/{cluster_id}/run_query``— ad-hoc DSL execution

Service exceptions are translated into the spec §7.5 error envelope here
Expand All @@ -31,6 +32,7 @@
InvalidQueryDSLError,
QueryTimeoutError,
TargetNotFoundError,
TargetsForbiddenError,
)
from backend.app.adapters.protocol import HealthStatus
from backend.app.adapters.protocol import Schema as AdapterSchema
Expand All @@ -47,6 +49,7 @@
RunQueryHit,
RunQueryRequest,
RunQueryResponse,
TargetListResponse,
)
from backend.app.db import repo
from backend.app.db.models import Cluster
Expand Down Expand Up @@ -315,6 +318,47 @@ async def get_cluster_schema(
raise _err(503, "CLUSTER_UNREACHABLE", str(exc), True) from exc


# ---------------------------------------------------------------------------
# feat_create_study_target_autocomplete Story B2 — Target list
# ---------------------------------------------------------------------------


@router.get(
"/clusters/{cluster_id}/targets",
response_model=TargetListResponse,
tags=["clusters"],
)
async def list_cluster_targets(
cluster_id: str,
db: Annotated[AsyncSession, Depends(get_db)],
) -> TargetListResponse:
"""List targets (indices/collections) on the cluster (FR-1 / AC-1).

Thin passthrough to ``ElasticAdapter.list_targets()`` (which filters out
system indices whose names start with ``.``). Mirrors the ``get_cluster_schema``
pattern: ``get_cluster`` → ``acquire_adapter`` async context → adapter call
→ translate exceptions via the ``_err()`` helper to the spec §7.5 envelope.

Error mapping:
* cluster missing or soft-deleted → 404 ``CLUSTER_NOT_FOUND`` (retryable=false)
* adapter raises ``TargetsForbiddenError`` (ACL 401/403) → 403
``TARGETS_FORBIDDEN`` (retryable=false) — frontend auto-engages manual mode
* adapter raises ``ClusterUnreachableError`` (5xx / connection failure) → 503
``CLUSTER_UNREACHABLE`` (retryable=true)
"""
cluster = await repo.get_cluster(db, cluster_id)
if cluster is None:
raise _err(404, "CLUSTER_NOT_FOUND", f"cluster {cluster_id} not found", False)
try:
async with cluster_svc.acquire_adapter(cluster) as adapter:
targets = await adapter.list_targets()
return TargetListResponse(data=targets)
except TargetsForbiddenError as exc:
raise _err(403, "TARGETS_FORBIDDEN", str(exc), False) from exc
except (ClusterUnreachable, ClusterUnreachableError) as exc:
raise _err(503, "CLUSTER_UNREACHABLE", str(exc), True) from exc


# ---------------------------------------------------------------------------
# Story 3.4 — Run-query
# ---------------------------------------------------------------------------
Expand Down
15 changes: 15 additions & 0 deletions backend/app/api/v1/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator

from backend.app.adapters.protocol import TargetInfo
from backend.app.core.settings import get_settings

EngineType = Literal["elasticsearch", "opensearch"]
Expand Down Expand Up @@ -127,6 +128,20 @@ class ClusterListResponse(BaseModel):
has_more: bool


class TargetListResponse(BaseModel):
"""Response for ``GET /api/v1/clusters/{cluster_id}/targets`` (FR-1).

Unpaginated by design — see feature_spec.md §7.1 "pagination shape
rationale". The single-resource lookup pattern matches
``/clusters/{id}/schema`` rather than the queryable ``/clusters`` list.
``EntitySelectListPage<T>``'s ``next_cursor`` and ``has_more`` fields
are optional, so this bare ``data``-only shape consumes correctly on
the frontend without pretending to be a cursor endpoint.
"""

data: list[TargetInfo]


class RunQueryRequest(BaseModel):
"""``POST /api/v1/clusters/{id}/run_query`` body."""

Expand Down
24 changes: 24 additions & 0 deletions backend/tests/contract/test_clusters_api_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from __future__ import annotations

from backend.app.adapters.protocol import TargetInfo
from backend.app.api.v1.schemas import (
ClusterDetail,
ClusterListResponse,
Expand All @@ -21,6 +22,7 @@
HealthCheckResult,
RunQueryRequest,
RunQueryResponse,
TargetListResponse,
)


Expand All @@ -33,6 +35,28 @@ def test_response_schemas_importable() -> None:
assert HealthCheckResult is not None
assert RunQueryRequest is not None
assert RunQueryResponse is not None
assert TargetListResponse is not None


def test_target_list_response_is_bare_data_shape() -> None:
"""feat_create_study_target_autocomplete B2 / spec §7.1: TargetListResponse
is the bare ``{ data: TargetInfo[] }`` shape — NO ``next_cursor`` or
``has_more`` fields.

The decision is documented in feature_spec.md §7.1 "pagination shape
rationale" (cycle-1 GPT-5.5 review fix): EntitySelectListPage<T>'s
cursor fields are optional, so the bare data shape consumes correctly
on the frontend without pretending to be a cursor endpoint.
"""
declared = set(TargetListResponse.model_fields.keys())
assert declared == {"data"}, f"unexpected extra fields: {declared - {'data'}}"

# Round-trip a happy-path payload through the model to lock the shape.
payload = {"data": [{"name": "products", "doc_count": 42}]}
resp = TargetListResponse.model_validate(payload)
assert resp.data == [TargetInfo(name="products", doc_count=42)]
# Serialized form matches the wire contract.
assert resp.model_dump() == payload


def test_create_cluster_request_validates_scheme() -> None:
Expand Down
Loading
Loading