Skip to content

Commit eafb01e

Browse files
Add batch creation methods to the Workflow abstract base class
Why these changes are being introduced: * Support workflows that require programmatic batch creation. It was decided that the batch creation step would assume the responsibilities that were previously assigned to the reconcile step, which is now up for deprecation. How this addresses that need: * Define shared private method for writing records to DynamoDB * Add abstract method custom batch preparation processes * Define SimpleCSV.prepare_batch * Add unit tests for batch creation methods * Remove record creation from Workflow.reconcile_items Side effects of this change: * Sets the stage for deprecation of 'reconcile_items' Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/IN-1455
1 parent b29c50c commit eafb01e

File tree

9 files changed

+262
-33
lines changed

9 files changed

+262
-33
lines changed

dsc/db/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222

2323
class ItemSubmissionStatus(StrEnum):
24+
BATCH_CREATED = "batch_created"
2425
RECONCILE_SUCCESS = "reconcile_success"
2526
RECONCILE_FAILED = "reconcile_failed"
2627
SUBMIT_SUCCESS = "submit_success"

dsc/exceptions.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,18 @@ def __str__(self) -> str:
6969
if len(self.metadata_without_bitstreams) > 20: # noqa: PLR2004
7070
message = f"{message} (showing first 20 metadata item identifiers)"
7171
return f"{message}: {self.metadata_without_bitstreams}"
72+
73+
74+
# Exceptions for 'create-batch' step
75+
class BatchCreationFailedError(Exception):
76+
pass
77+
78+
79+
class ItemMetadataNotFoundError(Exception):
80+
def __init__(self) -> None:
81+
super().__init__("No metadata found for the item submission")
82+
83+
84+
class ItemBitstreamsNotFoundError(Exception):
85+
def __init__(self) -> None:
86+
super().__init__("No bitstreams found for the item submission")

dsc/workflows/base/__init__.py

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from dsc.config import Config
1515
from dsc.db.models import ItemSubmissionDB, ItemSubmissionStatus
1616
from dsc.exceptions import (
17+
BatchCreationFailedError,
1718
InvalidSQSMessageError,
1819
InvalidWorkflowNameError,
1920
ReconcileFailedError,
@@ -221,6 +222,60 @@ def item_metadata_iter(self) -> Iterator[dict[str, Any]]:
221222
MUST be overridden by workflow subclasses.
222223
"""
223224

225+
@final
226+
def create_batch(self) -> None:
227+
"""Create a batch of item submissions for processing.
228+
229+
A "batch" refers to a collection of item submissions that are grouped together
230+
for coordinated processing, storage, and workflow execution. Each batch
231+
typically consists of multiple items, each with its own metadata and
232+
associated files, organized under a unique batch identifier.
233+
234+
This method prepares the necessary assets in S3 (programmatically as needed)
235+
and records each item in the batch to DynamoDB.
236+
"""
237+
item_submissions, errors = self.prepare_batch()
238+
if errors:
239+
raise BatchCreationFailedError
240+
self._create_batch_in_db(item_submissions)
241+
242+
@abstractmethod
243+
def prepare_batch(self) -> tuple[list, ...]:
244+
"""Prepare batch submission assets in S3.
245+
246+
This method performs the required steps to prepare a batch
247+
of item submissions in S3. These steps must include (at minimum)
248+
the following checks:
249+
250+
- Check if there is metadata for the item submission;
251+
otherwise raise dsc.exceptions.ItemMetadataNotFoundError
252+
- Check if there are any bitstreams for the item submission;
253+
otherwise raise dsc.exceptions.ItemBitstreamsNotFoundError
254+
255+
MUST be overridden by workflow subclasses.
256+
257+
Returns:
258+
A tuple of item submissions (init params) represented as a
259+
list of dicts and errors represented as a list of tuples
260+
containing the item identifier and the error message.
261+
"""
262+
pass # noqa: PIE790
263+
264+
@final
265+
def _create_batch_in_db(self, item_submissions: list[dict]) -> None:
266+
"""Write records for a batch of item submissions to DynamoDB.
267+
268+
This method loops through the item submissions (init params)
269+
represented as a list dicts. For each item submission, the
270+
method creates an instance of ItemSubmission and saves the
271+
record to DynamoDB.
272+
"""
273+
for item_submission_init_params in item_submissions:
274+
item_submission = ItemSubmission.create(**item_submission_init_params)
275+
item_submission.last_run_date = self.run_date
276+
item_submission.status = ItemSubmissionStatus.BATCH_CREATED
277+
item_submission.save()
278+
224279
@final
225280
def reconcile_items(self) -> bool:
226281
"""Reconcile item submissions for a batch.
@@ -241,20 +296,24 @@ def reconcile_items(self) -> bool:
241296
NOTE: This method is likely the first time a record will be inserted
242297
into DynamoDB for each item submission. If already present,
243298
its status will be updated.
299+
300+
TODO: Reconcile methods will be deprecated after end-to-end testing.
244301
"""
245302
reconciled_items = {} # key=item_identifier, value=list of bitstream URIs
246303
bitstreams_without_metadata = [] # list of bitstream URIs
247304
metadata_without_bitstreams = [] # list of item identifiers
248305

249306
# loop through each item metadata
250307
for item_metadata in self.item_metadata_iter():
251-
item_submission = ItemSubmission.get_or_create(
308+
item_submission = ItemSubmission.get(
252309
batch_id=self.batch_id,
253310
item_identifier=item_metadata["item_identifier"],
254-
workflow_name=self.workflow_name,
255-
source_system_identifier=item_metadata.get("source_system_identifier"),
256311
)
257312

313+
# if no corresponding record in DynamoDB, skip
314+
if not item_submission:
315+
continue
316+
258317
# attach source metadata
259318
item_submission.source_metadata = item_metadata
260319

@@ -336,8 +395,7 @@ def reconcile_items(self) -> bool:
336395
)
337396
return True
338397

339-
@abstractmethod
340-
def reconcile_item(self, item_submission: ItemSubmission) -> bool:
398+
def reconcile_item(self, _item_submission: ItemSubmission) -> bool:
341399
"""Reconcile bitstreams and metadata for an item.
342400
343401
Items in DSpace represent a "work" and combine metadata and files,
@@ -348,7 +406,10 @@ def reconcile_item(self, item_submission: ItemSubmission) -> bool:
348406
349407
If an item fails reconcile, this method should raise
350408
dsc.exceptions.ReconcileFailed*Error. Otherwise, return True.
409+
410+
TODO: Reconcile methods will be deprecated after end-to-end testing.
351411
"""
412+
return False
352413

353414
def _report_reconcile_workflow_events(
354415
self,

dsc/workflows/base/simple_csv.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import pandas as pd
66
import smart_open
77

8-
from dsc.exceptions import ReconcileFailedMissingBitstreamsError
8+
from dsc.exceptions import (
9+
ItemBitstreamsNotFoundError,
10+
ReconcileFailedMissingBitstreamsError,
11+
)
912
from dsc.item_submission import ItemSubmission
1013
from dsc.utilities.aws import S3Client
1114
from dsc.workflows.base import Workflow
@@ -74,7 +77,6 @@ def item_metadata_iter(
7477
logger.warning(
7578
f"Renaming multiple columns as 'item_identifier': {col_names}"
7679
)
77-
7880
metadata_df = metadata_df.rename(
7981
columns={
8082
col: "item_identifier"
@@ -92,3 +94,31 @@ def item_metadata_iter(
9294

9395
for _, row in metadata_df.iterrows():
9496
yield row.to_dict()
97+
98+
def prepare_batch(self) -> tuple[list, ...]:
99+
item_submissions = []
100+
errors = []
101+
102+
for item_metadata in self.item_metadata_iter():
103+
# check if there are any bitstreams associated with the item submission
104+
if not self.get_item_bitstream_uris(
105+
item_identifier=item_metadata["item_identifier"]
106+
):
107+
errors.append(
108+
(
109+
item_metadata["item_identifier"],
110+
str(ItemBitstreamsNotFoundError()),
111+
)
112+
)
113+
continue
114+
115+
# if item submission has associated bitstreams
116+
# save init params
117+
item_submissions.append(
118+
{
119+
"batch_id": self.batch_id,
120+
"item_identifier": item_metadata["item_identifier"],
121+
"workflow_name": self.workflow_name,
122+
}
123+
)
124+
return item_submissions, errors

tests/conftest.py

Lines changed: 63 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from dsc.workflows.base.simple_csv import SimpleCSV
2525

2626

27+
# Test Workflow classes ######################
2728
class TestWorkflow(Workflow):
2829

2930
workflow_name: str = "test"
@@ -61,6 +62,47 @@ def item_metadata_iter(self):
6162
},
6263
]
6364

65+
def prepare_batch(self):
66+
return (
67+
[
68+
{
69+
"batch_id": "batch-aaa",
70+
"item_identifier": "123",
71+
"workflow_name": "test",
72+
},
73+
{
74+
"batch_id": "batch-aaa",
75+
"item_identifier": "789",
76+
"workflow_name": "test",
77+
},
78+
],
79+
[],
80+
)
81+
82+
83+
class TestOpenCourseWare(OpenCourseWare):
84+
85+
@property
86+
def output_queue(self) -> str:
87+
return "mock-output-queue"
88+
89+
def prepare_batch(self):
90+
return (
91+
[
92+
{
93+
"batch_id": "batch-aaa",
94+
"item_identifier": "123",
95+
"workflow_name": "opencourseware",
96+
},
97+
{
98+
"batch_id": "batch-aaa",
99+
"item_identifier": "124",
100+
"workflow_name": "opencourseware",
101+
},
102+
],
103+
[],
104+
)
105+
64106

65107
class TestSimpleCSV(SimpleCSV):
66108

@@ -80,13 +122,30 @@ def output_queue(self) -> str:
80122
return "mock-output-queue"
81123

82124

83-
class TestOpenCourseWare(OpenCourseWare):
125+
# Test Workflow instances ####################
126+
@pytest.fixture
127+
@freeze_time("2025-01-01 09:00:00")
128+
def base_workflow_instance(item_metadata, metadata_mapping, mocked_s3):
129+
return TestWorkflow(batch_id="batch-aaa")
84130

85-
@property
86-
def output_queue(self) -> str:
87-
return "mock-output-queue"
131+
132+
@pytest.fixture
133+
def simple_csv_workflow_instance(metadata_mapping):
134+
return TestSimpleCSV(batch_id="batch-aaa")
135+
136+
137+
@pytest.fixture
138+
@freeze_time("2025-01-01 09:00:00")
139+
def archivesspace_workflow_instance(tmp_path):
140+
return ArchivesSpace(batch_id="batch-aaa")
141+
142+
143+
@pytest.fixture
144+
def opencourseware_workflow_instance():
145+
return TestOpenCourseWare(batch_id="batch-aaa")
88146

89147

148+
# Test fixtures ##############################
90149
@pytest.fixture(autouse=True)
91150
def _test_env(monkeypatch):
92151
monkeypatch.setenv("SENTRY_DSN", "None")
@@ -113,28 +172,6 @@ def moto_server():
113172
time.sleep(0.5)
114173

115174

116-
@pytest.fixture
117-
@freeze_time("2025-01-01 09:00:00")
118-
def base_workflow_instance(item_metadata, metadata_mapping, mocked_s3):
119-
return TestWorkflow(batch_id="batch-aaa")
120-
121-
122-
@pytest.fixture
123-
def simple_csv_workflow_instance(metadata_mapping):
124-
return TestSimpleCSV(batch_id="batch-aaa")
125-
126-
127-
@pytest.fixture
128-
@freeze_time("2025-01-01 09:00:00")
129-
def archivesspace_workflow_instance(tmp_path):
130-
return ArchivesSpace(batch_id="batch-aaa")
131-
132-
133-
@pytest.fixture
134-
def opencourseware_workflow_instance():
135-
return TestOpenCourseWare(batch_id="batch-aaa")
136-
137-
138175
@pytest.fixture
139176
def config_instance():
140177
return Config()

tests/test_cli.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ def test_reconcile_success(
2222
s3_client.put_file(
2323
file_content="", bucket="dsc", key="simple-csv/batch-aaa/123_002.jpg"
2424
)
25+
instance = ItemSubmissionDB(
26+
batch_id="batch-aaa", item_identifier="123", workflow_name="workflow"
27+
)
28+
instance.create()
29+
2530
result = runner.invoke(
2631
main,
2732
[

tests/test_workflow_base.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import json
2+
from datetime import UTC, datetime
23
from unittest.mock import patch
34

45
import pytest
56
from botocore.exceptions import ClientError
7+
from freezegun import freeze_time
68

79
from dsc.db.models import ItemSubmissionDB, ItemSubmissionStatus
810
from dsc.exceptions import (
@@ -38,6 +40,18 @@ def test_base_workflow_get_workflow_invalid_workflow_name_raises_error(
3840
base_workflow_instance.get_workflow("tast")
3941

4042

43+
@freeze_time("2025-01-01 09:00:00")
44+
def test_base_workflow_create_batch_in_db_success(
45+
base_workflow_instance, mocked_item_submission_db
46+
):
47+
item_submissions, _ = base_workflow_instance.prepare_batch()
48+
base_workflow_instance._create_batch_in_db(item_submissions) # noqa: SLF001
49+
item_submission = ItemSubmissionDB.get(hash_key="batch-aaa", range_key="123")
50+
51+
assert item_submission.last_run_date == datetime(2025, 1, 1, 9, 0, tzinfo=UTC)
52+
assert item_submission.status == ItemSubmissionStatus.BATCH_CREATED
53+
54+
4155
def test_base_workflow_submit_items_success(
4256
caplog,
4357
base_workflow_instance,

tests/test_workflow_opencourseware.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ def test_workflow_ocw_reconcile_items_success(
120120
bucket="dsc",
121121
key="opencourseware/batch-aaa/123.zip",
122122
)
123+
ItemSubmissionDB(
124+
item_identifier="123",
125+
batch_id="batch-aaa",
126+
workflow_name="opencourseware",
127+
).create()
128+
123129
mock_opencourseware_read_metadata_from_zip_file.return_value = (
124130
opencourseware_source_metadata
125131
)
@@ -157,6 +163,17 @@ def test_workflow_ocw_reconcile_items_if_not_reconciled_success(
157163
bucket="dsc",
158164
key="opencourseware/batch-aaa/124.zip",
159165
)
166+
ItemSubmissionDB(
167+
item_identifier="123",
168+
batch_id="batch-aaa",
169+
workflow_name="opencourseware",
170+
).create()
171+
ItemSubmissionDB(
172+
item_identifier="124",
173+
batch_id="batch-aaa",
174+
workflow_name="opencourseware",
175+
).create()
176+
160177
mock_opencourseware_read_metadata_from_zip_file.side_effect = [
161178
opencourseware_source_metadata,
162179
FileNotFoundError,

0 commit comments

Comments
 (0)