Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ click = "*"
jinja2 = "*"
jsonschema = "*"
pandas = "*"
pynamodb = "*"
sentry-sdk = "*"
smart_open = {extras = ["s3"], version = "*"}

Expand Down
14 changes: 13 additions & 1 deletion Pipfile.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Several DSC workflows involve reading metadata CSV files and searching for bitst
SENTRY_DSN=### If set to a valid Sentry DSN, enables Sentry exception monitoring. This is not needed for local development.
WORKSPACE=### Set to `dev` for local development, this will be set to `stage` and `prod` in those environments by Terraform.
AWS_REGION_NAME=### Default AWS region.
ITEM_TABLE_NAME=### The name of the table in DynamoDB used for tracking the state of an 'item' across DSC workflow executions.
S3_BUCKET_SUBMISSION_ASSETS=### The name of the S3 bucket for DSC workflows holding submission assets. The bucket name will typically be formatted as "dsc-<workspace>-<aws-account-id". The bucket will contain separate prefixes (folders) for different DSC workflows. Within each workflow prefix are "subfolders" representing a batch, which contains the files and metadata uploaded by users and the DSpace metadata JSON files generated by DSC.
SOURCE_EMAIL=### The email address from which reports are sent.
SQS_QUEUE_DSS_INPUT=### The name of the SQS queue to which submission messages are sent. This must be a valid input queue for DSS.
Expand Down
8 changes: 8 additions & 0 deletions dsc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Config:
"WORKSPACE",
"SENTRY_DSN",
"AWS_REGION_NAME",
"ITEM_TABLE_NAME",
"S3_BUCKET_SUBMISSION_ASSETS",
"SOURCE_EMAIL",
"SQS_QUEUE_DSS_INPUT",
Expand All @@ -29,6 +30,13 @@ def sentry_dsn(self) -> str:
def aws_region_name(self) -> str:
return os.getenv("AWS_REGION_NAME", "us-east-1")

@property
def item_table_name(self) -> str:
value = os.getenv("ITEM_TABLE_NAME")
if not value:
raise OSError("Env var 'ITEM_TABLE_NAME' must be defined")
return value

@property
def s3_bucket_submission_assets(self) -> str:
value = os.getenv("S3_BUCKET_SUBMISSION_ASSETS")
Expand Down
Empty file added dsc/db/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions dsc/db/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ItemSubmissionExistsError(Exception):
pass
145 changes: 145 additions & 0 deletions dsc/db/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from enum import StrEnum
from typing import TypedDict, Unpack

from pynamodb.attributes import (
JSONAttribute,
NumberAttribute,
UnicodeAttribute,
UTCDateTimeAttribute,
)
from pynamodb.exceptions import PutError
from pynamodb.models import Model

from dsc.db.exceptions import ItemSubmissionExistsError


class ItemSubmissionStatus(StrEnum):
RECONCILE_SUCCESS = "reconcile_success"
RECONCILE_FAILED = "reconcile_failed"
SUBMIT_SUCCESS = "submit_success"
SUBMIT_FAILED = "submit_failed"
SUBMIT_MAX_RETRIES_REACHED = "submit_max_retries_reached"
INGEST_SUCCESS = "ingest_success"
INGEST_FAILED = "ingest_failed"
INGEST_UNKNOWN = "ingest_unknown"
INGEST_MAX_RETRIES_REACHED = "ingest_max_retries_reached"
Comment on lines +16 to +25
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to defining the allowed statuses here. Do you have plans to use this elsewhere in code? Perhaps in the business logic when a particular status will get set?

Even if this commit / PR doesn't do that, I think this enum sets us up nicely for that.



class OptionalItemAttributes(TypedDict, total=False):
dspace_handle: str
status: str
status_details: str
ingest_date: str
last_submission_message: str
last_result_message: str
last_run_date: str
submit_attempts: int
ingest_attempts: int


class ItemSubmissionDB(Model):
"""A DynamoDB model representing an item submission.

This model stores information about the state of an item submission
as it progresses through the DSC workflow. The table uses a
composite primary key consisting of 'batch_id' (partition key)
and item_identifier (sort key).

Attributes:
batch_id [partition key]: A unique identifier for the workflow run,
also used as an S3 prefix for workflow run files.
item_identifier [sort key]: A unique identifier for an item submission
in a batch.
workflow_name: The name of the DSC workflow.
dspace_handle: A persistent, globally unique identifier for a digital object
in DSpace. The handle is provided in the DSS result message when
an item is successfully ingested into DSpace.
NOTE: If the item is sent to a DSpace submission queue, the handle is
NOT provided.
status: The current state of an item submission in the DSC workflow.
See dsc.db.models.ItemSubmissionStatus for accepted values.
status_details: Additional details regarding the status of an item
(e.g., error messages).
ingest_date: A date representing when an item was successfully ingested
into DSpace. In DynamoDB, the date is stored as a string
(in ISO 8601 format).
last_submission_message: A serialized JSON string of the latest (most recent)
submission message composed and sent to the input SQS queue via
the submit command.
last_result_message: A serialized JSON string of the latest (most recent)
result message composed and sent to the output SQS queue for DSC via DSS.
last_run_date: A date representing the last time a DSC CLI command was executed
on the item. In DynamoDB, the date is stored as a string (in ISO 8601 format).
submit_attempts: The number of attempts to send a submission message to the
input SQS queue for DSC. This value is only incremented when the DSC
submit command is run for an item.
ingest_attempts: The number of attempts to ingest an item into DSpace (run DSS).
This value is only incremented when the DSC finalize command is run for
an item.
"""

class Meta: # noqa: D106
table_name = "dsc"

batch_id = UnicodeAttribute(hash_key=True)
item_identifier = UnicodeAttribute(range_key=True)
workflow_name = UnicodeAttribute()
dspace_handle = UnicodeAttribute(null=True)
status = UnicodeAttribute(null=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting for posterity what you shared @jonavellecuerdo, that pynamodb does not support an enum or list of accepted values. I think that's okay.

status_details = UnicodeAttribute(null=True)
ingest_date = UTCDateTimeAttribute(null=True)
last_submission_message = JSONAttribute(null=True)
last_result_message = JSONAttribute(null=True)
last_run_date = UTCDateTimeAttribute(null=True)
submit_attempts = NumberAttribute(default_for_new=0)
ingest_attempts = NumberAttribute(default_for_new=0)

@classmethod
def set_table_name(cls, table_name: str) -> None:
"""Set Meta.table_name attribute.

The table name must be set dynamically rather than from an env variable
due to the current configuration process.

Args:
table_name: The name of the DynamoDB table.
"""
cls.Meta.table_name = table_name

@classmethod
def create(
cls,
item_identifier: str,
batch_id: str,
workflow_name: str,
**attributes: Unpack[OptionalItemAttributes],
) -> None:
"""Create a new item (row) in the 'dsc-item-submissions' table.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"item" as in "DynamoDB item"


This method also calls self.save() to write the item to DynamoDB.
A condition is included in the 'save' call to prevent overwriting
entries in the table that have the same primary keys.

If the call to the save method fails due to the set condition, a
db.exceptions.X is raised; otherwise, it re-raises
the pynamodb.exceptions.PutError.
"""
item = cls(
item_identifier=item_identifier,
batch_id=batch_id,
workflow_name=workflow_name,
**attributes,
)

try:
item.save(
condition=cls.item_identifier.does_not_exist()
& cls.batch_id.does_not_exist()
)
except PutError as exception:
if exception.cause_response_code == "ConditionalCheckFailedException":
raise ItemSubmissionExistsError(
f"Item with item_identifier={item_identifier} (hash key) and "
f"batch_id={batch_id} (range_key) already exists"
) from exception
raise
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ ignore = [
"PLR0913",
"PLR0915",
"PTH",
"S320",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ruff removed this rule.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they removed a bunch so I had to remove several #noqas in s3-bagit-validator, something for all of us to keep an eye out for as we update dependencies in various repos!

"S321",
"TRY003",
]
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from moto import mock_aws

from dsc.config import Config
from dsc.db.models import ItemSubmissionDB
from dsc.item_submission import ItemSubmission
from dsc.utilities.aws.s3 import S3Client
from dsc.utilities.aws.ses import SESClient
Expand Down Expand Up @@ -89,8 +90,10 @@ def _test_env(monkeypatch):
monkeypatch.setenv("SENTRY_DSN", "None")
monkeypatch.setenv("WORKSPACE", "test")
monkeypatch.setenv("AWS_REGION_NAME", "us-east-1")
monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some AWS services use AWS_REGION_NAME and others use AWS_DEFAULT_REGION to set default region values. When running DSC in AWS, it will use the appropriate env var, which is why I didn't feel it was necessary to add AWS_DEFAULT_REGION to dsc.config.Config. 🤔

monkeypatch.setenv("AWS_ACCESS_KEY_ID", "testing")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "testing")
monkeypatch.setenv("ITEM_TABLE_NAME", "dsc-test")
monkeypatch.setenv("S3_BUCKET_SUBMISSION_ASSETS", "dsc")
monkeypatch.setenv("SOURCE_EMAIL", "noreply@example.com")
monkeypatch.setenv("SQS_QUEUE_DSS_INPUT", "mock-input-queue")
Expand Down Expand Up @@ -185,6 +188,14 @@ def item_submission_instance(dspace_metadata):
)


@pytest.fixture
def mocked_item_db():
with mock_aws():
if not ItemSubmissionDB.exists():
ItemSubmissionDB.create_table()
yield


@pytest.fixture
def metadata_mapping():
with open("tests/fixtures/test_metadata_mapping.json") as mapping_file:
Expand Down
28 changes: 28 additions & 0 deletions tests/test_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pytest

from dsc.db.exceptions import ItemSubmissionExistsError
from dsc.db.models import ItemSubmissionDB


def test_db_itemsubmission_create_success(mocked_item_db):
ItemSubmissionDB.create(
batch_id="batch-aaa", item_identifier="123", workflow_name="workflow"
)

# retrieve created 'item' from ItemDB
fetched_item = ItemSubmissionDB.get(hash_key="batch-aaa", range_key="123")

assert fetched_item.item_identifier == "123"
assert fetched_item.batch_id == "batch-aaa"
assert fetched_item.workflow_name == "workflow"


def test_db_itemsubmission_create_if_exists_raise_error(mocked_item_db):
ItemSubmissionDB.create(
batch_id="batch-aaa", item_identifier="123", workflow_name="workflow"
)

with pytest.raises(ItemSubmissionExistsError):
ItemSubmissionDB.create(
batch_id="batch-aaa", item_identifier="123", workflow_name="workflow"
)