Skip to content

Commit

Permalink
feat: remove auto_publish from migration (#7156)
Browse files Browse the repository at this point in the history
  • Loading branch information
nayib-jose-gloria committed Jun 14, 2024
1 parent 4295319 commit ed8cd5e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 186 deletions.
23 changes: 7 additions & 16 deletions .happy/terraform/modules/schema_migration/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ resource aws_sfn_state_machine sfn_schema_migration {
"Next": "ApplyDefaults",
"ResultPath": "$.inputDefaults",
"Parameters": {
"auto_publish": "False",
"limit_migration": "0"
}
},
Expand Down Expand Up @@ -221,10 +220,6 @@ resource aws_sfn_state_machine sfn_schema_migration {
"Name": "EXECUTION_ID",
"Value.$": "$$.Execution.Name"
},
{
"Name": "AUTO_PUBLISH",
"Value.$": "$.auto_publish"
},
{
"Name": "LIMIT_MIGRATION",
"Value.$": "$.limit_migration"
Expand Down Expand Up @@ -286,10 +281,6 @@ resource aws_sfn_state_machine sfn_schema_migration {
"Name": "COLLECTION_VERSION_ID",
"Value.$": "$.collection_version_id"
},
{
"Name": "CAN_PUBLISH",
"Value.$": "$.can_publish"
},
{
"Name": "TASK_TOKEN",
"Value.$": "$$.Task.Token"
Expand All @@ -314,7 +305,7 @@ resource aws_sfn_state_machine sfn_schema_migration {
"States.ALL"
],
"ResultPath": null,
"Next": "CollectionPublish"
"Next": "CollectionCleanup"
}
]
},
Expand All @@ -324,17 +315,17 @@ resource aws_sfn_state_machine sfn_schema_migration {
{
"Variable": "$.key_name",
"IsPresent": false,
"Next": "CollectionPublish"
"Next": "CollectionCleanup"
}
],
"Default": "SpanDatasets"
},
"CollectionPublish": {
"CollectionCleanup": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition": "${resource.aws_batch_job_definition.schema_migrations.arn}",
"JobName": "Collection_publish",
"JobName": "Collection_cleanup",
"JobQueue": "${var.job_queue_arn}",
"Timeout": {
"AttemptDurationSeconds": 600
Expand All @@ -343,7 +334,7 @@ resource aws_sfn_state_machine sfn_schema_migration {
"Environment": [
{
"Name": "STEP_NAME",
"Value": "collection_publish"
"Value": "collection_cleanup"
},
{
"Name": "MIGRATE",
Expand Down Expand Up @@ -497,14 +488,14 @@ resource aws_sfn_state_machine sfn_schema_migration {
"Key.$": "$.key_name"
}
},
"Next": "CollectionPublish",
"Next": "CollectionCleanup",
"MaxConcurrency": 10,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "CollectionPublish",
"Next": "CollectionCleanup",
"ResultPath": null
}
],
Expand Down
85 changes: 31 additions & 54 deletions backend/layers/processing/schema_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,53 +43,35 @@ def fetch_collections(self) -> Iterable[CollectionVersion]:
unpublished_collections = [*self.business_logic.get_collections(CollectionQueryFilter(is_published=False))]
return itertools.chain(unpublished_collections, published_collections)

def gather_collections(self, auto_publish: bool) -> Tuple[Dict[str, str], Dict[str, str]]:
def gather_collections(self) -> Tuple[Dict[str, str], Dict[str, str]]:
"""
This function is used to gather all the collections and their datasets that will be migrated
A json file is created and uploaded to S3 with the list of collections and datasets that will be migrated. It
has the following structure:
[
{"can_publish": "true", "collection_id": "<collection_id>", "collection_version_id":
"<collection_version_id>"},
{"can_publish": "false", "collection_id": "<collection_id>", "collection_version_id":
"<collection_version_id>"}
{"collection_id": "<collection_id>", "collection_version_id": "<collection_version_id>"},
{"collection_id": "<collection_id>", "collection_version_id": "<collection_version_id>"}
...
]
:param auto_publish: bool - if False, coerce can_publish to False for all collections. if True, determine
can_publish on collection-by-collection basis based on business logic
:return: the response retuned to the step function and the list of collections to be migrated
:return: the response returned to the step function and the list of collections to be migrated
"""
response_for_span_collections = []

has_migration_revision = set()
# iterates over unpublished collections first, so published versions are skipped if there is an active revision
for collection in self.fetch_collections():
_resp = {}
if collection.is_published() and collection.collection_id.id in has_migration_revision:
continue

if collection.is_published():
# published collection without an active revision
_resp["can_publish"] = str(True)
elif collection.is_unpublished_version():
# active revision of a published collection.
_resp["can_publish"] = str(False)
if collection.is_auto_version:
has_migration_revision.add(collection.collection_id.id) # migration revision found, skip published
_resp["can_publish"] = str(True)
elif collection.is_initial_unpublished_version():
# unpublished collection
_resp["can_publish"] = str(False)

if not auto_publish:
# auto_publish is off for this migration, overwrite "can_publish" as false in all cases.
_resp["can_publish"] = str(False)
_resp.update(
collection_id=collection.collection_id.id,
collection_version_id=collection.version_id.id,
execution_id=self.execution_id,
)
if collection.is_auto_version:
has_migration_revision.add(collection.collection_id.id) # migration revision found, skip published

_resp = {
"collection_id": collection.collection_id.id,
"collection_version_id": collection.version_id.id,
"execution_id": self.execution_id,
}
response_for_span_collections.append(_resp)

# For testing purposes, only migrate a randomly sampled subset of the collections gathered
Expand Down Expand Up @@ -139,16 +121,17 @@ def dataset_migrate(
}

def collection_migrate(
self, collection_id: str, collection_version_id: str, can_publish: bool
self,
collection_id: str,
collection_version_id: str,
) -> Tuple[Dict[str, str], Dict[str, str], List[Dict[str, str]]]:
"""
This function is used to migrate a collection and its datasets to the latest schema version.
:param collection_id: the canonical collection id
:param collection_version_id: the collection version to migrate
:param can_publish: if True, the collection will be published after migration
:return: the response retuned to the step function, the response for the publish_and_cleanup step function, and
the list of datasets to be migrated
:return: the response retuned to the step function, the response for the log_errors_and_cleanup step function,
and the list of datasets to be migrated
"""
# Get datasets from collection
version = self.business_logic.get_collection_version(CollectionVersionId(collection_version_id))
Expand All @@ -167,9 +150,7 @@ def collection_migrate(
"All datasets in the collection have been migrated", extra={"dataset_count": len(version.datasets)}
)
response_for_dataset_migrate = []
response_for_publish_and_cleanup = {
"can_publish": str(False), # skip publishing, because the collection is already published and no
# revision is created, or the collection is private or a revision.
response_for_log_errors_and_cleanup = {
"collection_version_id": collection_version_id,
}
response_for_sfn = {"collection_version_id": collection_version_id}
Expand All @@ -195,30 +176,31 @@ def collection_migrate(
if dataset.status.processing_status == DatasetProcessingStatus.SUCCESS
# Filter out datasets that are not successfully processed
]
response_for_publish_and_cleanup = {
"can_publish": str(can_publish),
response_for_log_errors_and_cleanup = {
"collection_version_id": private_collection_version_id,
}
response_for_sfn = {"collection_version_id": private_collection_version_id}
response_for_publish_and_cleanup["datasets"] = response_for_dataset_migrate
response_for_publish_and_cleanup["collection_url"] = collection_url
response_for_log_errors_and_cleanup["datasets"] = response_for_dataset_migrate
response_for_log_errors_and_cleanup["collection_url"] = collection_url

response_for_sfn["execution_id"] = self.execution_id

self._store_sfn_response("publish_and_cleanup", version.collection_id.id, response_for_publish_and_cleanup)
self._store_sfn_response(
"log_errors_and_cleanup", version.collection_id.id, response_for_log_errors_and_cleanup
)

if response_for_dataset_migrate:
key_name = self._store_sfn_response("span_datasets", version.collection_id.id, response_for_dataset_migrate)
response_for_sfn["key_name"] = key_name
return (response_for_sfn, response_for_publish_and_cleanup, response_for_dataset_migrate)
return (response_for_sfn, response_for_log_errors_and_cleanup, response_for_dataset_migrate)

def publish_and_cleanup(self, collection_version_id: str) -> list:
def log_errors_and_cleanup(self, collection_version_id: str) -> list:
errors = []
collection_version = self.business_logic.get_collection_version(CollectionVersionId(collection_version_id))
object_keys_to_delete = []

# Get the datasets that were processed
extra_info = self._retrieve_sfn_response("publish_and_cleanup", collection_version.collection_id.id)
extra_info = self._retrieve_sfn_response("log_errors_and_cleanup", collection_version.collection_id.id)
processed_datasets = {d["dataset_id"]: d["dataset_version_id"] for d in extra_info["datasets"]}

# Process datasets errors
Expand Down Expand Up @@ -275,8 +257,6 @@ def publish_and_cleanup(self, collection_version_id: str) -> list:
self.s3_provider.delete_files(self.artifact_bucket, object_keys_to_delete)
if errors:
self._store_sfn_response("report/errors", collection_version_id, errors)
elif extra_info["can_publish"].lower() == "true":
self.business_logic.publish_collection_version(collection_version.version_id)
return errors

def _store_sfn_response(self, directory: str, file_name: str, response: Dict[str, str]):
Expand Down Expand Up @@ -379,17 +359,14 @@ def migrate(self, step_name) -> bool:
self.logger.info(f"Starting {step_name}", extra={"step": step_name})
if step_name == "gather_collections":
gather_collections = self.error_wrapper(self.gather_collections, "gather_collections")
auto_publish = os.environ["AUTO_PUBLISH"].lower() == "true"
response, _ = gather_collections(auto_publish)
response, _ = gather_collections()
elif step_name == "collection_migrate":
collection_id = os.environ["COLLECTION_ID"]
collection_version_id = os.environ["COLLECTION_VERSION_ID"]
can_publish = os.environ["CAN_PUBLISH"].lower() == "true"
collection_migrate = self.error_wrapper(self.collection_migrate, collection_id)
response, _, _ = collection_migrate(
collection_id=collection_id,
collection_version_id=collection_version_id,
can_publish=can_publish,
)
elif step_name == "dataset_migrate":
collection_version_id = os.environ["COLLECTION_VERSION_ID"]
Expand All @@ -403,10 +380,10 @@ def migrate(self, step_name) -> bool:
dataset_id=dataset_id,
dataset_version_id=dataset_version_id,
)
elif step_name == "collection_publish":
elif step_name == "collection_cleanup":
collection_version_id = os.environ["COLLECTION_VERSION_ID"]
publish_and_cleanup = self.error_wrapper(self.publish_and_cleanup, collection_version_id)
response = publish_and_cleanup(collection_version_id=collection_version_id)
log_errors_and_cleanup = self.error_wrapper(self.log_errors_and_cleanup, collection_version_id)
response = log_errors_and_cleanup(collection_version_id=collection_version_id)
elif step_name == "report":
response = self.report()
self.logger.info("output", extra={"response": response})
Expand Down
Loading

0 comments on commit ed8cd5e

Please sign in to comment.