-
-
Notifications
You must be signed in to change notification settings - Fork 18
/
partner_web_client.ts
148 lines (123 loc) · 3.62 KB
/
partner_web_client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import { AsyncQueue, deferred } from "../../deps.ts";
import {
websocketReadable,
websocketWritable,
} from "../streams/stream_utils.ts";
import { EarthstarError, NotSupportedError } from "../util/errors.ts";
import {
GetTransferOpts,
ISyncPartner,
SyncAppetite,
SyncerEvent,
} from "./syncer_types.ts";
type SyncerDriverWebServerOpts = {
/** A websocket created from the initial sync request. */
socket: WebSocket;
appetite: SyncAppetite;
};
/** A syncing partner created from an inbound HTTP connection (i.e. a web client).
*
* Works everywhere, but is really meant for servers running on Deno and Node.
*/
export class PartnerWebClient<
IncomingTransferSourceType extends WebSocket,
> implements ISyncPartner<IncomingTransferSourceType> {
concurrentTransfers = 16;
payloadThreshold = 8;
rangeDivision = 8;
syncAppetite: SyncAppetite;
private socket: WebSocket;
private incomingQueue = new AsyncQueue<SyncerEvent>();
private socketIsReady = deferred();
constructor({ socket, appetite }: SyncerDriverWebServerOpts) {
this.syncAppetite = appetite;
if (socket.readyState === socket.OPEN) {
this.socketIsReady.resolve();
}
socket.onopen = () => {
this.socketIsReady.resolve();
};
this.socket = socket;
this.socket.binaryType = "arraybuffer";
this.socket.onmessage = (event) => {
// Casting as string for Node's incorrect WebSocket types.
this.incomingQueue.push(JSON.parse(event.data as string));
};
this.socket.onclose = () => {
this.incomingQueue.close();
};
this.socket.onerror = (event) => {
if ("error" in event) {
this.incomingQueue.close({
withError: event.error,
});
return;
}
this.incomingQueue.close({
withError: new EarthstarError("Websocket error."),
});
};
}
async sendEvent(event: SyncerEvent): Promise<void> {
await this.socketIsReady;
if (this.socket.readyState !== this.socket.OPEN) {
return;
}
return this.socket.send(JSON.stringify(event));
}
getEvents(): AsyncIterable<SyncerEvent> {
return this.incomingQueue;
}
closeConnection(): Promise<void> {
this.socket.close();
return Promise.resolve();
}
getDownload(
_opts: GetTransferOpts,
): Promise<ReadableStream<Uint8Array> | NotSupportedError> {
// Server can't initiate a request with a client.
return Promise.resolve(
new NotSupportedError(
"SyncDriverWebServer does not support download requests.",
),
);
}
handleUploadRequest(
_opts: GetTransferOpts,
): Promise<WritableStream<Uint8Array> | NotSupportedError> {
// Server won't get in-band BLOB_REQ messages
return Promise.resolve(
new NotSupportedError(
"SyncDriverWebServer does not support upload requests.",
),
);
}
handleTransferRequest(
socket: IncomingTransferSourceType,
kind: "upload" | "download",
): Promise<
| ReadableStream<Uint8Array>
| WritableStream<Uint8Array>
| undefined
> {
// Return a stream which writes to the socket. nice.
// They want to download data from us
if (kind === "download") {
const writable = websocketWritable(
socket,
(outgoing: Uint8Array) => outgoing,
);
return Promise.resolve(writable);
} else {
// they want to upload data to us.
const readable = websocketReadable(socket, (event) => {
if (event.data instanceof ArrayBuffer) {
const bytes = new Uint8Array(event.data);
return bytes;
}
return null as never;
});
return Promise.resolve(readable);
}
}
}