-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
50 lines (36 loc) · 1.41 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
async function processCursorConcurrently(cursor, callable, options) {
const {concurrency, onProgress} = options
const promiseSlots = []
let cursorHasMore = true
let countProcessed = 0
for (let slotIndex = 0; slotIndex < concurrency && cursorHasMore; ++slotIndex) {
cursorHasMore = await startCallableInSlot(promiseSlots, slotIndex, cursor, callable)
}
while (cursorHasMore || promiseSlots.filter(Boolean).length > 0) {
// If cursorHasMore is false, then some items in the promiseSlots array will be null.
// We filter those because Promise.race would continuously return null otherwise
const resolvedSlotIndex = await Promise.race(cursorHasMore ? promiseSlots : promiseSlots.filter(Boolean))
promiseSlots[resolvedSlotIndex] = null
countProcessed += 1
if (onProgress) {
onProgress(countProcessed)
}
if (!cursorHasMore) {
continue
}
cursorHasMore = await startCallableInSlot(promiseSlots, resolvedSlotIndex, cursor, callable)
}
return {countProcessed}
}
async function startCallableInSlot(promiseSlots, slotIndex, cursor, callable) {
const item = await cursor.next()
if (!item) {
return false
}
promiseSlots[slotIndex] = (async () => {
await callable(item)
return slotIndex
})()
return true
}
module.exports = processCursorConcurrently