Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

Commit

Permalink
feat: update lambda state machine to accommodate tenantId (#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bingjiling committed Jun 30, 2021
1 parent 22711ae commit 9fedf56
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
18 changes: 16 additions & 2 deletions bulkExport/glueScripts/export-script.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
groupId = None
if ('--{}'.format('groupId') in sys.argv):
groupId = getResolvedOptions(sys.argv, ['groupId'])['groupId']
tenantId = None
if ('--{}'.format('tenantId') in sys.argv):
tenantId = getResolvedOptions(sys.argv, ['tenantId'])['tenantId']

job_id = args['jobId']
export_type = args['exportType']
Expand Down Expand Up @@ -60,12 +63,21 @@
}
)

print('Start filtering by tenantId')
# Filter by tenantId
if (tenantId is None):
filtered_tenant_id_frame = original_data_source_dyn_frame
else:
filtered_tenant_id_frame = Filter.apply(frame = original_data_source_dyn_frame,
f = lambda x:
x['_tenantId'] == tenantId)

print('Start filtering by transactionTime and Since')
# Filter by transactionTime and Since
datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ")
datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ")

filtered_dates_dyn_frame = Filter.apply(frame = original_data_source_dyn_frame,
filtered_dates_dyn_frame = Filter.apply(frame = filtered_tenant_id_frame,
f = lambda x:
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time
Expand All @@ -81,6 +93,7 @@
else x["documentStatus"] in valid_document_state_to_be_read_from and x["resourceType"] in type_list
)


# Drop fields that are not needed
print('Dropping fields that are not needed')
data_source_cleaned_dyn_frame = DropFields.apply(frame = filtered_dates_resource_dyn_frame, paths = ['documentStatus', 'lockEndTs', 'vid', '_references'])
Expand Down Expand Up @@ -124,7 +137,8 @@ def add_dup_resource_type(record):
source_s3_file_path = item['Key']
match = re.search(regex_pattern, source_s3_file_path)
new_s3_file_name = match.group(1) + "-" + match.group(2) + ".ndjson"
new_s3_file_path = job_id + '/' + new_s3_file_name
tenant_specific_path = '' if (tenantId is None) else tenantId + '/'
new_s3_file_path = tenant_specific_path + job_id + '/' + new_s3_file_name

copy_source = {
'Bucket': bucket_name,
Expand Down
6 changes: 3 additions & 3 deletions bulkExport/state-machine-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ definition:
updateStatusToFailed:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"jobId.$":"$.jobId", "status": "failed"}
Parameters: {"globalParams.$":"$", "status": "failed"}
Retry:
- ErrorEquals: [ "States.ALL" ]
End: true
updateStatusToCanceled:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"jobId.$":"$.jobId", "status": "canceled"}
Parameters: {"globalParams.$":"$", "status": "canceled"}
Retry:
- ErrorEquals: [ "States.ALL" ]
End: true
updateStatusToCompleted:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"jobId.$":"$.jobId", "status": "completed"}
Parameters: {"globalParams.$":"$", "status": "completed"}
Retry:
- ErrorEquals: [ "States.ALL" ]
End: true
Expand Down

0 comments on commit 9fedf56

Please sign in to comment.