Skip to content

Commit 418379e

Browse files
committed
Add reconnecting websocket class and use it in CoderApi
1 parent 118d50a commit 418379e

File tree

8 files changed

+811
-52
lines changed

8 files changed

+811
-52
lines changed

src/api/agentMetadataHelper.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@ export async function createAgentMetadataWatcher(
5353
event.parsedMessage.data,
5454
);
5555

56-
// Overwrite metadata if it changed.
56+
if (watcher.error !== undefined) {
57+
watcher.error = undefined;
58+
onChange.fire(null);
59+
}
60+
5761
if (JSON.stringify(watcher.metadata) !== JSON.stringify(metadata)) {
5862
watcher.metadata = metadata;
5963
onChange.fire(null);

src/api/coderApi.ts

Lines changed: 98 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ import {
3636
OneWayWebSocket,
3737
type OneWayWebSocketInit,
3838
} from "../websocket/oneWayWebSocket";
39+
import {
40+
ReconnectingWebSocket,
41+
type SocketFactory,
42+
} from "../websocket/reconnectingWebSocket";
3943
import { SseConnection } from "../websocket/sseConnection";
4044

4145
import { createHttpAgent } from "./utils";
@@ -47,6 +51,10 @@ const coderSessionTokenHeader = "Coder-Session-Token";
4751
* and WebSocket methods for real-time functionality.
4852
*/
4953
export class CoderApi extends Api {
54+
private readonly reconnectingSockets = new Set<
55+
ReconnectingWebSocket<unknown>
56+
>();
57+
5058
private constructor(private readonly output: Logger) {
5159
super();
5260
}
@@ -70,6 +78,30 @@ export class CoderApi extends Api {
7078
return client;
7179
}
7280

81+
setSessionToken = (token: string): void => {
82+
const currentToken =
83+
this.getAxiosInstance().defaults.headers.common[coderSessionTokenHeader];
84+
this.getAxiosInstance().defaults.headers.common[coderSessionTokenHeader] =
85+
token;
86+
87+
if (currentToken !== token) {
88+
for (const socket of this.reconnectingSockets) {
89+
socket.reconnect();
90+
}
91+
}
92+
};
93+
94+
setHost = (host: string | undefined): void => {
95+
const currentHost = this.getAxiosInstance().defaults.baseURL;
96+
this.getAxiosInstance().defaults.baseURL = host;
97+
98+
if (currentHost !== host) {
99+
for (const socket of this.reconnectingSockets) {
100+
socket.reconnect();
101+
}
102+
}
103+
};
104+
73105
watchInboxNotifications = async (
74106
watchTemplates: string[],
75107
watchTargets: string[],
@@ -83,6 +115,7 @@ export class CoderApi extends Api {
83115
targets: watchTargets.join(","),
84116
},
85117
options,
118+
enableRetry: true,
86119
});
87120
};
88121

@@ -91,6 +124,7 @@ export class CoderApi extends Api {
91124
apiRoute: `/api/v2/workspaces/${workspace.id}/watch-ws`,
92125
fallbackApiRoute: `/api/v2/workspaces/${workspace.id}/watch`,
93126
options,
127+
enableRetry: true,
94128
});
95129
};
96130

@@ -102,6 +136,7 @@ export class CoderApi extends Api {
102136
apiRoute: `/api/v2/workspaceagents/${agentId}/watch-metadata-ws`,
103137
fallbackApiRoute: `/api/v2/workspaceagents/${agentId}/watch-metadata`,
104138
options,
139+
enableRetry: true,
105140
});
106141
};
107142

@@ -148,53 +183,73 @@ export class CoderApi extends Api {
148183
}
149184

150185
private async createWebSocket<TData = unknown>(
151-
configs: Omit<OneWayWebSocketInit, "location">,
152-
) {
153-
const baseUrlRaw = this.getAxiosInstance().defaults.baseURL;
154-
if (!baseUrlRaw) {
155-
throw new Error("No base URL set on REST client");
156-
}
186+
configs: Omit<OneWayWebSocketInit, "location"> & { enableRetry?: boolean },
187+
): Promise<UnidirectionalStream<TData>> {
188+
const { enableRetry, ...socketConfigs } = configs;
189+
190+
const socketFactory: SocketFactory<TData> = async () => {
191+
const baseUrlRaw = this.getAxiosInstance().defaults.baseURL;
192+
if (!baseUrlRaw) {
193+
throw new Error("No base URL set on REST client");
194+
}
195+
196+
const baseUrl = new URL(baseUrlRaw);
197+
const token = this.getAxiosInstance().defaults.headers.common[
198+
coderSessionTokenHeader
199+
] as string | undefined;
200+
201+
const headersFromCommand = await getHeaders(
202+
baseUrlRaw,
203+
getHeaderCommand(vscode.workspace.getConfiguration()),
204+
this.output,
205+
);
157206

158-
const baseUrl = new URL(baseUrlRaw);
159-
const token = this.getAxiosInstance().defaults.headers.common[
160-
coderSessionTokenHeader
161-
] as string | undefined;
207+
const httpAgent = await createHttpAgent(
208+
vscode.workspace.getConfiguration(),
209+
);
162210

163-
const headersFromCommand = await getHeaders(
164-
baseUrlRaw,
165-
getHeaderCommand(vscode.workspace.getConfiguration()),
166-
this.output,
167-
);
211+
/**
212+
* Similar to the REST client, we want to prioritize headers in this order (highest to lowest):
213+
* 1. Headers from the header command
214+
* 2. Any headers passed directly to this function
215+
* 3. Coder session token from the Api client (if set)
216+
*/
217+
const headers = {
218+
...(token ? { [coderSessionTokenHeader]: token } : {}),
219+
...configs.options?.headers,
220+
...headersFromCommand,
221+
};
168222

169-
const httpAgent = await createHttpAgent(
170-
vscode.workspace.getConfiguration(),
171-
);
223+
const webSocket = new OneWayWebSocket<TData>({
224+
location: baseUrl,
225+
...socketConfigs,
226+
options: {
227+
...configs.options,
228+
agent: httpAgent,
229+
followRedirects: true,
230+
headers,
231+
},
232+
});
172233

173-
/**
174-
* Similar to the REST client, we want to prioritize headers in this order (highest to lowest):
175-
* 1. Headers from the header command
176-
* 2. Any headers passed directly to this function
177-
* 3. Coder session token from the Api client (if set)
178-
*/
179-
const headers = {
180-
...(token ? { [coderSessionTokenHeader]: token } : {}),
181-
...configs.options?.headers,
182-
...headersFromCommand,
234+
this.attachStreamLogger(webSocket);
235+
return webSocket;
183236
};
184237

185-
const webSocket = new OneWayWebSocket<TData>({
186-
location: baseUrl,
187-
...configs,
188-
options: {
189-
...configs.options,
190-
agent: httpAgent,
191-
followRedirects: true,
192-
headers,
193-
},
194-
});
238+
if (enableRetry) {
239+
const reconnectingSocket = await ReconnectingWebSocket.create<TData>(
240+
socketFactory,
241+
this.output,
242+
configs.apiRoute,
243+
);
244+
245+
this.reconnectingSockets.add(
246+
reconnectingSocket as ReconnectingWebSocket<unknown>,
247+
);
195248

196-
this.attachStreamLogger(webSocket);
197-
return webSocket;
249+
return reconnectingSocket;
250+
} else {
251+
return socketFactory();
252+
}
198253
}
199254

200255
private attachStreamLogger<TData>(
@@ -230,13 +285,15 @@ export class CoderApi extends Api {
230285
fallbackApiRoute: string;
231286
searchParams?: Record<string, string> | URLSearchParams;
232287
options?: ClientOptions;
288+
enableRetry?: boolean;
233289
}): Promise<UnidirectionalStream<TData>> {
234-
let webSocket: OneWayWebSocket<TData>;
290+
let webSocket: UnidirectionalStream<TData>;
235291
try {
236292
webSocket = await this.createWebSocket<TData>({
237293
apiRoute: configs.apiRoute,
238294
searchParams: configs.searchParams,
239295
options: configs.options,
296+
enableRetry: configs.enableRetry,
240297
});
241298
} catch {
242299
// Failed to create WebSocket, use SSE fallback

src/api/workspace.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import * as vscode from "vscode";
1111
import { type FeatureSet } from "../featureSet";
1212
import { getGlobalFlags } from "../globalFlags";
1313
import { escapeCommandArg } from "../util";
14-
import { type OneWayWebSocket } from "../websocket/oneWayWebSocket";
14+
import { type UnidirectionalStream } from "../websocket/eventStreamConnection";
1515

1616
import { errToStr, createWorkspaceIdentifier } from "./api-helper";
1717
import { type CoderApi } from "./coderApi";
@@ -93,7 +93,7 @@ export async function streamBuildLogs(
9393
client: CoderApi,
9494
writeEmitter: vscode.EventEmitter<string>,
9595
workspace: Workspace,
96-
): Promise<OneWayWebSocket<ProvisionerJobLog>> {
96+
): Promise<UnidirectionalStream<ProvisionerJobLog>> {
9797
const socket = await client.watchBuildLogsByBuildId(
9898
workspace.latest_build.id,
9999
[],
@@ -131,7 +131,7 @@ export async function streamAgentLogs(
131131
client: CoderApi,
132132
writeEmitter: vscode.EventEmitter<string>,
133133
agent: WorkspaceAgent,
134-
): Promise<OneWayWebSocket<WorkspaceAgentLog[]>> {
134+
): Promise<UnidirectionalStream<WorkspaceAgentLog[]>> {
135135
const socket = await client.watchWorkspaceAgentLogs(agent.id, []);
136136

137137
socket.addEventListener("message", (data) => {

src/inbox.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type {
77

88
import type { CoderApi } from "./api/coderApi";
99
import type { Logger } from "./logging/logger";
10-
import type { OneWayWebSocket } from "./websocket/oneWayWebSocket";
10+
import type { UnidirectionalStream } from "./websocket/eventStreamConnection";
1111

1212
// These are the template IDs of our notifications.
1313
// Maybe in the future we should avoid hardcoding
@@ -16,7 +16,9 @@ const TEMPLATE_WORKSPACE_OUT_OF_MEMORY = "a9d027b4-ac49-4fb1-9f6d-45af15f64e7a";
1616
const TEMPLATE_WORKSPACE_OUT_OF_DISK = "f047f6a3-5713-40f7-85aa-0394cce9fa3a";
1717

1818
export class Inbox implements vscode.Disposable {
19-
private socket: OneWayWebSocket<GetInboxNotificationResponse> | undefined;
19+
private socket:
20+
| UnidirectionalStream<GetInboxNotificationResponse>
21+
| undefined;
2022
private disposed = false;
2123

2224
private constructor(private readonly logger: Logger) {}

src/remote/workspaceStateMachine.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import type { CoderApi } from "../api/coderApi";
2121
import type { PathResolver } from "../core/pathResolver";
2222
import type { FeatureSet } from "../featureSet";
2323
import type { Logger } from "../logging/logger";
24-
import type { OneWayWebSocket } from "../websocket/oneWayWebSocket";
24+
import type { UnidirectionalStream } from "../websocket/eventStreamConnection";
2525

2626
/**
2727
* Manages workspace and agent state transitions until ready for SSH connection.
@@ -32,9 +32,10 @@ export class WorkspaceStateMachine implements vscode.Disposable {
3232

3333
private agent: { id: string; name: string } | undefined;
3434

35-
private buildLogSocket: OneWayWebSocket<ProvisionerJobLog> | null = null;
35+
private buildLogSocket: UnidirectionalStream<ProvisionerJobLog> | null = null;
3636

37-
private agentLogSocket: OneWayWebSocket<WorkspaceAgentLog[]> | null = null;
37+
private agentLogSocket: UnidirectionalStream<WorkspaceAgentLog[]> | null =
38+
null;
3839

3940
constructor(
4041
private readonly parts: AuthorityParts,

0 commit comments

Comments
 (0)