Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
545 changes: 269 additions & 276 deletions Pipfile.lock

Large diffs are not rendered by default.

68 changes: 62 additions & 6 deletions dsc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,83 @@
import click

from dsc.config import Config
from dsc.workflows.base import BaseWorkflow

logger = logging.getLogger(__name__)
CONFIG = Config()


@click.command()
@click.group()
@click.pass_context
@click.option(
"-w",
"--workflow-name",
help="The workflow to use for the batch of DSpace submissions",
required=True,
)
@click.option(
"-c",
"--collection-handle",
help="The handle of the DSpace collection to which the batch will be submitted",
required=True,
)
@click.option(
"-b",
"--batch-id",
help="The S3 prefix for the batch of DSpace submissions",
required=True,
)
@click.option(
"-v", "--verbose", is_flag=True, help="Pass to log at debug level instead of info"
)
def main(*, verbose: bool) -> None:
start_time = perf_counter()
def main(
ctx: click.Context,
workflow_name: str,
collection_handle: str,
batch_id: str,
verbose: bool, # noqa: FBT001
) -> None:
ctx.ensure_object(dict)
ctx.obj["start_time"] = perf_counter()

ctx.obj["workflow"] = BaseWorkflow.load(workflow_name, collection_handle, batch_id)

stream = StringIO()
root_logger = logging.getLogger()
logger.info(CONFIG.configure_logger(root_logger, stream, verbose=verbose))
logger.info(CONFIG.configure_sentry())
CONFIG.check_required_env_vars()
ctx.obj["stream"] = stream

logger.info("Running process")

# Do things here!

elapsed_time = perf_counter() - start_time
@main.result_callback()
@click.pass_context
def post_main_group_subcommand(
ctx: click.Context,
*_args: tuple,
**_kwargs: dict,
) -> None:
"""Callback for any work to perform after a main sub-command completes."""
logger.info(
"Total time to complete process: %s", str(timedelta(seconds=elapsed_time))
"Total time elapsed: %s",
str(
timedelta(seconds=perf_counter() - ctx.obj["start_time"]),
),
)


@main.command()
@click.pass_context
def reconcile(ctx: click.Context) -> None:
"""Reconcile bitstreams with item identifiers from the metadata."""
workflow = ctx.obj["workflow"]
no_bitstreams, no_item_identifiers = workflow.reconcile_bitstreams_and_metadata()

if no_bitstreams:
logger.error(f"No bitstreams found for these item identifiers: {no_bitstreams}")
if no_item_identifiers:
logger.error(
f"No item identifiers found for these bitstreams: {no_item_identifiers}"
)
4 changes: 4 additions & 0 deletions dsc/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ class InvalidSQSMessageError(Exception):
pass


class InvalidWorkflowNameError(Exception):
pass


class ItemMetadatMissingRequiredFieldError(Exception):
pass
Empty file removed dsc/utilities/__init__.py
Empty file.
173 changes: 145 additions & 28 deletions dsc/workflows/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
from __future__ import annotations

import json
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import TYPE_CHECKING, Any, final

from dsc.exceptions import (
InvalidDSpaceMetadataError,
InvalidWorkflowNameError,
ItemMetadatMissingRequiredFieldError,
)
from dsc.item_submission import ItemSubmission
from dsc.utilities.aws.s3 import S3Client

if TYPE_CHECKING:
from _collections_abc import dict_keys
from collections.abc import Iterator

from mypy_boto3_sqs.type_defs import SendMessageResultTypeDef
Expand All @@ -21,51 +26,163 @@
class BaseWorkflow(ABC):
"""A base workflow class from which other workflow classes are derived."""

workflow_name: str = "base"
submission_system: str = "DSpace@MIT"
email_recipients: tuple[str] = ("None",)
metadata_mapping_path: str = ""
s3_bucket: str = ""
output_queue: str = ""
Comment on lines +29 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our recent discussions around how the workflows will be initialized made me wonder how these values are set for workflow instances.

Recalling what was written in the DSC engineering plan:

The application will be deployed as an ECS task that is executed via CLI commands using the boto3 library.


Expanding the instructions in the Example Application Run, these are the steps I figured I'd need to do:

Prerequisite

  1. Open a terminal, navigate to my local clone of DSC, and activate my virtual environment.

Running reconcile command

  1. Set up a .env file at the root of the DSC app directory. Looking at the current definition of the reconcile command, it seems like it needs to know the following values for a given workflow: [s3_bucket, batch_path]. The .env file should look like:
    DSC_S3_BUCKET="dsc"
  2. Run reconcile command in terminal:
    dsc -w <workflow-name> -b <batch_id> -c <collection-handle> reconcile
    

Running deposit command

Option A: Running deposit via the DSC CLI

  1. Set up a .env file at the root of the DSC app directory.
    DSC_S3_BUCKET="dsc"
    DSC_SUBMISSION_SYSTEM="DSpace@MIT"
    DSC_OUTPUT_QUEUE="output-queue"
  2. Run deposit command in terminal
    dsc -w <workflow-name> -b <batch-id> -c <collection-handle> deposit 
    

Option B: Running deposit ECS task using AWS CLI

  1. Set up a JSON file with overrides for the ECS task (e.g., overrides.json)

    {
     "containerOverrides": [
         {
             "name": "dsc-ecs-<env>",
             "command": [
                 "-w",
                 "<workflow-name>",
                 "-b",
                 "<batch-id>",
                 "-c",
                 "<collection-handle>",
                 "deposit"
             ],
             "environment": [
                 {
                     "name": "DSC_S3_BUCKET",
                     "value": "dsc"
                 },
                 {
                     "name": "DSC_SUBMISSION_SYSTEM",
                     "value": "DSpace@MIT"
                 },
                 {
                     "name": "DSC_OUTPUT_QUEUE",
                     "value": "<output-queue-name>"
                 }
             ]
         }
     ]
    }
  2. Run aws ecs run-task CLI command in terminal (see Carbon Makefile for example).

    aws ecs run-task --cluster dsc-ecs-<env> --task-definition dsc-ecs-<env>-workflow-deposit --launch-type="FARGATE" --region us-east-1 --network-configuration '{"awsvpcConfiguration": {"subnets": [<subnets>], "securityGroups": [<security-groups>],"assignPublicIp": "DISABLED"}}' --overrides file://overrides.json
    

Option C: Running deposit ECS task using ??? DSC CLI command using the boto3 library.
Note: This is how I interpreted what is currently written in the Engineering Plan. 🤔 I believe the idea was from our recent work with the Alma SAP Invoices UI where we defined an ECSClient to run ECS tasks.

This work needs to be scoped out some more, but in terms of passing...workflow parameter values, but it should support accessing workflow parameter values set in a .env via the Config.


Takeaways

  • Environment variable names subject to change (curious what folks would find helpful)🤓

  • Change request: Include additional variables related to reconcile, deposit, and finalize DSC CLI commands to REQUIRED_ENV_VARS and OPTIONAL_ENV_VARS in dsc.Config.

  • Request: Have a discussion as a team to scope out workflow implementation~

    • Create a ticket to scope out a CLI command that runs an ECS task for either DSC deposit CLI command.
    • Create a ticket for defining an ECSClient

Also, I think this started with having some questions about how the workflow attributes retrieve values.

  • I think it makes sense for workflow_name and metadata_mapping to be hardcoded in the class definition.
  • To support flexibility, I think submission_system, email_recipients, and s3_bucket should be accessed as environment variables via the Config. 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted by @ehanson8 in Slack, the questions/comments above do not need to be addressed via this PR, but it has prompted a discussion about workflow implementation to be addressed in future PRs!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We synced up on env vars and arrived at the following decision:

  • Explicitly set class attributes in the workflow module:
class BaseWorkflow(ABC):
    """A base workflow class from which other workflow classes are derived."""

    workflow_name: str = "base"
    submission_system: str = "DSpace@MIT"
    email_recipients: tuple[str] = ("None",)
    metadata_mapping_path: str = ""
    s3_bucket: str = "dsc"
    output_queue: str = "dsc-unhandled"
  • Revisit use of env vars later!


def __init__(
self,
workflow_name: str,
submission_system: str,
email_recipients: list[str],
metadata_mapping: dict,
s3_bucket: str,
batch_id: str,
collection_handle: str,
output_queue: str,
batch_id: str,
) -> None:
"""Initialize base instance.

Args:
workflow_name: The name of the workflow.
submission_system: The system to which item submissions will be sent
(e.g. DSpace@MIT).
email_recipients: The email addresses to notify after runs of
the workflow.
metadata_mapping: A mapping file for generating DSpace metadata
from the workflow's source metadata.
s3_bucket: The S3 bucket containing bitstream and metadata files for
the workflow.
collection_handle: The handle of the DSpace collection to which
submissions will be uploaded.
batch_id: Unique identifier for a 'batch' deposit that corresponds
to the name of a subfolder in the workflow directory of the S3 bucket.
This subfolder is where the S3 client will search for bitstream
and metadata files.
collection_handle: The handle of the DSpace collection to which
submissions will be uploaded.
output_queue: The SQS output queue used for retrieving result messages
from the workflow's submissions.
"""
self.workflow_name: str = workflow_name
self.submission_system: str = submission_system
self.email_recipients: list[str] = email_recipients
self.metadata_mapping: dict = metadata_mapping
self.s3_bucket: str = s3_bucket
self.batch_id: str = batch_id
self.collection_handle: str = collection_handle
self.output_queue: str = output_queue
self.batch_id = batch_id
self.collection_handle = collection_handle

@property
def batch_path(self) -> str:
return f"{self.workflow_name}/{self.batch_id}"

@property
def metadata_mapping(self) -> dict:
with open(self.metadata_mapping_path) as mapping_file:
return json.load(mapping_file)

@final
@classmethod
def load(
cls, workflow_name: str, collection_handle: str, batch_id: str
) -> BaseWorkflow:
"""Return configured workflow class instance.

Args:
workflow_name: The label of the workflow. Must match a key from
config.WORKFLOWS.
collection_handle: The handle of the DSpace collection to which the batch will
be submitted.
batch_id: The S3 prefix for the batch of DSpace submissions.
"""
workflow_class = cls.get_workflow(workflow_name)
return workflow_class(
collection_handle=collection_handle,
batch_id=batch_id,
)

@final
@classmethod
def get_workflow(cls, workflow_name: str) -> type[BaseWorkflow]:
"""Return workflow class.

Args:
workflow_name: The label of the workflow. Must match a workflow_name attribute
from BaseWorkflow subclass.
"""
for workflow_class in BaseWorkflow.__subclasses__():
if workflow_name == workflow_class.workflow_name:
return workflow_class
raise InvalidWorkflowNameError(f"Invalid workflow name: {workflow_name} ")

def reconcile_bitstreams_and_metadata(self) -> tuple[set[str], set[str]]:
"""Reconcile bitstreams against metadata.

Generate a list of bitstreams without item identifiers and item identifiers
without bitstreams. Any discrepancies will be addressed by the engineer and
stakeholders as necessary.
"""
bitstream_dict = self._build_bitstream_dict()

# extract item identifiers from batch metadata
item_identifiers = [
self.get_item_identifier(item_metadata)
for item_metadata in self.item_metadata_iter()
]

# reconcile item identifiers against bitstreams
item_identifiers_with_bitstreams = self._match_item_identifiers_to_bitstreams(
bitstream_dict.keys(), item_identifiers
)

bitstreams_with_item_identifiers = self._match_bitstreams_to_item_identifiers(
bitstream_dict.keys(), item_identifiers
)

logger.info(
"Item identifiers from batch metadata with matching bitstreams: "
f"{item_identifiers_with_bitstreams}"
)

item_identifiers_without_bitstreams = set(item_identifiers) - set(
item_identifiers_with_bitstreams
)
bitstreams_without_item_identifiers = set(bitstream_dict.keys()) - set(
bitstreams_with_item_identifiers
)

return item_identifiers_without_bitstreams, bitstreams_without_item_identifiers

def _build_bitstream_dict(self) -> dict:
"""Build a dict of potential bitstreams with an item identifier for the key.

An underscore (if present) serves as the delimiter between the item identifier
and any additional suffixes in the case of multiple matching bitstreams.
"""
s3_client = S3Client()
bitstreams = list(
s3_client.files_iter(bucket=self.s3_bucket, prefix=self.batch_path)
)
bitstream_dict: dict[str, list[str]] = defaultdict(list)
for bitstream in bitstreams:
file_name = bitstream.split("/")[-1]
item_identifier = file_name.split("_")[0] if "_" in file_name else file_name
bitstream_dict[item_identifier].append(bitstream)
return bitstream_dict

def _match_bitstreams_to_item_identifiers(
self, bitstreams: dict_keys, item_identifiers: list[str]
) -> list[str]:
"""Create list of bitstreams matched to item identifiers.

Args:
bitstreams: A dict of S3 files with base file IDs and full URIs.
item_identifiers: A list of item identifiers retrieved from the batch
metadata.
"""
return [
file_id
for item_identifier in item_identifiers
for file_id in bitstreams
if file_id == item_identifier
]

def _match_item_identifiers_to_bitstreams(
self, bitstreams: dict_keys, item_identifiers: list[str]
) -> list[str]:
"""Create list of item identifers matched to bitstreams.

Args:
bitstreams: A dict of S3 files with base file IDs and full URIs.
item_identifiers: A list of item identifiers retrieved from the batch
metadata.
"""
return [
item_identifier
for file_id in bitstreams
for item_identifier in item_identifiers
if file_id == item_identifier
]

@final
def run(self) -> Iterator[SendMessageResultTypeDef]:
"""Run workflow to submit items to the DSpace Submission Service."""
Expand Down
2 changes: 2 additions & 0 deletions dsc/workflows/base/simple_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class SimpleCSV(BaseWorkflow):
deposit on S3.
"""

workflow_name: str = "simple_csv"

def item_metadata_iter(
self, metadata_file: str = "metadata.csv"
) -> Iterator[dict[str, Any]]:
Expand Down
Loading
Loading