Skip to content

Commit

Permalink
Queue to hold corrupted/inexistent XML files (Closes #89)
Browse files Browse the repository at this point in the history
  • Loading branch information
fredliporace committed Aug 30, 2022
1 parent 9d07694 commit ac3dfe5
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 22 deletions.
71 changes: 49 additions & 22 deletions cbers2stac/process_new_scene_queue/code.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import re
from typing import Any, Dict, Generator
from xml.etree.ElementTree import ParseError

from botocore.exceptions import ClientError

Expand Down Expand Up @@ -259,6 +260,7 @@ def process_trigger(
sns_reconcile_target_arn: str,
catalog_update_queue: str,
catalog_update_table: str,
corrupted_xml_queue: str,
):
"""
Read quicklook queue and create STAC items if necessary.
Expand All @@ -273,6 +275,8 @@ def process_trigger(
STAC items for updating the
catalog structure
catalog_update_table: DynamoDB that hold the catalog update requests
corrupted_xml_queue: URL of queue that receive the keys associated with
corrupted/inexistent XML files.
"""

buckets = {
Expand All @@ -285,19 +289,28 @@ def process_trigger(
eff_sns_target_arn = sns_reconcile_target_arn
else:
eff_sns_target_arn = sns_target_arn
process_message(
{"key": rec["s3"]["object"]["key"]},
{
**buckets,
**{
"cog": rec["s3"]["bucket"]["name"],
"metadata": cog_pds_meta_pds[rec["s3"]["bucket"]["name"]],
try:
process_message(
{"key": rec["s3"]["object"]["key"]},
{
**buckets,
**{
"cog": rec["s3"]["bucket"]["name"],
"metadata": cog_pds_meta_pds[rec["s3"]["bucket"]["name"]],
},
},
},
eff_sns_target_arn,
catalog_update_queue,
catalog_update_table,
)
eff_sns_target_arn,
catalog_update_queue,
catalog_update_table,
)
except ParseError:
LOGGER.info(
"Corrupted XML for %s quicklook.",
rec["s3"]["object"]["key"].split("/")[-1],
)
get_client("sqs").send_message(
QueueUrl=corrupted_xml_queue, MessageBody=rec["s3"]["object"]["key"]
)


def process_queue(
Expand All @@ -309,6 +322,7 @@ def process_queue(
sns_reconcile_target_arn: str,
catalog_update_queue: str,
catalog_update_table: str,
corrupted_xml_queue: str,
delete_processed_messages: bool = False,
):
"""
Expand All @@ -324,6 +338,8 @@ def process_queue(
catalog_update_queue: URL of queue that receives new STAC
items for updating the catalog structure
catalog_update_table: DynamoDB that hold the catalog update requests
corrupted_xml_queue: URL of queue that receive the keys associated with
corrupted/inexistent XML files.
delete_processed_messages: if True messages are deleted from queue
after processing
"""
Expand All @@ -334,16 +350,25 @@ def process_queue(
processed_messages = 0
for msg in sqs_messages(queue):

process_message(
msg,
{
**buckets,
**{"cog": msg["bucket"], "metadata": cog_pds_meta_pds[msg["bucket"]]},
},
sns_reconcile_target_arn,
catalog_update_queue,
catalog_update_table,
)
try:
process_message(
msg,
{
**buckets,
**{
"cog": msg["bucket"],
"metadata": cog_pds_meta_pds[msg["bucket"]],
},
},
sns_reconcile_target_arn,
catalog_update_queue,
catalog_update_table,
)
except ParseError:
LOGGER.info("Corrupted XML for %s quicklook.", msg["key"].split("/")[-1])
get_client("sqs").send_message(
QueueUrl=corrupted_xml_queue, MessageBody=msg["key"]
)

# Remove message from queue
if delete_processed_messages:
Expand Down Expand Up @@ -373,6 +398,7 @@ def handler(event, context): # pylint: disable=unused-argument
sns_reconcile_target_arn=os.environ["SNS_RECONCILE_TARGET_ARN"],
catalog_update_queue=os.environ.get("CATALOG_UPDATE_QUEUE"),
catalog_update_table=os.environ["CATALOG_UPDATE_TABLE"],
corrupted_xml_queue=os.environ["corrupted_xml_queue_url"],
delete_processed_messages=int(os.environ["DELETE_MESSAGES"]) == 1,
)
else:
Expand All @@ -385,4 +411,5 @@ def handler(event, context): # pylint: disable=unused-argument
sns_reconcile_target_arn=os.environ["SNS_RECONCILE_TARGET_ARN"],
catalog_update_queue=os.environ.get("CATALOG_UPDATE_QUEUE"),
catalog_update_table=os.environ["CATALOG_UPDATE_TABLE"],
corrupted_xml_queue=os.environ["corrupted_xml_queue_url"],
)
5 changes: 5 additions & 0 deletions stack/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ def create_all_queues(self) -> None:
)
)

# Queue for corrupted XML entries, see #89
self.create_queue(
id="corrupted_xml_queue", retention_period=core.Duration.days(14),
)

def create_all_topics(self) -> None:
"""
Create all stack topics
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
60 changes: 60 additions & 0 deletions test/process_new_scene_queue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

import json
import pathlib
from test.utils import check_queue_size

import pytest

from cbers2stac.consume_reconcile_queue.code import populate_queue_with_quicklooks
from cbers2stac.layers.common.dbtable import DBTable
from cbers2stac.layers.common.utils import get_client
from cbers2stac.process_new_scene_queue.code import ( # process_queue
build_sns_topic_msg_attributes,
convert_inpe_to_stac,
get_s3_keys,
parse_quicklook_key,
process_message,
process_queue,
process_trigger,
sqs_messages,
)

Expand Down Expand Up @@ -280,6 +283,7 @@ def test_process_queue(s3_buckets, sqs_queues, sns_topic, dynamodb_table):
sns_reconcile_target_arn=topic["TopicArn"],
catalog_update_queue=catup_queue.url,
catalog_update_table=DBTable.schema()["TableName"],
corrupted_xml_queue=None,
)
assert len(db_table.scan()["Items"]) == 1
assert (
Expand All @@ -299,3 +303,59 @@ def test_process_queue(s3_buckets, sqs_queues, sns_topic, dynamodb_table):
== "https://s3.amazonaws.com/metadata/CBERS4A/MUX/222/116/"
"CBERS_4A_MUX_20220810_222_116_L4/CBERS_4A_MUX_20220810_222_116.png"
)


@pytest.mark.s3_buckets_args(["cog", "stac"])
@pytest.mark.sqs_queues_args(["catup_queue", "corrupted_xml"])
@pytest.mark.dynamodb_table_args({**(DBTable.schema())})
def test_process_trigger(s3_buckets, sqs_queues, sns_topic, dynamodb_table):
"""
test_process_trigger.
"""

s3_client, _ = s3_buckets
catup_queue = sqs_queues[0]
corrupted_xml_queue = sqs_queues[1]
_, topic = sns_topic
db_table = dynamodb_table

fixture_prefix = "test/fixtures/amazonia_pds_invalid_xml/"
paths = pathlib.Path(fixture_prefix).rglob("*")
files = [
str(f.relative_to(fixture_prefix))
for f in paths
if any(ext in f.suffix for ext in ["png", "xml"])
]
for upfile in files:
s3_client.upload_file(
Filename=fixture_prefix + "/" + upfile, Bucket="cog", Key=upfile
)

process_trigger(
stac_bucket="stac",
cog_pds_meta_pds={"cog": "metadata"},
event={
"Records": [
{
"body": '{"Message": "{\\"Records\\": '
'[{\\"s3\\": {\\"bucket\\": {\\"name\\": \\"cog\\"}, '
'\\"object\\": {\\"key\\": \\"AMAZONIA1/'
"WFI/035/017/AMAZONIA_1_WFI_20210317_035_017_L4/"
'AMAZONIA_1_WFI_20210317_035_017.png\\", \\"reconcile\\": 1}}}]}"}'
}
]
},
sns_target_arn=topic["TopicArn"],
sns_reconcile_target_arn=topic["TopicArn"],
catalog_update_queue=catup_queue.url,
catalog_update_table=DBTable.schema()["TableName"],
corrupted_xml_queue=corrupted_xml_queue.url,
)
assert len(db_table.scan()["Items"]) == 0
check_queue_size(corrupted_xml_queue, 1)
response = get_client("sqs").receive_message(QueueUrl=corrupted_xml_queue.url)
assert (
response["Messages"][0]["Body"]
== "AMAZONIA1/WFI/035/017/AMAZONIA_1_WFI_20210317_035_017_L4/"
"AMAZONIA_1_WFI_20210317_035_017.png"
)

0 comments on commit ac3dfe5

Please sign in to comment.