-
Notifications
You must be signed in to change notification settings - Fork 0
/
retry.ts
182 lines (167 loc) · 5.19 KB
/
retry.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import { Message, Middleware, Subscriber } from '../types'
import { DelayArgs, withDelay } from './delay'
const TOPIC_HEADER = 'x-retry-topic'
const DATE_HEADER = 'x-retry-date'
export const RETRIES_HEADER = 'x-retry-retries'
interface RetryArgs {
/** How long to wait before the first retry, in milliseconds */
initialDelay?: number
/** How much to increase the delay by on each failed attempt */
increaseFactor?: number
/** How much to vary the delay by for each message */
variation?: number
/** The maximum number of times to attempt processing a message before failing */
maxRetries?: number
/**
* A function to determine whether a message should be retried. If it returns
* false, the message will be permanently rejected
*/
canRetry?: (error: unknown, message: Message) => boolean
}
export const retry = ({
initialDelay = 1000,
increaseFactor = 10,
variation = 0.1,
maxRetries = 5,
canRetry = () => true
}: RetryArgs = {}): Middleware => {
return ({ addSubscribeMiddleware, publish }) => {
addSubscribeMiddleware(async ({ subscriber, next }) => {
await createRetryQueues(subscriber, next)
await next({
...subscriber,
topics: [
...subscriber.topics,
// This topic is added so that we can re-inject messages we want
// to try into just this queue. We can't use the subscriber's
// existing topics because there may be other subscribers listening
// to them
getRetryTopic(subscriber, 0)
],
async handle(message, args) {
// Since retried messages have a different topic, we have to
// restore the message's original topic before sending it to
// the subscriber
if (message.headers[TOPIC_HEADER] != null) {
message.topic = message.headers[TOPIC_HEADER]
}
try {
await subscriber.handle(message, args)
} catch (e) {
if (canRetry(e, message) && getNumRetries(message) < maxRetries) {
await retryMessage(subscriber, message)
} else {
throw e
}
}
}
})
})
/**
* Creates the queues for the messages that need to be retried
*/
async function createRetryQueues(
subscriber: Subscriber,
next: (subscriber: Subscriber) => Promise<unknown>
) {
for (let retry = 1; retry <= maxRetries; retry++) {
await createRetryQueue(subscriber, retry, next)
}
}
/**
* Creates an individual retry queue
*/
async function createRetryQueue(
subscriber: Subscriber,
retryNum: number,
next: (subscriber: Subscriber) => Promise<unknown>
) {
await next(
withDelay(
getRetrySubscriber(subscriber, retryNum),
getDelayArgs(retryNum)
)
)
}
/**
* Queues the message in the appropriate retry queue
*/
async function retryMessage(subscriber: Subscriber, message: Message) {
const nextRetry = getNumRetries(message) + 1
// We need to override the topic to our retry queue, but we store
// it in the header so we can restore it before passing it to the
// original handler.
// We need to override the date so that withDelay() delays from now,
// not from when the message was first published.
await publish({
...message,
headers: {
...message.headers,
[TOPIC_HEADER]: getTopic(message),
[RETRIES_HEADER]: String(nextRetry),
[DATE_HEADER]: message.properties.date.toISOString()
},
topic: getRetryTopic(subscriber, nextRetry),
properties: {
...message.properties,
date: new Date()
}
})
}
/**
* Returns a subscriber for retrying messages
*/
function getRetrySubscriber(
subscriber: Subscriber,
retryNum: number
): Subscriber {
const name = getRetryTopic(subscriber, retryNum)
return {
queueName: name,
topics: [name],
handle(message) {
return publish({
...message,
topic: message.headers[TOPIC_HEADER],
properties: {
...message.properties,
// Restore the message's original date
date: new Date(message.headers[DATE_HEADER])
}
})
},
options: {
concurrency: 1,
preserveRejectedMessages: false
}
}
}
/**
* Returns the arguments to give to withDelay()
*/
function getDelayArgs(retryNum: number): DelayArgs {
return {
delay: initialDelay * Math.pow(increaseFactor, retryNum - 1),
variation
}
}
}
}
/**
* Returns the message's actual topic
*/
function getTopic(message: Message): string {
return message.headers[TOPIC_HEADER] ?? message.topic
}
/**
* Returns the topic to publish retry messages to
*/
function getRetryTopic(subscriber: Subscriber, retryNum: number) {
return `~retry.${subscriber.queueName}.${retryNum}`
}
/**
* Returns the number of retry attempts this message has had
*/
function getNumRetries(message: Message): number {
return +(message.headers[RETRIES_HEADER] ?? '0')
}