In [1]:
import boto3
import botocore


sns = boto3.client('sns')
sqs = boto3.client('sqs')

In [2]:
import os
account_no = os.environ['ACCOUNT_NO']

### Set Up Delivery Failure Notification

In [3]:
failureNotificationRoleArn = 'arn:aws:iam::{}:role/SNSFailureFeedback'.format(account_no)

In [4]:
# Set this to the serverless deployment stage name
stage = 'dev'
topic_arn = 'arn:aws:sns:us-east-1:' + account_no + ':t1-' + stage

In [5]:
# Modify the topic to enable delivery failure logging. This cannot be done via cloud
# formation at the moment.
tar = sns.set_topic_attributes(
    TopicArn=topic_arn,
    AttributeName='SQSFailureFeedbackRoleArn',
    AttributeValue=failureNotificationRoleArn
)
print(tar)

{'ResponseMetadata': {'RequestId': '72af0c96-e2a9-524c-8c55-55d296ed352f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '72af0c96-e2a9-524c-8c55-55d296ed352f', 'content-type': 'text/xml', 'content-length': '215', 'date': 'Wed, 16 Jun 2021 21:49:56 GMT'}, 'RetryAttempts': 0}}


### Set Up a Doomed SQS Subscription

This is to force delivery failures so we can detect and process those.

In [6]:
cq = sqs.create_queue(
    QueueName='sampleQueue-' + stage
)
print(cq)
queue_url = cq['QueueUrl']
queue_arn = 'arn:aws:sqs:us-east-1:' + account_no + ':sampleQueue-' + stage

{'QueueUrl': 'https://queue.amazonaws.com/427848627088/sampleQueue-dev', 'ResponseMetadata': {'RequestId': 'c449b37b-d8d2-568d-8200-ea3fac9b85b8', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c449b37b-d8d2-568d-8200-ea3fac9b85b8', 'date': 'Wed, 16 Jun 2021 21:50:01 GMT', 'content-type': 'text/xml', 'content-length': '327'}, 'RetryAttempts': 0}}


In [7]:
sns.subscribe(
    TopicArn='arn:aws:sns:us-east-1:' + account_no + ':t1-' + stage,
    Protocol='sqs',
    Endpoint=queue_arn
)

{'SubscriptionArn': 'arn:aws:sns:us-east-1:427848627088:t1-dev:51ae3c4f-ecc0-4d78-adf8-ac6a2f17ced1',
 'ResponseMetadata': {'RequestId': 'ba7f0f59-7ecb-5bb4-ba32-561d0a874817',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ba7f0f59-7ecb-5bb4-ba32-561d0a874817',
   'content-type': 'text/xml',
   'content-length': '356',
   'date': 'Wed, 16 Jun 2021 21:50:04 GMT'},
  'RetryAttempts': 0}}

In [8]:
def allow_sns_to_write_to_sqs(topicarn, queuearn):
    policy_document = """{{
  "Version":"2012-10-17",
  "Statement":[
    {{
      "Sid":"MyPolicy",
      "Effect":"Allow",
      "Principal" : {{"AWS" : "*"}},
      "Action":"SQS:SendMessage",
      "Resource": "{}",
      "Condition":{{
        "ArnLike":{{
          "aws:SourceArn": "{}noworky"
        }}
      }}
    }}
  ]
}}""".format(queuearn, topicarn)

    return policy_document

In [9]:
#policy_json = allow_sns_to_write_to_sqs(topic_arn1, queue_arn)
policy_json = allow_sns_to_write_to_sqs('arn:aws:sns:us-east-1:' + account_no + ':*', queue_arn)

response = sqs.set_queue_attributes(
    QueueUrl = queue_url,
    Attributes = {
        'Policy' : policy_json
    }
)
print(response)

{'ResponseMetadata': {'RequestId': '4bc79cce-9ee3-5baa-b101-c58aeab1389f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '4bc79cce-9ee3-5baa-b101-c58aeab1389f', 'date': 'Wed, 16 Jun 2021 21:50:10 GMT', 'content-type': 'text/xml', 'content-length': '225'}, 'RetryAttempts': 0}}


### Delivery Log Queries

In [10]:
client = boto3.client('logs')

In [16]:
import time
nowish = int(time.time())
hour_ago = nowish - (60*60)

q = client.start_query(
    logGroupName='sns/us-east-1/427848627088/t1-' + stage + '/Failure',
    startTime=hour_ago,
    endTime=nowish,
    queryString='fields @timestamp, @message | sort @timestamp desc | limit 20',
    limit=123
)

print(q)

{'queryId': '87320d8c-20e2-4036-a8c8-a7ce962d1c7a', 'ResponseMetadata': {'RequestId': '94568e95-1e92-4a9b-b6a9-d18484861872', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '94568e95-1e92-4a9b-b6a9-d18484861872', 'content-type': 'application/x-amz-json-1.1', 'content-length': '50', 'date': 'Wed, 16 Jun 2021 21:53:38 GMT'}, 'RetryAttempts': 0}}


In [17]:
r = client.get_query_results(queryId=q['queryId'])
print(r)

{'results': [[{'field': '@timestamp', 'value': '2021-06-16 21:52:53.052'}, {'field': '@message', 'value': '{"notification":{"messageMD5Sum":"d67ee3f245f9ed1e1c7825752cc52d39","messageId":"e58e478a-c640-5d2c-b0ed-1eceb2af453e","topicArn":"arn:aws:sns:us-east-1:427848627088:t1-dev","timestamp":"2021-06-16 21:52:47.071"},"delivery":{"deliveryId":"3397ae8c-4cea-5bae-ba03-f7c838050593","destination":"arn:aws:sqs:us-east-1:427848627088:sampleQueue-dev","providerResponse":"{\\"ErrorCode\\":\\"AccessDenied\\",\\"ErrorMessage\\":\\"Access to the resource https://sqs.us-east-1.amazonaws.com/427848627088/sampleQueue-dev is denied.\\",\\"sqsRequestId\\":\\"Unrecoverable\\"}","dwellTimeMs":37,"attempts":1,"statusCode":403},"status":"FAILURE"}'}, {'field': '@ptr', 'value': 'CnMKOgo2NDI3ODQ4NjI3MDg4OnNucy91cy1lYXN0LTEvNDI3ODQ4NjI3MDg4L3QxLWRldi9GYWlsdXJlEAYSNRoYAgXqfi4OAAAAB21kqaAABgynJIAAAADiIAEovKbAtqEvMLymwLahLzgBQIsFSOARUJgKEAAYAQ=='}]], 'statistics': {'recordsMatched': 1.0, 'recordsScanned': 1.0

### Publish Log Queries

In [None]:
# Everything in the log
import time
nowish = int(time.time())
hour_ago = nowish - (60*60)

q = client.start_query(
    logGroupName='/aws/lambda/wrap-and-pub-dev-wrapPub',
    startTime=hour_ago,
    endTime=nowish,
    queryString='fields @timestamp, @message | sort @timestamp desc | limit 20',
    limit=123
)

print(q)

In [None]:
# Just the correlations
import time
nowish = int(time.time())
hour_ago = nowish - (15*60)

q = client.start_query(
    logGroupName='/aws/lambda/wrap-and-pub-dev-wrapPub',
    startTime=hour_ago,
    endTime=nowish,
    queryString='filter @message like /PublishContext/ | fields @timestamp, @message | sort @timestamp desc | limit 20',
    limit=123
)

print(q)

In [None]:
r = client.get_query_results(queryId=q['queryId'])
print(r)

### Clean Up

In [None]:
try:
    sqs.delete_queue(
        QueueUrl='https://queue.amazonaws.com/' + account_no + '/sampleQueue'
    )
    print('queue deleted')
except botocore.exceptions.ClientError as error:
    if error.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
        print('queue deleted')
    else:
        raise error