Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

feat: update lambda state machine to accommodate tenantId #367

Merged
merged 3 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions bulkExport/glueScripts/export-script.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
groupId = None
if ('--{}'.format('groupId') in sys.argv):
groupId = getResolvedOptions(sys.argv, ['groupId'])['groupId']
tenantId = None
if ('--{}'.format('tenantId') in sys.argv):
tenantId = getResolvedOptions(sys.argv, ['tenantId'])['tenantId']

job_id = args['jobId']
export_type = args['exportType']
Expand Down Expand Up @@ -60,12 +63,21 @@
}
)

print('Start filtering by tenantId')
# Filter by resource listed in Type and with correct STATUS
Bingjiling marked this conversation as resolved.
Show resolved Hide resolved
if (tenantId is None):
filtered_tenant_id_frame = original_data_source_dyn_frame
else:
filtered_tenant_id_frame = Filter.apply(frame = original_data_source_dyn_frame,
f = lambda x:
x['_tenantId'] == tenantId)
Comment on lines +71 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of doing glue side filtering would it be better to have a secondary index on the tenantId? This will become an expensive operation if we have to scan across all tenants

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm that's a great question. In the design doc, it specified the filtering is to be done as part of the Glue job, and secondary index was not introduced for any tables. @carvantes Any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The glue job always scans the entire DDB table no matter what, there's no way to use a query. This is a limitation on the current AWS Glue + DDB integration.

There are existing scenarios where this is far from ideal. e.g. exporting a single FHIR resource type or exporting the resources modified in the last hour will both scan the entire table.

There is room for improvement on the bulk export solution, but we are not changing the fundamentals here.


print('Start filtering by transactionTime and Since')
# Filter by transactionTime and Since
datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ")
datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ")

filtered_dates_dyn_frame = Filter.apply(frame = original_data_source_dyn_frame,
filtered_dates_dyn_frame = Filter.apply(frame = filtered_tenant_id_frame,
f = lambda x:
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time
Expand All @@ -81,6 +93,7 @@
else x["documentStatus"] in valid_document_state_to_be_read_from and x["resourceType"] in type_list
)


# Drop fields that are not needed
print('Dropping fields that are not needed')
data_source_cleaned_dyn_frame = DropFields.apply(frame = filtered_dates_resource_dyn_frame, paths = ['documentStatus', 'lockEndTs', 'vid', '_references'])
Expand Down Expand Up @@ -124,7 +137,8 @@ def add_dup_resource_type(record):
source_s3_file_path = item['Key']
match = re.search(regex_pattern, source_s3_file_path)
new_s3_file_name = match.group(1) + "-" + match.group(2) + ".ndjson"
new_s3_file_path = job_id + '/' + new_s3_file_name
tenant_specific_path = '' if (tenantId is None) else tenantId + '/'
new_s3_file_path = tenant_specific_path + job_id + '/' + new_s3_file_name

copy_source = {
'Bucket': bucket_name,
Expand Down
6 changes: 3 additions & 3 deletions bulkExport/state-machine-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ definition:
updateStatusToFailed:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"jobId.$":"$.jobId", "status": "failed"}
Parameters: {"globalParams.$":"$", "status": "failed"}
Retry:
- ErrorEquals: [ "States.ALL" ]
End: true
updateStatusToCanceled:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"jobId.$":"$.jobId", "status": "canceled"}
Parameters: {"globalParams.$":"$", "status": "canceled"}
Retry:
- ErrorEquals: [ "States.ALL" ]
End: true
updateStatusToCompleted:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"jobId.$":"$.jobId", "status": "completed"}
Parameters: {"globalParams.$":"$", "status": "completed"}
Retry:
- ErrorEquals: [ "States.ALL" ]
End: true
Expand Down