-
Notifications
You must be signed in to change notification settings - Fork 324
/
PubSubPromise.ts
78 lines (70 loc) · 2.45 KB
/
PubSubPromise.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import ms from 'ms'
import GQLExecutorChannelId from '../../client/shared/gqlIds/GQLExecutorChannelId'
import RedisInstance from './RedisInstance'
import numToBase64 from './numToBase64'
import sendToSentry from './sendToSentry'
const STANDARD_TIMEOUT = ms('10s')
const LONG_TIMEOUT = ms('60s')
const ADHOC_TIMEOUT = ms('10m')
interface Job {
resolve: (payload: any) => void
timeoutId: NodeJS.Timeout
}
const {SERVER_ID} = process.env
interface BaseRequest {
executorServerId?: string
isAdHoc?: boolean
longRunning?: boolean
}
export default class PubSubPromise<Request extends BaseRequest, Response> {
jobs = {} as {[jobId: string]: Job}
publisher = new RedisInstance('pubsubPromise_pub')
subscriber = new RedisInstance('pubsubPromise_sub')
subChannel: string
stream: string
jobCounter = 0
carrier: any
constructor(stream: string, subChannel: string) {
this.stream = stream
this.subChannel = subChannel
}
onMessage = (_channel: string, message: string) => {
const {jobId, response} = JSON.parse(message)
const cachedJob = this.jobs[jobId]
if (!cachedJob) return
delete this.jobs[jobId]
const {resolve, timeoutId} = cachedJob
clearTimeout(timeoutId)
resolve(response)
}
subscribe = () => {
this.subscriber.on('message', this.onMessage)
this.subscriber.subscribe(this.subChannel)
}
publish = (request: Request) => {
return new Promise<Response>((resolve, reject) => {
const nextJob = numToBase64(this.jobCounter++)
const jobId = `${SERVER_ID}:${nextJob}`
const {isAdHoc, longRunning} = request
const timeout = isAdHoc ? ADHOC_TIMEOUT : longRunning ? LONG_TIMEOUT : STANDARD_TIMEOUT
const timeoutId = setTimeout(() => {
delete this.jobs[jobId]
reject(new Error('TIMEOUT'))
}, timeout)
const previousJob = this.jobs[jobId]
if (previousJob) {
sendToSentry(new Error('REDIS JOB ALREADY EXISTS'), {tags: {jobId}})
}
this.jobs[jobId] = {resolve, timeoutId}
const {executorServerId, ...rest} = request
const message = JSON.stringify({jobId, socketServerId: SERVER_ID, request: rest})
if (executorServerId) {
const executorChannel = GQLExecutorChannelId.join(executorServerId)
this.publisher.publish(executorChannel, message)
} else {
// cap the stream to slightly more than 1000 entries.
this.publisher.xadd(this.stream, 'MAXLEN', '~', 1000, '*', 'msg', message)
}
})
}
}