-
Notifications
You must be signed in to change notification settings - Fork 599
/
Copy pathmemory-runner.ts
72 lines (62 loc) · 2 KB
/
memory-runner.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import PQueue from 'p-queue'
import { ConsecutiveList } from './consecutive-list'
import { EventRunner } from './types'
export { ConsecutiveList }
export type MemoryRunnerOptions = {
setCursor?: (cursor: number) => Promise<void>
concurrency?: number
startCursor?: number
}
// A queue with arbitrarily many partitions, each processing work sequentially.
// Partitions are created lazily and taken out of memory when they go idle.
export class MemoryRunner implements EventRunner {
consecutive = new ConsecutiveList<number>()
mainQueue: PQueue
partitions = new Map<string, PQueue>()
cursor: number | undefined
constructor(public opts: MemoryRunnerOptions = {}) {
this.mainQueue = new PQueue({ concurrency: opts.concurrency ?? Infinity })
this.cursor = opts.startCursor
}
getCursor() {
return this.cursor
}
async addTask(partitionId: string, task: () => Promise<void>) {
if (this.mainQueue.isPaused) return
return this.mainQueue.add(() => {
return this.getPartition(partitionId).add(task)
})
}
private getPartition(partitionId: string) {
let partition = this.partitions.get(partitionId)
if (!partition) {
partition = new PQueue({ concurrency: 1 })
partition.once('idle', () => this.partitions.delete(partitionId))
this.partitions.set(partitionId, partition)
}
return partition
}
async trackEvent(did: string, seq: number, handler: () => Promise<void>) {
if (this.mainQueue.isPaused) return
const item = this.consecutive.push(seq)
await this.addTask(did, async () => {
await handler()
const latest = item.complete().at(-1)
if (latest !== undefined) {
this.cursor = latest
if (this.opts.setCursor) {
await this.opts.setCursor(this.cursor)
}
}
})
}
async processAll() {
await this.mainQueue.onIdle()
}
async destroy() {
this.mainQueue.pause()
this.mainQueue.clear()
this.partitions.forEach((p) => p.clear())
await this.mainQueue.onIdle()
}
}