Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
367de4c
Feat (api): wire datastore_info end-to-end + accept resource_id/id alias
sagargg May 18, 2026
5435e0b
Feat (api): wire datastore_delete end-to-end
sagargg May 18, 2026
efc9214
Docs: refresh milestones + add example payloads for delete / info / s…
sagargg May 18, 2026
5eb5016
Feat (engines): wire lifespan init + real /ready healthcheck
sagargg May 19, 2026
7a0b7e5
Feat (datastore_create): accept Frictionless schema; deprecate legacy…
sagargg May 19, 2026
432de8a
Feat (datastore_info): also return Frictionless schema alongside lega…
sagargg May 19, 2026
a2a8e0d
Feat (datastore_search): also return Frictionless schema alongside le…
sagargg May 19, 2026
6497955
Feat (bigquery): wire datastore_create — metadata store + per-resourc…
sagargg May 19, 2026
0a1769b
Feat (bigquery): datastore_upsert + insert/update via DML, friendly e…
sagargg May 20, 2026
6d871e3
Feat (bigquery): system columns _id + optional _updated_at via INCLUD…
sagargg May 20, 2026
700e322
Style: ruff import-order + line-length fixes in BigQuery tests
sagargg May 20, 2026
2bf0577
Feat (bigquery): wire datastore_info — Frictionless schema + cheap ro…
sagargg May 21, 2026
0876789
Feat (bigquery): wire datastore_search with full CKAN param support +…
sagargg May 21, 2026
f13589c
Fix: address PR review comments — tighter boundary validation + safer…
sagargg May 21, 2026
ea82531
Feat (datastore_delete): wire DROP / row-delete / column-drop branches
sagargg May 21, 2026
3ecd46b
Feat (search_sql): SQL-driven pagination, cheap unfiltered totals, an…
sagargg May 23, 2026
7ccd660
Chore (postman): consolidate example payloads into postman/ folder
sagargg May 23, 2026
8d5079c
Test (engines): split engine-specific tests under tests/engines/<engi…
sagargg May 23, 2026
e482376
Feat (auth): pluggable auth providers (ckan / jwt / anonymous)
sagargg May 23, 2026
ebe2d57
Feat (bigquery): opt-in query-results cache on read endpoints
sagargg May 23, 2026
1c199b1
Style: ruff import-order fixes after auth refactor
sagargg May 23, 2026
cc4cf95
Fix: address CodeRabbit review (PR #1, 2026-05-23)
sagargg May 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,46 @@ MAX_REQUEST_BODY_MB=50
LOG_LEVEL=INFO

# --- Datastore engine ---
# Selects the storage backend:
# Selects the storage backend (must match a folder under
# `datastore/infrastructure/engines/`):
# bigquery — real BigQuery adapter (placeholder while being built).
# ducklake — Future planned
# ducklake — Future planned.
DATASTORE_ENGINE=bigquery
BQ_PROJECT=
BIGQUERY_PROJECT=
BIGQUERY_DATASET=
BIGQUERY_CREDENTIALS=
BIGQUERY_CREDENTIALS_RO=
Comment on lines +10 to +18
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep .env.example aligned with the actual settings model.

HTTP_MAX_CONNECTIONS and HTTP_MAX_KEEPALIVE_CONNECTIONS are documented here, but datastore/core/config.py does not declare either field, so pydantic-settings will silently ignore them. INCLUDE_UPDATED_AT was added to Config as well, but operators still don't get an example for that toggle here.

Also applies to: 26-27

🧰 Tools
🪛 dotenv-linter (4.0.0)

[warning] 15-15: [UnorderedKey] The BIGQUERY_PROJECT key should go before the DATASTORE_ENGINE key

(UnorderedKey)


[warning] 16-16: [UnorderedKey] The BIGQUERY_DATASET key should go before the BIGQUERY_PROJECT key

(UnorderedKey)


[warning] 17-17: [UnorderedKey] The BIGQUERY_CREDENTIALS key should go before the BIGQUERY_DATASET key

(UnorderedKey)


[warning] 18-18: [UnorderedKey] The BIGQUERY_CREDENTIALS_RO key should go before the BIGQUERY_DATASET key

(UnorderedKey)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.env.example around lines 10 - 18, The .env.example is missing entries that
match the fields declared in datastore/core/config.py; add entries for
HTTP_MAX_CONNECTIONS, HTTP_MAX_KEEPALIVE_CONNECTIONS, and INCLUDE_UPDATED_AT
(with example values or blanks) using the exact variable names used in Config so
pydantic-settings doesn't ignore them, and update the other relevant section
mentioned in the comment to include the same variables for consistency.

# Use BigQuery's built-in 24h query-results cache on read paths
# (datastore_search / datastore_search_sql / datastore_info). Identical
# SELECTs return free + fast on cache hits. False = force fresh scan.
BIGQUERY_USE_QUERY_CACHE=true
SQL_FUNCTIONS_ALLOW_FILE=

# --- CKAN auth gate ---
# Set AUTH_ENABLED=false for local dev / CI without a CKAN instance.
AUTH_ENABLED=true
# --- Auth ---
# AUTH_TYPE selects the provider (folder under `datastore/auth/`):
# ckan — CKAN /api/3/action/datastore_authorize (uses CKAN_URL below).
# jwt — verify JWT against JWT_SECRET or JWT_PUBLIC_KEY (see below).
# anonymous — always allow, no identity. Use for local dev / CI.
AUTH_TYPE=ckan
AUTH_CACHE_TTL=10

# CKAN — required when AUTH_TYPE=ckan.
CKAN_URL=
HTTP_TIMEOUT_SECONDS=10
AUTH_CACHE_TTL=10

# JWT — required when AUTH_TYPE=jwt. HS* uses JWT_SECRET; RS*/ES* uses JWT_PUBLIC_KEY.
JWT_ALGORITHM=HS256
JWT_SECRET=
JWT_PUBLIC_KEY=
JWT_AUDIENCE=
JWT_ISSUER=

# --- System columns + search ---
# Toggle the per-row `_updated_at` TIMESTAMP system column. False = `_id` only.
INCLUDE_UPDATED_AT=true
# Hard cap on `datastore_search` / `datastore_search_sql` `limit`. Requests
# above this return 400. Raise this only when downstream clients can stream.
SEARCH_RESULT_ROWS_MAX=32000

# --- Cache ---
# Empty REDIS_URL keeps the in-process InMemoryCache (single-pod only).
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ Thumbs.db
# Environment files
.env
.env.local

# Local-only test engine — synthetic data for local dev; never push.
# Pair with DATASTORE_ENGINE=bigquery_test (which is NOT in the committed
# Config Literal, so flip to a non-Literal locally if you need to run it).
datastore/infrastructure/engines/bigquery_test/
261 changes: 160 additions & 101 deletions CLAUDE.md

Large diffs are not rendered by default.

266 changes: 192 additions & 74 deletions README.md

Large diffs are not rendered by default.

151 changes: 28 additions & 123 deletions datastore/api/auth.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,45 @@
"""CKAN authorizationpure async functions. No state, no FastAPI.
"""Auth orchestrationboundary validation + anonymous-read policy.

`AuthContext` (in `app/api/context.py`) wraps these into a per-request
object: it holds the state (api_key, cache, ttl, enabled) and exposes
methods that delegate here.
Provider-agnostic. Owns only the pieces that apply to every provider:
- the anonymous-read policy (some permissions skip the credential check),
- validation of `permission` and the `resource_id` XOR `package_id` rule.

Caching is a provider concern (network-bound providers cache; local ones
don't). Today only the CKAN provider caches — see `auth/ckan/provider.py`.

`RequestContext.authorize(...)` (in `api/context.py`) is the public seam
endpoints use; it delegates here.
"""

from __future__ import annotations

import base64
import hashlib
import logging
from typing import Any, Literal, get_args

import orjson

from datastore.auth.base import AuthProvider
from datastore.core.exceptions import AuthorizationError, ValidationError
from datastore.infrastructure.cache import CachePort
from datastore.infrastructure.ckan_client import CKANClient

log = logging.getLogger(__name__)

Permission = Literal["read", "create", "update", "delete", "patch"]
ALLOWED_PERMISSIONS: frozenset[str] = frozenset(get_args(Permission))

# Permissions an unauthenticated caller is allowed to attempt. For these
# we forward to the provider with `credential=None`; the provider decides
# (e.g. CKAN checks resource visibility). Anything outside this set
# hard-fails on missing credentials before the provider is called.
ANONYMOUS_PERMISSIONS: frozenset[str] = frozenset({"read"})


# --- public ------------------------------------------------------------------
async def authorize(
*,
api_key: str | None,
cache: CachePort,
cache_ttl: int,
enabled: bool,
ckan: CKANClient,
provider: AuthProvider,
resource_id: str | None,
package_id: str | None,
permission: Permission | None = None,
) -> dict[str, Any]:
"""CKAN `datastore_authorize` with TTL cache.
"""Run policy checks, delegate to the provider, return endpoint data_dict.

Endpoints merge the returned dict into their `data_dict`:
`{"resource": <dict or {}>, "package": <dict or {}>}`
"""
if bool(resource_id) == bool(package_id):
raise ValidationError("exactly one of resource_id or package_id required")
Expand All @@ -47,112 +49,15 @@ async def authorize(
f"permission must be one of {sorted(ALLOWED_PERMISSIONS)}"
)

if not enabled:
log.debug("auth disabled; returning stub for resource_id=%s package_id=%s",
resource_id, package_id)
return _disabled_stub(resource_id, package_id)

if not api_key:
raise AuthorizationError("Access denied: Action requires an authenticated user")

# Adapter enforces TTL: `cache.set(..., ttl=cache_ttl)` writes an entry
# that expires `cache_ttl` seconds after this write.
scope, target = ("res", resource_id) if resource_id else ("pkg", package_id)
assert target is not None # narrowed by the validation above
cache_key = _cache_key(api_key, scope, target, permission)

cached = await _safe_get(cache, cache_key)
if cached is not None:
log.debug("auth cache HIT scope=%s target=%s perm=%s", scope, target, permission)
return _decode(cached)
if not api_key and permission not in ANONYMOUS_PERMISSIONS:
raise AuthorizationError(
"Access denied: Action requires an authenticated user"
)

log.debug("auth cache MISS scope=%s target=%s perm=%s -> CKAN", scope, target, permission)
result = await ckan.datastore_authorize(
decision = await provider.authorize(
credential=api_key,
resource_id=resource_id,
package_id=package_id,
permission=permission,
)
await _safe_set(cache, cache_key, orjson.dumps(result), cache_ttl)
log.debug("auth cache STORE scope=%s target=%s perm=%s ttl=%ds",
scope, target, permission, cache_ttl)
return result


# --- cache helpers -----------------------------------------------------------

def _cache_key(
api_key: str,
scope: str,
identifier: str,
permission: str | None,
) -> str:
return f"auth:{_key_id(api_key)}:{scope}:{identifier}:{permission}"


async def _safe_get(cache: CachePort, key: str) -> bytes | None:
try:
return await cache.get(key)
except Exception: # noqa: BLE001 — cache failure must not block requests
log.warning("auth cache GET failed; falling back to CKAN", exc_info=True)
return None


async def _safe_set(cache: CachePort, key: str, value: bytes, ttl: int) -> None:
try:
await cache.set(key, value, ttl)
except Exception: # noqa: BLE001 — same fail-open policy on writes
log.warning("auth cache SET failed; skipping cache", exc_info=True)


# --- pure helpers ------------------------------------------------------------


def _key_id(api_key: str) -> str:
"""Stable, non-reversible id for the api_key.

JWT tokens use their `jti` claim; opaque tokens use a sha256 prefix.
The raw key never reaches the cache.
"""
jti = _jwt_jti(api_key)
if jti:
return f"jti:{jti}"
return "h:" + hashlib.sha256(api_key.encode()).hexdigest()[:16]


def _jwt_jti(token: str) -> str | None:
"""Extract the `jti` claim from an unverified JWT, or None if not a JWT."""
parts = token.split(".")
if len(parts) != 3:
return None
try:
segment = parts[1]
padded = segment + "=" * (-len(segment) % 4)
payload = orjson.loads(base64.urlsafe_b64decode(padded))
except (ValueError, TypeError, orjson.JSONDecodeError):
return None
if not isinstance(payload, dict):
return None
jti = payload.get("jti")
return jti if isinstance(jti, str) and jti else None


def _disabled_stub(
resource_id: str | None, package_id: str | None
) -> dict[str, Any]:
"""Decision returned when `AUTH_ENABLED=false` (local dev / CI without CKAN)."""
if resource_id is not None:
return {
"package": {"id": None, "_auth_disabled": True},
"resource": {"id": resource_id, "_auth_disabled": True},
}
return {
"package": {"id": package_id, "_auth_disabled": True},
"resource": {"package_id": package_id, "_auth_disabled": True},
}


def _decode(value: bytes) -> dict[str, Any]:
parsed = orjson.loads(value)
if not isinstance(parsed, dict):
raise AuthorizationError("cached auth entry is malformed")
return parsed
return {"resource": decision.resource or {}, "package": decision.package or {}}
91 changes: 39 additions & 52 deletions datastore/api/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,52 @@

from datastore.api import auth as auth_fns
from datastore.api.auth import Permission
from datastore.auth.base import AuthProvider
from datastore.core.config import Config, get_config
from datastore.core.helper import parse_authorization_header
from datastore.infrastructure.cache import CachePort
from datastore.infrastructure.ckan_client import CKANClient

# --- FastAPI dependency seams ------------------------------------------------
ConfigDep = Annotated[Config, Depends(get_config)]


def get_cache(request: Request) -> CachePort:
"""Cache adapter installed by the app lifespan in `request.app.state.cache`."""
cache = getattr(request.app.state, "cache", None)
if cache is None:
raise RuntimeError("cache is not initialised; check the lifespan wiring")
return cache # type: ignore[no-any-return]
def get_ckan_client(request: Request) -> CKANClient | None:
"""CKAN client installed by the app lifespan in `request.app.state.ckan`.

`None` under non-CKAN auth (the lifespan skips construction when
`AUTH_TYPE != "ckan"` — the datastore runs standalone).
"""
return getattr(request.app.state, "ckan", None)


def get_ckan_client(request: Request) -> CKANClient:
"""CKAN client installed by the app lifespan in `request.app.state.ckan`."""
ckan = getattr(request.app.state, "ckan", None)
if ckan is None:
raise RuntimeError("ckan client is not initialised; check the lifespan wiring")
return ckan # type: ignore[no-any-return]
def get_auth_provider(request: Request) -> AuthProvider:
"""Auth provider installed by the app lifespan."""
provider = getattr(request.app.state, "auth_provider", None)
if provider is None:
raise RuntimeError(
"auth provider is not initialised; check the lifespan wiring"
)
return provider # type: ignore[no-any-return]


# --- AuthContext -------------------------------------------------------------
@dataclass(slots=True)
class AuthContext:
"""Per-request auth state. Delegates the real work to `app.api.auth`."""
class RequestContext:
"""Per-request facade — the one dep an endpoint takes.

`ckan` is None under non-CKAN auth (the datastore runs standalone).
Code paths that need CKAN — today only `datastore_create`'s `resource`
dict branch — must guard for that.

Usage:
async def handler(payload: ..., context: Context):
data_dict = await ctx.authorize(resource_id=..., permission=...)
if ctx.ckan is not None:
created = await ctx.ckan.resource_create(resource=...)
"""

config: Config
api_key: str | None = field(repr=False)
cache: CachePort
cache_ttl: int
enabled: bool
ckan: CKANClient
auth_provider: AuthProvider
ckan: CKANClient | None

async def authorize(
self,
Expand All @@ -52,50 +63,26 @@ async def authorize(
) -> dict[str, Any]:
return await auth_fns.authorize(
api_key=self.api_key,
cache=self.cache,
cache_ttl=self.cache_ttl,
enabled=self.enabled,
ckan=self.ckan,
provider=self.auth_provider,
resource_id=resource_id,
package_id=package_id,
permission=permission,
)


# --- RequestContext ----------------------------------------------------------
@dataclass(slots=True)
class RequestContext:
"""Per-request facade — the one dep an endpoint takes.

Usage:
async def handler(payload: ..., context: Context):
decision = await ctx.auth.authorize(resource_id=..., permission=...)
created = await ctx.ckan.resource_create(resource=...)

Add new sub-contexts here as the app grows (e.g. `engine`, `events`).
"""

config: Config
auth: AuthContext
ckan: CKANClient


def get_context(
config: ConfigDep,
cache: Annotated[CachePort, Depends(get_cache)],
ckan: Annotated[CKANClient, Depends(get_ckan_client)],
ckan: Annotated[CKANClient | None, Depends(get_ckan_client)],
provider: Annotated[AuthProvider, Depends(get_auth_provider)],
authorization: Annotated[str | None, Header(alias="Authorization")] = None,
) -> RequestContext:
api_key = parse_authorization_header(authorization)
bound_ckan = ckan.bind(api_key)
auth = AuthContext(
return RequestContext(
config=config,
api_key=api_key,
cache=cache,
cache_ttl=config.AUTH_CACHE_TTL,
enabled=config.AUTH_ENABLED,
ckan=bound_ckan,
auth_provider=provider,
ckan=ckan.bind(api_key) if ckan is not None else None,
)
return RequestContext(config=config, auth=auth, ckan=bound_ckan)


Context = Annotated[RequestContext, Depends(get_context)]
Loading
Loading