Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ckanext/datapusher_plus/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ groups:
description: |
Download proxy

- key: ckanext.datapusher_plus.dictionary_stash_dir
editable: true
default: ""
description: |
Directory where the per-resource Data Dictionary stash files
live. The analysis stage writes a small JSON snapshot here
before deleting the existing datastore resource, and the
database rollback hook reads it back to restore the
operator's annotations if any later stage fails (issue #265).
Empty (default) → ``<tempdir>/dpp_dict_stash``. For deployments
where ``/tmp`` is wiped on container restart, set this to a
persistent path.

- key: ckanext.datapusher_plus.types
description: |
Types of files to be processed
Expand Down
180 changes: 180 additions & 0 deletions ckanext/datapusher_plus/dictionary_stash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# flake8: noqa: E501
"""Persist a resource's Data Dictionary across a DP+ job so it can be
restored on failure.

Why this exists
---------------
The analysis stage captures a resource's existing per-field ``info``
dicts (label / description / type_override — the "Data Dictionary") into
``ProcessingContext.existing_info``, then deletes the existing datastore
resource so the database stage can re-create + COPY into it. If any
later stage fails, the in-memory snapshot dies with the worker and the
operator's annotations are gone forever — see the docstring on
``_rollback_database`` in ``jobs/prefect_flow.py``, which explicitly
calls out this hole:

(The earlier ``existing_info`` branch claimed to "preserve" the
original, but the delete had already destroyed it.)

This module closes it. On entry to the database stage, the dictionary is
written to a small JSON file keyed by ``resource_id``. On a flow
rollback, ``_rollback_database`` looks for that file and restores the
dictionary by re-creating the datastore resource with the original
``info`` dicts (and zero rows — the data is gone, but the *annotations*
the operator carefully built up are preserved). On flow success the
file is deleted.

Why a file (not the DP+ DB)
---------------------------
The ``Metadata`` model would also work, but adds a transaction inside
the Prefect task body for a write that is only ever read by the rollback
on the *same* worker. A file in a configurable directory keeps the path
simple, supports inspection by ops, and survives the worker-crash case
(/tmp is per-host, persists across container restarts unless the host
itself reboots — the failure mode #265 targets). For deployments where
/tmp is wiped on restart, set ``ckanext.datapusher_plus.dictionary_stash_dir``
to a persistent location.

Public API: ``save``, ``load``, ``clear``, ``stash_path``.
"""
from __future__ import annotations

import json
import logging
import os
import tempfile
from typing import Any, Dict, Optional

import ckan.plugins.toolkit as tk

_LOG = logging.getLogger(__name__)


def _stash_dir(create: bool = False) -> str:
"""Resolve the stash directory.

Lazy (per-call) rather than module-level so tests can set the config
via ``ckan.plugins.toolkit.config`` without re-importing the module.

``create=True`` is for ``save()`` — the only call site that needs
the directory to exist. ``load()`` and ``clear()`` deliberately
pass ``create=False`` so they remain side-effect free and so a
misconfigured (e.g. unwritable) stash dir cannot crash the flow's
``finally`` cleanup path on a job that never stashed anything.
"""
configured = tk.config.get("ckanext.datapusher_plus.dictionary_stash_dir")
base = configured or os.path.join(tempfile.gettempdir(), "dpp_dict_stash")
if create:
os.makedirs(base, exist_ok=True)
return base


def stash_path(resource_id: str) -> str:
"""Return the absolute path to the stash file for ``resource_id``.

Does not check existence and does not create the stash directory —
callers use ``load`` / ``save`` / ``clear`` for I/O.
"""
if not resource_id:
raise ValueError("resource_id is required")
# Guard against path traversal via maliciously crafted resource_id —
# CKAN resource ids are UUIDs in practice, but the contract here is
# "any string", so reject separators rather than silently writing to
# an unexpected location.
if os.sep in resource_id or (os.altsep and os.altsep in resource_id):
raise ValueError(f"resource_id contains a path separator: {resource_id!r}")
return os.path.join(_stash_dir(create=False), f"{resource_id}.json")


def save(resource_id: str, info_by_field: Dict[str, Dict[str, Any]]) -> None:
"""Write the per-field ``info`` dicts for ``resource_id`` to the stash.

``info_by_field`` is the shape produced by the analysis stage:
``{field_id: {"label": ..., "type_override": ..., ...}}``. Writing
an empty dict is allowed (no-op semantically; explicit "no
dictionary to stash") and produces a file containing ``{}``.

Overwrites any pre-existing stash for the same ``resource_id`` —
a previous failed run's stash is superseded by the latest attempt's
snapshot.
"""
# ``save`` is the only operation that needs the directory to exist;
# ``stash_path`` deliberately does not bootstrap it.
_stash_dir(create=True)
path = stash_path(resource_id)
# Write to a temp file in the same directory, then rename. atomic on
# POSIX; on Windows the rename is best-effort but the failure mode
# (partial JSON on disk) is detected by load() returning None on
# JSONDecodeError.
tmp_path = f"{path}.tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(info_by_field, f)
os.replace(tmp_path, path)
except Exception:
# On any failure (disk full / permission / cross-device rename),
# remove the half-written tempfile so the stash directory does
# not accumulate ``.tmp`` leaks. The original ``path`` is left
# untouched — the failed write did not reach the atomic rename.
try:
os.unlink(tmp_path)
except FileNotFoundError:
pass
raise
_LOG.debug(
"dictionary_stash.save: wrote %d field(s) for resource %s to %s",
len(info_by_field),
resource_id,
path,
)


def load(resource_id: str) -> Optional[Dict[str, Dict[str, Any]]]:
"""Read the stash for ``resource_id``; return ``None`` if absent.

A corrupt stash (malformed JSON) is treated as absent and logged at
warning level — restoring nothing is strictly better than crashing
the rollback hook on a bad file we wrote ourselves.
"""
path = stash_path(resource_id)
if not os.path.exists(path):
return None
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
_LOG.warning(
"dictionary_stash.load: corrupt stash for resource %s at %s (%s); "
"treating as absent",
resource_id,
path,
e,
)
return None


def clear(resource_id: str) -> None:
"""Delete the stash for ``resource_id`` if present.

Idempotent — clearing a non-existent stash is a no-op, not an
error. The on-success path calls this; we want it to be safe to
call even when ``save`` was never called for this run.
"""
path = stash_path(resource_id)
try:
os.remove(path)
_LOG.debug(
"dictionary_stash.clear: removed stash for resource %s at %s",
resource_id,
path,
)
except FileNotFoundError:
return
except OSError as e:
# Surface but do not raise — the worst case is a stale stash
# file that the next run will overwrite anyway.
_LOG.warning(
"dictionary_stash.clear: could not remove %s: %s",
path,
e,
)
66 changes: 66 additions & 0 deletions ckanext/datapusher_plus/jobs/prefect_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def _bootstrap_ckan_app_context() -> None:

import ckanext.datapusher_plus.config as conf
import ckanext.datapusher_plus.datastore_utils as dsu
import ckanext.datapusher_plus.dictionary_stash as dict_stash
import ckanext.datapusher_plus.helpers as dph
import ckanext.datapusher_plus.job_exceptions as job_exceptions
import ckanext.datapusher_plus.prefect_client as prefect_client
Expand Down Expand Up @@ -822,6 +823,56 @@ def _rollback_database(txn) -> None:
f"Rollback: could not drop datastore {resource_id}: {e}"
)

# Issue #265: if the analysis stage stashed a Data Dictionary
# before the original delete, restore it now by re-creating the
# datastore resource with the stashed per-field ``info`` dicts and
# zero rows. The *data* is unrecoverable (it never landed), but the
# operator's annotations (labels, descriptions, type_overrides)
# are preserved across the failed run. The stash file is left in
# place for inspection if restore itself fails — a future
# successful run will overwrite it.
stashed = dict_stash.load(resource_id)
if not stashed:
return
try:
# Derive each field's Postgres ``type`` from the stashed
# ``info["type_override"]`` (mapped through ``conf.TYPE_MAPPING``
# values, e.g. ``numeric`` / ``timestamp`` / ``text``). This
# mirrors the analysis stage's ``_build_headers_dicts`` merge:
# otherwise CKAN's ``datastore_create`` falls back to ``text``
# for every column, and a column the operator originally
# annotated as numeric or timestamp would be restored as text —
# silently inconsistent with the stashed dictionary's intent.
valid_types = set(conf.TYPE_MAPPING.values())
fields = []
for fid, info in stashed.items():
field: Dict[str, Any] = {"id": fid, "info": info}
type_override = (info or {}).get("type_override")
if type_override in valid_types:
field["type"] = type_override
else:
field["type"] = "text"
fields.append(field)
dsu.send_resource_to_datastore(
resource=None,
resource_id=resource_id,
headers=fields,
records=[],
aliases=[],
calculate_record_count=False,
)
runtime.logger.info(
f"Rollback: restored Data Dictionary for {resource_id} "
f"({len(fields)} field(s)) from stash"
)
dict_stash.clear(resource_id)
except Exception as e:
runtime.logger.warning(
f"Rollback: could not restore Data Dictionary for "
f"{resource_id}: {e}. Stash file retained at "
f"{dict_stash.stash_path(resource_id)} for inspection."
)


@indexing_task.on_rollback
def _rollback_indexing(txn) -> None:
Expand Down Expand Up @@ -1343,6 +1394,21 @@ def _check_deadline():
finally:
if token is not None:
reset_runtime_context(token)
# Issue #265: on any successful exit (including
# ``_StageAbort`` complete-with-skip), drop the Data
# Dictionary stash — the run reached its natural end and
# the dictionary either rode through to the new datastore
# resource (via the analysis stage's existing_info merge)
# or was never needed. On failure, leave the stash for the
# rollback hook (and a possible subsequent retry).
if not errored:
try:
dict_stash.clear(job_input.resource_id)
except Exception as e: # noqa: BLE001 — never block teardown on stash cleanup
prefect_logger.warning(
f"Could not clear dictionary stash for "
f"{job_input.resource_id}: {e}"
)
if result_url:
status = "error" if errored else "complete"
saved_ok = callback_datapusher_hook(
Expand Down
60 changes: 60 additions & 0 deletions ckanext/datapusher_plus/jobs/stages/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import ckanext.datapusher_plus.utils as utils
import ckanext.datapusher_plus.config as conf
import ckanext.datapusher_plus.datastore_utils as dsu
import ckanext.datapusher_plus.dictionary_stash as dict_stash
from ckanext.datapusher_plus.pii_screening import screen_for_pii
from ckanext.datapusher_plus.jobs.stages.base import BaseStage
from ckanext.datapusher_plus.jobs.context import ProcessingContext
Expand Down Expand Up @@ -344,6 +345,50 @@ def _parse_stats(
context.existing_info = dict(
(f["id"], f["info"]) for f in existing.get("fields", []) if "info" in f
)
else:
# Issue #265 — retry-after-failure path. The previous attempt
# may have already deleted the datastore resource and stashed
# the dictionary to disk before crashing. In that case the
# database transaction's ``_rollback_database`` hook never
# got a chance to fire (the delete happens *here*, outside
# the transaction block). Without this branch, ``existing``
# is falsy → ``existing_info`` stays ``None`` → the new
# datastore resource gets built with no dictionary → and
# the flow's success ``finally`` clears the orphaned stash,
# losing the operator's annotations forever.
#
# If a stash exists for this resource_id with no live
# datastore resource, treat it as if the dictionary had
# been freshly captured: load it into ``existing_info`` so
# the merge logic below applies it onto the new headers.
# The on-disk stash is left as-is — its content is already
# the correct snapshot (the original dictionary, captured
# before the original delete). If this retry also fails,
# the same stash is reused on the next attempt.
stashed = dict_stash.load(context.resource_id)
if stashed:
# Surface the stash mtime so operators can distinguish a
# genuine retry-after-failure (mtime within the last few
# minutes / hours) from a stale-restore caused by an
# ancient orphaned stash being applied to an unrelated
# upload (roborev #2223 LOW finding). A future change
# could enforce a TTL; for now, visibility is enough.
stash_age_s: float = 0.0
try:
stash_age_s = time.time() - os.path.getmtime(
dict_stash.stash_path(context.resource_id)
)
except OSError:
pass
context.logger.info(
f"Found stashed Data Dictionary for "
f"{context.resource_id} ({len(stashed)} field(s), "
f"stash age {stash_age_s:.0f}s). No live datastore "
"resource present — treating as retry-after-failure "
"and restoring. (If this age looks stale, the stash "
"may be orphaned from an unrelated prior upload.)"
)
context.existing_info = stashed

# Override with types from Data Dictionary
if context.existing_info:
Expand All @@ -358,6 +403,21 @@ def _parse_stats(

# Delete existing datastore resource
if existing:
# Issue #265: stash the data dictionary to disk BEFORE the
# delete, so the rollback hook in ``prefect_flow`` can
# restore the operator's annotations if a later stage fails.
# The existing in-memory ``context.existing_info`` survives
# only inside this stage's worker — the on-disk stash
# survives across the whole flow.
if context.existing_info:
try:
dict_stash.save(context.resource_id, context.existing_info)
except Exception as e: # noqa: BLE001 — never block ingestion on stash failure
context.logger.warning(
f"Could not stash data dictionary for "
f"{context.resource_id}: {e}. Proceeding without "
"stash; a mid-flow failure may lose the dictionary."
)
context.logger.info(
f'Deleting existing resource "{context.resource_id}" from datastore.'
)
Expand Down
Loading
Loading