-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer.py
45 lines (38 loc) · 1.26 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import time
from datetime import datetime
from uuid import uuid4
from pynamodb.exceptions import PutError
from model import MeasuredDuration
def handle(event, context):
delays = []
for record in event['Records']:
print(record)
payload = record['Sns']['Message']
execution_time = datetime.fromisoformat(payload)
delta = int((datetime.utcnow() - execution_time).total_seconds() * 1000)
print(f'delay: {delta}')
delays.append(delta)
with MeasuredDuration.batch_write() as batch:
retries = []
for delay in delays:
item = MeasuredDuration()
item.id = str(uuid4())
item.delay = delay
try:
batch.save(item)
except PutError as e:
print(e)
time.sleep(.200)
retries.append(delay)
while len(retries) > 0:
delay = retries.pop(0)
item = MeasuredDuration()
item.id = str(uuid4())
item.delay = delay
try:
batch.save(item)
except PutError as e:
print(e)
time.sleep(.200)
retries.append(delay)
print('Processed %d records' % len(event['Records']))