Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
Push events to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanmills committed Jan 27, 2023
1 parent fb40502 commit d3b41d7
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
1 change: 1 addition & 0 deletions terraform/modules/consumer/get-event.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ resource "aws_lambda_function" "get_event" {
client_secret = var.consumer_client_secret
lev_api_url = var.lev_api_url
cloudwatch_metric_namespace = "${var.environment}-example-consumer"
queue_name = module.consumer_queue.queue_name
}
}
tracing_config {
Expand Down
33 changes: 26 additions & 7 deletions terraform/modules/consumer/lambdas/get_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@
cloudwatch = boto3.client("cloudwatch")
cloudwatch_namespace = os.environ["cloudwatch_metric_namespace"]

sqs = boto3.client("sqs")
queue_name = os.environ["queue_name"]
queue_url = sqs.get_queue_url(QueueName=queue_name)["QueueUrl"]

def lambda_handler(event, _context):
logger.info(f"## EVENT: {event}")
logger.info(f"## TIME: {datetime.now()}")

auth_token = get_auth_token(auth_url, client_id, client_secret)
retrieved_event = get_event(auth_token, event["event_id"])
lev_record = get_lev_record(retrieved_event["sourceId"])
lev_record = get_lev_record(retrieved_event["data"]["attributes"]["sourceId"])

for datum in retrieved_event["eventData"].keys():
for datum in retrieved_event["data"]["attributes"]["eventData"].keys():
assert_matches_lev(retrieved_event, lev_record, datum)

return {"event_id": retrieved_event["eventId"]}
push_event_to_queue(retrieved_event)

return {"event_id": retrieved_event["data"]["id"]}


def get_event(auth_token: str, event_id: str):
Expand All @@ -51,6 +56,15 @@ def get_event(auth_token: str, event_id: str):
return json.loads(response.read())


def push_event_to_queue(event):
queue_event = event["data"]["attributes"]
queue_event["id"] = event["data"]["id"]
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(queue_event)
)


def get_lev_record(record_id: str):
record_request = request.Request(
f"{lev_api_url}/v1/registration/death/{record_id}",
Expand All @@ -64,9 +78,12 @@ def assert_matches_lev(retrieved_event, lev_record, datum):
"""Fetch the matching record from the LEV API and assert matches.
This is not needed in a normal consumer. Only for validation."""
provided_data = retrieved_event["eventData"][datum]
lev_key = map_to_lev_key(datum)
lev_data = lev_record["deceased"][lev_key]
provided_data = retrieved_event["data"]["attributes"]["eventData"][datum]
if datum == "registrationDate":
lev_data = lev_record["date"]
else:
lev_key = map_to_lev_key(datum)
lev_data = lev_record["deceased"][lev_key]
if provided_data != lev_data:
logger.error(f"Data mismatch. Provided data: {provided_data}, lev key: {lev_key}, lev data: {lev_data}")
record_metric(cloudwatch, cloudwatch_namespace, "GET_EVENT.DataMatchFailure", 1)
Expand All @@ -75,7 +92,7 @@ def assert_matches_lev(retrieved_event, lev_record, datum):


def map_to_lev_key(key: str) -> str:
if key == "firstName":
if key == "firstNames":
return "forenames"
elif key == "lastName":
return "surname"
Expand All @@ -85,3 +102,5 @@ def map_to_lev_key(key: str) -> str:
return "dateOfDeath"
elif key == "address":
return "address"
else:
return key
4 changes: 2 additions & 2 deletions terraform/modules/consumer/sqs.tf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module "consumer_queue" {
source = "../sqs"
source = "../sqs"
environment = var.environment
queue_name = "${var.environment}-example-consumer-queue"
queue_name = "${var.environment}-example-consumer-queue"
}

data "aws_iam_policy_document" "lambda_sqs_access" {
Expand Down

0 comments on commit d3b41d7

Please sign in to comment.