Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions dsc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ class SQSMessageSendError(Exception):
pass


class ReconcileFailedError(Exception):
pass


class ReconcileFailedMissingMetadataError(ReconcileFailedError):
def __init__(self) -> None:
super().__init__("Reconcile failed due to missing metadata")


class ReconcileFailedMissingBitstreamsError(ReconcileFailedError):
def __init__(self) -> None:
super().__init__("Reconcile failed due to missing bitstreams")


class ReconcileFoundBitstreamsWithoutMetadataWarning(Warning):
def __init__(self, bitstreams_without_metadata: list[str]):
self.bitstreams_without_metadata = bitstreams_without_metadata
Expand Down
14 changes: 14 additions & 0 deletions dsc/item_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,24 @@ class ItemSubmission:
status_details: str | None = None

# processing attributes
source_metadata: dict[str, Any] | None = None
dspace_metadata: dict[str, Any] | None = None
bitstream_s3_uris: list[str] | None = None
metadata_s3_uri: str = ""

@classmethod
def get_or_create(
cls, batch_id: str, item_identifier: str, workflow_name: str
) -> ItemSubmission:
"""Get or create an ItemSubmission.

The method hydrates the ItemSubmission with data from DynamoDB if it
finds a corresponding record.
"""
return cls.get(batch_id, item_identifier) or cls.create(
batch_id, item_identifier, workflow_name
)

@classmethod
def get(
cls, batch_id: str | None, item_identifier: str | None
Expand Down
175 changes: 126 additions & 49 deletions dsc/workflows/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import itertools
import json
import logging
from abc import ABC, abstractmethod
Expand All @@ -12,7 +13,14 @@

from dsc.config import Config
from dsc.db.models import ItemSubmissionDB, ItemSubmissionStatus
from dsc.exceptions import InvalidSQSMessageError, InvalidWorkflowNameError
from dsc.exceptions import (
InvalidSQSMessageError,
InvalidWorkflowNameError,
ReconcileFailedError,
ReconcileFailedMissingBitstreamsError,
ReconcileFoundBitstreamsWithoutMetadataWarning,
ReconcileFoundMetadataWithoutBitstreamsWarning,
)
from dsc.item_submission import ItemSubmission
from dsc.utilities.aws import SESClient, SQSClient
from dsc.utilities.validate.schemas import RESULT_MESSAGE_ATTRIBUTES, RESULT_MESSAGE_BODY
Expand Down Expand Up @@ -213,79 +221,148 @@ def item_metadata_iter(self) -> Iterator[dict[str, Any]]:
def reconcile_items(self) -> bool:
"""Reconcile item submissions for a batch.

This method will first reconcile all bitstreams and metadata.
It will then iterate through the item metadata, creating instances
of the ItemSubmission class. For each instance, data will be loaded
in from the corresponding record in DynamoDB; if the record is not yet
recorded in DynamoDB, it will first create and save the record to the table.

Depending on the "current status" of the record in DynamoDB
(note that status=None for created records), the method will conditionally
update the records in the table with the status of the reconcile.

NOTE: This may not be the full set of item submissions in a batch
as there may be bitstreams (intended for item submissions)
for which an item identifier cannot be retrieved.
This method loops through the item metadata, creating an
instance of ItemSubmission with data loaded from a corresponding
record in DynamoDB (if it exists). For each ItemSubmission,
the method calls a workflow-specific reconcile method to
determine if it includes the required submission assets (bitstreams
and metadata), recording the results along the way.

After going through each ItemSubmission, the method provides an overall
summary of the results and returns a boolean indicating the status
for the batch:
- If any item submissions failed reconcile, returns False
- If all item submissions were reconciled, returns True

NOTE: This method is likely the first time a record will be inserted
into DynamoDB for each item submission. If already present,
its status will be updated.
"""
reconciled = self.reconcile_bitstreams_and_metadata()
reconciled_items = {} # key=item_identifier, value=list of bitstream URIs
bitstreams_without_metadata = [] # list of bitstream URIs
metadata_without_bitstreams = [] # list of item identifiers

# iterate over the results of the reconcile
# for all item submissions from the metadata
# loop through each item metadata
for item_metadata in self.item_metadata_iter():
# create or get existing ItemSubmission
item_submission = ItemSubmission.get(
batch_id=self.batch_id, item_identifier=item_metadata["item_identifier"]
item_submission = ItemSubmission.get_or_create(
batch_id=self.batch_id,
item_identifier=item_metadata["item_identifier"],
workflow_name=self.workflow_name,
)
if not item_submission:
item_submission = ItemSubmission.create(
batch_id=self.batch_id,
item_identifier=item_metadata["item_identifier"],
workflow_name=self.workflow_name,

# attach source metadata
item_submission.source_metadata = item_metadata

# get reconcile status and status details
status_details = None
try:
self.reconcile_item(item_submission)
except ReconcileFailedError as exception:
reconcile_status = ItemSubmissionStatus.RECONCILE_FAILED
status_details = str(exception)
Comment on lines +258 to +262
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is feeling pretty good. And FWIW, this did not seem obvious at the onset of this work; like an iterative finding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed!


if isinstance(exception, ReconcileFailedMissingBitstreamsError):
metadata_without_bitstreams.append(item_submission.item_identifier)
Comment on lines +264 to +265
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice application here: this feels like when controlled exceptions (though would have worked with enums) start to pay dividends.

else:
reconcile_status = ItemSubmissionStatus.RECONCILE_SUCCESS
reconciled_items[item_submission.item_identifier] = (
self.get_item_bitstream_uris(item_submission.item_identifier)
)

if item_submission.status not in [
# update the table if not yet reconciled
if item_submission.status in [
None,
ItemSubmissionStatus.RECONCILE_FAILED,
]:
continue
item_submission.last_run_date = self.run_date
item_submission.status = reconcile_status
item_submission.status_details = status_details
item_submission.upsert_db()

# update reconciliation status
item_submission.last_run_date = self.run_date
if item_submission.item_identifier in self.workflow_events.reconciled_items:
item_submission.status = ItemSubmissionStatus.RECONCILE_SUCCESS
else:
item_submission.status = ItemSubmissionStatus.RECONCILE_FAILED
logger.debug(
"Updated status for the item submission(item_identifier="
f"{item_submission.item_identifier}): {item_submission.status}"
)

logger.debug(
"Updating status for the item submission(item_identifier="
f"{item_submission.item_identifier}): {item_submission.status}"
# check for unmatched bitstreams
matched_bitstream_uris = reconciled_items.values()
bitstreams_without_metadata.extend(
list(
set(self.batch_bitstream_uris)
- set(itertools.chain(*matched_bitstream_uris))
)
)

# save status update
item_submission.upsert_db()
# attach results to workflow events
self._report_reconcile_workflow_events(
reconciled_items, bitstreams_without_metadata, metadata_without_bitstreams
)

return reconciled
# log results
reconcile_summary = {
"reconciled": len(reconciled_items),
"bitstreams_without_metadata": len(bitstreams_without_metadata),
"metadata_without_bitstreams": len(metadata_without_bitstreams),
}
logger.info(
f"Ran reconcile for batch '{self.batch_id}': {json.dumps(reconcile_summary)}"
)
if any((bitstreams_without_metadata, metadata_without_bitstreams)):
logger.warning("Failed to reconcile bitstreams and metadata")

if bitstreams_without_metadata:
logger.warning(
ReconcileFoundBitstreamsWithoutMetadataWarning(
bitstreams_without_metadata
)
)

if metadata_without_bitstreams:
logger.warning(
ReconcileFoundMetadataWithoutBitstreamsWarning(
metadata_without_bitstreams
)
)
return False

logger.info(
"Successfully reconciled bitstreams and metadata for all "
f"{len(reconciled_items)} item(s)"
)
return True

@abstractmethod
def reconcile_bitstreams_and_metadata(self) -> bool:
"""Reconcile bitstreams against metadata.
def reconcile_item(self, item_submission: ItemSubmission) -> bool:
"""Reconcile bitstreams and metadata for an item.

Items in DSpace represent a "work" and combine metadata and files,
known as "bitstreams". For any given workflow, this method ensures
the existence of both bitstreams and metadata for each item in the
batch, verifying that all provided bitstreams can be linked to a
metadata record and vice versa.

While this method is not needed for every workflow,
it MUST be overridden by all workflow subclasses.
If the workflow does not require this method, the override must
raise the following exception:
If an item fails reconcile, this method should raise
dsc.exceptions.ReconcileFailed*Error. Otherwise, return True.
"""

TypeError(
f"Method '{self.reconcile_bitstreams_and_metadata.__name__}' "
f"not used by workflow '{self.__class__.__name__}'."
)
def _report_reconcile_workflow_events(
self,
reconciled_items: dict,
bitstreams_without_metadata: list[str],
metadata_without_bitstreams: list[str],
) -> None:
"""Attach reconcile results to WorkflowEvents for reporting.

TODO: This method is a temporary workaround until reporting modules are updated
so that it no longer rely on WorkflowEvents.
"""
self.workflow_events.reconciled_items = reconciled_items
self.workflow_events.reconcile_errors["bitstreams_without_metadata"] = (
bitstreams_without_metadata
)
self.workflow_events.reconcile_errors["metadata_without_bitstreams"] = (
metadata_without_bitstreams
)

@final
def submit_items(self, collection_handle: str) -> list:
Expand Down
113 changes: 11 additions & 102 deletions dsc/workflows/base/simple_csv.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import itertools
import json
import logging
from collections import defaultdict
from collections.abc import Iterator
from typing import Any

import pandas as pd
import smart_open

from dsc.exceptions import (
ReconcileFoundBitstreamsWithoutMetadataWarning,
ReconcileFoundMetadataWithoutBitstreamsWarning,
)
from dsc.exceptions import ReconcileFailedMissingBitstreamsError
from dsc.item_submission import ItemSubmission
from dsc.utilities.aws import S3Client
from dsc.workflows.base import Workflow

Expand Down Expand Up @@ -40,103 +35,17 @@ def get_batch_bitstream_uris(self) -> list[str]:
)
)

def reconcile_bitstreams_and_metadata(
self, metadata_file: str = "metadata.csv"
) -> bool:
"""Reconcile item metadata from metadata CSV file with bitstreams.
def reconcile_item(self, item_submission: ItemSubmission) -> bool:
"""Check whether ItemSubmission is associated with any bitstreams.

For SimpleCSV workflows, bitstreams (files) and a metadata CSV file
are uploaded to a designated batch folder on S3. The reconcile method
ensures that every bitstream on S3 has metadata--a row in the metadata
CSV file--associated with it and vice versa.
This method will match bitstreams to an item submission by filtering the
list of URIs to those that include the item identifier as recorded
in the metadata CSV file. If it finds any matches, the item
submission is reconciled.
"""
logger.info(f"Reconciling bitstreams and metadata for batch '{self.batch_id}'")
reconciled: bool = False
reconcile_summary = {
"reconciled": 0,
"bitstreams_without_metadata": 0,
"metadata_without_bitstreams": 0,
}

# get metadata
metadata_item_identifiers = self._get_item_identifiers_from_metadata(
metadata_file
)

reconciled_items = self._match_metadata_to_bitstreams(
metadata_item_identifiers, self.batch_bitstream_uris
)
self.workflow_events.reconciled_items = reconciled_items

bitstreams_without_metadata = list(
set(self.batch_bitstream_uris)
- set(itertools.chain(*reconciled_items.values()))
)
metadata_without_bitstreams = list(
metadata_item_identifiers - set(reconciled_items.keys())
)
reconcile_summary.update(
{
"reconciled": len(reconciled_items),
"bitstreams_without_metadata": len(bitstreams_without_metadata),
"metadata_without_bitstreams": len(metadata_without_bitstreams),
}
)
logger.info(f"Reconcile results: {json.dumps(reconcile_summary)}")

if any((bitstreams_without_metadata, metadata_without_bitstreams)):
logger.warning("Failed to reconcile bitstreams and metadata")

if bitstreams_without_metadata:
logger.warning(
ReconcileFoundBitstreamsWithoutMetadataWarning(
bitstreams_without_metadata
)
)
self.workflow_events.reconcile_errors["bitstreams_without_metadata"] = (
bitstreams_without_metadata
)

if metadata_without_bitstreams:
logger.warning(
ReconcileFoundMetadataWithoutBitstreamsWarning(
metadata_without_bitstreams
)
)
self.workflow_events.reconcile_errors["metadata_without_bitstreams"] = (
metadata_without_bitstreams
)
else:
reconciled = True
logger.info(
"Successfully reconciled bitstreams and metadata for all "
f"{len(reconciled_items)} item(s)"
)

return reconciled

def _match_metadata_to_bitstreams(
self, item_identifiers: set[str], bitstream_filenames: list[str]
) -> dict:
metadata_with_bitstreams = defaultdict(list)
for item_identifier in item_identifiers:
for bitstream_filename in bitstream_filenames:
if item_identifier in bitstream_filename:
metadata_with_bitstreams[item_identifier].append(bitstream_filename)
return metadata_with_bitstreams

def _get_item_identifiers_from_metadata(
self, metadata_file: str = "metadata.csv"
) -> set[str]:
"""Get set of item identifiers from metadata file."""
item_identifiers = set()
item_identifiers.update(
[
item_metadata["item_identifier"]
for item_metadata in self.item_metadata_iter(metadata_file)
]
)
return item_identifiers
if not self.get_item_bitstream_uris(item_submission.item_identifier):
raise ReconcileFailedMissingBitstreamsError
return True

def item_metadata_iter(
self, metadata_file: str = "metadata.csv"
Expand Down
Loading