Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# dspace-submission-composer
An application for creating messages for the [DSpace Submission Service application](https://github.com/MITLibraries/dspace-submission-service).
DSpace Submission Composer (DSC) is a Python CLI application that prepares items for ingest into DSpace.

# Application Description
DSC is a component of the DSpace Submission Orchestrator (DSO), a collection of microservices that form a data pipeline for ingesting items into DSpace repositories. The application's name highlights a key step of the DSC workflow in which it "composes" and sends a message to an SQS queue. These messages follow the specification set by the [DSpace Submission Service (DSS)](https://github.com/MITLibraries/dspace-submission-service), another component of DSO. Together, DSC and DSS follow a message-driven architecture, communicating over message queues in SQS.

Description of the app
See additional documentation in the :
* [Understanding and Running the DSC Workflow](docs/how_to_run.md)

## Development

Expand Down
192 changes: 192 additions & 0 deletions docs/how_to_run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# Understanding and Running DSC

This documentation describes the DSC application and how to run it.

**DISCLAIMER**: While the CLI application is runnable on its own, the DSO Step Function offers a simplified user interface for running the full ETL pipeline. For more details on the DSO Step Function and how to use it, see https://mitlibraries.atlassian.net/wiki/spaces/IN/pages/4690542593/DSpace+Submission+Orchestrator+DSO.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be focusing on documenting the step function in the DSO Confluence page next!


Running DSC consists of the following key steps:

1. Create a batch
2. Queue a batch for ingest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: I know the Step Function doc uses the word "queue" for the submit step as well but you might consider whether to just use "submit" to avoid potential confusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, might leave as-is for now. I like that it alludes to the SQS queues. 🤔

3. _Items are ingested into DSpace by DSS*_
4. Analyze ingest results

***Important:** DSC is not responsible for ingesting items into DSpace nor does it execute this process. This task is handled by [DSS](https://github.com/MITLibraries/dspace-submission-service), which is invoked via the [DSO Step Function](https://github.com/MITLibraries/dspace-submission-service).

### Create a batch
DSC processes deposits in "batches", a collection of item submissions grouped by a unique identifier. Generally, assets for batches are provided in one of two ways:

1. Requestors upload raw metadata and bitstreams (see [How To: Batch deposits with DSO for Content Owners](https://mitlibraries.atlassian.net/wiki/spaces/INF/pages/4411326470/How-To+Batch+deposits+with+DSpace+Submission+Orchestrator+DSO+for+Content+Owners))
2. DSC workflows retrieve raw metadata and/or bitstreams programmatically (via API requests)

Once all assets are stored in a "folder" in S3, DSC verifies that metadata and bitstreams have been provided for each item submission. It is only after _all_ item submissions have been verified that DSC will establish the batch by recording each item in DynamoDB.

At the end of this step:
* If all item submission assets are complete:
- A batch folder with complete item submission assets exists in the DSO S3 bucket
- Each item submission in the batch is recorded in DynamoDB (with `status="batch_created"`)
- **[OPTIONAL]** An email is sent reporting the number of created item submissions. The email includes a CSV file with the batch records from DynamoDB.
* If any item submission assets were invalid (missing metadata and/or bitstreams):
- A batch folder with incomplete item submission assets exists in the DSO S3 bucket
- **[OPTIONAL]** An email is sent reporting that zero item submissions were created. The email
includes a CSV file describing the failing item submissions with the corresponding error message.
Comment on lines +16 to +32
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:chef's kiss:

Looks great! I feel like it communicates quite a bit of complexity, pretty succintly.


**Data syncing**

✨ If the batch folder was already created (i.e., an S3 bucket in a different deployment environment), DSC can sync the data and avoid repeating data retrieval steps.

### Queue a batch for ingest
DSC retrieves the batch records from DynamoDB, and for each item submission, it performs the following steps:
* Determine whether the item submission should be sent to the DSS input queue
* Map/transform the source metadata to follow the Dublin Core schema
* Create and upload a metadata JSON file in the batch folder (under `dspace_metadata/`)
* Send a message to the DSS input queue

Note: The message is structured in accordance with the [Submission Message Specification](https://github.com/MITLibraries/dspace-submission-service/blob/main/docs/specifications/submission-message-specification.md).

At the end of this step:
* Batch records in DynamoDB are updated. Updates are made to the folllowing fields:
- `status`: Indicates submit status
- `status_details`: Set to error messages (if message failed to send)
- `last_run_date`: Set to current run date
- `submit_attempts`: Increments by 1
* **[OPTIONAL]** An email is sent reporting the counts for each submission status. The email includes a CSV file with the batch records from DynamoDB, reflecting the latest information.

### Items are ingested into DSpace by DSS
📌 **Reminder:** DSS is not executed by DSC and requires separate invocation.

DSS consumes the submission messages from the input queue in SQS. DSS uses a client to interact with DSpace. For each item submission, DSS reads the metadata JSON file and bitstreams from S3, using the information provided in the message, and creates an item with bitstreams in DSpace.

At the end of this step:
* Result messages are written to the output queue for DSC (`dss-output-dsc`).

Note: The message is structured in accordance with the [Result Message Specification](https://github.com/MITLibraries/dspace-submission-service/blob/main/docs/specifications/result-message-specification.md).

### Analyze ingest results
DSC consumes result messages from its output queue, parsing the messages to determine whether the associated item was ingested into DSpace. It then loops through the batch records from DynamoDB, updating those that have a corresponding result message. Additional steps with the item submission may be performed on behalf of the requestor (e.g., custom reports).

At the end of this step:
* Batch records in DynamoDB are updated. Updates are made to the folllowing fields:
- `dspace_handle`: Set to generated DSpace handle (if item was ingested)
- `status`: Indicates ingest status
- `status_details`: Set to error messages (if item failed ingest)
- `last_result_message`: Set to result message string
- `last_run_date`: Set to current run date
- `ingest_attempts`: Increments by 1
- Result messages are deleted from the queue
- If any errors occur during the processing of result message, the result message will remain in the queue.
- An email is sent reporting the counts for each ingest status. The email includes a CSV file with the batch records from DynamoDB, reflecting the latest information.

## The DSC CLI

### `pipenv run dsc`

```text
Usage: -c [OPTIONS] COMMAND [ARGS]...

DSC CLI.

Options:
-w, --workflow-name TEXT The workflow to use for the batch of DSpace
submissions [required]
-b, --batch-id TEXT A unique identifier for the workflow run, also
used as an S3 prefix for workflow run files
[required]
-v, --verbose Pass to log at debug level instead of info
--help Show this message and exit.

Commands:
create Create a batch of item submissions.
finalize Process the result messages from the DSS output queue...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excess periods here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, this was from text-wrapping -- i.e., shows up automatically when Click prints the help docs to console~

reconcile Reconcile bitstreams with item identifiers from the metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be removed or were we planning to do that later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! When we remove the reconcile code from DSC, we can update then. :)

submit Send a batch of item submissions to the DSpace Submission...
sync Sync data between two directories using the aws s3 sync...
```

### `pipenv run dsc -w <workflow-name> -b <batch-id> create`
Copy link
Contributor Author

@jonavellecuerdo jonavellecuerdo Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will notice that when using DSC, we will need to provide the --workflow-name / -w and --batch-id / -b (CLI options even if just want to run <command-name> help). This is because we currently define the options on the main command group and set them as required=True.

We could either:

A. Decorate each command with the -w and -b CLI options and remove setting in main command group.
B. Create a shared_cli_options decorator that contains all CLI options used by the CLI commands (i.e., see example in CDPS-CURT)
C. Do nothing

@ghukill @ehanson8 Let me know what you think! This would, of course, be outside the scope of this PR.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm easy. Whenever we get to that point, whichever approach is feeling right works for me. I feel like everytime I really dig into click, I learn a new way to do something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, just create a ticket with that comment and whoever picks it up can make the call


```text
Usage: -c create [OPTIONS]

Create a batch of item submissions.

Options:
--sync-data / --no-sync-data Invoke 'sync' CLI command.
--sync-dry-run Display the operations that would be performed
using the sync command without actually
running them
-s, --sync-source TEXT Source directory formatted as a local
filesystem path or an S3 URI in
s3://bucket/prefix form
-d, --sync-destination TEXT Destination directory formatted as a local
filesystem path or an S3 URI in
s3://bucket/prefix form
-e, --email-recipients TEXT The recipients of the batch creation results
email as a comma-delimited string
--help Show this message and exit.
```

**Important:** If the boolean flag `--sync-data` is set, the `sync` CLI command is invoked, which executes a basic [`aws s3 sync`](https://docs.aws.amazon.com/cli/latest/reference/s3/sync.html) command.

### `pipenv run dsc -w <workflow-name> -b <batch-id> submit`

```text
Usage: -c submit [OPTIONS]

Send a batch of item submissions to DSS.

Options:
-c, --collection-handle TEXT The handle of the DSpace collection to which
the batch will be submitted [required]
-e, --email-recipients TEXT The recipients of the submission results email
as a comma-delimited string
--help Show this message and exit.
```

### `pipenv run dsc -w <workflow-name> -b <batch-id> finalize`

```text
Usage: -c finalize [OPTIONS]

Process the result messages from the DSC output queue.

Options:
-e, --email-recipients TEXT The recipients of the submission results email
as a comma-delimited string [required]
--help Show this message and exit.
```

### `pipenv run dsc -w <workflow-name> -b <batch-id> sync`

```text
Usage: -c sync [OPTIONS]

Sync data between two directories using the aws s3 sync command.

If 'source' and 'destination' are not provided, the method will derive
values based on the required '--batch-id / -b' and 'workflow-name / -w'
options and S3 bucket env vars:

* source: batch path in S3_BUCKET_SYNC_SOURCE

* destination: batch path in S3_BUCKET_SUBMISSION_ASSETS

This command accepts both local file system paths and S3 URIs in
s3://bucket/prefix form. It synchronizes the contents of the source
directory to the destination directory, and is configured to delete files in
the destination that are not present in the source exclude files in the
dspace_metadata/ directory.

Although the aws s3 sync command recursively copies files, it ignores empty
directories from the sync.

Options:
-s, --source TEXT Source directory formatted as a local filesystem
path or an S3 URI in s3://bucket/prefix form
-d, --destination TEXT Destination directory formatted as a local
filesystem path or an S3 URI in s3://bucket/prefix
form
--dry-run Display the operations that would be performed using
the sync command without actually running them
--help Show this message and exit.
```
16 changes: 10 additions & 6 deletions dsc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def main(
batch_id: str,
verbose: bool, # noqa: FBT001
) -> None:
"""DSC CLI."""
ctx.ensure_object(dict)
ctx.obj["start_time"] = perf_counter()
workflow_class = Workflow.get_workflow(workflow_name)
Expand Down Expand Up @@ -90,7 +91,9 @@ def reconcile(ctx: click.Context) -> None:

@main.command()
@click.pass_context
@click.option("--sync-data/--no-sync-data", default=False)
@click.option(
"--sync-data/--no-sync-data", default=False, help=("Invoke 'sync' CLI command")
)
@click.option(
"--sync-dry-run",
is_flag=True,
Expand Down Expand Up @@ -200,14 +203,15 @@ def sync(
If 'source' and 'destination' are not provided, the method will derive values
based on the required '--batch-id / -b' and 'workflow-name / -w' options and
S3 bucket env vars:

* source: batch path in S3_BUCKET_SYNC_SOURCE

* destination: batch path in S3_BUCKET_SUBMISSION_ASSETS

This command accepts both local file system paths and S3 URIs in
s3://bucket/prefix form. It synchronizes the contents of the source directory
to the destination directory, and is configured to:
* --delete: delete files in the destination that are not present in the source
* --exclude metadata/*: exclude files in the dspace_metadata/ directory
to the destination directory, and is configured to delete files in the destination
that are not present in the source exclude files in the dspace_metadata/ directory.

Although the aws s3 sync command recursively copies files, it ignores
empty directories from the sync.
Expand Down Expand Up @@ -288,7 +292,7 @@ def submit(
collection_handle: str,
email_recipients: str | None = None,
) -> None:
"""Send a batch of item submissions to the DSpace Submission Service (DSS)."""
"""Send a batch of item submissions to DSS."""
workflow = ctx.obj["workflow"]
workflow.submit_items(collection_handle)

Expand All @@ -308,7 +312,7 @@ def submit(
required=True,
)
def finalize(ctx: click.Context, email_recipients: str) -> None:
"""Process the result messages from the DSS output queue according the workflow."""
"""Process the result messages from the DSC output queue."""
workflow = ctx.obj["workflow"]
workflow.finalize_items()
workflow.send_report(
Expand Down