-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.js
120 lines (106 loc) · 2.84 KB
/
events.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
const AWS = require('aws-sdk')
const R = require('ramda')
const { observable, action, runInAction, reaction } = require('mobx')
const { writeEvents } = require('../dynamo/events')
const eventsSQS = new AWS.SQS({
accessKeyId: process.env.AWS_STORIES_USER_ACCESS_KEY,
secretAccessKey: process.env.AWS_STORIES_USER_SECRET,
region: process.env.AWS_STORIES_REGION
})
const sqsMessageGroupID = 'events.in'
const sqsURL = process.env.AWS_SQS_EVENTS_URL
const state = observable({
active: false,
successCount: 0,
errorCount: 0
//errors: observable.shallowArray([])
})
const deleteReceivedMessages = data => (
eventsSQS.deleteMessageBatch({
Entries: data.Messages.map((message, index) => ({
Id: `${ index }`,
ReceiptHandle: message.ReceiptHandle
})),
QueueUrl: sqsURL
})
.promise()
)
const processReceivedMessages = data => {
console.log('processReceivedMessages', data)
if (data.Messages) {
const events = data.Messages.map((message, index) => {
const timestamp = parseFloat(message.Attributes.SentTimestamp)
const event = JSON.parse(message.Body)
const adjustedTimestamp = (timestamp * 100) + index
console.log('event received', event, timestamp, adjustedTimestamp)
return Object.assign({}, event, { timestamp: adjustedTimestamp })
})
const deleteEntries = data.Messages.map((message, index) => ({
Id: `${ index }`,
ReceiptHandle: message.ReceiptHandle
}))
return writeEvents(events)
.then(response => {
// FIXME: skip unprocessed events
return deleteReceivedMessages(data)
})
.then(action(() => {
state.successCount += 1
}))
}
else {
runInAction(() => {
state.successCount += 1
//enqueueEvents([{ section: 'test', type: 'test.blah' }])
})
}
}
const processReceivedError = action(error => {
console.error('processReceivedError', error)
state.errorCount += 1
})
reaction(
() => ({
active: state.active,
successCount: state.successCount,
errorCount: state.errorCount
}),
({ active }) => {
if (!active) {
return
}
console.log('start receiveMessage')
eventsSQS.receiveMessage({
QueueUrl: sqsURL,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 1,
AttributeNames: ['SenderId', 'SentTimestamp']
})
.promise()
.then(processReceivedMessages)
.catch(processReceivedError)
},
{
name: 'events.sqs.messages.receiver',
fireImmediately: true,
delay: 250
}
)
function send10EventsToSQS(events) {
return eventsSQS.sendMessageBatch({
QueueUrl: sqsURL,
Entries: events.map((event, index) => ({
Id: `${index}`,
MessageBody: JSON.stringify(event),
MessageGroupId: sqsMessageGroupID
}))
}).promise()
}
function enqueueEvents(events, send10Events = send10EventsToSQS) {
return Promise.all(
R.splitEvery(10, events).map(send10Events)
)
}
module.exports = {
enqueueEvents
}