In [2]:
import time
import json
import logging

import boto3
from boto3.dynamodb.conditions import Key, Attr

### 목표

    * users table을 읽어서, 카테고리 정보와 브랜드 정보를 SQS에 담아주는 lambda를 설계

In [None]:
# Logging 설정
logger = logging.getLogger(__name__) # "__name__"를 하면, root log는 제외시키고, 이 모듈 내에서 발생한 로그만 포함시킴. 없으면 requests 모듈에서 생긴 로그, boto3에서 생긴 로그 등이 포함되어 버림 그래서 지저분해짐
logger.setLevel(logging.INFO)
formatter = logging.Formatter('{"method" : "SQS_CategoryBrand", "time" : "%(asctime)s", "level" : "%(levelname)s", "message" : "%(message)s"}')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logger.addHandler(ch)

#### SQS 관련된 Parameter

| 항목  | 의미  |
|----|----|
| DefaultVisibleTimeout | SQS에서 메시지는 특정 Component에 전달된 뒤, 자동으로 삭제 되지 않는데, 그 때문에 중복된 메시지를 전달받을 수 있음. 그래서 한번 전달된 메시지는 visible timeout에 설정된 일정시간 동안은 다시 전달되지 않도록 하고 있음. 이러한 메시지들의 상태를 **inflight**라고 표현함.|
| MessageRetentionPeriod | 메시지 생명주기 / 1분부터 최대 14일까지 지정할 수 있음 |
| Maximum Message Size | 메시지 최대 크기 최대 256Kbytes까지 가능 |
| Delivery Delay | 새로운 메시지가 전달되는 초기 지연 시간. 0초~900초(15분)까지 설정 가능. |
| Receive Message Wait Time | Long Polling을 활성화 함, 0~20초까지 가능 |

In [56]:
QueueName = "categorybrands"

sqs = boto3.resource('sqs')
try:
    queue = sqs.get_queue_by_name(QueueName=QueueName)
except:
    queue = sqs.create_queue(QueueName=QueueName,Attributes=
                            {
                                "VisibilityTimeout":"300",
                            })

In [54]:
# Table 내 모든 정보를 가져옴
dynamodb = boto3.resource("dynamodb", region_name="ap-northeast-2")
table = dynamodb.Table("users")

response = table.scan()
items = response['Items']

sqs_message_list = []
message = {}
for item in items:
    item['brands'] = json.loads(item['brands']) # json decoding    
    
    message['category_id'] = str(item['category_id']) # Decimal decoding
    message['category_title'] = item['category_title']
    for brand in item['brands']:
        message['brand_id'] = brand[0]
        message['brand_title'] = brand[1]
        sqs_message_list.append(message.copy())
        
print("sqs message 갯수 : ",len(sqs_message_list))

sqs message 갯수 :  735


In [58]:
# categorybrands에 보내는 메시지 형식
categorybrands_message = lambda msgid, item : {
    "Id" : msgid,
    "MessageBody": item
}

In [57]:
# nvmid queue에 Bulk Insert하기 
Entries = []
start = time.time()
for message in sqs_message_list:
    msgid = message['brand_id'] + message['category_id']
    item = json.dumps(message)
    Entries.append(categorybrands_message(msgid,item))
    if len(Entries) == 10:
        queue.send_messages(Entries=Entries)
        Entries=[]

if len(Entries) > 0:
    queue.send_messages(Entries=Entries)
    Entries=[]
end = time.time()

In [68]:
messages = queue.receive_messages(AttributeNames=["All"],
                                MessageAttributeNames=['Page'],
                                MaxNumberOfMessages=1,
                                WaitTimeSeconds=3)

msg = messages[0]
data = json.loads(msg.body)
msg.delete()

{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
   'content-length': '215',
   'content-type': 'text/xml',
   'date': 'Thu, 08 Mar 2018 08:46:37 GMT',
   'server': 'Server',
   'x-amzn-requestid': '414268a9-bc50-5390-976e-3bb346fd19f1'},
  'HTTPStatusCode': 200,
  'RequestId': '414268a9-bc50-5390-976e-3bb346fd19f1',
  'RetryAttempts': 0}}