Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ingestify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .infra import retrieve_http
from .source_base import Source, DatasetResource
from .domain.models.resources.batch_loader import BatchLoader
from .exceptions import StopProcessing
from .main import debug_source

__version__ = "0.15.1"
13 changes: 11 additions & 2 deletions ingestify/domain/models/ingestion/ingestion_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from ingestify.domain.models.resources.batch_loader import BatchLoader
from ingestify.domain.models.dataset.dataset import DatasetLastModifiedAtMap
from ingestify.domain.models.task.task_summary import TaskSummary
from ingestify.exceptions import SaveError, IngestifyError
from ingestify.exceptions import SaveError, IngestifyError, StopProcessing
from ingestify.utils import TaskExecutor, chunker

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -529,7 +529,16 @@ def execute(
)
logger.info(f"Running {len(task_set)} tasks")

results = task_executor.run(run_task, task_set)
try:
results = task_executor.run(run_task, task_set)
except StopProcessing:
logger.info(
"StopProcessing raised — saving partial results "
"and stopping"
)
ingestion_job_summary.set_finished()
yield ingestion_job_summary
raise

# BatchTasks return a list of TaskSummary; flatten.
task_summaries = []
Expand Down
14 changes: 14 additions & 0 deletions ingestify/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,17 @@ class DuplicateFile(IngestifyError):

class SaveError(IngestifyError):
pass


class StopProcessing(IngestifyError):
"""Raised by a source or loader to signal that processing should stop
gracefully. Successfully processed datasets are preserved; the current
task and all remaining tasks are skipped.

Use this for recoverable situations like API quota exhaustion where
retrying later will succeed.

Exit code: 2 (distinct from 0=success and 1=error).
"""

exit_code = 2
105 changes: 105 additions & 0 deletions ingestify/tests/test_stop_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Tests for StopProcessing exception."""
from unittest.mock import patch

import pytest

from ingestify import Source, DatasetResource
from ingestify.domain import DataSpecVersionCollection, DraftFile, Selector
from ingestify.domain.models.dataset.collection_metadata import (
DatasetCollectionMetadata,
)
from ingestify.domain.models.fetch_policy import FetchPolicy
from ingestify.domain.models.ingestion.ingestion_plan import IngestionPlan
from ingestify.exceptions import StopProcessing
from ingestify.utils import utcnow


def good_loader(file_resource, current_file, **kwargs):
return DraftFile.from_input("data", data_feed_key="f1")


def stopping_loader(file_resource, current_file, **kwargs):
raise StopProcessing("API quota exhausted")


class SourceWithStopProcessing(Source):
"""Source that yields 5 datasets. The 3rd one raises StopProcessing."""

provider = "test_provider"

def find_datasets(
self, dataset_type, data_spec_versions, dataset_collection_metadata, **kwargs
):
for i in range(5):
loader = stopping_loader if i == 2 else good_loader
r = DatasetResource(
dataset_resource_id={"item_id": i},
provider=self.provider,
dataset_type="test",
name=f"item-{i}",
)
r.add_file(
last_modified=utcnow(),
data_feed_key="f1",
data_spec_version="v1",
file_loader=loader,
)
yield r


def _setup(engine, source):
dsv = DataSpecVersionCollection.from_dict({"default": {"v1"}})
engine.add_ingestion_plan(
IngestionPlan(
source=source,
fetch_policy=FetchPolicy(),
dataset_type="test",
selectors=[Selector.build({}, data_spec_versions=dsv)],
data_spec_versions=dsv,
)
)


def test_stop_processing_has_exit_code():
assert StopProcessing.exit_code == 2


def test_stop_processing_propagates(engine):
"""StopProcessing raised by a loader propagates out of engine.run()."""
_setup(engine, SourceWithStopProcessing("s"))

with pytest.raises(StopProcessing, match="quota exhausted"):
engine.run()


def test_stop_processing_preserves_completed_datasets(engine):
"""Datasets processed before StopProcessing are saved."""
_setup(engine, SourceWithStopProcessing("s"))

try:
engine.run()
except StopProcessing:
pass

datasets = list(
engine.store.get_dataset_collection(
provider="test_provider",
dataset_type="test",
)
)
assert len(datasets) == 2


def test_stop_processing_saves_ingestion_job_summary(engine):
"""IngestionJobSummary is saved even when StopProcessing occurs."""
_setup(engine, SourceWithStopProcessing("s"))

with patch.object(engine.store, "save_ingestion_job_summary") as mock_save:
try:
engine.run()
except StopProcessing:
pass

assert (
mock_save.call_count >= 1
), "save_ingestion_job_summary should be called even on StopProcessing"
Loading