-
Notifications
You must be signed in to change notification settings - Fork 354
/
processor.ts
278 lines (248 loc) · 7.67 KB
/
processor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
import crypto from "crypto";
import { Redis } from "ioredis";
import { Db } from "mongodb";
import getNow from "performance-now";
import { Config } from "coral-server/config";
import { CoralEventPayload } from "coral-server/events/event";
import logger from "coral-server/logger";
import {
filterActiveSecrets,
filterExpiredSecrets,
} from "coral-server/models/settings";
import {
deleteEndpointSecrets,
Endpoint,
getWebhookEndpoint,
} from "coral-server/models/tenant";
import { JobProcessor } from "coral-server/queue/Task";
import { createFetch, FetchOptions } from "coral-server/services/fetch";
import { disableWebhookEndpoint } from "coral-server/services/tenant";
import TenantCache from "coral-server/services/tenant/cache";
export const JOB_NAME = "webhook";
// The count of failures on a webhook delivery before we disable the endpoint.
const MAXIMUM_FAILURE_COUNT = 10;
// The number of webhook attempts that should be retained for debugging.
const MAXIMUM_EVENT_ATTEMPTS_LOG_SIZE = 50;
export interface WebhookProcessorOptions {
config: Config;
mongo: Db;
redis: Redis;
tenantCache: TenantCache;
}
export interface WebhookData {
contextID: string;
endpointID: string;
tenantID: string;
event: CoralEventPayload;
}
export interface WebhookDelivery {
id: string;
name: string;
success: boolean;
status: number;
statusText: string;
request: string;
response: string;
createdAt: Date;
}
/**
* generateSignature will generate a signature used to assist clients to
* validate that the request came from Coral.
*
* @param secret the secret used to sign the body with
* @param body the body to use when signing
*/
export function generateSignature(secret: string, body: string) {
return crypto
.createHmac("sha256", secret)
.update(body)
.digest()
.toString("hex");
}
export function generateSignatures(
endpoint: Pick<Endpoint, "signingSecrets">,
body: string,
now: Date
) {
// For each of the signatures, we only want to sign the body with secrets that
// are still active.
return endpoint.signingSecrets
.filter(filterActiveSecrets(now))
.map(({ secret }) => generateSignature(secret, body))
.map(signature => `sha256=${signature}`)
.join(",");
}
type CoralWebhookEventPayload = CoralEventPayload & {
/**
* tenantID is the ID of the Tenant that this event originated at.
*/
readonly tenantID: string;
/**
* tenantDomain is the domain that is associated with this Tenant that this event originated at.
*/
readonly tenantDomain: string;
};
export function generateFetchOptions(
endpoint: Pick<Endpoint, "signingSecrets">,
data: CoralWebhookEventPayload,
now: Date
): FetchOptions {
// Serialize the body and signature to include in the request.
const body = JSON.stringify(data, null, 2);
const signature = generateSignatures(endpoint, body, now);
const headers: Record<string, any> = {
"Content-Type": "application/json",
"X-Coral-Event": data.type,
"X-Coral-Signature": signature,
};
return {
method: "POST",
headers,
body,
};
}
export function createJobProcessor({
mongo,
tenantCache,
redis,
}: WebhookProcessorOptions): JobProcessor<WebhookData> {
// Create the fetcher that will orchestrate sending the actual webhooks.
const fetch = createFetch({ name: "Webhook" });
return async job => {
const { tenantID, endpointID, contextID, event } = job.data;
const log = logger.child(
{
eventID: event.id,
contextID,
jobID: job.id,
jobName: JOB_NAME,
tenantID,
endpointID,
},
true
);
// Get the referenced tenant so we can get the endpoint details.
const tenant = await tenantCache.retrieveByID(tenantID);
if (!tenant) {
log.error("referenced tenant was not found");
return;
}
// Get the referenced endpoint.
const endpoint = getWebhookEndpoint(tenant, endpointID);
if (!endpoint) {
log.error("referenced endpoint was not found");
return;
}
// If the endpoint is disabled, don't bother processing it.
if (!endpoint.enabled) {
log.warn("endpoint was disabled, skipping sending");
return;
}
// Get the current date.
const now = new Date();
// Get the fetch options.
const options = generateFetchOptions(
endpoint,
{ ...event, tenantID, tenantDomain: tenant.domain },
now
);
// Send the request.
const startedSendingAt = getNow();
const res = await fetch(endpoint.url, options);
const took = getNow() - startedSendingAt;
if (res.ok) {
log.info(
{ took, responseStatus: res.status },
"finished sending webhook"
);
} else {
log.warn(
{ took, responseStatus: res.status },
"failed to deliver webhook"
);
}
// Grab the response from the webhook, we'll want to save this in the recent
// attempts.
const response = await res.text();
// Collect the delivery information.
const delivery: WebhookDelivery = {
id: event.id,
name: event.type,
success: res.ok,
status: res.status,
statusText: res.statusText,
// We only serialize the body as a string.
request: options.body as string,
response,
createdAt: new Date(),
};
// Record the delivery.
const endpointDeliveriesKey = `${tenantID}:endpointDeliveries:${endpointID}`;
const endpointFailuresKey = `${tenantID}:endpointFailures:${endpointID}`;
let [, , [, failuresString]] = await redis
.multi()
// Push the attempt into the list.
.rpush(endpointDeliveriesKey, JSON.stringify(delivery))
// Trim the list to the 50 most recent attempts.
.ltrim(endpointDeliveriesKey, 0, MAXIMUM_EVENT_ATTEMPTS_LOG_SIZE - 1)
// Get the current failure count.
.get(endpointFailuresKey)
// Execute the queued operations.
.exec();
let failures = failuresString ? parseInt(failuresString, 10) : null;
if (res.ok && failures && failures > 0) {
// The webhook delivery was a success, and there were previous failures.
// Remove the failures record.
await redis.del(endpointFailuresKey);
} else if (!res.ok) {
// Record the failed attempt.
failuresString = await redis.incr(endpointFailuresKey);
// If the failure count is higher than the allowed maximum, disable the
// endpoint.
failures = failuresString ? parseInt(failuresString, 10) : null;
if (failures && failures >= MAXIMUM_FAILURE_COUNT) {
log.warn(
{ failures, maxFailures: MAXIMUM_FAILURE_COUNT },
"maximum failures reached, disabling endpoint"
);
await disableWebhookEndpoint(
mongo,
redis,
tenantCache,
tenant,
endpointID
);
} else {
// TODO: (wyattjoh) maybe schedule a retry?
}
}
// Remove the expired secrets in the next tick so that it does not affect
// the sending performance of this job, and errors do not impact the
// sending.
const expiredSigningSecrets = endpoint.signingSecrets.filter(
filterExpiredSecrets(now)
);
if (expiredSigningSecrets.length > 0) {
process.nextTick(() => {
deleteEndpointSecrets(
mongo,
tenantID,
endpoint.id,
expiredSigningSecrets.map(s => s.kid)
)
.then(() => {
log.info(
{ secrets: expiredSigningSecrets.length },
"removed expired secrets from endpoint"
);
})
.catch(err => {
log.error(
{ err },
"an error occurred when trying to remove expired secrets"
);
});
});
}
};
}