-
Notifications
You must be signed in to change notification settings - Fork 4
/
via-websocket.ts
202 lines (184 loc) · 6.99 KB
/
via-websocket.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import { map } from "https://deno.land/x/stream_observables@v1.3/transforms/map.ts";
import { EOF, external } from "https://deno.land/x/stream_observables@v1.3/sources/external.ts";
import { KubernetesTunnel, RequestOptions } from "../lib/contract.ts";
import { KubeConfigRestClient } from "../transports/via-kubeconfig.ts";
/**
* Extension of KubeConfigRestClient, adding tunnel support via WebSocketStream.
* WebSockets have various limits within the Kubernetes and Deno ecosystem,
* but they work quite well in several situations and have good backpressure support.
*
* * For most clusters, you'll need to have Deno trust the cluster CA.
* Otherwise you'll get an `UnknownIssuer` error.
* In-cluster, you just need to pass `--cert /var/run/secrets/kubernetes.io/serviceaccount/ca.crt`
*
* * Restricted for out-of-cluster use due to lack of Client Certificates.
* https://github.com/denoland/deno/issues/11846
* Workaround of using `kubectl proxy --reject-paths=/^-$/`
*
* * Restricted for port-forwarding due to lack of dynamic multiplexing.
* Every new port connection requires a new WebSocket.
* (TODO: find or create Kubernetes ticket to track this)
*
* * stdin restricted for exec/attach due to lack of EOF signal.
* Upstream work: https://github.com/kubernetes/kubernetes/pull/119157
*/
export class WebsocketRestClient extends KubeConfigRestClient {
async performRequest<Tproto extends string>(opts: RequestOptions & {expectTunnel?: Tproto[]}) {
const requestedProtocols = opts.expectTunnel;
if (!requestedProtocols) {
return super.performRequest(opts);
}
const headers: Record<string, string> = {};
if (!this.ctx.cluster.server) throw new Error(`No server URL found in KubeConfig`);
const authHeader = await this.ctx.getAuthHeader();
if (authHeader) {
headers['Authorization'] = authHeader;
}
let path = opts.path || '/';
if (opts.querystring) {
path += `?${opts.querystring}`;
}
const url = new URL(path, this.ctx.cluster.server);
url.protocol = url.protocol.replace('http', 'ws');
const serverWs = new WebSocketStream(url.toString(), {
headers,
protocols: opts.expectTunnel,
signal: opts.abortSignal,
});
const serverConn = await serverWs.connection;
return new WebsocketTunnel(serverWs, serverConn);
}
}
/**
* Handles an individual WebSocket connection to the Kubernetes API.
* Kubernetes WebSockets support up to 255 indexed bytestreams.
* To send or receive data, you must first provide the channel's index,
* and then streams will be returned for that particular index.
*
* WebSocket channels do not support closing individual streams.
* You must disconnect the overall tunnel if you wish to end things.
*/
export class WebsocketTunnel implements KubernetesTunnel {
constructor(
private readonly wss: WebSocketStream,
wssConn: WebSocketConnection,
) {
this.subProtocol = wssConn.protocol;
this.done = this.upstreamChannels.observable
.pipeThrough(mergeAll())
.pipeThrough(wssConn)
.pipeThrough(map(async chunk => {
if (typeof chunk == 'string') throw new Error(`Unexpected`);
const channelNum = chunk.at(0);
if (typeof channelNum != 'number') throw new Error(`Unexpected`);
const downstream = this.downstreamChannels.get(channelNum);
if (!downstream) throw new Error(`Channel ${channelNum} not reading`);
await downstream.write(chunk.slice(1));
}))
.pipeTo(new WritableStream())
.finally(() => {
this.upstreamChannels.next(EOF);
for (const downstream of this.downstreamChannels.values()) {
downstream.close();
}
});
}
readonly transportProtocol = "WebSocket";
readonly subProtocol: string;
readonly done: Promise<void>;
private upstreamChannels = external<ReadableStream<Uint8Array>>();
private downstreamChannels = new Map<number, WritableStreamDefaultWriter<Uint8Array>>();
getChannel<Treadable extends boolean, Twritable extends boolean>(opts: {
spdyHeaders?: Record<string, string | number> | undefined;
streamIndex?: number | undefined;
readable: Treadable;
writable: Twritable;
}) {
const { streamIndex } = opts;
if (typeof streamIndex !== 'number') {
throw new Error("Cannot get a WebSocket channel without a streamIndex.");
}
return Promise.resolve({
writable: maybe(opts.writable, () => {
const clientToServer = new ChannelPrependTransform(streamIndex);
this.upstreamChannels.next(clientToServer.readable);
return clientToServer.writable;
}),
readable: maybe(opts.readable, () => {
const serverToClient = new TransformStream<Uint8Array, Uint8Array>();
this.downstreamChannels.set(streamIndex, serverToClient.writable.getWriter());
return serverToClient.readable;
}),
});
}
ready(): Promise<void> {
// We don't let the user create more channels after they call Ready.
// (So why did we overengineer the stream merger?)
// this.upstreamChannels.next(EOF);
return Promise.resolve();
}
stop() {
this.wss.close();
return Promise.resolve();
}
}
function maybe<Tcond extends boolean, Tres>(cond: Tcond, factory: () => Tres) {
return (cond ? factory() : null) as (Tcond extends true ? Tres : null);
}
class ChannelPrependTransform extends TransformStream<Uint8Array, Uint8Array> {
constructor(channelOctet: number) {
super({
transform(chunk, ctlr) {
const buf = new ArrayBuffer(chunk.byteLength + 1);
new DataView(buf).setUint8(0, channelOctet);
const array = new Uint8Array(buf);
array.set(chunk, 1);
ctlr.enqueue(array);
},
// to implement proposed v5 of the tunnel protocol:
// flush(ctlr) {
// ctlr.enqueue(new Uint8Array([255, channelOctet]));
// },
});
}
}
/**
* Merges all observables received from a Higher Order Observable
* by emitting all items from all the observables.
* Items are emitted in the order they appear.
*
* @typeparam T Type of items emitted by the observables.
* @returns Transformer to consume observables and emit their collective items.
*/
function mergeAll<T>(): TransformStream<ReadableStream<T>> {
const outChannel = new TransformStream<T>;
const outWriter = outChannel.writable.getWriter();
const forwarders = new Array<Promise<void>>;
return {
readable: outChannel.readable,
writable: new WritableStream({
write(observable) {
const forwarder = observable
.pipeTo(new WritableStream({
async write(chunk) {
await outWriter.write(chunk);
},
async abort(reason) {
await outWriter.abort(reason);
},
}));
forwarders.push(forwarder);
},
async abort(reason) {
await outWriter.abort(reason);
},
async close() {
try {
await Promise.all(forwarders);
} finally {
outWriter.close();
}
},
}, { highWaterMark: 1 }),
};
}