Skip to content

BigQuery datastore backend: real CRUD + Frictionless schemas + lifespan health#1

Open
sagargg wants to merge 22 commits into
mainfrom
feat/bigquery-backend
Open

BigQuery datastore backend: real CRUD + Frictionless schemas + lifespan health#1
sagargg wants to merge 22 commits into
mainfrom
feat/bigquery-backend

Conversation

@sagargg
Copy link
Copy Markdown
Member

@sagargg sagargg commented May 20, 2026

Summary

Replaces the BigQuery engine placeholder with a working backend covering all six datastore_* actions, plus the surrounding wiring (Frictionless schemas as the canonical column shape, real /ready healthcheck, system columns).

Highlights

  • Engineinfrastructure/engines/bigquery/backend.py now implements datastore_create, upsert, delete, search, search_sql, info against real BigQuery. Split into client.py (auth/Client construction), lib.py (pure SQL helpers — DDL, MERGE, INSERT, UPDATE), metadata.py (_table_metadata row store for per-resource Frictionless schemas), types.py (Frictionless ↔ BigQuery type map + widening rules).
  • Frictionless schema as canonical shapedatastore_create accepts schema (Frictionless Table Schema); legacy fields + primary_key are auto-converted and emit DeprecationWarning. datastore_info / datastore_search / datastore_search_sql return both legacy fields and Frictionless schema so clients can migrate at their own pace.
  • DML, not streaming — INSERT / UPSERT / UPDATE all go through DML (INSERT INTO ... SELECT FROM UNNEST(JSON_QUERY_ARRAY(@rows)) / MERGE / UPDATE … FROM) to avoid BigQuery's 30–90 min streaming-buffer lockout that blocks follow-up MERGE/UPDATE on the same row.
  • System columns — every resource table gets _id (INT64, monotonically increasing via inlined (SELECT IFNULL(MAX(_id), 0) FROM tbl) + ROW_NUMBER()) and, behind Config.INCLUDE_UPDATED_AT (default on), _updated_at TIMESTAMP. MERGE only bumps _updated_at when a non-PK column actually differs (NULL-safe IS DISTINCT FROM, JSON canonicalised via TO_JSON_STRING).
  • Lifespan + /ready — engines are constructed once at startup and stashed on app.state; /ready calls engine.healthcheck() for both read and write engines and returns 503 when either fails.
  • Readable errors — BigQuery error messages are translated to CKAN-shaped ValidationError / ConflictError / ServerError (duplicate-PK on MERGE, bad-type casts, out-of-range, invalid date/timestamp, etc.).

Layout (new files)

datastore/infrastructure/engines/bigquery/
  ├── backend.py        # DatastoreBackend impl
  ├── client.py         # bigquery.Client construction
  ├── lib.py            # pure SQL helpers (DDL, DML renderers, JSON extractors)
  ├── metadata.py       # MetadataStore Protocol + BigQuery impl
  ├── types.py          # Frictionless ↔ BigQuery type map + widening rules
  └── allowed_functions.txt

MetadataStore is a Protocol so future engines (DuckLake, Postgres) plug in without copying the backend.

Test plan

  • pytest — all 180 tests green (engine-level tables tests, metadata store tests, endpoint tests, write service units)
  • Layer arrow holds: rg "from (fastapi|starlette)" datastore/services datastore/infrastructure datastore/core returns nothing
  • Smoke test against a real BigQuery dataset:
    • datastore_create with Frictionless schema → table + _table_metadata row created
    • datastore_create with legacy fields + primary_key → deprecation warning, same result
    • datastore_upsert (method=upsert) — new rows inserted, matching rows updated only when non-PK columns differ, _updated_at advances only on real change
    • datastore_upsert (method=update) — NotFoundError when any PK is unmatched
    • datastore_search returns both fields and schema
    • datastore_search_sql blocks functions outside allowed_functions.txt
    • datastore_info round-trips the info data dictionary verbatim
    • Duplicate-PK insert surfaces as ValidationError, not a 500
    • Bad cast ("jk" into a number column) surfaces as ValidationError with the column name
    • /ready returns 503 when BigQuery credentials are bad
    • Config.INCLUDE_UPDATED_AT=false → table created without _updated_at; MERGE/UPDATE SQL omits the column

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Pluggable auth providers (ckan, jwt, anonymous) and BigQuery datastore backend
    • New endpoints: datastore_delete, datastore_info, and streaming datastore dump (csv/ndjson/parquet) with ETag caching
    • Frictionless Table Schema as canonical create/search shape; streaming/search now include canonical schema
  • Documentation

    • Updated README and docs for auth, BigQuery config, and API/usage changes
  • Tests

    • Expanded test suites covering endpoints, auth providers, BigQuery backend, pagination, and dump behavior

Review Change Stack

sagargg and others added 10 commits May 18, 2026 14:10
- New GET /api/3/action/datastore_info. Returns
  `result: { meta: dict, fields: [{...}] }` — `fields` is the column
  schema, `meta` is a free-form dict (engine populates whatever extras
  it has: row count, table size, etc.).
- Schema accepts either `resource_id` or `id`; model_validator requires
  at least one and normalises `id` → `resource_id` so downstream code
  reads a single field.
- New `InfoResult` dataclass on the engine ABC; `bigquery.info()` stub
  fixed (previously a no-op that returned None) — now returns a real
  InfoResult.
- Service `info_datastore` calls the read-only engine and reshapes
  into `DatastoreInfoResponse.Result`. Endpoint authorizes the caller
  (same gate as datastore_search) and uses the standard
  `_success_response` envelope — info responses are small, no streaming.
- Also: re-add `.gitignore` entry for the local-only test engine
  (the earlier follow-up commit was lost in a rebase / sync).
- New POST /api/3/action/datastore_delete. Body accepts `resource_id` /
  `id` (one required, normalised); optional `filters` (dict) — omit to
  drop the whole table; optional `force` for read-only resources.
- Response echoes back the resource + filters (CKAN convention), with
  `filters` excluded from the wire when omitted.
- Service `delete_datastore` calls the rw engine and reshapes into
  DatastoreDeleteResponse.Result. Engine placeholder fixed (was a
  no-op `{}` expression) — now returns a real WriteResult.
- Endpoint authorizes the caller with `permission="delete"` (same gate
  pattern as create / upsert).
…earch_sql

- CLAUDE.md + README.md: roadmap shows all six datastore_* actions now
  wired end-to-end; "Next" trimmed to BigQuery adapter + readiness +
  observability + DuckLake. §5.2 status table refreshed (no more 501
  stubs); §3 services tree lists read.py + streaming.py.
- example_payload/: new folders for datastore_delete (with_filters,
  whole_table, force_readonly), datastore_info (basic + id alias), and
  datastore_search_sql (basic, aggregate, with_cte). All use a UUID-shaped
  resource_id matching the real CKAN datastore table-naming convention.
- example_payload/README.md tree updated.
Engines are now built once during lifespan (`warmup_engines`) and probed
by `/ready` via `engine.healthcheck()` — 503 with a Service Unavailable
envelope if rw or ro fails. Registry imports a `Backend` alias from each
engine package so the concrete class name stays engine-private; adding
a backend remains "drop a folder". Renames BQ_* env vars to BIGQUERY_*
and splits credentials into rw / ro. Docs refreshed.
… fields

Make Frictionless Table Schema the native column shape for
`datastore_create` while keeping the legacy CKAN `fields` + `primary_key`
input working end-to-end for back-compat.

Request accepts exactly one of `schema` or legacy `fields`; the validator
folds the legacy pair into a canonical Frictionless schema so the service
and engine only ever see one shape. Response surfaces both for callers on
either side of the migration — `schema` is canonical, `fields` /
`primary_key` are mirrors marked `deprecated` in OpenAPI / IDE tooltips.

When legacy input is used, the envelope grows a `warnings` array pulled
directly from `Field(deprecated=...)` on the model — single source of
truth for the warning text.

Engine `create()` ABC takes a single `schema: dict` (no more separate
`fields` / `unique_keys`); BigQuery + bigquery_test backends updated.

Postgres <-> Frictionless type maps added in `core/constants.py`;
conversion helpers in `schemas/validators.py`. README has a brief
"Column definitions" section explaining the migration goal, and
`example_payload/datastore_create/with_schema.json` provides a
ready-to-curl Frictionless payload.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…cy fields

Extend the same dual-shape pattern shipped for `datastore_create` to
`datastore_info`. The response now carries both `schema` (canonical
Frictionless Table Schema) and `fields` (legacy `{id, type, info}` list
marked `deprecated` in OpenAPI), so clients on either side of the
migration see what they expect.

Engine `InfoResult` grows a `schema: dict` field next to the existing
`fields` and `meta`. Both placeholder backends populate both shapes —
the local `bigquery_test` engine projects its `DEMO_SCHEMA` into the
Frictionless `{name, type}` form without disturbing `search`, which
still uses the legacy shape via the same `DEMO_SCHEMA` constant.

Service passes both through verbatim; tests pin the new envelope keys.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…gacy fields

Extend the dual-shape response pattern shipped for `datastore_create` /
`datastore_info` to the streaming endpoints `datastore_search` and
`datastore_search_sql`. The CKAN envelope now carries `schema` (canonical
Frictionless Table Schema, same shape `datastore_create` stores) ahead
of `fields` (legacy `{id, type}` list, marked `deprecated` in OpenAPI),
so clients on either side of the migration see what they expect.

Engine `SearchResult` swaps `fields` for `schema` — engines produce the
canonical shape, the service derives the legacy list via the existing
`frictionless_schema_to_fields` helper. Per the migration direction,
schema is what we'll persist; legacy fields is a derived view.

Streaming writers (objects / lists / csv / tsv) all thread the new
`schema` kwarg through `_stream_envelope` so it's emitted lazily into
the body before the `records` block — peak memory still ≈ 1 row.

BigQuery placeholder + local `bigquery_test` backend updated. Test
mock-helper still accepts the legacy `{id, type}` shape for caller
convenience but converts to Frictionless before constructing
`SearchResult`. `test_datastore_search_sql` pins the new envelope key.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e tables

First real BigQuery write path. `datastore_create` now persists the
declared Frictionless schema and creates / migrates the per-resource
data table.

Layout:
  - `metadata.py` — `BigQueryMetadataStore` implements the new
    engine-agnostic `MetadataStore` Protocol added to `engines/base.py`.
    Owns the `_table_metadata` table (resource_id, schema JSON,
    created_at, updated_at) with `initialize` / `insert` / `update` /
    `get` / `delete`. Future engines drop a sibling `metadata.py` to
    satisfy the same Protocol.
  - `types.py` — Frictionless → BigQuery type map + `can_widen` for
    `ALTER COLUMN SET DATA TYPE` rules (INT64 → NUMERIC/BIGNUMERIC/
    FLOAT64; DATE → DATETIME/TIMESTAMP; everything else rejected).
  - `lib.py` — pure helpers (schema diff, ALTER-clause rendering,
    JSON-column serialisation, error formatting).
  - `backend.py` — orchestration. `create()` runs DDL → records insert
    → metadata write in that order so any failure short-circuits before
    the metadata row is written; metadata never describes a state the
    table doesn't match. Two branch helpers (`_apply_new_resource` /
    `_apply_existing_resource`) make the per-path sequence explicit.
    `ALTER TABLE` packs added columns + supported type widenings into
    one atomic statement; unsupported transitions raise `ConflictError`
    with a recreate-the-resource hint up-front. Records are
    stream-inserted via `insert_rows_json`; `object` / `array` /
    `geojson` field values are JSON-encoded first since BigQuery's
    `JSON` type expects strings on the wire.
  - Every `client.query` and `client.insert_rows_json` call is funnelled
    through `_run_query` / `_run_insert_rows` so transport / SQL errors
    surface as `ServerError` with `op` + `resource_id` baked in — never
    as raw `google.api_core` exceptions. Metadata store does the same
    via its own `_run`.

Config: new `BIGQUERY_DATASET` env var (the dataset that holds
`_table_metadata` and all user data tables). Placeholder mode
(BIGQUERY_PROJECT or BIGQUERY_DATASET unset) keeps the unit suite
runnable without GCP creds — `create()` is a no-op echo there.

Tests: 46 new (`tests/test_bigquery_metadata.py` + `tests/test_bigquery_tables.py`)
covering SQL shape, parameter binding, the diff/widening rules, JSON
serialisation, atomicity (data-op failure must not write metadata), and
error wrapping at every layer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rror mapping

Wire the upsert action end-to-end with three methods on a single
backend method:

  - "upsert"  → MERGE keyed on schema.primaryKey (update existing,
                insert new).
  - "insert"  → DML INSERT INTO ... SELECT FROM UNNEST(@rows).
                Replaces the old streaming insert_rows_json — that API
                parks rows in BigQuery's streaming buffer for 30-90 min
                and silently blocks any follow-up MERGE/UPDATE on the
                same table. DML writes go straight to storage, so a
                datastore_create + immediate datastore_upsert now works.
  - "update"  → DML UPDATE keyed on primaryKey. Compares
                num_dml_affected_rows against input row count and
                raises NotFoundError if any row had no matching key
                (UPDATE silently no-ops on misses otherwise).

Records ride as one JSON-array string parameter @rows for every path;
BigQuery unpacks via JSON_QUERY_ARRAY and type-aware extractors
(CAST(JSON_VALUE), PARSE_JSON(JSON_QUERY) for JSON columns). One
statement regardless of batch size.

BigQuery raw errors are mapped to clear ValidationErrors:
  - "Scalar subquery produced more than one element" → "duplicated
    rows with the same primary key".
  - "Bad <type> value: <v>" / "Could not cast '<v>' to type <T>" /
    "Could not parse '<v>' as <T>" → "Value <v> is not a valid
    <integer|number|boolean|...>".
  - "Value out of range for <T>: <v>" → "out of range" message.
  - "Invalid <date|timestamp|datetime|time>: <v>" → typed message.

Refactor: _run_query and _run_dml merged — _run_query now returns the
QueryJob so DML callers can grab num_dml_affected_rows without a
second helper. Module docstring + section markers updated. Per-row
JSON-column serialiser removed (handled inside SQL via PARSE_JSON).

Tests:
  - Test file restructured to one test per behaviour (~22 essentials,
    no duplicates).
  - New autouse conftest fixture clears BIGQUERY_PROJECT/DATASET/creds
    + resets engine cache so the suite stays hermetic regardless of a
    developer's .env. Restores the placeholder-echo branch for tests
    that predate the upsert wiring.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…E_UPDATED_AT

- Always inject `_id` INT64; assign via inlined `(SELECT MAX(_id) FROM tbl) + ROW_NUMBER()` so no extra round-trip.
- Add `_updated_at` TIMESTAMP behind `Config.INCLUDE_UPDATED_AT` (default on); MERGE bumps it only when a non-PK column actually differs (NULL-safe `IS DISTINCT FROM`, JSON canonicalised via `TO_JSON_STRING`).
- Trim verbose docstrings/comments in bigquery/lib.py; rename `user_*` locals to `data_*` / `insert_*`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 20, 2026

📝 Walkthrough

Walkthrough

Canonical Frictionless Table Schema becomes the input/output contract; auth is pluggable (ckan, jwt, anonymous); a full BigQuery backend (client, metadata, SQL builders, DDL/DML, search/sql, dump) is implemented; endpoints, streaming, readiness, config/docs, registry wiring, Postman generation, and tests were added.

Changes

End-to-end datastore refactor and BigQuery backend

Layer / File(s) Summary
Canonical schema, validators and constants
datastore/core/constants.py, datastore/schemas/validators.py, datastore/schemas/request.py, datastore/schemas/responses.py
Introduces Frictionless schema canonical form, validators to convert legacy {fields, primary_key} ↔ frictionless schema, request validation, and response models exposing both canonical schema and deprecated legacy fields.
Auth provider model, registry, and providers
datastore/auth/*, datastore/auth/registry.py, datastore/auth/base.py
Adds AuthProvider protocol and Decision dataclass, provider registry, and three providers: CKAN (cached), JWT (local verification), and Anonymous.
API context, auth orchestration, and responses
datastore/api/context.py, datastore/api/auth.py, datastore/api/responses.py, datastore/api/endpoints/*, datastore/api/routes.py
Rewires request context to carry auth provider, centralizes provider-agnostic authorize boundary, updates endpoints (create/info/delete/search/sql/upsert/dump), and includes deprecation warnings in success envelopes.
Engine contracts and registry
datastore/infrastructure/engines/base.py, datastore/infrastructure/engines/registry.py
Tightens engine abstract API to use schema and typed records, adds InfoResult/MetadataStore, and caches/warmups engine instances for rw/ro modes.
BigQuery backend, client, SQL builders, metadata
datastore/infrastructure/engines/bigquery/{client.py,types.py,lib.py,search.py,backend.py,metadata.py,__init__.py}
Full BigQuery implementation: client construction, type mappings, SQL builders (search/count/insert/merge/update/delete/alter/drop/qualify), metadata store, error translation, search/sql/read/write/delete/info/dump, and Backend export alias.
Streaming and dump services
datastore/services/streaming.py, datastore/services/dump.py, datastore/services/read.py, datastore/services/write.py
Streaming writers include canonical schema in CKAN envelopes; dump service streams CSV/NDJSON/Parquet with coalescing and backpressure; read/write services derive legacy fields from schema, add info/delete actions, and pagination link logic.
Health, readiness, and main startup wiring
datastore/api/endpoints/health.py, datastore/main.py
Splits welcome vs probe routers; /ready probes rw and ro engine health and returns 503 CKAN-envelope when not ready; startup warmups engines and resets cache on shutdown.
Config, env, docs, Postman generator
.env.example, datastore/core/config.py, README.md, CLAUDE.md, postman/*, pyproject.toml
Replaces BQ env vars with BIGQUERY_*, replaces AUTH_ENABLED with AUTH_TYPE and JWT settings, updates docs and README, adds Postman generator, and adds pyjwt dependency.
Tests and fixtures
tests/... (many files), tests/conftest.py
Extensive test additions and updates: auth provider tests (anonymous/ckan/jwt), BigQuery engine unit tests, endpoint E2E tests for create/search/search_sql/info/delete/dump, and registry/health/dump/coalescing tests.

Sequence Diagram(s)

sequenceDiagram
  rect rgba(220, 245, 255, 0.5)
  participant Client
  participant API
  end
  rect rgba(245, 235, 255, 0.5)
  participant AuthProvider
  participant CKAN as CKAN (optional)
  end
  rect rgba(235, 255, 235, 0.5)
  participant Services
  participant EngineRegistry
  participant BQ as BigQueryBackend
  end

  Client->>API: datastore_* request
  API->>AuthProvider: authorize(api_key, resource, permission)
  alt CKAN auth
    AuthProvider->>CKAN: datastore_authorize(...)
    CKAN-->>AuthProvider: decision(resource/package)
  end
  AuthProvider-->>API: Decision
  API->>Services: execute(action, data_dict, context)
  Services->>EngineRegistry: get_datastore_engine(ro/rw)
  EngineRegistry-->>Services: Backend (cached)
  Services->>BQ: create/search/upsert/info/delete/dump
  BQ-->>Services: Result (schema, records/meta)
  Services-->>API: Result + pagination/links
  API-->>Client: CKAN envelope (+warnings)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Poem

A rabbit taps keys with a thump-thump cheer,
Frictionless fields now crystal-clear.
BigQuery burrows, swift and bright,
Auth swaps masks: ckan, jwt, or light.
Streams hum soft; readiness winks—hooray! 🐇✨

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/bigquery-backend

@sagargg sagargg changed the title Feat/bigquery backend BigQuery datastore backend: real CRUD + Frictionless schemas + lifespan health May 20, 2026
sagargg and others added 2 commits May 20, 2026 22:27
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…w_count

- Backend reads the stored Frictionless schema from `_table_metadata` so `primaryKey` and the per-field `info` data dictionary round-trip exactly as declared; raises NotFoundError when undeclared.
- Row count comes from BigQuery's per-dataset `__TABLES__.row_count` (metadata-only, no full-table scan) — `COUNT(*)` would have billed bytes per call.
- Drop `fields` from InfoResult; the service derives legacy fields via `frictionless_schema_to_fields`, mirroring how `search` already works (engine stays out of the schemas/ layer).
- Soft fallback: missing data table while metadata exists reports total=0 with a warning, instead of 500-ing the call.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 20

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
datastore/infrastructure/engines/bigquery/backend.py (1)

539-585: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

search and search_sql are still hard-coded empty results.

Both methods ignore the actual table data and always return empty iterators, so every read path will behave as if the resource has zero rows. That is still a missing core backend implementation, not a stub that can ship behind the “real CRUD” PR scope.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 539 - 585,
The search and search_sql methods currently return hard-coded empty SearchResult
objects; implement real BigQuery queries in search and search_sql: in
search(build a parameterised SELECT using the resource_id/table name, filters,
q, distinct, fields, sort, limit and offset), execute via the BigQuery client
(client.query(...)), use query_job.result() as a lazy iterator to populate
records, construct schema from the returned query schema or provided fields, run
a COUNT(*) when include_total is true to set total, and set
records_truncated=True if the iterator reached the limit; in search_sql(execute
the raw SQL with client.query(sql, job_config=...) , derive schema from
query_job.schema/result, yield rows lazily from query_job.result(), and set
records_truncated when the limit is hit). Ensure you reference and update the
SearchResult construction in both search and search_sql to use the real schema,
records iterator, total, and records_truncated values.
🟡 Minor comments (2)
README.md-56-63 (1)

56-63: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Refresh BigQuery backend status in the project tree.

backend.py is still described as placeholder here; that no longer matches the implemented backend and may confuse readers.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@README.md` around lines 56 - 63, README's project tree still calls
bigquery/backend.py a "DatastoreBackend subclass (placeholder)"; update the
description to reflect that the BigQuery backend is implemented (reference the
concrete class BigQueryBackend in bigquery/backend.py) and briefly list what the
file provides (implemented backend logic rather than a placeholder). Keep
surrounding entries (bigquery/client.py, lib.py, allowed_functions.txt)
unchanged and ensure the wording matches the actual implemented functionality.
CLAUDE.md-378-379 (1)

378-379: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update status text that still says engine logic is placeholder.

These lines now read as outdated and can mislead contributors about what is already implemented in this PR.

Also applies to: 807-808

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@CLAUDE.md` around lines 378 - 379, Replace the outdated status sentence
"Engine business logic is still placeholder (returns empty results / echoes
inputs)" with an accurate summary of current state: note that call path,
validation, auth, streaming and per-engine datastore_search_sql allow-list are
implemented and only the BigQuery adapter remains (referenced in §7); update the
same phrasing wherever it repeats to avoid misleading contributors and ensure
the status reflects what is implemented versus what remains.
🧹 Nitpick comments (1)
tests/test_health.py (1)

39-47: ⚡ Quick win

Add a /docs smoke test in this health suite.

A tiny assertion for GET /docs would catch OpenAPI regressions early alongside liveness/readiness checks.

Based on learnings: Ensure OpenAPI documentation loads at /docs without errors.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_health.py` around lines 39 - 47, Add a small smoke test that
requests the OpenAPI UI so regressions are caught alongside liveness checks:
either extend test_health_returns_ok or add a new test (e.g., test_docs_served)
that calls client.get("/docs") and asserts a 200 response and basic content
(e.g., response.status_code == 200 and response.headers["content-type"] contains
"text/html" or response.text contains "Swagger" / "ReDoc") to ensure the docs
page loads without errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In @.env.example:
- Around line 10-18: The .env.example is missing entries that match the fields
declared in datastore/core/config.py; add entries for HTTP_MAX_CONNECTIONS,
HTTP_MAX_KEEPALIVE_CONNECTIONS, and INCLUDE_UPDATED_AT (with example values or
blanks) using the exact variable names used in Config so pydantic-settings
doesn't ignore them, and update the other relevant section mentioned in the
comment to include the same variables for consistency.

In `@datastore/api/endpoints/health.py`:
- Around line 47-61: The 503 branch in datastore/api/endpoints/health.py returns
an "error" key instead of the CKAN envelope "result" object; update the
JSONResponse content for the failing branch to use the CKAN envelope shape {
"help": str(request.url), "success": False, "result": { ... } } (move the
existing "__type" and "message" fields currently under "error" into "result"),
keep status_code=503 and the same message building that uses failing and
request.url so the /ready response conforms to the required contract.

In `@datastore/core/constants.py`:
- Around line 61-115: The POSTGRES_TO_FRICTIONLESS and FRICTIONLESS_TO_POSTGRES
maps currently introduce non-canonical Frictionless types (interval -> duration
and entries for duration, year, yearmonth); remove those non-canonical mappings
and normalize to the repo's canonical Frictionless subset. Concretely, change
POSTGRES_TO_FRICTIONLESS to map "interval" to a canonical type (e.g., "string"
or "any") instead of "duration", and remove the "duration", "year", and
"yearmonth" keys from FRICTIONLESS_TO_POSTGRES (do not add new non-canonical
keys); leave only mappings for the canonical types listed in the repo (integer,
number, string, boolean, date, datetime, time, object, array, geopoint, geojson,
any) so legacy fields round-trip into canonical schemas.

In `@datastore/infrastructure/engines/base.py`:
- Around line 17-18: Change all bare collection annotations to be parameterized
with typing.Any: import Any from typing and update schema: dict to schema:
dict[str, Any], records: Iterator[tuple] to records: Iterator[tuple[Any, ...]]
(or Iterator[tuple[int, ...]] if elements have known types), and similarly
replace any bare list/dict/tuple elsewhere (e.g., the other annotated blocks
referenced) with list[dict[str, Any]] / dict[str, Any] / tuple[Any, ...] or more
specific element types where known; ensure typing.Iterator/Sequence/Mapping are
used consistently and add the necessary typing imports.

In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 102-110: The readiness check currently returns healthy even when
BIGQUERY_DATASET is unset and metadata is disabled; modify BigQueryBackend so
initialize() records the disabled state (e.g., set self.metadata_available or
leave self.metadata as None) and update healthcheck() to return False when the
dataset/metadata layer is unavailable (check BIGQUERY_DATASET.strip() or the new
self.metadata_available flag), so /ready fails if create()/info() would be
disabled; apply the same change to the other identical dataset-check block later
in BigQueryBackend (the duplicate around the other initialization path).
- Around line 587-595: The delete() method in backend.py currently returns an
empty WriteResult and must be implemented to actually remove data: when filters
is None issue a DROP TABLE IF EXISTS <resource> via the BigQuery client (or use
client.delete_table on the fully-qualified table reference) and when filters is
provided build a parameterised DELETE FROM `<resource>` WHERE ... SQL (use
parameter binding to avoid injection) and execute it via client.query(), then
construct and return a WriteResult containing the row count/affected status;
update the delete() implementation to use the existing BigQuery client instance,
resource_id and filters parameters and populate WriteResult accordingly.

In `@datastore/infrastructure/engines/bigquery/client.py`:
- Around line 31-35: The RO credential path currently ignores
BIGQUERY_CREDENTIALS when BIGQUERY_CREDENTIALS_RO is blank; update the logic
that sets creds_raw (referencing config.BIGQUERY_CREDENTIALS_RO and
config.BIGQUERY_CREDENTIALS) so that when mode == "ro" and
BIGQUERY_CREDENTIALS_RO.strip() is empty it falls back to using
BIGQUERY_CREDENTIALS.strip() before falling back to ADC; ensure the resulting
creds_raw variable is the chosen string used downstream (same variable name) so
RO behaves like RW when only one service account is configured.

In `@datastore/infrastructure/engines/bigquery/lib.py`:
- Around line 15-17: The code currently silently drops user-declared reserved
columns (defined by SYSTEM_COLUMN_NAMES) when building schemas; instead,
validate incoming schema.fields against SYSTEM_COLUMN_NAMES and raise a clear,
fast-failing exception (e.g., ValueError or a domain-specific SchemaError) if
any field name intersects with SYSTEM_COLUMN_NAMES; update the validation logic
that iterates schema.fields (the same area referenced around lines 32-40) to
perform this check before any mutation so callers receive an explicit error
mentioning the offending field name(s) and that `_id`/`_updated_at` are
reserved.
- Around line 130-143: The current id allocation using id_expr (MAX(`_id`) +
ROW_NUMBER()) is unsafe for concurrent writers (see id_expr, table_ref,
sys_vals, include_updated_at, sys_cols, data_cols, data_extractors) and must be
replaced with a serialized or collision-free allocator; change the code to
either use a BigQuery SEQUENCE (NEXTVAL(sequence_name) per row) or switch to a
collision-free ID like GENERATE_UUID() for _id, and update both occurrences
(this block and the repeated logic around lines 199-210) so inserts select
NEXTVAL(...) or GENERATE_UUID() instead of MAX(_id)+ROW_NUMBER(). Ensure the
chosen approach preserves the sys_vals construction (include_updated_at branch)
and works for batch UNNEST(`@rows`) inserts.

In `@datastore/infrastructure/engines/bigquery/metadata.py`:
- Around line 90-112: The insert() method can race and create multiple rows for
the same resource_id because it uses a plain INSERT; change it to an idempotent,
atomic operation (e.g., MERGE ... WHEN NOT MATCHED THEN INSERT or INSERT ...
SELECT WHERE NOT EXISTS) against self.table_ref so a second concurrent create
will be no-op rather than a duplicate; use the same parameters currently passed
into _run and _schema_params and keep the op string (e.g., "metadata INSERT")
while ensuring the query only inserts when resource_id is absent; also apply the
same MERGE/conditional-insert pattern to the analogous code referenced at the
151-172 range to preserve the one-row-per-resource invariant.

In `@datastore/infrastructure/engines/registry.py`:
- Line 67: The helper signatures need concrete types to avoid leaking Any:
annotate _build_engine(engine: str, mode: Mode, *, config: Mapping[str, Any],
context: Optional[Mapping[str, Any]] = None) (or replace Mapping[...] with your
project-specific Config/Context type if one exists) and similarly annotate
warmup_engines(config: Mapping[str, Any]) (and its return type, e.g., None or
Coroutine[...] if async). Import Optional and Mapping/Any from typing and use
the project Config type if present so strict mypy sees concrete parameter types
through the backend construction path.
- Around line 95-100: warmup_engines currently caches engines under
_INSTANCES[(engine, mode)] that may capture a per-request RequestContext and
cause auth/CKAN state leakage; change the registry so only context-free
instances are cached and any engine built with a RequestContext is not stored in
_INSTANCES. Specifically: ensure warmup_engines calls _build_engine with
context=None and stores only context-free engines; update get_datastore_engine
(and the similar block around the other occurrence) to detect when a
RequestContext is provided and avoid writing that instance into _INSTANCES
(instead return a transient engine or clone a context-free engine with the
request context applied), and rely on cached engines only when context is None.
Ensure references: warmup_engines, get_datastore_engine, _INSTANCES, and
_build_engine are updated accordingly.

In `@datastore/main.py`:
- Around line 44-47: The warmed read/write engines created by warmup_engines
aren't being attached to the FastAPI app state; modify the lifespan startup to
store concrete engine instances on app.state (e.g. app.state.rw_engine and
app.state.ro_engine) after calling warmup_engines and ensure the
reset_engine_cache callback clears those same app.state entries; update
warmup_engines (or its caller) to return or assign the actual engine objects so
the /ready endpoint (datastore/api/endpoints/health.py) can call
app.state.rw_engine.healthcheck() and app.state.ro_engine.healthcheck(),
returning 503 if either healthcheck fails.

In `@datastore/schemas/request.py`:
- Around line 280-286: The current model validator _require_resource_id_or_id
allows both resource_id and id to be set and silently prefers resource_id;
change it to enforce an exclusive-or: if both resource_id and id are provided
raise ValueError("only one of 'resource_id' or 'id' may be set"), if neither
provided raise the existing error, and if only id is provided normalize by
assigning self.resource_id = self.id before returning; apply the same XOR
enforcement and normalization change to the corresponding validator method in
DatastoreDeleteRequest so both request types reject ambiguous dual-field
requests and still normalize id -> resource_id when appropriate.

In `@datastore/schemas/validators.py`:
- Around line 186-207: The validate_frictionless_schema function must reject
Frictionless field types outside our canonical vocabulary before calling
Schema.from_descriptor; update validate_frictionless_schema to, when value is a
dict containing a "fields" list, iterate each field and verify its "type" is one
of the canonical types (integer, number, string, boolean, date, datetime, time,
object, array, geopoint, geojson, any) and raise ValueError listing offending
types if any are found; keep the existing Schema.from_descriptor call for full
descriptor validation and do not perform alias normalization here (that belongs
in storage code).
- Around line 117-132: fields_to_frictionless_schema() currently flattens
non-title/description info keys onto the top-level field object (see variables
info, extra, fr), but frictionless_schema_to_fields() only reads back extras
from field["info"], losing flattened keys like unit or notes; update
frictionless_schema_to_fields() so when rebuilding legacy field dicts it
collects any top-level keys on the frictionless field object that are not the
canonical keys (e.g., name, type, title, description, info) and merges them into
the restored info/extras before appending to fr_fields (mirror the inverse of
the extra merging done in fields_to_frictionless_schema()); apply the same logic
where similar reconstruction occurs in the 140-173 range.

In `@datastore/services/write.py`:
- Around line 83-96: The service delete_datastore currently returns a Pydantic
response model DatastoreDeleteResponse.Result; change it to return a plain
Python data structure (e.g., a dict) with the same fields so the service layer
is not coupled to response schemas; locate delete_datastore (and its use of
get_datastore_engine and engine.delete), remove the Pydantic model from the
return and instead return {"resource_id": resource_id, "filters": filters} (or
an equivalent tuple/dataclass) and ensure any callers expect the plain dict
rather than DatastoreDeleteResponse.Result.
- Around line 88-91: The code currently does `filters = data_dict.get("filters")
or None`, which converts empty dicts to None and can change delete semantics;
change it to preserve explicit empty filters by assigning `filters =
data_dict.get("filters")` (or use `data_dict["filters"]` with a key-existence
check) and pass that through to
`get_datastore_engine(...).delete(resource_id=resource_id, filters=filters)` so
only a missing key yields None while `{}` remains `{}`.

In `@tests/test_health.py`:
- Around line 20-23: The function _clean_engine_cache is untyped and fails
strict mypy; add explicit typing by importing typing.Iterator and annotating the
generator as def _clean_engine_cache() -> Iterator[None]: (or Generator[None,
None, None] if you prefer), and annotate any other untyped generator/fixture
defs in the file (the other defs flagged in the review) similarly; ensure you
add the necessary import (from typing import Iterator or Generator) and any
missing parameter/return type annotations to satisfy strict = true.

---

Outside diff comments:
In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 539-585: The search and search_sql methods currently return
hard-coded empty SearchResult objects; implement real BigQuery queries in search
and search_sql: in search(build a parameterised SELECT using the
resource_id/table name, filters, q, distinct, fields, sort, limit and offset),
execute via the BigQuery client (client.query(...)), use query_job.result() as a
lazy iterator to populate records, construct schema from the returned query
schema or provided fields, run a COUNT(*) when include_total is true to set
total, and set records_truncated=True if the iterator reached the limit; in
search_sql(execute the raw SQL with client.query(sql, job_config=...) , derive
schema from query_job.schema/result, yield rows lazily from query_job.result(),
and set records_truncated when the limit is hit). Ensure you reference and
update the SearchResult construction in both search and search_sql to use the
real schema, records iterator, total, and records_truncated values.

---

Minor comments:
In `@CLAUDE.md`:
- Around line 378-379: Replace the outdated status sentence "Engine business
logic is still placeholder (returns empty results / echoes inputs)" with an
accurate summary of current state: note that call path, validation, auth,
streaming and per-engine datastore_search_sql allow-list are implemented and
only the BigQuery adapter remains (referenced in §7); update the same phrasing
wherever it repeats to avoid misleading contributors and ensure the status
reflects what is implemented versus what remains.

In `@README.md`:
- Around line 56-63: README's project tree still calls bigquery/backend.py a
"DatastoreBackend subclass (placeholder)"; update the description to reflect
that the BigQuery backend is implemented (reference the concrete class
BigQueryBackend in bigquery/backend.py) and briefly list what the file provides
(implemented backend logic rather than a placeholder). Keep surrounding entries
(bigquery/client.py, lib.py, allowed_functions.txt) unchanged and ensure the
wording matches the actual implemented functionality.

---

Nitpick comments:
In `@tests/test_health.py`:
- Around line 39-47: Add a small smoke test that requests the OpenAPI UI so
regressions are caught alongside liveness checks: either extend
test_health_returns_ok or add a new test (e.g., test_docs_served) that calls
client.get("/docs") and asserts a 200 response and basic content (e.g.,
response.status_code == 200 and response.headers["content-type"] contains
"text/html" or response.text contains "Swagger" / "ReDoc") to ensure the docs
page loads without errors.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 40646cc2-c6d2-4774-90ba-512c1e517b04

📥 Commits

Reviewing files that changed from the base of the PR and between be3a91a and 2bf0577.

📒 Files selected for processing (44)
  • .env.example
  • .gitignore
  • CLAUDE.md
  • README.md
  • datastore/api/endpoints/datastore.py
  • datastore/api/endpoints/health.py
  • datastore/api/responses.py
  • datastore/core/config.py
  • datastore/core/constants.py
  • datastore/infrastructure/engines/base.py
  • datastore/infrastructure/engines/bigquery/__init__.py
  • datastore/infrastructure/engines/bigquery/backend.py
  • datastore/infrastructure/engines/bigquery/client.py
  • datastore/infrastructure/engines/bigquery/lib.py
  • datastore/infrastructure/engines/bigquery/metadata.py
  • datastore/infrastructure/engines/bigquery/types.py
  • datastore/infrastructure/engines/registry.py
  • datastore/main.py
  • datastore/schemas/request.py
  • datastore/schemas/responses.py
  • datastore/schemas/validators.py
  • datastore/services/read.py
  • datastore/services/streaming.py
  • datastore/services/write.py
  • example_payload/README.md
  • example_payload/datastore_create/with_schema.json
  • example_payload/datastore_delete/force_readonly.json
  • example_payload/datastore_delete/whole_table.json
  • example_payload/datastore_delete/with_filters.json
  • example_payload/datastore_info/basic.json
  • example_payload/datastore_info/with_id_alias.json
  • example_payload/datastore_search_sql/aggregate.json
  • example_payload/datastore_search_sql/basic.json
  • example_payload/datastore_search_sql/with_cte.json
  • tests/conftest.py
  • tests/test_bigquery_metadata.py
  • tests/test_bigquery_tables.py
  • tests/test_datastore_create.py
  • tests/test_datastore_delete.py
  • tests/test_datastore_info.py
  • tests/test_datastore_search.py
  • tests/test_datastore_search_sql.py
  • tests/test_health.py
  • tests/test_write_service.py

Comment thread .env.example
Comment on lines +10 to +18
# Selects the storage backend (must match a folder under
# `datastore/infrastructure/engines/`):
# bigquery — real BigQuery adapter (placeholder while being built).
# ducklake — Future planned
# ducklake — Future planned.
DATASTORE_ENGINE=bigquery
BQ_PROJECT=
BIGQUERY_PROJECT=
BIGQUERY_DATASET=
BIGQUERY_CREDENTIALS=
BIGQUERY_CREDENTIALS_RO=
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep .env.example aligned with the actual settings model.

HTTP_MAX_CONNECTIONS and HTTP_MAX_KEEPALIVE_CONNECTIONS are documented here, but datastore/core/config.py does not declare either field, so pydantic-settings will silently ignore them. INCLUDE_UPDATED_AT was added to Config as well, but operators still don't get an example for that toggle here.

Also applies to: 26-27

🧰 Tools
🪛 dotenv-linter (4.0.0)

[warning] 15-15: [UnorderedKey] The BIGQUERY_PROJECT key should go before the DATASTORE_ENGINE key

(UnorderedKey)


[warning] 16-16: [UnorderedKey] The BIGQUERY_DATASET key should go before the BIGQUERY_PROJECT key

(UnorderedKey)


[warning] 17-17: [UnorderedKey] The BIGQUERY_CREDENTIALS key should go before the BIGQUERY_DATASET key

(UnorderedKey)


[warning] 18-18: [UnorderedKey] The BIGQUERY_CREDENTIALS_RO key should go before the BIGQUERY_DATASET key

(UnorderedKey)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.env.example around lines 10 - 18, The .env.example is missing entries that
match the fields declared in datastore/core/config.py; add entries for
HTTP_MAX_CONNECTIONS, HTTP_MAX_KEEPALIVE_CONNECTIONS, and INCLUDE_UPDATED_AT
(with example values or blanks) using the exact variable names used in Config so
pydantic-settings doesn't ignore them, and update the other relevant section
mentioned in the comment to include the same variables for consistency.

Comment on lines +47 to +61
if failing:
return JSONResponse(
status_code=503,
content={
"help": str(request.url),
"success": False,
"error": {
"__type": "Service Unavailable",
"message": (
f"engine healthcheck failed for mode(s): "
f"{', '.join(failing)}"
),
},
},
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep /ready failure responses in CKAN health envelope shape.

The 503 branch returns an error object instead of a result object, which breaks the required health endpoint response contract.

As per coding guidelines "datastore/api/endpoints/health.py: Health endpoints (/, /health, /ready) must return CKAN envelope shape: {help, success, result: {...}}".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/api/endpoints/health.py` around lines 47 - 61, The 503 branch in
datastore/api/endpoints/health.py returns an "error" key instead of the CKAN
envelope "result" object; update the JSONResponse content for the failing branch
to use the CKAN envelope shape { "help": str(request.url), "success": False,
"result": { ... } } (move the existing "__type" and "message" fields currently
under "error" into "result"), keep status_code=503 and the same message building
that uses failing and request.url so the /ready response conforms to the
required contract.

Comment thread datastore/core/constants.py
Comment thread datastore/infrastructure/engines/base.py Outdated
Comment thread datastore/infrastructure/engines/bigquery/backend.py
Comment thread datastore/schemas/validators.py
Comment on lines +150 to +168
async def info_datastore(
context: RequestContext, data_dict: dict[str, Any]
) -> DatastoreInfoResponse.Result:
"""Look up table metadata for a single `resource_id`.

Endpoint authorizes the caller first (same gate as `search`). This
service just asks the read-only engine for its `InfoResult` and
re-shapes it as the response's typed `Result`. No streaming —
`info` responses are small enough for the standard `_success_response`
path.
"""
engine = get_datastore_engine(context, mode="ro")
result = engine.info(resource_id=data_dict["resource_id"])
fields, _ = frictionless_schema_to_fields(result.schema)
return DatastoreInfoResponse.Result(
meta=result.meta,
schema=result.schema,
fields=fields,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

info_datastore should return a plain dict/dataclass, not DatastoreInfoResponse.Result.

Line 152 and Line 164 couple the service layer to API response models. Keep response-model construction at the API boundary to preserve one-way layering.

As per coding guidelines: "{datastore/services,datastore/infrastructure/engines}/**/*.py: Use plain dicts, tuples, and dataclasses for data passed between services and engines—never Pydantic models."

Comment on lines +83 to +96
async def delete_datastore(
context: RequestContext, data_dict: dict[str, Any]
) -> DatastoreDeleteResponse.Result:
"""Delete rows matching `filters`, or drop the whole table."""
resource_id = data_dict["resource_id"]
filters = data_dict.get("filters") or None

engine = get_datastore_engine(context, mode="rw")
engine.delete(resource_id=resource_id, filters=filters)

return DatastoreDeleteResponse.Result(
resource_id=resource_id,
filters=filters,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Service layer should return plain data structures, not Pydantic Result models.

delete_datastore returns DatastoreDeleteResponse.Result, which couples services to response-schema models. That breaks the layer contract and makes services API-shape aware.

As per coding guidelines: "{datastore/services,datastore/infrastructure/engines}/**/*.py: Use plain dicts, tuples, and dataclasses for data passed between services and engines—never Pydantic models."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/services/write.py` around lines 83 - 96, The service
delete_datastore currently returns a Pydantic response model
DatastoreDeleteResponse.Result; change it to return a plain Python data
structure (e.g., a dict) with the same fields so the service layer is not
coupled to response schemas; locate delete_datastore (and its use of
get_datastore_engine and engine.delete), remove the Pydantic model from the
return and instead return {"resource_id": resource_id, "filters": filters} (or
an equivalent tuple/dataclass) and ensure any callers expect the plain dict
rather than DatastoreDeleteResponse.Result.

Comment thread datastore/services/write.py Outdated
Comment thread tests/test_health.py Outdated
sagargg and others added 2 commits May 21, 2026 14:34
… richer _links

- New `bigquery/search.py` builders: parameterised SELECT (typed filter binds, IN UNNEST for list filters, native `SEARCH()` for full-text on whole row or per-column), validated sort/projection, and a matching COUNT subquery for filtered/distinct totals.
- Backend dispatches search + count jobs before awaiting either, so they run in parallel — wall time ≈ max(both). Unfiltered+non-distinct totals fall back to the cheap row-count helper.
- Schema-level limit cap dropped; new `Config.SEARCH_RESULT_ROWS_MAX` (default 32000) is the env-driven ceiling enforced at the service boundary with a "paginate with offset" hint.
- `_links` now carries conditional `prev` / `next` plus `page_size`, `page`, and `total_pages`. `page` / `total_pages` are suppressed when the current page has no rows (empty resource or past-end) so a UI never sees an incoherent "page 5 of 4".

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… defaults

- Don't silently coerce `filters={}` to None on delete; an empty dict now reaches the engine as-is so the contract distinguishes "no-WHERE delete" from "drop table".
- Reject `_id` / `_updated_at` in user-provided field/schema names instead of silently dropping them — a hidden drop would leave the response advertising columns the engine refuses to populate.
- Reject non-canonical Frictionless types (`duration`, `year`, `yearmonth`, …) at the request boundary and trim them from the type maps; the datastore commits to a narrower vocabulary.
- 400 when `resource_id` and `id` arrive with different values on `datastore_info` / `datastore_delete` (legacy clients echoing the same value still work).
- `/ready` 503 stays in the StatusResponse envelope (`result.status: "not_ready"`) instead of an error envelope; mode names no longer leak into the response.
- Mount `/health` and `/ready` under both `/` and `/api/3/action/` so k8s probes and CKAN clients both reach them; welcome stays root-only.
- `Config` and `.env.example` realigned: drop the orphan `HTTP_MAX_*` from the template, document `INCLUDE_UPDATED_AT` + `SEARCH_RESULT_ROWS_MAX`.
- `healthcheck()` fails when the dataset / metadata store is unconfigured (was passing on `SELECT 1` even when writes would silently no-op).
- Preserve all `info` keys on the legacy ⇄ Frictionless roundtrip (extras now nest under `info` instead of being flattened and lost).
- Type annotations across base.py, registry.py helpers, and test_health.py to stop leaking `Any` under strict mypy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
datastore/services/read.py (1)

104-105: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

datastore_search_sql ignores the configurable row cap.

search_sql_datastore still hardcodes 32000 via _SQL_DEFAULT_LIMIT (Line 104, Line 140, Line 149), so Config.SEARCH_RESULT_ROWS_MAX does not govern SQL search results.

Suggested fix
-_SQL_DEFAULT_LIMIT = 32000
-
-
 async def search_sql_datastore(
@@
-    result = engine.search_sql(
-        sql=data_dict["sql"], limit=_SQL_DEFAULT_LIMIT
-    )
+    max_limit = context.config.SEARCH_RESULT_ROWS_MAX
+    result = engine.search_sql(sql=data_dict["sql"], limit=max_limit)
@@
-        limit=_SQL_DEFAULT_LIMIT,
+        limit=max_limit,
@@
-        links=_build_pagination_links(
-            request_url, limit=_SQL_DEFAULT_LIMIT, offset=0, total=None,
-        ),
+        links=_build_pagination_links(
+            request_url, limit=max_limit, offset=0, total=None,
+        ),

Also applies to: 140-155

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/services/read.py` around lines 104 - 105, The SQL search path
currently uses the hardcoded _SQL_DEFAULT_LIMIT (32000) which bypasses the
runtime configuration; update datastore_search_sql to derive its limit from
Config.SEARCH_RESULT_ROWS_MAX (with a sensible fallback if unset) instead of
using _SQL_DEFAULT_LIMIT, and replace uses of _SQL_DEFAULT_LIMIT in the
query-building/limit-clamping logic so the effective LIMIT value is calculated
from Config.SEARCH_RESULT_ROWS_MAX (and still enforces any minimum/maximum
checks); ensure references in datastore_search_sql and any helper that currently
reference _SQL_DEFAULT_LIMIT are switched to the new config-driven value.
datastore/infrastructure/engines/bigquery/backend.py (2)

465-470: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

create() returns a dict instead of WriteResult.

The method signature declares -> WriteResult but returns a plain dict. This breaks the type contract and will fail strict type checking. The same issue exists in upsert() at lines 503-509 and 531-537.

Proposed fix for create()
-        return {
-            "schema": schema,
-            "records": records,
-            "include_total": include_total,
-            "total": len(records or []) if include_total else None,
-        }
+        return WriteResult(
+            rows_written=len(records or []),
+            total=len(records or []) if include_total else None,
+        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 465 - 470,
The create() and upsert() functions currently return a plain dict but declare ->
WriteResult; update both to return an actual WriteResult instance instead of a
dict: import or reference the WriteResult class/type and construct
WriteResult(schema=schema, records=records, include_total=include_total,
total=(len(records or []) if include_total else None)) (apply the same change in
both create() and the two upsert() return sites) so the return value matches the
declared type and passes static typing.

501-537: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

upsert() returns a dict instead of WriteResult.

Both placeholder-mode (lines 503-509) and normal (lines 531-537) return paths return plain dicts instead of WriteResult dataclass instances, breaking the declared return type.

Proposed fix
         if self.metadata is None:
             # Placeholder mode — echo (matches the create() pattern).
-            return {
-                "resource_id": resource_id,
-                "records": records,
-                "method": method,
-                "include_total": include_total,
-                "total": len(records or []),
-            }
+            return WriteResult(
+                rows_written=len(records or []),
+                total=len(records or []) if include_total else None,
+            )
         ...
-        return {
-            "resource_id": resource_id,
-            "records": records,
-            "method": method,
-            "include_total": include_total,
-            "total": len(rows) if include_total else None,
-        }
+        return WriteResult(
+            rows_written=len(rows),
+            total=len(rows) if include_total else None,
+        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 501 - 537,
The upsert() implementation currently returns plain dicts in both placeholder
mode and the normal path; change both return statements to construct and return
a WriteResult dataclass instance (instead of a dict) with the same fields
(resource_id, records, method, include_total, total). Locate the upsert method
and replace the first return block that handles placeholder mode and the final
return block after calling _insert_records/_merge_records/_update_records to
return WriteResult(...) with the appropriate field values (compute total as
len(rows) or len(records or []) as before). Ensure WriteResult is imported where
needed and that the object shape matches the declared return type.
♻️ Duplicate comments (2)
datastore/services/write.py (1)

83-103: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return plain data from the service instead of DatastoreDeleteResponse.Result.

Line 85 and Line 100 keep this service coupled to response-schema models. Return a plain dict (or dataclass) from delete_datastore, and let the API layer shape it into the response model.

Suggested minimal change
-async def delete_datastore(
-    context: RequestContext, data_dict: dict[str, Any]
-) -> DatastoreDeleteResponse.Result:
+async def delete_datastore(
+    context: RequestContext, data_dict: dict[str, Any]
+) -> dict[str, Any]:
@@
-    return DatastoreDeleteResponse.Result(
-        resource_id=resource_id,
-        filters=filters,
-    )
+    return {
+        "resource_id": resource_id,
+        "filters": filters,
+    }

As per coding guidelines, "datastore/{services,infrastructure}/**/*.py: Use Pydantic v2 for request/response validation at boundaries only; never pass Pydantic models between services or return them from engines."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/services/write.py` around lines 83 - 103, The service function
delete_datastore currently returns a Pydantic response model
DatastoreDeleteResponse.Result, coupling the service layer to the API schema;
change it to return plain data (a dict or simple dataclass) containing the same
fields (resource_id and filters) instead. Locate delete_datastore, keep the call
to get_datastore_engine(...) and engine.delete(resource_id=..., filters=...)
unchanged, then construct and return a plain Python dict (or dataclass instance)
with keys resource_id and filters instead of DatastoreDeleteResponse.Result so
the API layer can perform Pydantic v2 shaping.
datastore/schemas/validators.py (1)

133-134: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Extra metadata still spread onto field top-level, contradicting docstring and breaking round-trip.

The docstring at lines 104-109 states extras "stays nested under a custom info key", but line 134 spreads extra onto the top-level field object (fr = {**fr, **extra}). This means frictionless_schema_to_fields() (which only reads from field["info"]) cannot recover these extras, losing keys like unit, notes, etc.

Proposed fix
         if extra:
-            fr = {**fr, **extra}
+            fr["info"] = extra
         fr_fields.append(fr)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/schemas/validators.py` around lines 133 - 134, The extras dict is
being spread onto the field top-level (fr = {**fr, **extra}) which contradicts
the docstring and prevents frictionless_schema_to_fields() from round-tripping
extras from field["info"]; instead merge extras into the field's "info" sub-dict
so existing info is preserved (e.g., set fr["info"] = {**fr.get("info", {}),
**extra}) and remove the top-level spread; update the block around the function
that constructs fr to nest extra under "info" rather than merging into fr
directly.
🧹 Nitpick comments (1)
tests/test_bigquery_tables.py (1)

1060-1070: 💤 Low value

Test comment references __TABLES__ but implementation uses COUNT(*).

The comment at line 1062 mentions "__TABLES__/_count_rows path" but the actual _count_rows implementation now uses SELECT COUNT(*) AS n FROM <table>, not BigQuery's __TABLES__ metadata. Consider updating the comment for accuracy.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_bigquery_tables.py` around lines 1060 - 1070, The test docstring
mentions the `__TABLES__`/`_count_rows` path but the current `_count_rows`
implementation uses a `SELECT COUNT(*) AS n FROM <table>` query; update the
`test_needs_count_query_only_when_filtering_or_distinct` docstring (or inline
comment) to accurately describe that unfiltered + non-distinct searches use the
cheaper COUNT(*) path (or remove the `__TABLES__` reference) and keep references
to `needs_count_query` and `_count_rows` so reviewers can locate the related
logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 465-470: The create() and upsert() functions currently return a
plain dict but declare -> WriteResult; update both to return an actual
WriteResult instance instead of a dict: import or reference the WriteResult
class/type and construct WriteResult(schema=schema, records=records,
include_total=include_total, total=(len(records or []) if include_total else
None)) (apply the same change in both create() and the two upsert() return
sites) so the return value matches the declared type and passes static typing.
- Around line 501-537: The upsert() implementation currently returns plain dicts
in both placeholder mode and the normal path; change both return statements to
construct and return a WriteResult dataclass instance (instead of a dict) with
the same fields (resource_id, records, method, include_total, total). Locate the
upsert method and replace the first return block that handles placeholder mode
and the final return block after calling
_insert_records/_merge_records/_update_records to return WriteResult(...) with
the appropriate field values (compute total as len(rows) or len(records or [])
as before). Ensure WriteResult is imported where needed and that the object
shape matches the declared return type.

In `@datastore/services/read.py`:
- Around line 104-105: The SQL search path currently uses the hardcoded
_SQL_DEFAULT_LIMIT (32000) which bypasses the runtime configuration; update
datastore_search_sql to derive its limit from Config.SEARCH_RESULT_ROWS_MAX
(with a sensible fallback if unset) instead of using _SQL_DEFAULT_LIMIT, and
replace uses of _SQL_DEFAULT_LIMIT in the query-building/limit-clamping logic so
the effective LIMIT value is calculated from Config.SEARCH_RESULT_ROWS_MAX (and
still enforces any minimum/maximum checks); ensure references in
datastore_search_sql and any helper that currently reference _SQL_DEFAULT_LIMIT
are switched to the new config-driven value.

---

Duplicate comments:
In `@datastore/schemas/validators.py`:
- Around line 133-134: The extras dict is being spread onto the field top-level
(fr = {**fr, **extra}) which contradicts the docstring and prevents
frictionless_schema_to_fields() from round-tripping extras from field["info"];
instead merge extras into the field's "info" sub-dict so existing info is
preserved (e.g., set fr["info"] = {**fr.get("info", {}), **extra}) and remove
the top-level spread; update the block around the function that constructs fr to
nest extra under "info" rather than merging into fr directly.

In `@datastore/services/write.py`:
- Around line 83-103: The service function delete_datastore currently returns a
Pydantic response model DatastoreDeleteResponse.Result, coupling the service
layer to the API schema; change it to return plain data (a dict or simple
dataclass) containing the same fields (resource_id and filters) instead. Locate
delete_datastore, keep the call to get_datastore_engine(...) and
engine.delete(resource_id=..., filters=...) unchanged, then construct and return
a plain Python dict (or dataclass instance) with keys resource_id and filters
instead of DatastoreDeleteResponse.Result so the API layer can perform Pydantic
v2 shaping.

---

Nitpick comments:
In `@tests/test_bigquery_tables.py`:
- Around line 1060-1070: The test docstring mentions the
`__TABLES__`/`_count_rows` path but the current `_count_rows` implementation
uses a `SELECT COUNT(*) AS n FROM <table>` query; update the
`test_needs_count_query_only_when_filtering_or_distinct` docstring (or inline
comment) to accurately describe that unfiltered + non-distinct searches use the
cheaper COUNT(*) path (or remove the `__TABLES__` reference) and keep references
to `needs_count_query` and `_count_rows` so reviewers can locate the related
logic.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8d9d8da2-9761-48ea-9fe6-1b5c8ffa3d4b

📥 Commits

Reviewing files that changed from the base of the PR and between 2bf0577 and f13589c.

📒 Files selected for processing (22)
  • .env.example
  • datastore/api/endpoints/health.py
  • datastore/api/routes.py
  • datastore/core/config.py
  • datastore/core/constants.py
  • datastore/infrastructure/engines/base.py
  • datastore/infrastructure/engines/bigquery/backend.py
  • datastore/infrastructure/engines/bigquery/client.py
  • datastore/infrastructure/engines/bigquery/search.py
  • datastore/infrastructure/engines/registry.py
  • datastore/schemas/request.py
  • datastore/schemas/responses.py
  • datastore/schemas/validators.py
  • datastore/services/read.py
  • datastore/services/write.py
  • tests/test_bigquery_tables.py
  • tests/test_datastore_delete.py
  • tests/test_datastore_info.py
  • tests/test_datastore_search.py
  • tests/test_datastore_search_sql.py
  • tests/test_health.py
  • tests/test_read_service.py

sagargg and others added 7 commits May 21, 2026 17:30
`datastore_delete` now supports three mutually-exclusive operations,
picked by which request fields are set:

- both `filters` and `fields` omitted → DROP TABLE + remove metadata
  row (resource disappears entirely)
- `filters` (incl. `{}` for "all rows") → parameterised `DELETE FROM
  ... WHERE ...` with typed param binds from the stored schema
- `fields` → `ALTER TABLE DROP COLUMN ...` + rewrite the stored schema

Guardrails: filters and fields together are 400; empty `fields` list
is 400; dropping system columns (`_id`, `_updated_at`) or primary-key
columns is rejected with a clear ValidationError instead of silently
breaking subsequent upserts. Unknown columns and JSON/array/geojson
filter columns also produce 400s before any DDL/DML runs.

Adds an example payload for the column-drop case and updates the
example index.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…onymous reads

- Parse LIMIT (required) and OFFSET (default 0) out of the user's SQL and surface them on the response; reject requests without LIMIT or above `SEARCH_RESULT_ROWS_MAX` at the schema boundary.
- Rewrite table refs to fully-qualified `project.dataset.table` BigQuery syntax before submission; emit BigQuery-dialect SQL with backticks regardless of input quoting style.
- Pick the cheapest viable path for `total`: plain `SELECT cols FROM table` reads `INFORMATION_SCHEMA.TABLE_STORAGE.total_rows` (free metadata, no bytes scanned); anything that filters / joins / aggregates falls through to `COUNT(*) FROM (<inner with LIMIT/OFFSET stripped>)`. COUNT runs in parallel with the data query.
- `_links` rewrites the SQL's OFFSET via sqlglot to produce real `next` / `prev` URLs the caller can follow without editing the SQL themselves; echo `page_size`, `page`, `total_pages` per the search-pagination contract.
- Echo the original SQL on the response (`result.sql`) so callers can see what actually ran for the current page.
- Anonymous reads: missing api_key on `read` permission no longer short-circuits — request is forwarded to CKAN's `datastore_authorize` which decides based on resource visibility. Writes still require an authenticated user.
- Tests updated to cover: LIMIT enforcement, OFFSET parsing, sql echo, cheap vs subquery COUNT, anonymous read flow, anonymous write rejection.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Drop example_payload/ in favor of a single self-contained postman/ folder
with README, collection.json, and generator script — one place to import
the collection and one place to regenerate it.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…ne>/

Move BigQuery-only unit tests into tests/engines/bigquery/ and stub
tests/engines/ducklake/ for the upcoming DuckLake backend. Engine-agnostic
endpoint/service tests stay at tests/ root. Also add a small assertion that
datastore_search_sql echoes the original SQL in the response.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Drop AUTH_ENABLED; AUTH_TYPE picks a provider under datastore/auth/.
CKAN auth keeps its TTL cache; jwt + anonymous don't need one.
Datastore runs standalone (no CKAN) under jwt / anonymous.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Funnel every read-path QueryJobConfig (datastore_search, datastore_search_sql,
datastore_info) through `_read_job_config`, which sets `use_query_cache=True`
by default. BigQuery's 24h cache makes identical SELECTs free + fast; DML
writes auto-invalidate so reads always reflect the latest state.
New `BIGQUERY_USE_QUERY_CACHE` env (default true) opts back out for
freshness-sensitive deployments or integration tests.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
datastore/infrastructure/engines/bigquery/backend.py (2)

467-500: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return type mismatch: create() returns dict but ABC declares WriteResult.

The method signature in DatastoreBackend ABC (lines 98-116 in base.py) declares -> WriteResult, but this implementation returns a plain dict. This will pass at runtime due to duck typing but violates the contract and will fail mypy strict mode.

🐛 Proposed fix
-        return {
-            "schema": schema,
-            "records": records,
-            "include_total": include_total,
-            "total": len(records or []) if include_total else None,
-        }
+        return WriteResult(
+            rows_written=len(records or []),
+            total=len(records or []) if include_total else None,
+        )

Based on learnings: "MyPy strict mode must be enabled in pyproject.toml; all modules must pass mypy --strict".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 467 - 500,
The create() implementation in the BigQuery backend returns a plain dict but
must return the ABC's WriteResult type; update the return to construct and
return a WriteResult instance (import WriteResult from the module that defines
it) populated with schema, records, include_total, and total (compute total as
len(records or []) when include_total is True, else None), and ensure the
function signature/type hints remain -> WriteResult so mypy --strict accepts it;
also verify any fields/attribute names match the WriteResult dataclass/TypedDict
exactly (adjust keys to match WriteResult's attributes if different).

502-567: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return type mismatch: upsert() returns dict but ABC declares WriteResult.

Both the placeholder mode (lines 533-539) and the real mode (lines 561-567) return plain dicts instead of WriteResult. This violates the ABC contract and will fail mypy strict mode.

🐛 Proposed fix for both branches
         if self.metadata is None:
             # Placeholder mode — echo (matches the create() pattern).
-            return {
-                "resource_id": resource_id,
-                "records": records,
-                "method": method,
-                "include_total": include_total,
-                "total": len(records or []),
-            }
+            return WriteResult(
+                rows_written=len(records or []),
+                total=len(records or []),
+            )
         ...
-        return {
-            "resource_id": resource_id,
-            "records": records,
-            "method": method,
-            "include_total": include_total,
-            "total": len(rows) if include_total else None,
-        }
+        return WriteResult(
+            rows_written=len(rows),
+            total=len(rows) if include_total else None,
+        )

Based on learnings: "MyPy strict mode must be enabled in pyproject.toml; all modules must pass mypy --strict".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 502 - 567,
The upsert method returns plain dicts but must return a WriteResult per the ABC;
update the upsert implementation (function upsert) to construct and return an
actual WriteResult object (or the concrete dataclass/type used across the repo)
in both the placeholder branch (where self.metadata is None) and the
normal-return branch at the end, preserving the same fields (resource_id,
records, method, include_total, total) but typed as WriteResult; reference the
WriteResult type used by the datastore ABC and ensure any necessary import is
added and mypy-compatible types (e.g., Optional[int] for total when
include_total is False) are used.
datastore/schemas/validators.py (1)

121-135: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

info metadata is lost on legacy ⇄ Frictionless round-trip.

The docstring (lines 104-109) claims extras are nested under info, but line 133-134 spreads them onto the top-level field object: fr = {**fr, **extra}. The inverse function frictionless_schema_to_fields only looks for extras under field["info"] (lines 172-174), so keys like unit or notes are silently dropped.

🐛 Proposed fix
         if extra:
-            fr = {**fr, **extra}
+            fr["info"] = extra
         fr_fields.append(fr)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/schemas/validators.py` around lines 121 - 135, The code in the
info-merging loop inside validators.py (where fr is built and appended to
fr_fields) incorrectly spreads extra metadata onto the field top-level (fr =
{**fr, **extra}) causing legacy info keys to be lost on round-trip; change the
merge to place extras under fr["info"] (merge into existing fr.get("info", {})
while still skipping info["type"] and preserving title/description behavior) so
that frictionless_schema_to_fields can find extras via field["info"] without
overwriting existing info keys.
postman/README.md (1)

47-48: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Incorrect environment variable name.

The documentation references AUTH_ENABLED=false, but based on the auth provider registry, the correct setting for disabling auth in local dev would be AUTH_TYPE=anonymous.

📝 Suggested fix
-- Writes need `apiKey`, or set `AUTH_ENABLED=false` in `.env` for local dev.
+- Writes need `apiKey`, or set `AUTH_TYPE=anonymous` in `.env` for local dev.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@postman/README.md` around lines 47 - 48, Update the README note that
currently says to set AUTH_ENABLED=false; change it to instruct developers to
set AUTH_TYPE=anonymous to disable auth in local dev (replace the incorrect
environment variable AUTH_ENABLED with AUTH_TYPE=anonymous in the sentence that
references disabling auth).
tests/test_datastore_search_sql.py (1)

217-218: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Extra-param test can pass for the wrong reason (missing LIMIT).

Use a valid SQL shape so this test specifically verifies unknown-query-param rejection.

Suggested fix
-    response = client.get(SQL_URL, params={"sql": "SELECT 1", "limit": 10})
+    response = client.get(SQL_URL, params={"sql": "SELECT 1 LIMIT 1", "limit": 10})
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_datastore_search_sql.py` around lines 217 - 218, The test
currently sends params to SQL_URL with an SQL that lacks a LIMIT, so the
"extra-param" check can pass for the wrong reason; update the request so the SQL
string itself has a valid shape including a LIMIT clause (so the server would
accept the SQL by shape) while still sending the extra query param (the 'limit'
key in the params dict) to trigger the unknown-query-param rejection; change the
params in the client.get call that constructs the request (the params dict used
with SQL_URL in this test) to use an SQL containing a LIMIT and keep the extra
'limit' param unchanged so the test specifically verifies rejection of unknown
query parameters.
🧹 Nitpick comments (1)
postman/generate_postman.py (1)

301-302: ⚡ Quick win

Consider adding error handling for malformed JSON files.

If an example payload file contains invalid JSON, json.load() will raise a JSONDecodeError without indicating which file failed. Adding error handling would improve the developer experience when maintaining example files.

♻️ Proposed enhancement
-        with payload_file.open() as f:
-            body = json.load(f)
+        try:
+            with payload_file.open() as f:
+                body = json.load(f)
+        except json.JSONDecodeError as e:
+            print(f"⚠️  Skipping {payload_file.relative_to(REPO)}: {e}")
+            continue
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@postman/generate_postman.py` around lines 301 - 302, The code currently calls
json.load(...) when reading example payload files without catching
JSONDecodeError, so a malformed JSON will raise an exception without filename
context; wrap the json.load call in a try/except that catches
json.JSONDecodeError (import it from the json module), log or raise a new error
that includes the filename and the original exception message, and optionally
include the path variable used when opening files (e.g., the filename variable
in the loop that reads examples) so developers know which example broke; apply
this change where json.load is invoked (the json.load(...) call in
generate_postman.py) to provide a clear, contextual error message.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@datastore/auth/base.py`:
- Around line 54-64: The code extracts jti from an unverified JWT payload (the
credential split -> parts[1] base64 decode + orjson.loads flow) which can be
spoofed; update this to NOT derive cache identity from the raw payload: either
verify the token signature using your project's JWT verification routine (use
the same verifier used elsewhere) and only read payload.get("jti") after
successful verification, or stop using jti from this path and fall back to a
safe key (e.g., user id from verified auth context or the full verified token
ID). In short, replace the unverified base64/orjson path with a call to the
proper token validation function and only use jti after that verification,
referencing the credential variable and jti extraction logic to locate the
change.

In `@datastore/auth/ckan/provider.py`:
- Around line 70-77: The Decision being cached currently includes raw
credentials in Decision.subject (constructed with credential) and is persisted
via _decision_to_bytes/cache_key; instead, change the Decision construction in
the block that creates decision = Decision(...) so that subject does NOT contain
the raw token/key — use a non-sensitive identifier (e.g., credential id,
username, or a one-way hash of the credential) or omit the subject entirely, and
ensure _decision_to_bytes and any consumers expect/handle the new non-sensitive
subject shape; update any tests or call sites that rely on Decision.subject
containing the secret accordingly and verify cached bytes no longer include the
raw credential.
- Around line 53-59: The cached decode can raise in
_decision_from_bytes(cached); instead of letting that exception block
authorization, wrap the call to _decision_from_bytes(cached) in a try/except,
catch decoding errors (Exception or a more specific decode error if available),
log the failure (include the error and context: scope, target, permission), and
treat it as a cache miss by not returning and allowing the function to fall
through to the CKAN lookup; optionally purge the bad cache entry (referencing
the same cache key) when an exception occurs.

In `@postman/generate_postman.py`:
- Around line 1-14: Update the module docstring at the top of
postman/generate_postman.py to show the correct invocation path: replace the
incorrect "python scripts/generate_postman.py" text with "python
postman/generate_postman.py" (or the preferred CLI invocation) so the usage
example in the docstring matches the file location; locate the top-level module
docstring in postman/generate_postman.py and edit that string accordingly.
- Around line 76-89: The query string builder is concatenating raw key=value
pairs (including JSON strings from _get_request) without URL-encoding values;
update the code that constructs the query string (the function that assembles
key/value pairs in generate_postman.py) to URL-encode parameter values using
urllib.parse.quote_plus (or quote) before concatenation, and ensure you import
urllib.parse; apply encoding to any value produced by _get_request as well so
special characters like &,=,?,[,] are escaped.
- Line 25: Fix the typo in the OUT_FILE constant and reconcile the docstring
path: change OUT_FILE (currently defined as REPO / "ollection.json") to the
intended filename and location (either REPO / "collection.json" if you want it
in the repo root or Path("postman") / "datastore-api.postman_collection.json" to
match the docstring). Update the module docstring if you choose the shorter
repo-root filename so it reflects the actual output path. Ensure any uses of
OUT_FILE in the script (e.g., open/save calls) remain correct after renaming.

In `@README.md`:
- Around line 223-236: Add a table row documenting BIGQUERY_DATASET (the missing
env var present in .env.example) in the README env var table: include the
variable name BIGQUERY_DATASET, default/empty state, that it is the BigQuery
dataset ID used by the datastore and is required when DATASTORE_ENGINE=bigquery
(explain empty → /ready returns 503 or similar warning), and note it is used
alongside BIGQUERY_PROJECT and BIGQUERY_CREDENTIALS to avoid deployment
misconfiguration.

In `@tests/test_datastore_create.py`:
- Around line 225-228: The test currently sends an empty Authorization header
which tests an empty-token path instead of a missing-header path; update the
client.post call that posts to CREATE_URL with _valid_payload_with_resource_id()
so it does not include the headers parameter (i.e., remove
headers={"Authorization": ""}) so the request is sent with no Authorization
header at all, ensuring the test exercises the missing-header behavior.

In `@tests/test_datastore_search.py`:
- Around line 256-259: Test "No Authorization header" is incorrectly sending an
Authorization header with an empty string; update the client.get call that uses
SEARCH_URL and _params() so it does not pass the headers argument at all (remove
headers={"Authorization": ""}) to exercise the anonymous code path and ensure
the request is sent without any Authorization header.

---

Outside diff comments:
In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 467-500: The create() implementation in the BigQuery backend
returns a plain dict but must return the ABC's WriteResult type; update the
return to construct and return a WriteResult instance (import WriteResult from
the module that defines it) populated with schema, records, include_total, and
total (compute total as len(records or []) when include_total is True, else
None), and ensure the function signature/type hints remain -> WriteResult so
mypy --strict accepts it; also verify any fields/attribute names match the
WriteResult dataclass/TypedDict exactly (adjust keys to match WriteResult's
attributes if different).
- Around line 502-567: The upsert method returns plain dicts but must return a
WriteResult per the ABC; update the upsert implementation (function upsert) to
construct and return an actual WriteResult object (or the concrete
dataclass/type used across the repo) in both the placeholder branch (where
self.metadata is None) and the normal-return branch at the end, preserving the
same fields (resource_id, records, method, include_total, total) but typed as
WriteResult; reference the WriteResult type used by the datastore ABC and ensure
any necessary import is added and mypy-compatible types (e.g., Optional[int] for
total when include_total is False) are used.

In `@datastore/schemas/validators.py`:
- Around line 121-135: The code in the info-merging loop inside validators.py
(where fr is built and appended to fr_fields) incorrectly spreads extra metadata
onto the field top-level (fr = {**fr, **extra}) causing legacy info keys to be
lost on round-trip; change the merge to place extras under fr["info"] (merge
into existing fr.get("info", {}) while still skipping info["type"] and
preserving title/description behavior) so that frictionless_schema_to_fields can
find extras via field["info"] without overwriting existing info keys.

In `@postman/README.md`:
- Around line 47-48: Update the README note that currently says to set
AUTH_ENABLED=false; change it to instruct developers to set AUTH_TYPE=anonymous
to disable auth in local dev (replace the incorrect environment variable
AUTH_ENABLED with AUTH_TYPE=anonymous in the sentence that references disabling
auth).

In `@tests/test_datastore_search_sql.py`:
- Around line 217-218: The test currently sends params to SQL_URL with an SQL
that lacks a LIMIT, so the "extra-param" check can pass for the wrong reason;
update the request so the SQL string itself has a valid shape including a LIMIT
clause (so the server would accept the SQL by shape) while still sending the
extra query param (the 'limit' key in the params dict) to trigger the
unknown-query-param rejection; change the params in the client.get call that
constructs the request (the params dict used with SQL_URL in this test) to use
an SQL containing a LIMIT and keep the extra 'limit' param unchanged so the test
specifically verifies rejection of unknown query parameters.

---

Nitpick comments:
In `@postman/generate_postman.py`:
- Around line 301-302: The code currently calls json.load(...) when reading
example payload files without catching JSONDecodeError, so a malformed JSON will
raise an exception without filename context; wrap the json.load call in a
try/except that catches json.JSONDecodeError (import it from the json module),
log or raise a new error that includes the filename and the original exception
message, and optionally include the path variable used when opening files (e.g.,
the filename variable in the loop that reads examples) so developers know which
example broke; apply this change where json.load is invoked (the json.load(...)
call in generate_postman.py) to provide a clear, contextual error message.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1ceda456-05d2-453c-8d14-3fdbf27707f8

📥 Commits

Reviewing files that changed from the base of the PR and between 2bf0577 and ebe2d57.

📒 Files selected for processing (71)
  • .env.example
  • CLAUDE.md
  • README.md
  • datastore/api/auth.py
  • datastore/api/context.py
  • datastore/api/endpoints/datastore.py
  • datastore/api/endpoints/health.py
  • datastore/api/routes.py
  • datastore/auth/__init__.py
  • datastore/auth/anonymous/__init__.py
  • datastore/auth/anonymous/provider.py
  • datastore/auth/base.py
  • datastore/auth/ckan/__init__.py
  • datastore/auth/ckan/provider.py
  • datastore/auth/jwt/__init__.py
  • datastore/auth/jwt/provider.py
  • datastore/auth/registry.py
  • datastore/core/config.py
  • datastore/core/constants.py
  • datastore/infrastructure/engines/base.py
  • datastore/infrastructure/engines/bigquery/backend.py
  • datastore/infrastructure/engines/bigquery/client.py
  • datastore/infrastructure/engines/bigquery/lib.py
  • datastore/infrastructure/engines/bigquery/search.py
  • datastore/infrastructure/engines/bigquery/types.py
  • datastore/infrastructure/engines/registry.py
  • datastore/main.py
  • datastore/schemas/request.py
  • datastore/schemas/responses.py
  • datastore/schemas/validators.py
  • datastore/services/read.py
  • datastore/services/streaming.py
  • datastore/services/write.py
  • example_payload/README.md
  • example_payload/datastore_create/with_resource.json
  • example_payload/datastore_create/with_resource_id.json
  • example_payload/datastore_search/basic.json
  • example_payload/datastore_search/paginated_sorted.json
  • example_payload/datastore_search/response.json
  • example_payload/datastore_search/with_filters.json
  • example_payload/datastore_search/with_full_text.json
  • example_payload/datastore_upsert/insert.json
  • example_payload/datastore_upsert/update.json
  • example_payload/datastore_upsert/upsert.json
  • postman/README.md
  • postman/collection.json
  • postman/generate_postman.py
  • pyproject.toml
  • tests/auth/__init__.py
  • tests/auth/anonymous/__init__.py
  • tests/auth/anonymous/test_provider.py
  • tests/auth/ckan/__init__.py
  • tests/auth/ckan/test_provider.py
  • tests/auth/jwt/__init__.py
  • tests/auth/jwt/test_provider.py
  • tests/auth/test_base.py
  • tests/auth/test_orchestration.py
  • tests/auth/test_registry.py
  • tests/conftest.py
  • tests/engines/__init__.py
  • tests/engines/bigquery/__init__.py
  • tests/engines/bigquery/test_metadata.py
  • tests/engines/bigquery/test_tables.py
  • tests/engines/ducklake/__init__.py
  • tests/test_datastore_create.py
  • tests/test_datastore_delete.py
  • tests/test_datastore_info.py
  • tests/test_datastore_search.py
  • tests/test_datastore_search_sql.py
  • tests/test_health.py
  • tests/test_read_service.py
💤 Files with no reviewable changes (12)
  • example_payload/datastore_search/response.json
  • example_payload/datastore_create/with_resource_id.json
  • example_payload/datastore_create/with_resource.json
  • example_payload/datastore_search/basic.json
  • example_payload/datastore_upsert/upsert.json
  • example_payload/datastore_search/paginated_sorted.json
  • example_payload/datastore_search/with_full_text.json
  • example_payload/datastore_upsert/update.json
  • example_payload/README.md
  • example_payload/datastore_search/with_filters.json
  • example_payload/datastore_upsert/insert.json
  • tests/engines/bigquery/test_metadata.py
✅ Files skipped from review due to trivial changes (4)
  • datastore/auth/init.py
  • datastore/auth/ckan/init.py
  • datastore/auth/anonymous/init.py
  • pyproject.toml

Comment thread datastore/auth/base.py Outdated
Comment thread datastore/auth/ckan/provider.py
Comment thread datastore/auth/ckan/provider.py
Comment thread postman/generate_postman.py
Comment thread postman/generate_postman.py Outdated
Comment thread postman/generate_postman.py
Comment thread README.md
Comment thread tests/test_datastore_create.py Outdated
Comment thread tests/test_datastore_search.py Outdated
- Drop JWT `jti` cache-key derivation in default_key_id — an unverified
  `jti` could collide a forged token with a real user's cached
  decision. Always sha256 the full credential.
- CKAN provider: treat a corrupt cache entry as a miss (fall back to
  CKAN) instead of failing the request.
- CKAN provider: store the hashed key in Decision.subject instead of
  the raw credential, so the cache never carries plaintext keys.
- Postman generator: fix OUT_FILE typo (`ollection.json` →
  `postman/collection.json`), URL-encode query-string values, and fix
  the docstring's run-from-root path.
- README: document `BIGQUERY_DATASET` in the env-var table.
- Tests: the "without API key" assertions actually drop the
  Authorization header now (the conftest fixture installs one by
  default).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@sagargg sagargg force-pushed the feat/bigquery-backend branch from 9e37cdc to cc4cf95 Compare May 23, 2026 14:51
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
datastore/auth/jwt/provider.py (1)

53-59: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Disable aud validation when JWT_AUDIENCE is unset.

JWTAuthProvider sets self._audience = config.JWT_AUDIENCE or None and passes audience=self._audience into jwt.decode(...). In PyJWT 2.8, audience=None does not ignore the token’s aud claim—it requires that the token contain no aud; tokens that include aud will be rejected with InvalidAudienceError, breaking deployments that intentionally leave JWT_AUDIENCE blank.

Update the jwt.decode(...) options to disable audience checking when _audience is None (e.g., options={"verify_aud": self._audience is not None}).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/auth/jwt/provider.py` around lines 53 - 59, JWTAuthProvider
currently passes self._audience (which may be None) into jwt.decode, causing
PyJWT to reject tokens with an aud claim; update the jwt.decode(...) call in the
code that builds claims to set options so audience verification is only enabled
when self._audience is not None (e.g., add options={"verify_aud": self._audience
is not None}) and ensure jwt.decode still receives algorithms, issuer, and
audience only when appropriate so that audience checking is disabled when
JWT_AUDIENCE is unset.
datastore/main.py (1)

69-88: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Defer config validation to startup instead of module import.

create_app() calls get_config(), and Line 88 immediately executes it via app = create_app(), so merely importing datastore.main can fail before tests or tooling get a chance to override env. That is the root cause of the current CI import error. Validation should happen in the lifespan/startup path, not at import time.

As per coding guidelines, validate environment configuration at startup via datastore.core.config.Config and reject startup if required fields are missing or invalid.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/main.py` around lines 69 - 88, Currently get_config() is called
during module import inside create_app() and app = create_app() runs at import
time; move config validation into the startup/lifespan path to avoid importing
failures. Change create_app() so it does not call get_config() at import —
either accept an optional Config parameter or defer calling get_config() inside
the provided lifespan/startup function (the existing lifespan) and perform
validation there using datastore.core.config.Config, logging and raising on
invalid/missing fields; remove the top-level app = create_app() invocation so
importing the module is safe and ensure your ASGI entrypoint constructs the app
via create_app() at runtime or relies on the factory to be called by the server.
♻️ Duplicate comments (1)
datastore/main.py (1)

54-57: 🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Persist the warmed engines on app.state as part of lifespan setup.

warmup_engines(config) eagerly initializes the backends, but the concrete read/write engine instances are never attached to app.state. That leaves the lifespan wiring short of the documented contract for get_context() and /ready, and it is the same gap previously called out.

As per coding guidelines, datastore/main.py: auth provider and storage engines must be instantiated once in the lifespan and cached on app.state; request handlers retrieve them via get_context().

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/main.py` around lines 54 - 57, The warmed engine instances
returned/created by warmup_engines(config) must be persisted onto the ASGI app
state during lifespan setup so request handlers can retrieve them via
get_context(); update the lifespan code that currently calls
warmup_engines(config) and stack.callback(reset_engine_cache) to capture the
concrete engine/auth instances (or change warmup_engines to return them) and
assign them to app.state (e.g., app.state.read_engine, app.state.write_engine,
app.state.auth_provider) and keep the reset_engine_cache callback; ensure
get_context() and the /ready check read these app.state entries.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@datastore/auth/base.py`:
- Around line 49-56: The docstring incorrectly claims that using
hexdigest()[:16] provides SHA-256 collision resistance; update the code and/or
comment in datastore/auth/base.py where hexdigest()[:16] is used (search for the
slice pattern or the surrounding security note) so the guarantee matches
reality—either (A) extend the prefix to the full SHA-256 hex (use the complete
hexdigest()) or a much larger prefix (e.g., 64 hex chars) if you want practical
resistance, or (B) revise the comment to state it only provides ~64-bit
uniqueness (not full SHA-256 collision resistance) and explain that the prefix
reduces but does not eliminate collision risk; choose one of these fixes and
make the corresponding change to both the code that computes the cache key and
the explanatory comment.

In `@datastore/infrastructure/engines/base.py`:
- Around line 173-202: The new abstract methods head_revision and dump break
existing out-of-tree backends; make them non-abstract with backward-compatible
defaults on the base class (the class that currently declares `@abstractmethod`
head_revision and dump) so subclasses can still instantiate: implement
head_revision(self, resource_id: str) -> str | None to return None by default,
and implement dump(self, resource_id: str) -> SearchResult to raise a clear
NotImplementedError (or return an empty SearchResult if that fits callers)
instead of being abstract; keep the method signatures (head_revision, dump, and
the SearchResult return type) and ensure docstrings remain, so legacy plugins
continue to work while still signaling unsupported features.

In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 1056-1063: head_revision() and dump() need to early-return when
metadata is disabled (self.metadata is None) because initialize() can leave a
live client but no metadata; modify the beginning of both methods
(head_revision, dump) to check if self.metadata is None and then follow the same
placeholder behavior used by the other actions (e.g., return None or the
metadata-disabled placeholder response) instead of building/executing SQL,
ensuring resource_id handling remains unchanged; locate these checks near where
initialize() and self.client are referenced to mirror the guard used elsewhere
in this module.

In `@datastore/services/dump.py`:
- Around line 487-489: _parquet serialization fails because _arrow_type() maps
JSON-like logical types (object/array/any) to pa.string() but
_flush_parquet_batch() passes raw dict/list values into pa.table(), causing
Arrow conversion errors; fix by JSON-serializing those cells before building the
Arrow table. In _flush_parquet_batch(), detect columns where the original
logical type (from the types list) is one of ('object','array','any') or where
_arrow_type(column_type) == pa.string(), and transform the column values with
json.dumps (preserving None/nulls) so pa.table(...) receives strings; ensure
json is imported and the transformation mirrors the CSV path JSON serialization
behavior and then use arrow_schema and pa.table(columns=...) as before.

---

Outside diff comments:
In `@datastore/auth/jwt/provider.py`:
- Around line 53-59: JWTAuthProvider currently passes self._audience (which may
be None) into jwt.decode, causing PyJWT to reject tokens with an aud claim;
update the jwt.decode(...) call in the code that builds claims to set options so
audience verification is only enabled when self._audience is not None (e.g., add
options={"verify_aud": self._audience is not None}) and ensure jwt.decode still
receives algorithms, issuer, and audience only when appropriate so that audience
checking is disabled when JWT_AUDIENCE is unset.

In `@datastore/main.py`:
- Around line 69-88: Currently get_config() is called during module import
inside create_app() and app = create_app() runs at import time; move config
validation into the startup/lifespan path to avoid importing failures. Change
create_app() so it does not call get_config() at import — either accept an
optional Config parameter or defer calling get_config() inside the provided
lifespan/startup function (the existing lifespan) and perform validation there
using datastore.core.config.Config, logging and raising on invalid/missing
fields; remove the top-level app = create_app() invocation so importing the
module is safe and ensure your ASGI entrypoint constructs the app via
create_app() at runtime or relies on the factory to be called by the server.

---

Duplicate comments:
In `@datastore/main.py`:
- Around line 54-57: The warmed engine instances returned/created by
warmup_engines(config) must be persisted onto the ASGI app state during lifespan
setup so request handlers can retrieve them via get_context(); update the
lifespan code that currently calls warmup_engines(config) and
stack.callback(reset_engine_cache) to capture the concrete engine/auth instances
(or change warmup_engines to return them) and assign them to app.state (e.g.,
app.state.read_engine, app.state.write_engine, app.state.auth_provider) and keep
the reset_engine_cache callback; ensure get_context() and the /ready check read
these app.state entries.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 32aaf1cf-c5ee-487d-a17a-ee3499a3cf6d

📥 Commits

Reviewing files that changed from the base of the PR and between ebe2d57 and 9e37cdc.

📒 Files selected for processing (21)
  • README.md
  • datastore/api/endpoints/dump.py
  • datastore/api/routes.py
  • datastore/auth/base.py
  • datastore/auth/ckan/provider.py
  • datastore/auth/jwt/provider.py
  • datastore/infrastructure/engines/base.py
  • datastore/infrastructure/engines/bigquery/backend.py
  • datastore/main.py
  • datastore/schemas/request.py
  • datastore/services/dump.py
  • postman/generate_postman.py
  • pyproject.toml
  • tests/auth/ckan/test_provider.py
  • tests/auth/jwt/test_provider.py
  • tests/auth/test_base.py
  • tests/conftest.py
  • tests/engines/bigquery/test_tables.py
  • tests/test_datastore_create.py
  • tests/test_datastore_dump.py
  • tests/test_datastore_search.py

Comment thread datastore/api/endpoints/dump.py Outdated
Comment on lines +56 to +57
etag = _compute_etag(context, resource_id, fmt)
if etag is not None and if_none_match and _etag_match(if_none_match, etag):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Move head_revision() out of the endpoint and off the event loop.

_compute_etag() does a direct engine lookup plus metadata read from the API layer, so this handler still performs backend I/O before the stream starts. That breaks the endpoint/service boundary and can stall unrelated requests if the engine is slow. Please move ETag resolution into datastore.services.dump (or a small service helper) and run it in the same worker-thread path as the dump itself.

As per coding guidelines, datastore/api/endpoints/**/*.py: "Endpoint handlers must never contain SQL, validation logic, or engine calls; delegate all business logic to datastore/services/ modules."

Also applies to: 79-92

Comment thread datastore/auth/base.py
Comment on lines +49 to +56
Security note: deliberately ignores any embedded JWT `jti` claim. An
unverified `jti` from the token's payload can be forged to collide
with a cached authorization decision for a different (verified)
token — the cache lookup is keyed before signature verification, so
a forged `jti:<value>` lookup would return the cached decision for
the legitimate user with the same `jti`. Hashing the whole
credential keeps the cache identity tied to bytes-on-the-wire and
makes any collision strictly equivalent to a sha256 collision.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don’t describe a 16-hex prefix as full SHA-256 collision resistance.

hexdigest()[:16] keeps only 64 bits, so collisions are not “strictly equivalent to a sha256 collision.” Either extend the prefix or tone this note down to match the actual guarantee.

Suggested doc fix
-    makes any collision strictly equivalent to a sha256 collision.
+    reduces accidental collisions to those of a 64-bit truncated sha256
+    prefix while keeping the cache identity tied to bytes-on-the-wire.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/auth/base.py` around lines 49 - 56, The docstring incorrectly
claims that using hexdigest()[:16] provides SHA-256 collision resistance; update
the code and/or comment in datastore/auth/base.py where hexdigest()[:16] is used
(search for the slice pattern or the surrounding security note) so the guarantee
matches reality—either (A) extend the prefix to the full SHA-256 hex (use the
complete hexdigest()) or a much larger prefix (e.g., 64 hex chars) if you want
practical resistance, or (B) revise the comment to state it only provides
~64-bit uniqueness (not full SHA-256 collision resistance) and explain that the
prefix reduces but does not eliminate collision risk; choose one of these fixes
and make the corresponding change to both the code that computes the cache key
and the explanatory comment.

Comment on lines +173 to +202
@abstractmethod
def head_revision(self, resource_id: str) -> str | None:
"""Return an opaque token that changes whenever the table changes.

Used by `GET /datastore/dump/<rid>` to compute HTTP `ETag` and
decide whether to short-circuit a request with `304 Not Modified`.
Must be cheap (no full-table scan) — typically a metadata read
such as the last-modified timestamp of the underlying table.

`None` when the engine has no source of truth available (e.g.
placeholder mode with no client); callers then skip ETag.
"""

@abstractmethod
def dump(self, resource_id: str) -> SearchResult:
"""Stream the entire table — no pagination, no filtering.

Returns a `SearchResult` whose `records` iterator yields every
row in storage order. `schema["fields"]` is the column shape
(in the order the row tuples carry). System columns that exist
on the table (e.g. `_id`, `_updated_at` when
`INCLUDE_UPDATED_AT=true`) ride along — the engine doesn't
special-case them; callers can project columns at the response
layer if needed.

Cost note: engines should prefer the cheapest "read the whole
table" path their backend offers (e.g. BigQuery's
`client.list_rows` bills at storage-scan rates without slot
usage), so a dump doesn't pay query-engine costs.
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Avoid breaking existing engine plugins with new abstract methods.

Making head_revision() and dump() abstract means any out-of-tree backend that currently satisfies DatastoreBackend becomes uninstantiable as soon as it upgrades to this version. Please either provide backward-compatible defaults on the base class or gate dump/ETag support behind a narrower capability interface before merging.

Based on learnings, pluggable storage backends are plugins: drop a folder under datastore/infrastructure/engines/<name>/ with __init__.py exporting Backend; no registry.py or config.py edits are required.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/base.py` around lines 173 - 202, The new
abstract methods head_revision and dump break existing out-of-tree backends;
make them non-abstract with backward-compatible defaults on the base class (the
class that currently declares `@abstractmethod` head_revision and dump) so
subclasses can still instantiate: implement head_revision(self, resource_id:
str) -> str | None to return None by default, and implement dump(self,
resource_id: str) -> SearchResult to raise a clear NotImplementedError (or
return an empty SearchResult if that fits callers) instead of being abstract;
keep the method signatures (head_revision, dump, and the SearchResult return
type) and ensure docstrings remain, so legacy plugins continue to work while
still signaling unsupported features.

Comment on lines +1056 to +1063
if self.client is None:
return None
from google.cloud import bigquery
table_ref = bigquery.TableReference.from_string(
f"{self.config.BIGQUERY_PROJECT}"
f".{self.config.BIGQUERY_DATASET}.{resource_id}"
)
try:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard head_revision() and dump() when metadata is disabled.

When BIGQUERY_PROJECT is set but dataset/metadata is unavailable, initialize() can leave self.metadata=None with a live client. At Line 1106, dump() still builds and executes SQL instead of following placeholder behavior used by the other actions.

💡 Proposed fix
 def head_revision(self, resource_id: str) -> str | None:
@@
-        if self.client is None:
+        if self.client is None or self.metadata is None:
             return None
@@
 def dump(self, resource_id: str) -> SearchResult:
@@
-        if self.client is None:
+        if self.client is None or self.metadata is None:
             return SearchResult(
                 schema={"fields": []},
                 records=iter([]),
                 records_truncated=False,
             )
 
-        if self.metadata is not None and self.metadata.get(resource_id) is None:
+        if self.metadata.get(resource_id) is None:
             raise NotFoundError(
                 f"resource {resource_id!r} is not declared; nothing to dump"
             )

Also applies to: 1094-1106

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 1056 -
1063, head_revision() and dump() need to early-return when metadata is disabled
(self.metadata is None) because initialize() can leave a live client but no
metadata; modify the beginning of both methods (head_revision, dump) to check if
self.metadata is None and then follow the same placeholder behavior used by the
other actions (e.g., return None or the metadata-disabled placeholder response)
instead of building/executing SQL, ensuring resource_id handling remains
unchanged; locate these checks near where initialize() and self.client are
referenced to mirror the guard used elsewhere in this module.

Comment thread datastore/services/dump.py Outdated
Comment on lines +487 to +489
arrow_schema = pa.schema(
[(name, _arrow_type(t)) for name, t in zip(columns, types)]
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In PyArrow 14+, what happens when pa.table()is given Pythondictorlistvalues for a column declared aspa.string()? Does it coerce them, or raise an Arrow type error?

💡 Result:

In PyArrow 14+, when you call pa.table with a schema that declares a column as pa.string, Python dict/list values are not “nested into” the string type. If the value is not already a string/bytes-like scalar (e.g., you provide a dict or list as an element of that column), PyArrow raises an Arrow conversion/type error rather than coercing the dict/list to a string automatically. Explanation (grounded in docs): - pa.table can infer schema from Python data, or you can pass an explicit schema; when you pass a schema, the resulting Table will have exactly that schema, i.e., it expects the column types to match and will fail otherwise [1][2]. - PyArrow explicitly treats coercion as something that may happen depending on the conversion path, but the docs emphasize coercion for certain inputs/types rather than arbitrary Python objects; string type is a variable-length primitive and nested Python containers are “nested value types” that must be represented with Arrow nested types (list/map/struct) instead [3]. So, if your column’s Arrow type is pa.string, supplying dicts or lists as the column elements will not be coerced into a string column; it will fail with an Arrow type/conversion error. If you want nested containers, model them as pa.list_, pa.map_, or pa.struct (or use an actual Arrow array type for the nested data) rather than pa.string [3].

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate the Parquet flush implementation and surrounding logic
rg -n "_flush_parquet_batch|parquet|_arrow_type|_parquet_cell" datastore/services/dump.py

# Show the relevant sections around the flagged lines
sed -n '430,580p' datastore/services/dump.py

Repository: datopian/datastore

Length of output: 5193


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for JSON/serialization helpers and usage in dump.py
rg -n "orjson\.dumps|_parquet_cell|_csv|_tsv|_write_csv|_write_tsv|ndjson" datastore/services/dump.py

# Inspect surrounding Parquet call site to see what `result.records` contains for parquet dumps
sed -n '110,170p' datastore/services/dump.py

# Inspect writer implementations for CSV/TSV to compare normalization behavior
sed -n '250,420p' datastore/services/dump.py

# Confirm whether any parquet-specific cell normalization exists elsewhere in the file
sed -n '420,570p' datastore/services/dump.py

Repository: datopian/datastore

Length of output: 14444


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show top-of-file imports for dump.py to confirm orjson availability
sed -n '1,120p' datastore/services/dump.py

# Also confirm whether dump.py already has any parquet cell normalization helpers
rg -n "_parquet_cell|parquet_cell|orjson\.dumps\(" datastore/services/dump.py

Repository: datopian/datastore

Length of output: 4881


Fix Parquet serialization for JSON-like cells (object/array/any)

_arrow_type() declares those columns as pa.string(), but _flush_parquet_batch() still passes raw dict/list Python values into pa.table(...). With an explicit pa.string schema, PyArrow will not coerce nested containers to strings and will raise an Arrow conversion/type error instead of preserving JSON text (unlike the CSV path, which JSON-serializes dict/list).

Possible fix
-                _flush_parquet_batch(writer, arrow_schema, columns, batch)
+                _flush_parquet_batch(writer, arrow_schema, columns, types, batch)
                 batch.clear()
         if batch:
-            _flush_parquet_batch(writer, arrow_schema, columns, batch)
+            _flush_parquet_batch(writer, arrow_schema, columns, types, batch)
@@
 def _flush_parquet_batch(
     writer: Any,
     schema: Any,
     columns: list[str],
+    types: list[str],
     batch: list[tuple[Any, ...]],
 ) -> None:
     """Build a pyarrow Table from `batch` and write it as one row group."""
     import pyarrow as pa
 
     table = pa.table(
-        {col: [row[i] for row in batch] for i, col in enumerate(columns)},
+        {
+            col: [_parquet_cell(row[i], types[i]) for row in batch]
+            for i, col in enumerate(columns)
+        },
         schema=schema,
     )
     writer.write_table(table)
+
+
+def _parquet_cell(value: Any, field_type: str) -> Any:
+    if value is None:
+        return None
+    if field_type in {"object", "array", "any"}:
+        return orjson.dumps(value).decode("utf-8")
+    return value
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/services/dump.py` around lines 487 - 489, _parquet serialization
fails because _arrow_type() maps JSON-like logical types (object/array/any) to
pa.string() but _flush_parquet_batch() passes raw dict/list values into
pa.table(), causing Arrow conversion errors; fix by JSON-serializing those cells
before building the Arrow table. In _flush_parquet_batch(), detect columns where
the original logical type (from the types list) is one of
('object','array','any') or where _arrow_type(column_type) == pa.string(), and
transform the column values with json.dumps (preserving None/nulls) so
pa.table(...) receives strings; ensure json is imported and the transformation
mirrors the CSV path JSON serialization behavior and then use arrow_schema and
pa.table(columns=...) as before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant