From 7ecf502e6f05480f69d1aafd39a8eba935344c08 Mon Sep 17 00:00:00 2001 From: Jeremy Prevost Date: Thu, 2 Sep 2021 09:18:39 -0400 Subject: [PATCH] tmp --- submitter/cli.py | 9 +++++++- submitter/sample_data.py | 45 +++++++++--------------------------- submitter/sqs.py | 50 ++++++++++++++++++++++++++++++++++++++-- tests/test_sqs.py | 1 + 4 files changed, 68 insertions(+), 37 deletions(-) create mode 100644 tests/test_sqs.py diff --git a/submitter/cli.py b/submitter/cli.py index 0047ac2..cad5705 100644 --- a/submitter/cli.py +++ b/submitter/cli.py @@ -2,7 +2,7 @@ from submitter import INPUT_QUEUE, OUTPUT_QUEUE from submitter.sample_data import sample_data -from submitter.sqs import message_loop +from submitter.sqs import create, message_loop @click.group() @@ -30,3 +30,10 @@ def sample_data_loader(queue): click.echo("sample this!") sample_data(queue) click.echo("sample data (probably) loaded into input queue") + + +@main.command() +@click.option("--name", help="name of queue to create") +def create_queue(name): + queue = create(name) + click.echo(queue.url) diff --git a/submitter/sample_data.py b/submitter/sample_data.py index c4bced7..8cedd22 100644 --- a/submitter/sample_data.py +++ b/submitter/sample_data.py @@ -1,31 +1,8 @@ -import json - -import boto3 - - -def sample_data_loader(id, source, target, col_hdl, meta_loc, filename, fileloc, queue): - sqs = boto3.resource("sqs") - queue = sqs.get_queue_by_name(QueueName=queue) - body = { - "SubmissionSystem": target, - "CollectionHandle": col_hdl, - "MetadataLocation": meta_loc, - "Files": [ - {"BitstreamName": filename, "FileLocation": fileloc}, - ], - } - # Send message to SQS queue - queue.send_message( - MessageAttributes={ - "PackageID": {"DataType": "String", "StringValue": id}, - "PackageSource": {"DataType": "String", "StringValue": source}, - }, - MessageBody=(json.dumps(body)), - ) +from submitter.sqs import data_loader def sample_data(queue): - sample_data_loader( + data_loader( "123", "ETD", "DSpace@MIT", @@ -35,7 +12,7 @@ def sample_data(queue): "s3:/fakeloc2/f.json", queue, ) - sample_data_loader( + data_loader( "466", "ETD", "DSpace@MIT", @@ -45,7 +22,7 @@ def sample_data(queue): "s3:/fakeloc2/f.json", queue, ) - sample_data_loader( + data_loader( "789", "ETD", "DSpace@MIT", @@ -55,7 +32,7 @@ def sample_data(queue): "s3:/fakeloc2/f.json", queue, ) - sample_data_loader( + data_loader( "asd", "Wiley", "DSpace@MIT", @@ -65,7 +42,7 @@ def sample_data(queue): "s3:/fakeloc2/f.json", queue, ) - sample_data_loader( + data_loader( "feg", "Wiley", "DSpace@MIT", @@ -75,7 +52,7 @@ def sample_data(queue): "s3:/fakeloc2/f.json", queue, ) - sample_data_loader( + data_loader( "hij", "Wiley", "DSpace@MIT", @@ -85,7 +62,7 @@ def sample_data(queue): "s3:/fakeloc2/f.json", queue, ) - sample_data_loader( + data_loader( "etd_123", "ETD", "DSpace@MIT", @@ -95,7 +72,7 @@ def sample_data(queue): "s3:/fakeloc2/f.json", queue, ) - sample_data_loader( + data_loader( "wiley_456", "Wiley", "DSpace@MIT", @@ -105,7 +82,7 @@ def sample_data(queue): "s3:/fakeloc2/f.json", queue, ) - sample_data_loader( + data_loader( "orange", "ETD", "DSpace@MIT", @@ -115,7 +92,7 @@ def sample_data(queue): "s3:/fakeloc2/f.json", queue, ) - sample_data_loader( + data_loader( "cat", "popcorn", "devnull", diff --git a/submitter/sqs.py b/submitter/sqs.py index 7a10d41..d994224 100644 --- a/submitter/sqs.py +++ b/submitter/sqs.py @@ -1,7 +1,26 @@ +import json +import os + import boto3 import click +def setup(): + # todo: make this work in dev, test and prod + if os.environ.get("LOCAL_SQS"): + sqs = boto3.resource( + service_name="sqs", + region_name=os.environ.get("AWS_REGION_NAME", "us-east-1"), + endpoint_url=os.environ.get("SQS_ENDPOINT_URL", ""), + ) + else: + sqs = boto3.resource( + service_name="sqs", + ) + + return sqs + + def message_loop(input_queue, output_queue, wait): msgs = retrieve(input_queue, wait) @@ -51,7 +70,7 @@ def process(msgs, output_queue): def retrieve(input_queue, wait): - sqs = boto3.resource("sqs") + sqs = setup() queue = sqs.get_queue_by_name(QueueName=input_queue) click.echo("Polling for messages") @@ -66,7 +85,7 @@ def retrieve(input_queue, wait): def write(status, output_queue): - sqs = boto3.resource("sqs") + sqs = setup() queue = sqs.get_queue_by_name(QueueName=output_queue) # Send message to SQS result queue @@ -74,3 +93,30 @@ def write(status, output_queue): MessageAttributes=status, MessageBody=("testing"), ) + + +def create(name): + sqs = setup() + queue = sqs.create_queue(QueueName=name) + return queue + + +def data_loader(id, source, target, col_hdl, meta_loc, filename, fileloc, queue): + sqs = setup() + queue = sqs.get_queue_by_name(QueueName=queue) + body = { + "SubmissionSystem": target, + "CollectionHandle": col_hdl, + "MetadataLocation": meta_loc, + "Files": [ + {"BitstreamName": filename, "FileLocation": fileloc}, + ], + } + # Send message to SQS queue + queue.send_message( + MessageAttributes={ + "PackageID": {"DataType": "String", "StringValue": id}, + "PackageSource": {"DataType": "String", "StringValue": source}, + }, + MessageBody=(json.dumps(body)), + ) diff --git a/tests/test_sqs.py b/tests/test_sqs.py new file mode 100644 index 0000000..adc19fd --- /dev/null +++ b/tests/test_sqs.py @@ -0,0 +1 @@ +# todo: unit tests for sqs