-
Notifications
You must be signed in to change notification settings - Fork 130
/
chat.mjs
565 lines (493 loc) · 25.1 KB
/
chat.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
// This is the Edge Chat Demo Worker, built using Durable Objects!
// ===============================
// Introduction to Modules
// ===============================
//
// The first thing you might notice, if you are familiar with the Workers platform, is that this
// Worker is written differently from others you may have seen. It even has a different file
// extension. The `mjs` extension means this JavaScript is an ES Module, which, among other things,
// means it has imports and exports. Unlike other Workers, this code doesn't use
// `addEventListener("fetch", handler)` to register its main HTTP handler; instead, it _exports_
// a handler, as we'll see below.
//
// This is a new way of writing Workers that we expect to introduce more broadly in the future. We
// like this syntax because it is *composable*: You can take two workers written this way and
// merge them into one worker, by importing the two Workers' exported handlers yourself, and then
// exporting a new handler that call into the other Workers as appropriate.
//
// This new syntax is required when using Durable Objects, because your Durable Objects are
// implemented by classes, and those classes need to be exported. The new syntax can be used for
// writing regular Workers (without Durable Objects) too, but for now, you must be in the Durable
// Objects beta to be able to use the new syntax, while we work out the quirks.
//
// To see an example configuration for uploading module-based Workers, check out the wrangler.toml
// file or one of our Durable Object templates for Wrangler:
// * https://github.com/cloudflare/durable-objects-template
// * https://github.com/cloudflare/durable-objects-rollup-esm
// * https://github.com/cloudflare/durable-objects-webpack-commonjs
// ===============================
// Required Environment
// ===============================
//
// This worker, when deployed, must be configured with two environment bindings:
// * rooms: A Durable Object namespace binding mapped to the ChatRoom class.
// * limiters: A Durable Object namespace binding mapped to the RateLimiter class.
//
// Incidentally, in pre-modules Workers syntax, "bindings" (like KV bindings, secrets, etc.)
// appeared in your script as global variables, but in the new modules syntax, this is no longer
// the case. Instead, bindings are now delivered in an "environment object" when an event handler
// (or Durable Object class constructor) is called. Look for the variable `env` below.
//
// We made this change, again, for composability: The global scope is global, but if you want to
// call into existing code that has different environment requirements, then you need to be able
// to pass the environment as a parameter instead.
//
// Once again, see the wrangler.toml file to understand how the environment is configured.
// =======================================================================================
// The regular Worker part...
//
// This section of the code implements a normal Worker that receives HTTP requests from external
// clients. This part is stateless.
// With the introduction of modules, we're experimenting with allowing text/data blobs to be
// uploaded and exposed as synthetic modules. In wrangler.toml we specify a rule that files ending
// in .html should be uploaded as "Data", equivalent to content-type `application/octet-stream`.
// So when we import it as `HTML` here, we get the HTML content as an `ArrayBuffer`. This lets us
// serve our app's static asset without relying on any separate storage. (However, the space
// available for assets served this way is very limited; larger sites should continue to use Workers
// KV to serve assets.)
import HTML from "./chat.html";
// `handleErrors()` is a little utility function that can wrap an HTTP request handler in a
// try/catch and return errors to the client. You probably wouldn't want to use this in production
// code but it is convenient when debugging and iterating.
async function handleErrors(request, func) {
try {
return await func();
} catch (err) {
if (request.headers.get("Upgrade") == "websocket") {
// Annoyingly, if we return an HTTP error in response to a WebSocket request, Chrome devtools
// won't show us the response body! So... let's send a WebSocket response with an error
// frame instead.
let pair = new WebSocketPair();
pair[1].accept();
pair[1].send(JSON.stringify({error: err.stack}));
pair[1].close(1011, "Uncaught exception during session setup");
return new Response(null, { status: 101, webSocket: pair[0] });
} else {
return new Response(err.stack, {status: 500});
}
}
}
// In modules-syntax workers, we use `export default` to export our script's main event handlers.
// Here, we export one handler, `fetch`, for receiving HTTP requests. In pre-modules workers, the
// fetch handler was registered using `addEventHandler("fetch", event => { ... })`; this is just
// new syntax for essentially the same thing.
//
// `fetch` isn't the only handler. If your worker runs on a Cron schedule, it will receive calls
// to a handler named `scheduled`, which should be exported here in a similar way. We will be
// adding other handlers for other types of events over time.
export default {
async fetch(request, env) {
return await handleErrors(request, async () => {
// We have received an HTTP request! Parse the URL and route the request.
let url = new URL(request.url);
let path = url.pathname.slice(1).split('/');
if (!path[0]) {
// Serve our HTML at the root path.
return new Response(HTML, {headers: {"Content-Type": "text/html;charset=UTF-8"}});
}
switch (path[0]) {
case "api":
// This is a request for `/api/...`, call the API handler.
return handleApiRequest(path.slice(1), request, env);
default:
return new Response("Not found", {status: 404});
}
});
}
}
async function handleApiRequest(path, request, env) {
// We've received at API request. Route the request based on the path.
switch (path[0]) {
case "room": {
// Request for `/api/room/...`.
if (!path[1]) {
// The request is for just "/api/room", with no ID.
if (request.method == "POST") {
// POST to /api/room creates a private room.
//
// Incidentally, this code doesn't actually store anything. It just generates a valid
// unique ID for this namespace. Each durable object namespace has its own ID space, but
// IDs from one namespace are not valid for any other.
//
// The IDs returned by `newUniqueId()` are unguessable, so are a valid way to implement
// "anyone with the link can access" sharing. Additionally, IDs generated this way have
// a performance benefit over IDs generated from names: When a unique ID is generated,
// the system knows it is unique without having to communicate with the rest of the
// world -- i.e., there is no way that someone in the UK and someone in New Zealand
// could coincidentally create the same ID at the same time, because unique IDs are,
// well, unique!
let id = env.rooms.newUniqueId();
return new Response(id.toString(), {headers: {"Access-Control-Allow-Origin": "*"}});
} else {
// If we wanted to support returning a list of public rooms, this might be a place to do
// it. The list of room names might be a good thing to store in KV, though a singleton
// Durable Object is also a possibility as long as the Cache API is used to cache reads.
// (A caching layer would be needed because a single Durable Object is single-threaded,
// so the amount of traffic it can handle is limited. Also, caching would improve latency
// for users who don't happen to be located close to the singleton.)
//
// For this demo, though, we're not implementing a public room list, mainly because
// inevitably some trolls would probably register a bunch of offensive room names. Sigh.
return new Response("Method not allowed", {status: 405});
}
}
// OK, the request is for `/api/room/<name>/...`. It's time to route to the Durable Object
// for the specific room.
let name = path[1];
// Each Durable Object has a 256-bit unique ID. IDs can be derived from string names, or
// chosen randomly by the system.
let id;
if (name.match(/^[0-9a-f]{64}$/)) {
// The name is 64 hex digits, so let's assume it actually just encodes an ID. We use this
// for private rooms. `idFromString()` simply parses the text as a hex encoding of the raw
// ID (and verifies that this is a valid ID for this namespace).
id = env.rooms.idFromString(name);
} else if (name.length <= 32) {
// Treat as a string room name (limited to 32 characters). `idFromName()` consistently
// derives an ID from a string.
id = env.rooms.idFromName(name);
} else {
return new Response("Name too long", {status: 404});
}
// Get the Durable Object stub for this room! The stub is a client object that can be used
// to send messages to the remote Durable Object instance. The stub is returned immediately;
// there is no need to await it. This is important because you would not want to wait for
// a network round trip before you could start sending requests. Since Durable Objects are
// created on-demand when the ID is first used, there's nothing to wait for anyway; we know
// an object will be available somewhere to receive our requests.
let roomObject = env.rooms.get(id);
// Compute a new URL with `/api/room/<name>` removed. We'll forward the rest of the path
// to the Durable Object.
let newUrl = new URL(request.url);
newUrl.pathname = "/" + path.slice(2).join("/");
// Send the request to the object. The `fetch()` method of a Durable Object stub has the
// same signature as the global `fetch()` function, but the request is always sent to the
// object, regardless of the request's URL.
return roomObject.fetch(newUrl, request);
}
default:
return new Response("Not found", {status: 404});
}
}
// =======================================================================================
// The ChatRoom Durable Object Class
// ChatRoom implements a Durable Object that coordinates an individual chat room. Participants
// connect to the room using WebSockets, and the room broadcasts messages from each participant
// to all others.
export class ChatRoom {
constructor(state, env) {
this.state = state
// `state.storage` provides access to our durable storage. It provides a simple KV
// get()/put() interface.
this.storage = state.storage;
// `env` is our environment bindings (discussed earlier).
this.env = env;
// We will track metadata for each client WebSocket object in `sessions`.
this.sessions = new Map();
this.state.getWebSockets().forEach((webSocket) => {
// The constructor may have been called when waking up from hibernation,
// so get previously serialized metadata for any existing WebSockets.
let meta = webSocket.deserializeAttachment();
// Set up our rate limiter client.
// The client itself can't have been in the attachment, because structured clone doesn't work on functions.
// DO ids aren't cloneable, restore the ID from its hex string
let limiterId = this.env.limiters.idFromString(meta.limiterId);
let limiter = new RateLimiterClient(
() => this.env.limiters.get(limiterId),
err => webSocket.close(1011, err.stack));
// We don't send any messages to the client until it has sent us the initial user info
// message. Until then, we will queue messages in `session.blockedMessages`.
// This could have been arbitrarily large, so we won't put it in the attachment.
let blockedMessages = [];
this.sessions.set(webSocket, { ...meta, limiter, blockedMessages });
});
// We keep track of the last-seen message's timestamp just so that we can assign monotonically
// increasing timestamps even if multiple messages arrive simultaneously (see below). There's
// no need to store this to disk since we assume if the object is destroyed and recreated, much
// more than a millisecond will have gone by.
this.lastTimestamp = 0;
}
// The system will call fetch() whenever an HTTP request is sent to this Object. Such requests
// can only be sent from other Worker code, such as the code above; these requests don't come
// directly from the internet. In the future, we will support other formats than HTTP for these
// communications, but we started with HTTP for its familiarity.
async fetch(request) {
return await handleErrors(request, async () => {
let url = new URL(request.url);
switch (url.pathname) {
case "/websocket": {
// The request is to `/api/room/<name>/websocket`. A client is trying to establish a new
// WebSocket session.
if (request.headers.get("Upgrade") != "websocket") {
return new Response("expected websocket", {status: 400});
}
// Get the client's IP address for use with the rate limiter.
let ip = request.headers.get("CF-Connecting-IP");
// To accept the WebSocket request, we create a WebSocketPair (which is like a socketpair,
// i.e. two WebSockets that talk to each other), we return one end of the pair in the
// response, and we operate on the other end. Note that this API is not part of the
// Fetch API standard; unfortunately, the Fetch API / Service Workers specs do not define
// any way to act as a WebSocket server today.
let pair = new WebSocketPair();
// We're going to take pair[1] as our end, and return pair[0] to the client.
await this.handleSession(pair[1], ip);
// Now we return the other end of the pair to the client.
return new Response(null, { status: 101, webSocket: pair[0] });
}
default:
return new Response("Not found", {status: 404});
}
});
}
// handleSession() implements our WebSocket-based chat protocol.
async handleSession(webSocket, ip) {
// Accept our end of the WebSocket. This tells the runtime that we'll be terminating the
// WebSocket in JavaScript, not sending it elsewhere.
this.state.acceptWebSocket(webSocket);
// Set up our rate limiter client.
let limiterId = this.env.limiters.idFromName(ip);
let limiter = new RateLimiterClient(
() => this.env.limiters.get(limiterId),
err => webSocket.close(1011, err.stack));
// Create our session and add it to the sessions map.
let session = { limiterId, limiter, blockedMessages: [] };
// attach limiterId to the webSocket so it survives hibernation
webSocket.serializeAttachment({ ...webSocket.deserializeAttachment(), limiterId: limiterId.toString() });
this.sessions.set(webSocket, session);
// Queue "join" messages for all online users, to populate the client's roster.
for (let otherSession of this.sessions.values()) {
if (otherSession.name) {
session.blockedMessages.push(JSON.stringify({joined: otherSession.name}));
}
}
// Load the last 100 messages from the chat history stored on disk, and send them to the
// client.
let storage = await this.storage.list({reverse: true, limit: 100});
let backlog = [...storage.values()];
backlog.reverse();
backlog.forEach(value => {
session.blockedMessages.push(value);
});
}
async webSocketMessage(webSocket, msg) {
try {
let session = this.sessions.get(webSocket);
if (session.quit) {
// Whoops, when trying to send to this WebSocket in the past, it threw an exception and
// we marked it broken. But somehow we got another message? I guess try sending a
// close(), which might throw, in which case we'll try to send an error, which will also
// throw, and whatever, at least we won't accept the message. (This probably can't
// actually happen. This is defensive coding.)
webSocket.close(1011, "WebSocket broken.");
return;
}
// Check if the user is over their rate limit and reject the message if so.
if (!session.limiter.checkLimit()) {
webSocket.send(JSON.stringify({
error: "Your IP is being rate-limited, please try again later."
}));
return;
}
// I guess we'll use JSON.
let data = JSON.parse(msg);
if (!session.name) {
// The first message the client sends is the user info message with their name. Save it
// into their session object.
session.name = "" + (data.name || "anonymous");
// attach name to the webSocket so it survives hibernation
webSocket.serializeAttachment({ ...webSocket.deserializeAttachment(), name: session.name });
// Don't let people use ridiculously long names. (This is also enforced on the client,
// so if they get here they are not using the intended client.)
if (session.name.length > 32) {
webSocket.send(JSON.stringify({error: "Name too long."}));
webSocket.close(1009, "Name too long.");
return;
}
// Deliver all the messages we queued up since the user connected.
session.blockedMessages.forEach(queued => {
webSocket.send(queued);
});
delete session.blockedMessages;
// Broadcast to all other connections that this user has joined.
this.broadcast({joined: session.name});
webSocket.send(JSON.stringify({ready: true}));
return;
}
// Construct sanitized message for storage and broadcast.
data = { name: session.name, message: "" + data.message };
// Block people from sending overly long messages. This is also enforced on the client,
// so to trigger this the user must be bypassing the client code.
if (data.message.length > 256) {
webSocket.send(JSON.stringify({error: "Message too long."}));
return;
}
// Add timestamp. Here's where this.lastTimestamp comes in -- if we receive a bunch of
// messages at the same time (or if the clock somehow goes backwards????), we'll assign
// them sequential timestamps, so at least the ordering is maintained.
data.timestamp = Math.max(Date.now(), this.lastTimestamp + 1);
this.lastTimestamp = data.timestamp;
// Broadcast the message to all other WebSockets.
let dataStr = JSON.stringify(data);
this.broadcast(dataStr);
// Save message.
let key = new Date(data.timestamp).toISOString();
await this.storage.put(key, dataStr);
} catch (err) {
// Report any exceptions directly back to the client. As with our handleErrors() this
// probably isn't what you'd want to do in production, but it's convenient when testing.
webSocket.send(JSON.stringify({error: err.stack}));
}
}
// On "close" and "error" events, remove the WebSocket from the sessions list and broadcast
// a quit message.
async closeOrErrorHandler(webSocket) {
let session = this.sessions.get(webSocket) || {};
session.quit = true;
this.sessions.delete(webSocket);
if (session.name) {
this.broadcast({quit: session.name});
}
}
async webSocketClose(webSocket, code, reason, wasClean) {
this.closeOrErrorHandler(webSocket)
}
async webSocketError(webSocket, error) {
this.closeOrErrorHandler(webSocket)
}
// broadcast() broadcasts a message to all clients.
broadcast(message) {
// Apply JSON if we weren't given a string to start with.
if (typeof message !== "string") {
message = JSON.stringify(message);
}
// Iterate over all the sessions sending them messages.
let quitters = [];
this.sessions.forEach((session, webSocket) => {
if (session.name) {
try {
webSocket.send(message);
} catch (err) {
// Whoops, this connection is dead. Remove it from the map and arrange to notify
// everyone below.
session.quit = true;
quitters.push(session);
this.sessions.delete(webSocket);
}
} else {
// This session hasn't sent the initial user info message yet, so we're not sending them
// messages yet (no secret lurking!). Queue the message to be sent later.
session.blockedMessages.push(message);
}
});
quitters.forEach(quitter => {
if (quitter.name) {
this.broadcast({quit: quitter.name});
}
});
}
}
// =======================================================================================
// The RateLimiter Durable Object class.
// RateLimiter implements a Durable Object that tracks the frequency of messages from a particular
// source and decides when messages should be dropped because the source is sending too many
// messages.
//
// We utilize this in ChatRoom, above, to apply a per-IP-address rate limit. These limits are
// global, i.e. they apply across all chat rooms, so if a user spams one chat room, they will find
// themselves rate limited in all other chat rooms simultaneously.
export class RateLimiter {
constructor(state, env) {
// Timestamp at which this IP will next be allowed to send a message. Start in the distant
// past, i.e. the IP can send a message now.
this.nextAllowedTime = 0;
}
// Our protocol is: POST when the IP performs an action, or GET to simply read the current limit.
// Either way, the result is the number of seconds to wait before allowing the IP to perform its
// next action.
async fetch(request) {
return await handleErrors(request, async () => {
let now = Date.now() / 1000;
this.nextAllowedTime = Math.max(now, this.nextAllowedTime);
if (request.method == "POST") {
// POST request means the user performed an action.
// We allow one action per 5 seconds.
this.nextAllowedTime += 5;
}
// Return the number of seconds that the client needs to wait.
//
// We provide a "grace" period of 20 seconds, meaning that the client can make 4-5 requests
// in a quick burst before they start being limited.
let cooldown = Math.max(0, this.nextAllowedTime - now - 20);
return new Response(cooldown);
})
}
}
// RateLimiterClient implements rate limiting logic on the caller's side.
class RateLimiterClient {
// The constructor takes two functions:
// * getLimiterStub() returns a new Durable Object stub for the RateLimiter object that manages
// the limit. This may be called multiple times as needed to reconnect, if the connection is
// lost.
// * reportError(err) is called when something goes wrong and the rate limiter is broken. It
// should probably disconnect the client, so that they can reconnect and start over.
constructor(getLimiterStub, reportError) {
this.getLimiterStub = getLimiterStub;
this.reportError = reportError;
// Call the callback to get the initial stub.
this.limiter = getLimiterStub();
// When `inCooldown` is true, the rate limit is currently applied and checkLimit() will return
// false.
this.inCooldown = false;
}
// Call checkLimit() when a message is received to decide if it should be blocked due to the
// rate limit. Returns `true` if the message should be accepted, `false` to reject.
checkLimit() {
if (this.inCooldown) {
return false;
}
this.inCooldown = true;
this.callLimiter();
return true;
}
// callLimiter() is an internal method which talks to the rate limiter.
async callLimiter() {
try {
let response;
try {
// Currently, fetch() needs a valid URL even though it's not actually going to the
// internet. We may loosen this in the future to accept an arbitrary string. But for now,
// we have to provide a dummy URL that will be ignored at the other end anyway.
response = await this.limiter.fetch("https://dummy-url", {method: "POST"});
} catch (err) {
// `fetch()` threw an exception. This is probably because the limiter has been
// disconnected. Stubs implement E-order semantics, meaning that calls to the same stub
// are delivered to the remote object in order, until the stub becomes disconnected, after
// which point all further calls fail. This guarantee makes a lot of complex interaction
// patterns easier, but it means we must be prepared for the occasional disconnect, as
// networks are inherently unreliable.
//
// Anyway, get a new limiter and try again. If it fails again, something else is probably
// wrong.
this.limiter = this.getLimiterStub();
response = await this.limiter.fetch("https://dummy-url", {method: "POST"});
}
// The response indicates how long we want to pause before accepting more requests.
let cooldown = +(await response.text());
await new Promise(resolve => setTimeout(resolve, cooldown * 1000));
// Done waiting.
this.inCooldown = false;
} catch (err) {
this.reportError(err);
}
}
}