From de406c69fa3eba5345e874a146876f7f73204c46 Mon Sep 17 00:00:00 2001 From: Adam Obuchowicz Date: Fri, 19 Apr 2024 15:39:45 +0200 Subject: [PATCH] Automatic reconnect with Language Server. (#9691) Fixes #8520 If the websocket is closed not by us, we automatically try to reconnect with it, and initialize the protocol again. **Restoring state (execution contexts, attached visualizations) is not part of this PR**. It's a part of making IDE work after hibernation (or LS crash). # Important Notes It required somewhat heavy refactoring: 1. I decided to use an existing implementation of reconnecting websocket. Replaced (later discovered by me) our implementation. 2. The LanguageServer class now handles both reconnecting and re-initializing - that make usage of it simpler (no more `Promise` - each method will just wait for (re)connection and initialization. 3. The stuff in `net` src's module was partially moved to shared's counterpart (with tests). Merged `exponentialBackoff` implementations, which also brought me to 4. Rewriting LS client, so it returns Result instead of throwing, what is closer our desired state, and allows us using exponentialBackoff method without any wrappers. --- app/gui2/e2e/componentBrowser.spec.ts | 1 + app/gui2/e2e/graphNodeVisualization.spec.ts | 2 + app/gui2/e2e/locate.ts | 1 + app/gui2/mock/MockFSWrapper.vue | 2 +- .../dataServer/mock.ts => mock/dataServer.ts} | 6 +- app/gui2/mock/engine.ts | 92 ++-- app/gui2/package.json | 1 + app/gui2/shared/languageServer.ts | 232 ++++++--- app/gui2/shared/languageServer/files.ts | 14 +- .../shared/languageServerTypes/suggestions.ts | 2 +- app/gui2/shared/retry.ts | 118 ----- app/gui2/shared/util/__tests__/net.test.ts | 74 +++ app/gui2/shared/util/data/result.ts | 33 +- app/gui2/shared/util/net.ts | 128 +++++ .../util/net/ReconnectingWSTransport.ts | 85 +++ app/gui2/shared/websocket.ts | 192 ------- app/gui2/src/components/ComponentBrowser.vue | 1 + .../src/components/ComponentBrowser/ai.ts | 50 +- app/gui2/src/components/GraphEditor.vue | 50 +- .../GraphEditor/GraphVisualization.vue | 5 +- app/gui2/src/components/GraphEditor/toasts.ts | 11 +- app/gui2/src/components/GraphEditor/upload.ts | 99 ++-- .../GraphEditor/widgets/WidgetFunction.vue | 3 +- .../stores/project/computedValueRegistry.ts | 2 +- .../src/stores/project/executionContext.ts | 342 ++++++++++++ app/gui2/src/stores/project/index.ts | 490 +++--------------- .../project/visualizationDataRegistry.ts | 16 +- .../src/stores/suggestionDatabase/index.ts | 34 +- .../stores/visualization/compilerMessaging.ts | 2 +- app/gui2/src/stores/visualization/index.ts | 16 +- app/gui2/src/util/__tests__/net.test.ts | 67 +-- app/gui2/src/util/ast/match.ts | 5 + app/gui2/src/util/net.ts | 115 +--- .../{shared => src/util/net}/dataServer.ts | 96 ++-- app/gui2/ydoc-server/languageServerSession.ts | 399 +++++++------- package-lock.json | 20 + 36 files changed, 1436 insertions(+), 1370 deletions(-) rename app/gui2/{shared/dataServer/mock.ts => mock/dataServer.ts} (98%) delete mode 100644 app/gui2/shared/retry.ts create mode 100644 app/gui2/shared/util/__tests__/net.test.ts create mode 100644 app/gui2/shared/util/net/ReconnectingWSTransport.ts delete mode 100644 app/gui2/shared/websocket.ts create mode 100644 app/gui2/src/stores/project/executionContext.ts rename app/gui2/{shared => src/util/net}/dataServer.ts (76%) diff --git a/app/gui2/e2e/componentBrowser.spec.ts b/app/gui2/e2e/componentBrowser.spec.ts index dae3bd6feb10..2249d3a663ea 100644 --- a/app/gui2/e2e/componentBrowser.spec.ts +++ b/app/gui2/e2e/componentBrowser.spec.ts @@ -254,6 +254,7 @@ test('Visualization preview: user visualization selection', async ({ page }) => await input.fill('4') await expect(input).toHaveValue('4') await expect(locate.jsonVisualization(page)).toExist() + await expect(locate.jsonVisualization(page)).toContainText('"visualizedExpr": "4"') await locate.showVisualizationSelectorButton(page).click() await page.getByRole('button', { name: 'Table' }).click() // The table visualization is not currently working with `executeExpression` (#9194), but we can test that the JSON diff --git a/app/gui2/e2e/graphNodeVisualization.spec.ts b/app/gui2/e2e/graphNodeVisualization.spec.ts index a2d5ad993d34..9f298c9dda52 100644 --- a/app/gui2/e2e/graphNodeVisualization.spec.ts +++ b/app/gui2/e2e/graphNodeVisualization.spec.ts @@ -12,6 +12,7 @@ test('node can open and load visualization', async ({ page }) => { await expect(locate.circularMenu(page)).toExist() await locate.toggleVisualizationButton(page).click() await expect(locate.anyVisualization(page)).toExist() + await expect(locate.loadingVisualization(page)).toHaveCount(0) await locate.showVisualizationSelectorButton(page).click() await page.getByText('JSON').click() const vis = locate.jsonVisualization(page) @@ -36,6 +37,7 @@ test('Warnings visualization', async ({ page }) => { await expect(locate.circularMenu(page)).toExist() await locate.toggleVisualizationButton(page).click() await expect(locate.anyVisualization(page)).toExist() + await expect(locate.loadingVisualization(page)).toHaveCount(0) await locate.showVisualizationSelectorButton(page).click() await page.locator('.VisualizationSelector').getByRole('button', { name: 'Warnings' }).click() await expect(locate.warningsVisualization(page)).toExist() diff --git a/app/gui2/e2e/locate.ts b/app/gui2/e2e/locate.ts index df3d99262172..c51e78108be9 100644 --- a/app/gui2/e2e/locate.ts +++ b/app/gui2/e2e/locate.ts @@ -145,6 +145,7 @@ function componentLocator(className: SanitizeClassName) { export const graphEditor = componentLocator('GraphEditor') // @ts-expect-error export const anyVisualization = componentLocator('GraphVisualization > *') +export const loadingVisualization = componentLocator('LoadingVisualization') export const circularMenu = componentLocator('CircularMenu') export const addNewNodeButton = componentLocator('PlusButton') export const componentBrowser = componentLocator('ComponentBrowser') diff --git a/app/gui2/mock/MockFSWrapper.vue b/app/gui2/mock/MockFSWrapper.vue index 7fcf38473a00..7e8d58cec9e0 100644 --- a/app/gui2/mock/MockFSWrapper.vue +++ b/app/gui2/mock/MockFSWrapper.vue @@ -2,9 +2,9 @@ import { useProjectStore } from '@/stores/project' import { mockFsDirectoryHandle } from '@/util/convert/fsAccess' import { MockWebSocket, type WebSocketHandler } from '@/util/net' -import { mockDataWSHandler } from 'shared/dataServer/mock' import { type Path as LSPath } from 'shared/languageServerTypes' import { watchEffect } from 'vue' +import { mockDataWSHandler } from './dataServer' const projectStore = useProjectStore() diff --git a/app/gui2/shared/dataServer/mock.ts b/app/gui2/mock/dataServer.ts similarity index 98% rename from app/gui2/shared/dataServer/mock.ts rename to app/gui2/mock/dataServer.ts index b418834d8b78..822b3afdb9d5 100644 --- a/app/gui2/shared/dataServer/mock.ts +++ b/app/gui2/mock/dataServer.ts @@ -27,9 +27,9 @@ import { type AnyOutboundPayload, type Offset, type Table, -} from '../binaryProtocol' -import { LanguageServerErrorCode } from '../languageServerTypes' -import { uuidToBits } from '../uuid' +} from 'shared/binaryProtocol' +import { LanguageServerErrorCode } from 'shared/languageServerTypes' +import { uuidToBits } from 'shared/uuid' const sha3 = createSHA3(224) diff --git a/app/gui2/mock/engine.ts b/app/gui2/mock/engine.ts index 83cd86cd8fe7..e5245234644e 100644 --- a/app/gui2/mock/engine.ts +++ b/app/gui2/mock/engine.ts @@ -1,3 +1,4 @@ +import { Pattern } from '@/util/ast/match' import type { MockYdocProviderImpl } from '@/util/crdt' import * as random from 'lib0/random' import * as Ast from 'shared/ast' @@ -9,7 +10,7 @@ import { VisualizationContext, VisualizationUpdate, } from 'shared/binaryProtocol' -import { mockDataWSHandler as originalMockDataWSHandler } from 'shared/dataServer/mock' +import { ErrorCode } from 'shared/languageServer' import type { ContextId, ExpressionId, @@ -26,6 +27,7 @@ import type { QualifiedName } from 'src/util/qualifiedName' import * as Y from 'yjs' import { mockFsDirectoryHandle, type FileTree } from '../src/util/convert/fsAccess' import mockDb from '../stories/mockSuggestions.json' assert { type: 'json' } +import { mockDataWSHandler as originalMockDataWSHandler } from './dataServer' const mockProjectId = random.uuidv4() as Uuid const standardBase = 'Standard.Base' as QualifiedName @@ -93,19 +95,21 @@ const visualizationExprIds = new Map() const encoder = new TextEncoder() const encodeJSON = (data: unknown) => encoder.encode(JSON.stringify(data)) -const scatterplotJson = encodeJSON({ - axis: { - x: { label: 'x-axis label', scale: 'linear' }, - y: { label: 'y-axis label', scale: 'logarithmic' }, - }, - points: { labels: 'visible' }, - data: [ - { x: 0.1, y: 0.7, label: 'foo', color: '#FF0000', shape: 'circle', size: 0.2 }, - { x: 0.4, y: 0.2, label: 'baz', color: '#0000FF', shape: 'square', size: 0.3 }, - ], -}) +const scatterplotJson = (params: string[]) => + encodeJSON({ + visualizedExpr: params[0], + axis: { + x: { label: 'x-axis label', scale: 'linear' }, + y: { label: 'y-axis label', scale: 'logarithmic' }, + }, + points: { labels: 'visible' }, + data: [ + { x: 0.1, y: 0.7, label: 'foo', color: '#FF0000', shape: 'circle', size: 0.2 }, + { x: 0.4, y: 0.2, label: 'baz', color: '#0000FF', shape: 'square', size: 0.3 }, + ], + }) -const mockVizData: Record Uint8Array)> = { +const mockVizPreprocessors: Record Uint8Array)> = { // JSON 'Standard.Visualization.Preprocessor.default_preprocessor': scatterplotJson, 'Standard.Visualization.Scatter_Plot.process_to_json_text': scatterplotJson, @@ -320,7 +324,6 @@ const mockVizData: Record Uint8Array return encodeJSON([]) } }, - 'Standard.Visualization.AI.build_ai_prompt': () => encodeJSON('Could you __$$GOAL$$__, please?'), // The following visualizations do not have unique transformation methods, and as such are only kept // for posterity. @@ -353,9 +356,9 @@ function createId(id: Uuid) { return (builder: Builder) => EnsoUUID.createEnsoUUID(builder, low, high) } -function sendVizData(id: Uuid, config: VisualizationConfiguration) { +function sendVizData(id: Uuid, config: VisualizationConfiguration, expressionId?: Uuid) { const vizDataHandler = - mockVizData[ + mockVizPreprocessors[ typeof config.expression === 'string' ? `${config.visualizationModule}.${config.expression}` : `${config.expression.definedOnType}.${config.expression.name}` @@ -365,12 +368,22 @@ function sendVizData(id: Uuid, config: VisualizationConfiguration) { vizDataHandler instanceof Uint8Array ? vizDataHandler : ( vizDataHandler(config.positionalArgumentsExpressions ?? []) ) + const exprId = expressionId ?? visualizationExprIds.get(id) + sendVizUpdate(id, config.executionContextId, exprId, vizData) +} + +function sendVizUpdate( + id: Uuid, + executionCtxId: Uuid, + exprId: Uuid | undefined, + vizData: Uint8Array, +) { + if (!sendData) return const builder = new Builder() - const exprId = visualizationExprIds.get(id) const visualizationContextOffset = VisualizationContext.createVisualizationContext( builder, createId(id), - createId(config.executionContextId), + createId(executionCtxId), exprId ? createId(exprId) : null, ) const dataOffset = VisualizationUpdate.createDataVector(builder, vizData) @@ -446,16 +459,27 @@ export const mockLSHandler: MockTransportData = async (method, data, transport) expressionId: ExpressionId expression: string } - const { func, args } = Ast.analyzeAppLike(Ast.parse(data_.expression)) - if (!(func instanceof Ast.PropertyAccess && func.lhs)) return - const visualizationConfig: VisualizationConfiguration = { - executionContextId: data_.executionContextId, - visualizationModule: func.lhs.code(), - expression: func.rhs.code(), - positionalArgumentsExpressions: args.map((ast) => ast.code()), + const aiPromptPat = Pattern.parse('Standard.Visualization.AI.build_ai_prompt __ . to_json') + const exprAst = Ast.parse(data_.expression) + if (aiPromptPat.test(exprAst)) { + sendVizUpdate( + data_.visualizationId, + data_.executionContextId, + data_.expressionId, + encodeJSON('Could you __$$GOAL$$__, please?'), + ) + } else { + // Check if there's existing preprocessor mock which matches our expression + const { func, args } = Ast.analyzeAppLike(exprAst) + if (!(func instanceof Ast.PropertyAccess && func.lhs)) return + const visualizationConfig: VisualizationConfiguration = { + executionContextId: data_.executionContextId, + visualizationModule: func.lhs.code(), + expression: func.rhs.code(), + positionalArgumentsExpressions: args.map((ast) => ast.code()), + } + sendVizData(data_.visualizationId, visualizationConfig, data_.expressionId) } - visualizationExprIds.set(data_.visualizationId, data_.expressionId) - sendVizData(data_.visualizationId, visualizationConfig) return } case 'search/getSuggestionsDatabase': @@ -487,9 +511,16 @@ export const mockLSHandler: MockTransportData = async (method, data, transport) if (!child || typeof child === 'string' || child instanceof ArrayBuffer) break } } - if (!child) return Promise.reject(`Folder '/${data_.path.segments.join('/')}' not found.`) + if (!child) + return Promise.reject({ + code: ErrorCode.FILE_NOT_FOUND, + message: `Folder '/${data_.path.segments.join('/')}' not found.`, + }) if (typeof child === 'string' || child instanceof ArrayBuffer) - return Promise.reject(`File '/${data_.path.segments.join('/')}' is not a folder.`) + return Promise.reject({ + code: ErrorCode.NOT_DIRECTORY, + message: `File '/${data_.path.segments.join('/')}' is not a folder.`, + }) return { paths: Object.entries(child).map(([name, entry]) => ({ type: typeof entry === 'string' || entry instanceof ArrayBuffer ? 'File' : 'Directory', @@ -500,8 +531,7 @@ export const mockLSHandler: MockTransportData = async (method, data, transport) } case 'ai/completion': { const { prompt } = data - console.log(prompt) - const match = /^"Could you (.*), please\?"$/.exec(prompt) + const match = /^Could you (.*), please\?$/.exec(prompt) if (!match) { return { code: 'How rude!' } } else if (match[1] === 'convert to table') { diff --git a/app/gui2/package.json b/app/gui2/package.json index 8de3ff9d6f91..6e4c423022c4 100644 --- a/app/gui2/package.json +++ b/app/gui2/package.json @@ -63,6 +63,7 @@ "lib0": "^0.2.85", "magic-string": "^0.30.3", "murmurhash": "^2.0.1", + "partysocket": "^1.0.1", "pinia": "^2.1.7", "postcss-inline-svg": "^6.0.0", "postcss-nesting": "^12.0.1", diff --git a/app/gui2/shared/languageServer.ts b/app/gui2/shared/languageServer.ts index 75e577b0ff7f..f9a6fd4b9eb7 100644 --- a/app/gui2/shared/languageServer.ts +++ b/app/gui2/shared/languageServer.ts @@ -1,12 +1,13 @@ import { sha3_224 as SHA3 } from '@noble/hashes/sha3' import { bytesToHex } from '@noble/hashes/utils' -import { Client } from '@open-rpc/client-js' +import { Client, RequestManager } from '@open-rpc/client-js' import { ObservableV2 } from 'lib0/observable' import { uuidv4 } from 'lib0/random' import { z } from 'zod' import { walkFs } from './languageServer/files' import type { Checksum, + ContentRoot, ContextId, Event, ExecutionEnvironment, @@ -21,6 +22,12 @@ import type { VisualizationConfiguration, response, } from './languageServerTypes' +import { Err, Ok, type Result } from './util/data/result' +import { + AbortScope, + exponentialBackoff, + type ReconnectingTransportWithWebsocketEvents, +} from './util/net' import type { Uuid } from './yjsModel' const DEBUG_LOG_RPC = false @@ -91,41 +98,117 @@ export class RemoteRpcError { } } -export class LsRpcError extends Error { - cause: RemoteRpcError | Error +export class LsRpcError { + cause: RemoteRpcError | Error | string request: string params: object - constructor(cause: RemoteRpcError | Error, request: string, params: object) { - super(`Language server request '${request}' failed.`) + constructor(cause: RemoteRpcError | Error | string, request: string, params: object) { this.cause = cause this.request = request this.params = params } + + toString() { + return `Language server request '${this.request} failed: ${this.cause instanceof RemoteRpcError ? this.cause.message : this.cause}` + } } -/** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md) */ -export class LanguageServer extends ObservableV2 { - client: Client - handlers: Map void>> - retainCount = 1 +export type LsRpcResult = Result - constructor(client: Client) { - super() - this.client = client - this.handlers = new Map() +export type TransportEvents = { + 'transport/closed': () => void + 'transport/connected': () => void +} - client.onNotification((notification) => { +/** + * This client implements the [Language Server Protocol](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md) + * + * It also handles the initialization (and re-initialization on every reconnect); each method + * repressenting a remote call (except the `initProtocolConnection` obviously) waits for + * initialization before sending the request. + */ +export class LanguageServer extends ObservableV2 { + client: Client + /** + * This promise is resolved once the LS protocol is initialized. When connection is lost, this + * field becomes again an unresolved promise until reconnected and reinitialized. + */ + initialized: Promise> + private clientScope: AbortScope = new AbortScope() + private initializationScheduled = false + private retainCount = 1 + + constructor( + private clientID: Uuid, + private transport: ReconnectingTransportWithWebsocketEvents, + ) { + super() + this.initialized = this.scheduleInitializationAfterConnect() + const requestManager = new RequestManager([transport]) + this.client = new Client(requestManager) + this.client.onNotification((notification) => { this.emit(notification.method as keyof Notifications, [notification.params]) }) - client.onError((error) => { + this.client.onError((error) => { console.error(`Unexpected LS connection error:`, error) }) + transport.on('error', (error) => console.error('Language Server transport error:', error)) + const reinitializeCb = () => { + this.emit('transport/closed', []) + console.log('Language Server: websocket closed') + this.scheduleInitializationAfterConnect() + } + transport.on('close', reinitializeCb) + this.clientScope.onAbort(() => { + this.transport.off('close', reinitializeCb) + this.transport.close() + }) + } + + private scheduleInitializationAfterConnect() { + if (this.initializationScheduled) return this.initialized + this.initializationScheduled = true + this.initialized = new Promise((resolve) => { + const cb = () => { + this.transport.off('open', cb) + this.emit('transport/connected', []) + this.initializationScheduled = false + exponentialBackoff(() => this.initProtocolConnection(this.clientID), { + onBeforeRetry: (error, _, delay) => { + console.warn( + `Failed to initialize language server connection, retrying after ${delay}ms...\n`, + error, + ) + }, + }).then((result) => { + if (!result.ok) { + result.error.log('Error initializing Language Server RPC') + } + resolve(result) + }) + } + this.transport.on('open', cb) + }) + return this.initialized + } + + get contentRoots(): Promise { + return this.initialized.then((result) => (result.ok ? result.value.contentRoots : [])) + } + + reconnect() { + this.transport.reconnect() } // The "magic bag of holding" generic that is only present in the return type is UNSOUND. // However, it is SAFE, as the return type of the API is statically known. - private async request(method: string, params: object): Promise { - if (this.retainCount === 0) return Promise.reject(new Error('LanguageServer disposed')) + private async request( + method: string, + params: object, + waitForInit = true, + ): Promise> { + if (this.retainCount === 0) + return Err(new LsRpcError('LanguageServer disposed', method, params)) const uuid = uuidv4() const now = performance.now() try { @@ -133,15 +216,18 @@ export class LanguageServer extends ObservableV2 { console.log(`LS [${uuid}] ${method}:`) console.dir(params) } - return await this.client.request({ method, params }, RPC_TIMEOUT_MS) + if (waitForInit) { + const initResult = await this.initialized + if (!initResult.ok) return initResult + } + return Ok(await this.client.request({ method, params }, RPC_TIMEOUT_MS)) } catch (error) { const remoteError = RemoteRpcErrorSchema.safeParse(error) if (remoteError.success) { - throw new LsRpcError(new RemoteRpcError(remoteError.data), method, params) + return Err(new LsRpcError(new RemoteRpcError(remoteError.data), method, params)) } else if (error instanceof Error) { - throw new LsRpcError(error, method, params) - } - throw error + return Err(new LsRpcError(error, method, params)) + } else throw error } finally { if (DEBUG_LOG_RPC) { console.log(`LS [${uuid}] ${method} took ${performance.now() - now}ms`) @@ -150,146 +236,146 @@ export class LanguageServer extends ObservableV2 { } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#capabilityacquire) */ - acquireCapability(method: string, registerOptions: RegisterOptions): Promise { + acquireCapability(method: string, registerOptions: RegisterOptions): Promise> { return this.request('capability/acquire', { method, registerOptions }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filereceivestreeupdates) */ - acquireReceivesTreeUpdates(path: Path): Promise { + acquireReceivesTreeUpdates(path: Path): Promise> { return this.acquireCapability('file/receivesTreeUpdates', { path }) } - acquireExecutionContextCanModify(contextId: ContextId): Promise { + acquireExecutionContextCanModify(contextId: ContextId): Promise> { return this.acquireCapability('executionContext/canModify', { contextId }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#sessioninitprotocolconnection) */ - initProtocolConnection(clientId: Uuid): Promise { - return this.request('session/initProtocolConnection', { clientId }) + initProtocolConnection(clientId: Uuid): Promise> { + return this.request('session/initProtocolConnection', { clientId }, false) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#textopenfile) */ - openTextFile(path: Path): Promise { + openTextFile(path: Path): Promise> { return this.request('text/openFile', { path }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#textclosefile) */ - closeTextFile(path: Path): Promise { + closeTextFile(path: Path): Promise> { return this.request('text/closeFile', { path }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#textsave) */ - saveTextFile(path: Path, currentVersion: Checksum): Promise { + saveTextFile(path: Path, currentVersion: Checksum): Promise> { return this.request('text/save', { path, currentVersion }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#textapplyedit) */ - applyEdit(edit: FileEdit, execute: boolean): Promise { + applyEdit(edit: FileEdit, execute: boolean): Promise> { return this.request('text/applyEdit', { edit, execute }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filewrite) */ - writeFile(path: Path, contents: TextFileContents): Promise { + writeFile(path: Path, contents: TextFileContents): Promise> { return this.request('file/write', { path, contents }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#fileread) */ - readFile(path: Path): Promise { + readFile(path: Path): Promise> { return this.request('file/read', { path }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filecreate) */ - createFile(object: FileSystemObject): Promise { + createFile(object: FileSystemObject): Promise> { return this.request('file/create', { object }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filedelete) */ - deleteFile(path: Path): Promise { + deleteFile(path: Path): Promise> { return this.request('file/delete', { path }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filecopy) */ - copyFile(from: Path, to: Path): Promise { + copyFile(from: Path, to: Path): Promise> { return this.request('file/copy', { from, to }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filemove) */ - moveFile(from: Path, to: Path): Promise { + moveFile(from: Path, to: Path): Promise> { return this.request('file/move', { from, to }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#fileexists) */ - fileExists(path: Path): Promise { + fileExists(path: Path): Promise> { return this.request('file/exists', { path }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filetree) */ - fileTree(path: Path, depth?: number): Promise { + fileTree(path: Path, depth?: number): Promise> { return this.request('file/tree', { path, depth }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filelist) */ - listFiles(path: Path): Promise { + listFiles(path: Path): Promise> { return this.request('file/list', { path }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#fileinfo) */ - fileInfo(path: Path): Promise { + fileInfo(path: Path): Promise> { return this.request('file/info', { path }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#filechecksum) */ - fileChecksum(path: Path): Promise { + fileChecksum(path: Path): Promise> { return this.request('file/checksum', { path }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcsinit) */ - vcsInit(root: Path): Promise { + vcsInit(root: Path): Promise> { return this.request('vcs/init', { root }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcssave) */ - vcsSave(root: Path, name?: string): Promise { + vcsSave(root: Path, name?: string): Promise> { return this.request('vcs/save', { root, name }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcsstatus) */ - vcsStatus(root: Path): Promise { + vcsStatus(root: Path): Promise> { return this.request('vcs/status', { root }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcsrestore) */ - vcsRestore(root: Path, commitId?: string): Promise { + vcsRestore(root: Path, commitId?: string): Promise> { return this.request('vcs/restore', { root, commitId }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#vcslist) */ - vcsList(root: Path, limit?: number): Promise { + vcsList(root: Path, limit?: number): Promise> { return this.request('vcs/list', { root, limit }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextcreate) */ - createExecutionContext(contextId?: ContextId): Promise { + createExecutionContext(contextId?: ContextId): Promise> { return this.request('executionContext/create', { contextId }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextdestroy) */ - destroyExecutionContext(contextId: ContextId): Promise { + destroyExecutionContext(contextId: ContextId): Promise> { return this.request('executionContext/destroy', { contextId }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextfork) */ - forkExecutionContext(contextId: ContextId): Promise { + forkExecutionContext(contextId: ContextId): Promise> { return this.request('executionContext/fork', { contextId }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextpush) */ - pushExecutionContextItem(contextId: ContextId, stackItem: StackItem): Promise { + pushExecutionContextItem(contextId: ContextId, stackItem: StackItem): Promise> { return this.request('executionContext/push', { contextId, stackItem }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextpop) */ - popExecutionContextItem(contextId: ContextId): Promise { + popExecutionContextItem(contextId: ContextId): Promise> { return this.request('executionContext/pop', { contextId }) } @@ -298,7 +384,7 @@ export class LanguageServer extends ObservableV2 { contextId: ContextId, invalidatedExpressions?: 'all' | string[], executionEnvironment?: ExecutionEnvironment, - ): Promise { + ): Promise> { return this.request('executionContext/recompute', { contextId, invalidatedExpressions, @@ -307,7 +393,7 @@ export class LanguageServer extends ObservableV2 { } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#executioncontextinterrupt) */ - interruptExecutionContext(contextId: ContextId): Promise { + interruptExecutionContext(contextId: ContextId): Promise> { return this.request('executionContext/interrupt', { contextId }) } @@ -315,7 +401,7 @@ export class LanguageServer extends ObservableV2 { setExecutionEnvironment( contextId: ContextId, executionEnvironment?: ExecutionEnvironment, - ): Promise { + ): Promise> { return this.request('executionContext/setExecutionEnvironment', { contextId, executionEnvironment, @@ -328,7 +414,7 @@ export class LanguageServer extends ObservableV2 { visualizationId: Uuid, expressionId: ExpressionId, expression: string, - ): Promise { + ): Promise> { return this.request('executionContext/executeExpression', { executionContextId, visualizationId, @@ -342,7 +428,7 @@ export class LanguageServer extends ObservableV2 { visualizationId: Uuid, expressionId: ExpressionId, visualizationConfig: VisualizationConfiguration, - ): Promise { + ): Promise> { return this.request('executionContext/attachVisualization', { visualizationId, expressionId, @@ -355,7 +441,7 @@ export class LanguageServer extends ObservableV2 { visualizationId: Uuid, expressionId: ExpressionId, contextId: ContextId, - ): Promise { + ): Promise> { return this.request('executionContext/detachVisualization', { visualizationId, expressionId, @@ -367,7 +453,7 @@ export class LanguageServer extends ObservableV2 { modifyVisualization( visualizationId: Uuid, visualizationConfig: VisualizationConfiguration, - ): Promise { + ): Promise> { return this.request('executionContext/modifyVisualization', { visualizationId, visualizationConfig, @@ -375,45 +461,43 @@ export class LanguageServer extends ObservableV2 { } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#searchgetsuggestionsdatabase) */ - getSuggestionsDatabase(): Promise { + getSuggestionsDatabase(): Promise> { return this.request('search/getSuggestionsDatabase', {}) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#runtimegetcomponentgroups) */ - getComponentGroups(): Promise { + getComponentGroups(): Promise> { return this.request('runtime/getComponentGroups', {}) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#profilingstart) */ - profilingStart(memorySnapshot?: boolean): Promise { + profilingStart(memorySnapshot?: boolean): Promise> { return this.request('profiling/start', { memorySnapshot }) } /** [Documentation](https://github.com/enso-org/enso/blob/develop/docs/language-server/protocol-language-server.md#profilingstop) */ - profilingStop(): Promise { + profilingStop(): Promise> { return this.request('profiling/stop', {}) } - aiCompletion(prompt: string, stopSequence: string): Promise { + aiCompletion(prompt: string, stopSequence: string): Promise> { return this.request('ai/completion', { prompt, stopSequence }) } /** A helper function to subscribe to file updates. * Please use `ls.on('file/event')` directly if the initial `'Added'` notifications are not * needed. */ - watchFiles( - rootId: Uuid, - segments: string[], - callback: (event: Event<'file/event'>) => void, - retry: (cb: () => Promise) => Promise = (f) => f(), - ) { + watchFiles(rootId: Uuid, segments: string[], callback: (event: Event<'file/event'>) => void) { let running = true const self = this return { promise: (async () => { self.on('file/event', callback) - await retry(async () => running && self.acquireReceivesTreeUpdates({ rootId, segments })) - await walkFs(self, { rootId, segments }, (type, path) => { + const updatesAcquired = await exponentialBackoff(async () => + running ? self.acquireReceivesTreeUpdates({ rootId, segments }) : Ok(), + ) + if (!updatesAcquired) return updatesAcquired + return await walkFs(self, { rootId, segments }, (type, path) => { if ( !running || type !== 'File' || @@ -445,7 +529,7 @@ export class LanguageServer extends ObservableV2 { if (this.retainCount > 0) { this.retainCount -= 1 if (this.retainCount === 0) { - this.client.close() + this.clientScope.dispose('Language server released') } } else { throw new Error('Released already disposed language server.') diff --git a/app/gui2/shared/languageServer/files.ts b/app/gui2/shared/languageServer/files.ts index 79454facc4da..567573570081 100644 --- a/app/gui2/shared/languageServer/files.ts +++ b/app/gui2/shared/languageServer/files.ts @@ -1,12 +1,15 @@ -import { type LanguageServer } from 'shared/languageServer' -import type { FileSystemObject, Path } from 'shared/languageServerTypes' +import { type LanguageServer } from '../languageServer' +import type { FileSystemObject, Path } from '../languageServerTypes' +import { Err, Ok, type Result } from '../util/data/result' export async function walkFs( ls: LanguageServer, path: Path, cb: (type: FileSystemObject['type'], path: Path) => void, -) { - for (const file of (await ls.listFiles(path)).paths) { +): Promise> { + const files = await ls.listFiles(path) + if (!files.ok) return files + for (const file of files.value.paths) { const filePath: Path = { rootId: file.path.rootId, segments: [...file.path.segments, file.name], @@ -26,8 +29,9 @@ export async function walkFs( } default: { const unexpected: never = file - throw new Error('Unexpected object: ' + JSON.stringify(unexpected)) + return Err('Unexpected object: ' + JSON.stringify(unexpected)) } } } + return Ok() } diff --git a/app/gui2/shared/languageServerTypes/suggestions.ts b/app/gui2/shared/languageServerTypes/suggestions.ts index a00f28a42dbc..8e3835a62ce7 100644 --- a/app/gui2/shared/languageServerTypes/suggestions.ts +++ b/app/gui2/shared/languageServerTypes/suggestions.ts @@ -1,4 +1,4 @@ -import type { Uuid } from 'shared/languageServerTypes' +import type { Uuid } from '../languageServerTypes' export type SuggestionId = number diff --git a/app/gui2/shared/retry.ts b/app/gui2/shared/retry.ts deleted file mode 100644 index 6e98d487d6f0..000000000000 --- a/app/gui2/shared/retry.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { wait } from 'lib0/promise' - -export interface BackoffOptions { - maxRetries?: number - retryDelay?: number - retryDelayMultiplier?: number - retryDelayMax?: number - /** Called when the promise throws an error, and the next retry is about to be attempted. - * When this function returns `false`, the backoff is immediately aborted. When this function - * is not provided, the backoff will always continue until the maximum number of retries - * is reached. * */ - onBeforeRetry?: ( - error: E, - retryCount: number, - maxRetries: number, - delay: number, - ) => boolean | void - /** Called right before returning. */ - onSuccess?: (retryCount: number) => void - /** Called after the final retry, right before throwing an error. - * Note that `onBeforeRetry` is *not* called on the final retry, as there is nothing after the - * final retry. */ - onFailure?: (error: E, retryCount: number) => void -} - -const defaultBackoffOptions: Required> = { - maxRetries: 3, - retryDelay: 1000, - retryDelayMultiplier: 2, - retryDelayMax: 10000, - onBeforeRetry: () => {}, - onSuccess: () => {}, - onFailure: () => {}, -} - -/** Retry a failing promise function with exponential backoff. */ -export async function exponentialBackoff( - f: () => Promise, - backoffOptions?: BackoffOptions, -): Promise { - const { - maxRetries, - retryDelay, - retryDelayMultiplier, - retryDelayMax, - onBeforeRetry, - onSuccess, - onFailure, - } = { - ...defaultBackoffOptions, - ...backoffOptions, - } - for ( - let retries = 0, delay = retryDelay; - ; - retries += 1, delay = Math.min(retryDelayMax, delay * retryDelayMultiplier) - ) { - try { - const result = await f() - onSuccess(retries) - return result - } catch (error) { - if (retries >= maxRetries) { - onFailure(error as E, retries) - throw error - } - if (onBeforeRetry(error as E, retries, maxRetries, delay) === false) throw error - await wait(delay) - } - } -} - -export function defaultOnBeforeRetry( - description: string, -): NonNullable['onBeforeRetry']> { - return (error, retryCount, maxRetries, delay) => { - console.error( - 'Could not ' + - description + - ` (${retryCount}/${maxRetries} retries), retrying after ${delay}ms...`, - ) - console.error(error) - } -} - -export function defaultOnFailure( - description: string, -): NonNullable['onFailure']> { - return (error, retryCount) => { - console.error( - 'Could not ' + description + ` (${retryCount}/${retryCount} retries), throwing error.`, - ) - console.error(error) - } -} - -export function defaultOnSuccess( - description: string, -): NonNullable['onSuccess']> { - return (retryCount) => { - if (retryCount === 0) return - console.info( - 'Successfully ' + - description + - ` after ${retryCount} ${retryCount === 1 ? 'failure' : 'failures'}.`, - ) - } -} - -/** @param successDescription Should be in past tense, without an initial capital letter. - * @param errorDescription Should be in present tense, without an initial capital letter. */ -export function printingCallbacks(successDescription: string, errorDescription: string) { - return { - onBeforeRetry: defaultOnBeforeRetry(errorDescription), - onSuccess: defaultOnSuccess(successDescription), - onFailure: defaultOnFailure(errorDescription), - } satisfies BackoffOptions -} diff --git a/app/gui2/shared/util/__tests__/net.test.ts b/app/gui2/shared/util/__tests__/net.test.ts new file mode 100644 index 000000000000..fbbb813e8770 --- /dev/null +++ b/app/gui2/shared/util/__tests__/net.test.ts @@ -0,0 +1,74 @@ +import { Err, Ok, ResultError } from 'shared/util/data/result' +import { exponentialBackoff } from 'shared/util/net' +import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest' + +beforeEach(() => { + vi.useFakeTimers() +}) +afterEach(() => { + vi.useRealTimers() +}) + +describe('exponentialBackoff', () => { + test('runs successful task once', async () => { + const task = vi.fn(async () => Ok(1)) + const result = await exponentialBackoff(task) + expect(result).toEqual({ ok: true, value: 1 }) + expect(task).toHaveBeenCalledTimes(1) + }) + + test('retry failing task up to a limit', async () => { + const task = vi.fn(async () => Err(1)) + const promise = exponentialBackoff(task, { maxRetries: 4 }) + vi.runAllTimersAsync() + const result = await promise + expect(result).toEqual({ ok: false, error: new ResultError(1) }) + expect(task).toHaveBeenCalledTimes(5) + }) + + test('wait before retrying', async () => { + const task = vi.fn(async () => Err(null)) + exponentialBackoff(task, { + maxRetries: 10, + retryDelay: 100, + retryDelayMultiplier: 3, + retryDelayMax: 1000, + }) + expect(task).toHaveBeenCalledTimes(1) + await vi.advanceTimersByTimeAsync(100) + expect(task).toHaveBeenCalledTimes(2) + await vi.advanceTimersByTimeAsync(300) + expect(task).toHaveBeenCalledTimes(3) + await vi.advanceTimersByTimeAsync(900) + expect(task).toHaveBeenCalledTimes(4) + await vi.advanceTimersByTimeAsync(5000) + expect(task).toHaveBeenCalledTimes(9) + }) + + test('retry task until success', async () => { + const task = vi.fn() + task.mockReturnValueOnce(Promise.resolve(Err(3))) + task.mockReturnValueOnce(Promise.resolve(Err(2))) + task.mockReturnValueOnce(Promise.resolve(Ok(1))) + const promise = exponentialBackoff(task) + vi.runAllTimersAsync() + const result = await promise + expect(result).toEqual({ ok: true, value: 1 }) + expect(task).toHaveBeenCalledTimes(3) + }) + + test('call retry callback', async () => { + const task = vi.fn() + task.mockReturnValueOnce(Promise.resolve(Err(3))) + task.mockReturnValueOnce(Promise.resolve(Err(2))) + task.mockReturnValueOnce(Promise.resolve(Ok(1))) + const onBeforeRetry = vi.fn() + + const promise = exponentialBackoff(task, { onBeforeRetry }) + vi.runAllTimersAsync() + await promise + expect(onBeforeRetry).toHaveBeenCalledTimes(2) + expect(onBeforeRetry).toHaveBeenNthCalledWith(1, new ResultError(3), 0, 3, 1000) + expect(onBeforeRetry).toHaveBeenNthCalledWith(2, new ResultError(2), 1, 3, 2000) + }) +}) diff --git a/app/gui2/shared/util/data/result.ts b/app/gui2/shared/util/data/result.ts index 471ca1fed2ab..691027135ee7 100644 --- a/app/gui2/shared/util/data/result.ts +++ b/app/gui2/shared/util/data/result.ts @@ -3,11 +3,13 @@ import { isSome, type Opt } from './opt' -export type Result = +export type Result = | { ok: true; value: T } | { ok: false; error: ResultError } -export function Ok(data: T): Result { +export function Ok(): Result +export function Ok(data: T): Result +export function Ok(data?: T): Result { return { ok: true, value: data } } @@ -40,7 +42,7 @@ export function isResult(v: unknown): v is Result { ) } -export class ResultError { +export class ResultError { payload: E context: (() => string)[] @@ -60,13 +62,28 @@ export class ResultError { } } -export function withContext(context: () => string, f: () => Result): Result { +export function withContext(context: () => string, f: () => Result): Result +export function withContext( + context: () => string, + f: () => Promise>, +): Promise> +export function withContext( + context: () => string, + f: () => Promise> | Result, +) { const result = f() - if (result == null) { - throw new Error('withContext: f() returned null or undefined') + const handleResult = (result: Result) => { + if (result == null) { + throw new Error('withContext: f() returned null or undefined') + } + if (!result.ok) result.error.context.splice(0, 0, context) + return result + } + if (result instanceof Promise) { + return result.then(handleResult) + } else { + return handleResult(result) } - if (!result.ok) result.error.context.splice(0, 0, context) - return result } /** diff --git a/app/gui2/shared/util/net.ts b/app/gui2/shared/util/net.ts index f468933a3019..8f839aef30c2 100644 --- a/app/gui2/shared/util/net.ts +++ b/app/gui2/shared/util/net.ts @@ -1,4 +1,8 @@ +import { Transport } from '@open-rpc/client-js/build/transports/Transport' import type { ObservableV2 } from 'lib0/observable' +import { wait } from 'lib0/promise' +import { type WebSocketEventMap } from 'partysocket/ws' +import { type Result, type ResultError } from './data/result' interface Disposable { dispose(): void @@ -37,3 +41,127 @@ export class AbortScope { return f } } + +export interface BackoffOptions { + maxRetries?: number + retryDelay?: number + retryDelayMultiplier?: number + retryDelayMax?: number + /** Called when the promise throws an error, and the next retry is about to be attempted. + * When this function returns `false`, the backoff is immediately aborted. When this function + * is not provided, the backoff will always continue until the maximum number of retries + * is reached. * */ + onBeforeRetry?: ( + error: ResultError, + retryCount: number, + maxRetries: number, + delay: number, + ) => boolean | void + /** Called right before returning. */ + onSuccess?: (retryCount: number) => void + /** Called after the final retry, right before throwing an error. + * Note that `onBeforeRetry` is *not* called on the final retry, as there is nothing after the + * final retry. */ + onFailure?: (error: ResultError, retryCount: number) => void +} + +const defaultBackoffOptions: Required> = { + maxRetries: 3, + retryDelay: 1000, + retryDelayMultiplier: 2, + retryDelayMax: 10000, + onBeforeRetry: () => {}, + onSuccess: () => {}, + onFailure: () => {}, +} + +/** Retry a failing promise function with exponential backoff. */ +export async function exponentialBackoff( + f: () => Promise>, + backoffOptions?: BackoffOptions, +): Promise> { + const { + maxRetries, + retryDelay, + retryDelayMultiplier, + retryDelayMax, + onBeforeRetry, + onSuccess, + onFailure, + } = { + ...defaultBackoffOptions, + ...backoffOptions, + } + for ( + let retries = 0, delay = retryDelay; + ; + retries += 1, delay = Math.min(retryDelayMax, delay * retryDelayMultiplier) + ) { + const result = await f() + if (result.ok) { + onSuccess(retries) + return result + } + if (retries >= maxRetries) { + onFailure(result.error, retries) + return result + } + if (onBeforeRetry(result.error, retries, maxRetries, delay) === false) { + return result + } + await wait(delay) + } +} + +export function defaultOnBeforeRetry( + description: string, +): NonNullable['onBeforeRetry']> { + return (error, retryCount, maxRetries, delay) => { + console.error( + 'Could not ' + + description + + ` (${retryCount}/${maxRetries} retries), retrying after ${delay}ms...`, + ) + console.error(error) + } +} + +export function defaultOnFailure( + description: string, +): NonNullable['onFailure']> { + return (error, retryCount) => { + console.error( + 'Could not ' + description + ` (${retryCount}/${retryCount} retries), throwing error.`, + ) + console.error(error) + } +} + +export function defaultOnSuccess( + description: string, +): NonNullable['onSuccess']> { + return (retryCount) => { + if (retryCount === 0) return + console.info( + 'Successfully ' + + description + + ` after ${retryCount} ${retryCount === 1 ? 'failure' : 'failures'}.`, + ) + } +} + +/** @param successDescription Should be in past tense, without an initial capital letter. + * @param errorDescription Should be in present tense, without an initial capital letter. */ +export function printingCallbacks(successDescription: string, errorDescription: string) { + return { + onBeforeRetry: defaultOnBeforeRetry(errorDescription), + onSuccess: defaultOnSuccess(successDescription), + onFailure: defaultOnFailure(errorDescription), + } satisfies BackoffOptions +} + +export type ReconnectingTransportWithWebsocketEvents = Transport & { + on(type: K, cb: (event: WebSocketEventMap[K]) => void): void + off(type: K, cb: (event: WebSocketEventMap[K]) => void): void + reconnect(): void +} diff --git a/app/gui2/shared/util/net/ReconnectingWSTransport.ts b/app/gui2/shared/util/net/ReconnectingWSTransport.ts new file mode 100644 index 000000000000..636489e3e97d --- /dev/null +++ b/app/gui2/shared/util/net/ReconnectingWSTransport.ts @@ -0,0 +1,85 @@ +/** + * This file is modified version of open-rpc/client-js WebSocketTransport implementation + * (https://github.com/open-rpc/client-js/blob/master/src/transports/WebSocketTransport.ts) + * which uses the automatically reconnecting websocket. + */ + +import { ERR_UNKNOWN, JSONRPCError } from '@open-rpc/client-js/build/Error' +import { + getBatchRequests, + getNotifications, + type JSONRPCRequestData, +} from '@open-rpc/client-js/build/Request' +import { Transport } from '@open-rpc/client-js/build/transports/Transport' +import WS from 'isomorphic-ws' +import { WebSocket } from 'partysocket' +import { type WebSocketEventMap } from 'partysocket/ws' + +class ReconnectingWebSocketTransport extends Transport { + public connection: WebSocket + public uri: string + + constructor(uri: string) { + super() + this.uri = uri + this.connection = new WebSocket(uri, undefined, { WebSocket: WS }) + } + public connect(): Promise { + return new Promise((resolve) => { + const cb = () => { + this.connection.removeEventListener('open', cb) + resolve() + } + this.connection.addEventListener('open', cb) + this.connection.addEventListener('message', (message: { data: string }) => { + const { data } = message + this.transportRequestManager.resolveResponse(data) + }) + }) + } + + public reconnect() { + this.connection.reconnect() + } + + public async sendData(data: JSONRPCRequestData, timeout: number | null = 5000): Promise { + let prom = this.transportRequestManager.addRequest(data, timeout) + const notifications = getNotifications(data) + try { + this.connection.send(JSON.stringify(this.parseData(data))) + this.transportRequestManager.settlePendingRequest(notifications) + } catch (err) { + const jsonError = new JSONRPCError((err as any).message, ERR_UNKNOWN, err) + + this.transportRequestManager.settlePendingRequest(notifications, jsonError) + this.transportRequestManager.settlePendingRequest(getBatchRequests(data), jsonError) + + prom = Promise.reject(jsonError) + } + + return prom + } + + public close(): void { + this.connection.close() + } + + on( + type: K, + cb: ( + event: WebSocketEventMap[K] extends Event ? WebSocketEventMap[K] : never, + ) => WebSocketEventMap[K] extends Event ? void : never, + ): void { + this.connection.addEventListener(type, cb) + } + off( + type: K, + cb: ( + event: WebSocketEventMap[K] extends Event ? WebSocketEventMap[K] : never, + ) => WebSocketEventMap[K] extends Event ? void : never, + ): void { + this.connection.removeEventListener(type, cb) + } +} + +export default ReconnectingWebSocketTransport diff --git a/app/gui2/shared/websocket.ts b/app/gui2/shared/websocket.ts deleted file mode 100644 index 47a41e2bfa1e..000000000000 --- a/app/gui2/shared/websocket.ts +++ /dev/null @@ -1,192 +0,0 @@ -/// -/* eslint-env browser */ - -// The refernce to DOM types is requiredto use `WebSocket` as a type. -// This is preferable over using `any`, for additional type safety. - -/* The MIT License (MIT) - * - * Copyright (c) 2019 Kevin Jahns . - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -/** - * Tiny websocket connection handler. - * - * Implements exponential backoff reconnects, ping/pong, and a nice event system using [lib0/observable]. - * - * @module websocket - */ - -import * as math from 'lib0/math' -import { ObservableV2 } from 'lib0/observable' -import * as time from 'lib0/time' -import type { AbortScope } from './util/net' - -const reconnectTimeoutBase = 1200 -const maxReconnectTimeout = 2500 -// @todo - this should depend on awareness.outdatedTime -const messageReconnectTimeout = 30000 - -const setupWS = (wsclient: WebsocketClient, ws?: WebSocket | null | undefined) => { - if (wsclient.shouldConnect && (wsclient.ws === null || ws)) { - // deepcode ignore MissingClose: This is closed by `WebsocketClient` below. - const websocket = ws ?? new WebSocket(wsclient.url) - const binaryType = wsclient.binaryType - let pingTimeout: any = null - if (binaryType) { - websocket.binaryType = binaryType - } - wsclient.ws = websocket - wsclient.connecting = true - wsclient.connected = false - websocket.onmessage = (event: { data: string | ArrayBuffer | Blob }) => { - wsclient.lastMessageReceived = time.getUnixTime() - const data = event.data - const message = typeof data === 'string' ? JSON.parse(data) : data - if (wsclient.sendPings && message && message.type === 'pong') { - clearTimeout(pingTimeout) - pingTimeout = setTimeout(sendPing, messageReconnectTimeout / 2) - } - wsclient.emit('message', [message, wsclient]) - } - const onclose = (error: unknown) => { - if (wsclient.ws !== null) { - wsclient.ws = null - wsclient.connecting = false - if (wsclient.connected) { - wsclient.connected = false - wsclient.emit('disconnect', [{ type: 'disconnect', error }, wsclient]) - } else { - wsclient.unsuccessfulReconnects++ - } - // Start with no reconnect timeout and increase timeout by - // log10(wsUnsuccessfulReconnects). - // The idea is to increase reconnect timeout slowly and have no reconnect - // timeout at the beginning (log(1) = 0) - setTimeout( - setupWS, - math.min( - math.log10(wsclient.unsuccessfulReconnects + 1) * reconnectTimeoutBase, - maxReconnectTimeout, - ), - wsclient, - ) - } - clearTimeout(pingTimeout) - } - const sendPing = () => { - if (wsclient.sendPings && wsclient.ws === websocket) { - wsclient.send({ - type: 'ping', - }) - } - } - websocket.onclose = () => onclose(null) - websocket.onerror = (error: unknown) => onclose(error) - websocket.onopen = () => { - wsclient.lastMessageReceived = time.getUnixTime() - wsclient.connecting = false - wsclient.connected = true - wsclient.unsuccessfulReconnects = 0 - wsclient.emit('connect', [{ type: 'connect' }, wsclient]) - // set ping - pingTimeout = setTimeout(sendPing, messageReconnectTimeout / 2) - } - } -} - -type WebsocketEvents = { - connect: (payload: { type: 'connect' }, self: WebsocketClient) => void - disconnect: (payload: { type: 'disconnect'; error: unknown }, self: WebsocketClient) => void - message: (payload: {} | ArrayBuffer | Blob, self: WebsocketClient) => void -} - -export class WebsocketClient extends ObservableV2 { - ws: WebSocket | null - binaryType - sendPings - connected - connecting - unsuccessfulReconnects - lastMessageReceived - shouldConnect - protected _checkInterval - constructor( - public url: string, - abort: AbortScope, - { - binaryType, - sendPings, - }: { binaryType?: 'arraybuffer' | 'blob' | null; sendPings?: boolean } = {}, - ) { - super() - abort.handleDispose(this) - this.ws = null - this.binaryType = binaryType || null - this.sendPings = sendPings ?? true - this.connected = false - this.connecting = false - this.unsuccessfulReconnects = 0 - this.lastMessageReceived = 0 - /** Whether to connect to other peers or not */ - this.shouldConnect = false - this._checkInterval = - this.sendPings ? - setInterval(() => { - if ( - this.connected && - messageReconnectTimeout < time.getUnixTime() - this.lastMessageReceived - ) { - // no message received in a long time - not even your own awareness - // updates (which are updated every 15 seconds) - this.ws?.close() - } - }, messageReconnectTimeout / 2) - : 0 - setupWS(this) - } - - send(message: {} | ArrayBuffer | Blob) { - if (!this.ws) return - const encoded = - message instanceof ArrayBuffer || message instanceof Blob ? message : JSON.stringify(message) - this.ws.send(encoded) - } - - dispose() { - clearInterval(this._checkInterval) - this.disconnect() - super.destroy() - } - - disconnect() { - this.shouldConnect = false - this.ws?.close() - } - - connect(ws?: WebSocket | null | undefined) { - this.shouldConnect = true - if (ws) this.ws = ws - if ((!this.connected && !this.ws) || ws) { - setupWS(this, ws) - } - } -} diff --git a/app/gui2/src/components/ComponentBrowser.vue b/app/gui2/src/components/ComponentBrowser.vue index b0c96bb82f38..537837053414 100644 --- a/app/gui2/src/components/ComponentBrowser.vue +++ b/app/gui2/src/components/ComponentBrowser.vue @@ -345,6 +345,7 @@ const visualizationSelections = reactive(new Map { return visualizationSelections.get(previewed.value.suggestionId ?? null) }) + function setVisualization(visualization: VisualizationIdentifier) { visualizationSelections.set(previewed.value.suggestionId ?? null, visualization) } diff --git a/app/gui2/src/components/ComponentBrowser/ai.ts b/app/gui2/src/components/ComponentBrowser/ai.ts index 9b535a707c7e..838bb5f586df 100644 --- a/app/gui2/src/components/ComponentBrowser/ai.ts +++ b/app/gui2/src/components/ComponentBrowser/ai.ts @@ -11,34 +11,38 @@ const AI_STOP_SEQUENCE = '`' export function useAI( graphDb: GraphDb = useGraphStore().db, project: { - lsRpcConnection: Promise - executeExpression(expressionId: ExternalId, expression: string): Promise | null> + lsRpcConnection: LanguageServer + executeExpression(expressionId: ExternalId, expression: string): Promise | null> } = useProjectStore(), ) { async function query(query: string, sourceIdentifier: string): Promise> { - const lsRpc = await project.lsRpcConnection - const sourceNodeId = graphDb.getIdentDefiningNode(sourceIdentifier) - const contextId = sourceNodeId && graphDb.nodeIdToNode.get(sourceNodeId)?.outerExpr.externalId - if (!contextId) return Err(`Cannot find node with name ${sourceIdentifier}`) + return withContext( + () => 'When getting AI completion', + async () => { + const lsRpc = project.lsRpcConnection + const sourceNodeId = graphDb.getIdentDefiningNode(sourceIdentifier) + const contextId = + sourceNodeId && graphDb.nodeIdToNode.get(sourceNodeId)?.outerExpr.externalId + if (!contextId) return Err(`Cannot find node with name ${sourceIdentifier}`) - const prompt = await project.executeExpression( - contextId, - `Standard.Visualization.AI.build_ai_prompt ${sourceIdentifier}`, + const prompt = await withContext( + () => 'When building AI propt', + async () => { + const prompt = await project.executeExpression( + contextId, + `Standard.Visualization.AI.build_ai_prompt ${sourceIdentifier} . to_json`, + ) + if (!prompt) return Err('No data from AI visualization') + return prompt + }, + ) + if (!prompt.ok) return prompt + const promptWithGoal = prompt.value.replace(AI_GOAL_PLACEHOLDER, query) + const completion = await lsRpc.aiCompletion(promptWithGoal, AI_STOP_SEQUENCE) + if (!completion.ok) return completion + return Ok(completion.value.code) + }, ) - if (!prompt) return Err('No data from AI visualization') - if (!prompt.ok) - return withContext( - () => 'When building AI propt', - () => prompt, - ) - const promptWithGoal = prompt.value.replace(AI_GOAL_PLACEHOLDER, query) - if (!prompt.ok) return prompt - try { - const { code } = await lsRpc.aiCompletion(promptWithGoal, AI_STOP_SEQUENCE) - return Ok(code) - } catch (err) { - return Err(`Error when getting AI completion: ${err}`) - } } return { diff --git a/app/gui2/src/components/GraphEditor.vue b/app/gui2/src/components/GraphEditor.vue index 521793e13b5f..d0f2f8c8b8a2 100644 --- a/app/gui2/src/components/GraphEditor.vue +++ b/app/gui2/src/components/GraphEditor.vue @@ -147,10 +147,10 @@ const graphBindingsHandler = graphBindings.handler({ projectStore.module?.undoManager.redo() }, startProfiling() { - projectStore.lsRpcConnection.then((ls) => ls.profilingStart(true)) + projectStore.lsRpcConnection.profilingStart(true) }, stopProfiling() { - projectStore.lsRpcConnection.then((ls) => ls.profilingStop()) + projectStore.lsRpcConnection.profilingStop() }, openComponentBrowser() { if (keyboardBusy()) return false @@ -288,7 +288,7 @@ const codeEditorHandler = codeEditorBindings.handler({ /** Handle record-once button presses. */ function onRecordOnceButtonPress() { - projectStore.lsRpcConnection.then(async () => { + projectStore.lsRpcConnection.initialized.then(async () => { const modeValue = projectStore.executionMode if (modeValue == undefined) { return @@ -449,29 +449,29 @@ async function handleFileDrop(event: DragEvent) { if (!event.dataTransfer?.items) return ;[...event.dataTransfer.items].forEach(async (item, index) => { - try { - if (item.kind === 'file') { - const file = item.getAsFile() - if (!file) return - const clientPos = new Vec2(event.clientX, event.clientY) - const offset = new Vec2(0, index * -MULTIPLE_FILES_GAP) - const pos = graphNavigator.clientToScenePos(clientPos).add(offset) - const uploader = await Uploader.Create( - projectStore.lsRpcConnection, - projectStore.dataConnection, - projectStore.contentRoots, - projectStore.awareness, - file, - pos, - projectStore.isOnLocalBackend, - event.shiftKey, - projectStore.executionContext.getStackTop(), - ) - const uploadResult = await uploader.upload() - graphStore.createNode(pos, uploadedExpression(uploadResult)) + if (item.kind === 'file') { + const file = item.getAsFile() + if (!file) return + const clientPos = new Vec2(event.clientX, event.clientY) + const offset = new Vec2(0, index * -MULTIPLE_FILES_GAP) + const pos = graphNavigator.clientToScenePos(clientPos).add(offset) + const uploader = await Uploader.Create( + projectStore.lsRpcConnection, + projectStore.dataConnection, + projectStore.contentRoots, + projectStore.awareness, + file, + pos, + projectStore.isOnLocalBackend, + event.shiftKey, + projectStore.executionContext.getStackTop(), + ) + const uploadResult = await uploader.upload() + if (uploadResult.ok) { + graphStore.createNode(pos, uploadedExpression(uploadResult.value)) + } else { + uploadResult.error.log(`Uploading file failed`) } - } catch (err) { - console.error(`Uploading file failed. ${err}`) } }) } diff --git a/app/gui2/src/components/GraphEditor/GraphVisualization.vue b/app/gui2/src/components/GraphEditor/GraphVisualization.vue index 8bfb6bf3d791..551db0ca0aee 100644 --- a/app/gui2/src/components/GraphEditor/GraphVisualization.vue +++ b/app/gui2/src/components/GraphEditor/GraphVisualization.vue @@ -5,7 +5,8 @@ import LoadingErrorVisualization from '@/components/visualizations/LoadingErrorV import LoadingVisualization from '@/components/visualizations/LoadingVisualization.vue' import { focusIsIn, useEvent } from '@/composables/events' import { provideVisualizationConfig } from '@/providers/visualizationConfig' -import { useProjectStore, type NodeVisualizationConfiguration } from '@/stores/project' +import { useProjectStore } from '@/stores/project' +import { type NodeVisualizationConfiguration } from '@/stores/project/executionContext' import { DEFAULT_VISUALIZATION_CONFIGURATION, DEFAULT_VISUALIZATION_IDENTIFIER, @@ -157,7 +158,7 @@ const effectiveVisualizationData = computed(() => { const visualizationData = nodeVisualizationData.value ?? expressionVisualizationData.value if (!visualizationData) return if (visualizationData.ok) return visualizationData.value - else return { name, error: new Error(visualizationData.error.payload) } + else return { name, error: new Error(`${visualizationData.error.payload}`) } }) function updatePreprocessor( diff --git a/app/gui2/src/components/GraphEditor/toasts.ts b/app/gui2/src/components/GraphEditor/toasts.ts index 239ca655d7c0..c4435acd203a 100644 --- a/app/gui2/src/components/GraphEditor/toasts.ts +++ b/app/gui2/src/components/GraphEditor/toasts.ts @@ -8,7 +8,6 @@ export function useGraphEditorToasts() { const toastStartup = useToast.info({ autoClose: false }) const toastConnectionLost = useToast.error({ autoClose: false }) const toastLspError = useToast.error() - const toastConnectionError = useToast.error() const toastExecutionFailed = useToast.error() toastStartup.show('Initializing the project. This can take up to one minute.') @@ -18,12 +17,10 @@ export function useGraphEditorToasts() { toastConnectionLost.show('Lost connection to Language Server.'), ) - projectStore.lsRpcConnection.then( - (ls) => ls.client.onError((e) => toastLspError.show(`Language server error: ${e}`)), - (e) => toastConnectionError.show(`Connection to language server failed: ${JSON.stringify(e)}`), - ) - - projectStore.executionContext.on('executionComplete', () => toastExecutionFailed.dismiss()) + projectStore.lsRpcConnection.client.onError((e) => + toastLspError.show(`Language server error: ${e}`), + ), + projectStore.executionContext.on('executionComplete', () => toastExecutionFailed.dismiss()) projectStore.executionContext.on('executionFailed', (e) => toastExecutionFailed.show(`Execution Failed: ${JSON.stringify(e)}`), ) diff --git a/app/gui2/src/components/GraphEditor/upload.ts b/app/gui2/src/components/GraphEditor/upload.ts index 779d0f73b4f1..fce12ebb069a 100644 --- a/app/gui2/src/components/GraphEditor/upload.ts +++ b/app/gui2/src/components/GraphEditor/upload.ts @@ -1,13 +1,14 @@ import { Awareness } from '@/stores/awareness' import { Vec2 } from '@/util/data/vec2' +import type { DataServer } from '@/util/net/dataServer' import { Keccak, sha3_224 as SHA3 } from '@noble/hashes/sha3' import type { Hash } from '@noble/hashes/utils' import { bytesToHex } from '@noble/hashes/utils' import { escapeTextLiteral } from 'shared/ast' -import type { DataServer } from 'shared/dataServer' import type { LanguageServer } from 'shared/languageServer' import { ErrorCode, RemoteRpcError } from 'shared/languageServer' import type { ContentRoot, Path, StackItem, Uuid } from 'shared/languageServerTypes' +import { Err, Ok, withContext, type Result } from 'shared/util/data/result' import { markRaw, toRaw } from 'vue' // === Constants === @@ -54,8 +55,8 @@ export class Uploader { } static async Create( - rpc: Promise, - binary: Promise, + rpc: LanguageServer, + binary: DataServer, contentRoots: Promise, awareness: Awareness, file: File, @@ -68,8 +69,8 @@ export class Uploader { const projectRootId = roots.find((root) => root.type == 'Project') if (!projectRootId) throw new Error('Could not find project root, uploading not possible.') const instance = new Uploader( - await rpc, - await binary, + rpc, + binary, awareness, file, projectRootId.id, @@ -81,7 +82,7 @@ export class Uploader { return instance } - async upload(): Promise { + async upload(): Promise> { // This non-standard property is defined in Electron. if ( this.isOnLocalBackend && @@ -89,18 +90,20 @@ export class Uploader { 'path' in this.file && typeof this.file.path === 'string' ) { - return { source: 'FileSystemRoot', name: this.file.path } + return Ok({ source: 'FileSystemRoot', name: this.file.path }) } - await this.ensureDataDirExists() + const dataDirExists = await this.ensureDataDirExists() + if (!dataDirExists.ok) return dataDirExists const name = await this.pickUniqueName(this.file.name) - this.awareness.addOrUpdateUpload(name, { + if (!name.ok) return name + this.awareness.addOrUpdateUpload(name.value, { sizePercentage: 0, position: this.position, stackItem: this.stackItem, }) - const remotePath: Path = { rootId: this.projectRootId, segments: [DATA_DIR_NAME, name] } + const remotePath: Path = { rootId: this.projectRootId, segments: [DATA_DIR_NAME, name.value] } const uploader = this - const cleanup = this.cleanup.bind(this, name) + const cleanup = this.cleanup.bind(this, name.value) const writableStream = new WritableStream({ async write(chunk: Uint8Array) { await uploader.binary.writeBytes(remotePath, uploader.uploadedBytes, false, chunk) @@ -108,7 +111,7 @@ export class Uploader { uploader.uploadedBytes += BigInt(chunk.length) const bytes = Number(uploader.uploadedBytes) const sizePercentage = Math.round((bytes / uploader.file.size) * 100) - uploader.awareness.addOrUpdateUpload(name, { + uploader.awareness.addOrUpdateUpload(name.value, { sizePercentage, position: uploader.position, stackItem: uploader.stackItem, @@ -116,8 +119,6 @@ export class Uploader { }, async close() { cleanup() - // Disabled until https://github.com/enso-org/enso/issues/6691 is fixed. - // uploader.assertChecksum(remotePath) }, async abort(reason: string) { cleanup() @@ -125,21 +126,27 @@ export class Uploader { throw new Error(`Uploading process aborted. ${reason}`) }, }) + // Disabled until https://github.com/enso-org/enso/issues/6691 is fixed. + // Plus, handle the error here, as it should be displayed to the user. + // uploader.assertChecksum(remotePath) await this.file.stream().pipeTo(writableStream) - return { source: 'Project', name } + return Ok({ source: 'Project', name: name.value }) } private cleanup(name: string) { this.awareness.removeUpload(name) } - private async assertChecksum(path: Path) { + private async assertChecksum(path: Path): Promise> { const engineChecksum = await this.rpc.fileChecksum(path) + if (!engineChecksum.ok) return engineChecksum const hexChecksum = bytesToHex(this.checksum.digest()) - if (hexChecksum != engineChecksum.checksum) { - throw new Error( - `Uploading file failed, checksum does not match. ${hexChecksum} != ${engineChecksum.checksum}`, + if (hexChecksum != engineChecksum.value.checksum) { + return Err( + `Uploading file failed, checksum does not match. ${hexChecksum} != ${engineChecksum.value.checksum}`, ) + } else { + return Ok() } } @@ -149,38 +156,38 @@ export class Uploader { private async ensureDataDirExists() { const exists = await this.dataDirExists() - if (exists) return - await this.rpc.createFile({ - type: 'Directory', - name: DATA_DIR_NAME, - path: { rootId: this.projectRootId, segments: [] }, - }) + if (!exists.ok) return exists + if (exists.value) return Ok() + return await withContext( + () => 'When creating directory for uploaded file', + async () => { + return await this.rpc.createFile({ + type: 'Directory', + name: DATA_DIR_NAME, + path: { rootId: this.projectRootId, segments: [] }, + }) + }, + ) } - private async dataDirExists(): Promise { - try { - const info = await this.rpc.fileInfo(this.dataDirPath()) - return info.attributes.kind.type == 'Directory' - } catch (error) { - if ( - typeof error === 'object' && - error && - 'cause' in error && - error.cause instanceof RemoteRpcError - ) { - if ( - error.cause.code === ErrorCode.FILE_NOT_FOUND || - error.cause.code === ErrorCode.CONTENT_ROOT_NOT_FOUND - ) - return false - } - throw error + private async dataDirExists(): Promise> { + const info = await this.rpc.fileInfo(this.dataDirPath()) + if (info.ok) return Ok(info.value.attributes.kind.type == 'Directory') + else if ( + info.error.payload.cause instanceof RemoteRpcError && + (info.error.payload.cause.code === ErrorCode.FILE_NOT_FOUND || + info.error.payload.cause.code === ErrorCode.CONTENT_ROOT_NOT_FOUND) + ) { + return Ok(false) + } else { + return info } } - private async pickUniqueName(suggestedName: string): Promise { + private async pickUniqueName(suggestedName: string): Promise> { const files = await this.rpc.listFiles(this.dataDirPath()) - const existingNames = new Set(files.paths.map((path) => path.name)) + if (!files.ok) return files + const existingNames = new Set(files.value.paths.map((path) => path.name)) const { stem, extension = '' } = splitFilename(suggestedName) let candidate = suggestedName let num = 1 @@ -188,7 +195,7 @@ export class Uploader { candidate = `${stem}_${num}.${extension}` num += 1 } - return candidate + return Ok(candidate) } } diff --git a/app/gui2/src/components/GraphEditor/widgets/WidgetFunction.vue b/app/gui2/src/components/GraphEditor/widgets/WidgetFunction.vue index 754b86bf18d4..f8d621de3bfe 100644 --- a/app/gui2/src/components/GraphEditor/widgets/WidgetFunction.vue +++ b/app/gui2/src/components/GraphEditor/widgets/WidgetFunction.vue @@ -13,7 +13,8 @@ import { functionCallConfiguration, } from '@/providers/widgetRegistry/configuration' import { useGraphStore } from '@/stores/graph' -import { useProjectStore, type NodeVisualizationConfiguration } from '@/stores/project' +import { useProjectStore } from '@/stores/project' +import { type NodeVisualizationConfiguration } from '@/stores/project/executionContext' import { entryQn } from '@/stores/suggestionDatabase/entry' import { assert, assertUnreachable } from '@/util/assert' import { Ast } from '@/util/ast' diff --git a/app/gui2/src/stores/project/computedValueRegistry.ts b/app/gui2/src/stores/project/computedValueRegistry.ts index 5924a3e6319f..71bde46d607f 100644 --- a/app/gui2/src/stores/project/computedValueRegistry.ts +++ b/app/gui2/src/stores/project/computedValueRegistry.ts @@ -1,4 +1,4 @@ -import type { ExecutionContext } from '@/stores/project' +import type { ExecutionContext } from '@/stores/project/executionContext' import { ReactiveDb, ReactiveIndex } from '@/util/database/reactiveDb' import type { ExpressionId, diff --git a/app/gui2/src/stores/project/executionContext.ts b/app/gui2/src/stores/project/executionContext.ts new file mode 100644 index 000000000000..d1a154445d9f --- /dev/null +++ b/app/gui2/src/stores/project/executionContext.ts @@ -0,0 +1,342 @@ +import { isSome, type Opt } from '@/util/data/opt' +import { Err, Ok, type Result } from '@/util/data/result' +import { AsyncQueue, type AbortScope } from '@/util/net' +import * as array from 'lib0/array' +import * as object from 'lib0/object' +import { ObservableV2 } from 'lib0/observable' +import * as random from 'lib0/random' +import type { LanguageServer } from 'shared/languageServer' +import type { + ContextId, + Diagnostic, + ExecutionEnvironment, + ExplicitCall, + ExpressionId, + ExpressionUpdate, + StackItem, + Uuid, + VisualizationConfiguration, +} from 'shared/languageServerTypes' +import { exponentialBackoff } from 'shared/util/net' +import type { ExternalId } from 'shared/yjsModel' +import { reactive } from 'vue' + +export type NodeVisualizationConfiguration = Omit< + VisualizationConfiguration, + 'executionContextId' +> & { + expressionId: ExternalId +} + +function visualizationConfigEqual( + a: NodeVisualizationConfiguration, + b: NodeVisualizationConfiguration, +): boolean { + return ( + a === b || + (a.visualizationModule === b.visualizationModule && + (a.positionalArgumentsExpressions === b.positionalArgumentsExpressions || + (Array.isArray(a.positionalArgumentsExpressions) && + Array.isArray(b.positionalArgumentsExpressions) && + array.equalFlat(a.positionalArgumentsExpressions, b.positionalArgumentsExpressions))) && + (a.expression === b.expression || + (typeof a.expression === 'object' && + typeof b.expression === 'object' && + object.equalFlat(a.expression, b.expression)))) + ) +} + +interface ExecutionContextState { + lsRpc: LanguageServer + created: boolean + visualizations: Map + stack: StackItem[] +} + +type EntryPoint = Omit + +type ExecutionContextNotification = { + 'expressionUpdates'(updates: ExpressionUpdate[]): void + 'visualizationEvaluationFailed'( + visualizationId: Uuid, + expressionId: ExpressionId, + message: string, + diagnostic: Diagnostic | undefined, + ): void + 'executionFailed'(message: string): void + 'executionComplete'(): void + 'executionStatus'(diagnostics: Diagnostic[]): void + 'newVisualizationConfiguration'(configs: Set): void + 'visualizationsConfigured'(configs: Set): void +} + +/** + * Execution Context + * + * This class represent an execution context created in the Language Server. It creates + * it and pushes the initial frame upon construction. + * + * It hides the asynchronous nature of the language server. Each call is scheduled and + * run only when the previous call is done. + */ +export class ExecutionContext extends ObservableV2 { + id: ContextId = random.uuidv4() as ContextId + queue: AsyncQueue + taskRunning = false + visSyncScheduled = false + desiredStack: StackItem[] = reactive([]) + visualizationConfigs: Map = new Map() + + constructor( + lsRpc: LanguageServer, + entryPoint: EntryPoint, + private abort: AbortScope, + ) { + super() + this.abort.handleDispose(this) + + this.queue = new AsyncQueue( + Promise.resolve({ + lsRpc, + created: false, + visualizations: new Map(), + stack: [], + }), + ) + this.registerHandlers() + this.create() + this.pushItem({ type: 'ExplicitCall', ...entryPoint }) + this.recompute() + } + + private async withBackoff(f: () => Promise>, message: string): Promise { + const result = await exponentialBackoff(f, { + onBeforeRetry: (error, _, delay) => { + if (this.abort.signal.aborted) return false + console.warn(`${error.message(message)}. Retrying after ${delay}ms...\n`) + }, + }) + if (result.ok) return result.value + else throw result.error + } + + private syncVisualizations() { + if (this.visSyncScheduled || this.abort.signal.aborted) return + this.visSyncScheduled = true + this.queue.pushTask(async (state) => { + this.visSyncScheduled = false + if (!state.created || this.abort.signal.aborted) return state + this.emit('newVisualizationConfiguration', [new Set(this.visualizationConfigs.keys())]) + const promises: Promise[] = [] + + const attach = (id: Uuid, config: NodeVisualizationConfiguration) => { + return this.withBackoff( + () => + state.lsRpc.attachVisualization(id, config.expressionId, { + executionContextId: this.id, + expression: config.expression, + visualizationModule: config.visualizationModule, + ...(config.positionalArgumentsExpressions ? + { positionalArgumentsExpressions: config.positionalArgumentsExpressions } + : {}), + }), + 'Failed to attach visualization', + ).then(() => { + state.visualizations.set(id, config) + }) + } + + const modify = (id: Uuid, config: NodeVisualizationConfiguration) => { + return this.withBackoff( + () => + state.lsRpc.modifyVisualization(id, { + executionContextId: this.id, + expression: config.expression, + visualizationModule: config.visualizationModule, + ...(config.positionalArgumentsExpressions ? + { positionalArgumentsExpressions: config.positionalArgumentsExpressions } + : {}), + }), + 'Failed to modify visualization', + ).then(() => { + state.visualizations.set(id, config) + }) + } + + const detach = (id: Uuid, config: NodeVisualizationConfiguration) => { + return this.withBackoff( + () => state.lsRpc.detachVisualization(id, config.expressionId, this.id), + 'Failed to detach visualization', + ).then(() => { + state.visualizations.delete(id) + }) + } + + // Attach new and update existing visualizations. + for (const [id, config] of this.visualizationConfigs) { + const previousConfig = state.visualizations.get(id) + if (previousConfig == null) { + promises.push(attach(id, config)) + } else if (!visualizationConfigEqual(previousConfig, config)) { + if (previousConfig.expressionId === config.expressionId) { + promises.push(modify(id, config)) + } else { + promises.push(detach(id, previousConfig).then(() => attach(id, config))) + } + } + } + + // Detach removed visualizations. + for (const [id, config] of state.visualizations) { + if (!this.visualizationConfigs.get(id)) { + promises.push(detach(id, config)) + } + } + const settled = await Promise.allSettled(promises) + + // Emit errors for failed requests. + const errors = settled + .map((result) => (result.status === 'rejected' ? result.reason : null)) + .filter(isSome) + if (errors.length > 0) { + console.error('Failed to synchronize visualizations:', errors) + } + + this.emit('visualizationsConfigured', [new Set(this.visualizationConfigs.keys())]) + + // State object was updated in-place in each successful promise. + return state + }) + } + + private pushItem(item: StackItem) { + this.desiredStack.push(item) + this.queue.pushTask(async (state) => { + if (!state.created) return state + await this.withBackoff( + () => state.lsRpc.pushExecutionContextItem(this.id, item), + 'Failed to push item to execution context stack', + ) + state.stack.push(item) + return state + }) + } + + push(expressionId: ExpressionId) { + this.pushItem({ type: 'LocalCall', expressionId }) + } + + pop() { + if (this.desiredStack.length === 1) { + console.debug('Cannot pop last item from execution context stack') + return + } + this.desiredStack.pop() + this.queue.pushTask(async (state) => { + if (!state.created) return state + if (state.stack.length === 1) { + console.debug('Cannot pop last item from execution context stack') + return state + } + await this.withBackoff( + () => state.lsRpc.popExecutionContextItem(this.id), + 'Failed to pop item from execution context stack', + ) + state.stack.pop() + return state + }) + } + + async setVisualization(id: Uuid, configuration: Opt) { + if (configuration == null) { + this.visualizationConfigs.delete(id) + } else { + this.visualizationConfigs.set(id, configuration) + } + this.syncVisualizations() + } + + private create() { + this.queue.pushTask(async (state) => { + if (state.created) return state + return this.withBackoff(async () => { + const result = await state.lsRpc.createExecutionContext(this.id) + if (!result.ok) return result + if (result.value.contextId !== this.id) { + return Err('Unexpected Context ID returned by the language server.') + } + state.lsRpc.retain() + return Ok({ ...state, created: true }) + }, 'Failed to create execution context') + }) + } + + private registerHandlers() { + this.queue.pushTask(async (state) => { + this.abort.handleObserve(state.lsRpc, 'executionContext/expressionUpdates', (event) => { + if (event.contextId == this.id) this.emit('expressionUpdates', [event.updates]) + }) + this.abort.handleObserve(state.lsRpc, 'executionContext/executionFailed', (event) => { + if (event.contextId == this.id) this.emit('executionFailed', [event.message]) + }) + this.abort.handleObserve(state.lsRpc, 'executionContext/executionComplete', (event) => { + if (event.contextId == this.id) this.emit('executionComplete', []) + }) + this.abort.handleObserve(state.lsRpc, 'executionContext/executionStatus', (event) => { + if (event.contextId == this.id) this.emit('executionStatus', [event.diagnostics]) + }) + this.abort.handleObserve( + state.lsRpc, + 'executionContext/visualizationEvaluationFailed', + (event) => { + if (event.contextId == this.id) + this.emit('visualizationEvaluationFailed', [ + event.visualizationId, + event.expressionId, + event.message, + event.diagnostic, + ]) + }, + ) + return state + }) + } + + recompute( + expressionIds: 'all' | ExternalId[] = 'all', + executionEnvironment?: ExecutionEnvironment, + ) { + this.queue.pushTask(async (state) => { + if (!state.created) return state + await state.lsRpc.recomputeExecutionContext(this.id, expressionIds, executionEnvironment) + return state + }) + } + + getStackBottom(): StackItem { + return this.desiredStack[0]! + } + + getStackTop(): StackItem { + return this.desiredStack[this.desiredStack.length - 1]! + } + + setExecutionEnvironment(mode: ExecutionEnvironment) { + this.queue.pushTask(async (state) => { + await state.lsRpc.setExecutionEnvironment(this.id, mode) + return state + }) + } + + dispose() { + this.queue.pushTask(async (state) => { + if (!state.created) return state + const result = await state.lsRpc.destroyExecutionContext(this.id) + if (!result.ok) { + result.error.log('Failed to destroy execution context') + } + state.lsRpc.release() + return { ...state, created: false } + }) + } +} diff --git a/app/gui2/src/stores/project/index.ts b/app/gui2/src/stores/project/index.ts index 00affe48cce0..2adc8955f020 100644 --- a/app/gui2/src/stores/project/index.ts +++ b/app/gui2/src/stores/project/index.ts @@ -1,43 +1,26 @@ import { injectGuiConfig, type GuiConfig } from '@/providers/guiConfig' import { Awareness } from '@/stores/awareness' import { ComputedValueRegistry } from '@/stores/project/computedValueRegistry' +import { + ExecutionContext, + type NodeVisualizationConfiguration, +} from '@/stores/project/executionContext' import { VisualizationDataRegistry } from '@/stores/project/visualizationDataRegistry' import { attachProvider, useObserveYjs } from '@/util/crdt' import { nextEvent } from '@/util/data/observable' -import { isSome, type Opt } from '@/util/data/opt' +import { type Opt } from '@/util/data/opt' import { Err, Ok, type Result } from '@/util/data/result' import { ReactiveMapping } from '@/util/database/reactiveDb' -import { - AsyncQueue, - createRpcTransport, - createWebsocketClient, - rpcWithRetries as lsRpcWithRetries, - useAbortScope, -} from '@/util/net' +import { createDataWebsocket, createRpcTransport, useAbortScope } from '@/util/net' +import { DataServer } from '@/util/net/dataServer' import { tryQualifiedName } from '@/util/qualifiedName' -import { Client, RequestManager } from '@open-rpc/client-js' import { computedAsync } from '@vueuse/core' -import * as array from 'lib0/array' -import * as object from 'lib0/object' -import { ObservableV2 } from 'lib0/observable' import * as random from 'lib0/random' import { defineStore } from 'pinia' import { OutboundPayload, VisualizationUpdate } from 'shared/binaryProtocol' -import { DataServer } from 'shared/dataServer' import { LanguageServer } from 'shared/languageServer' -import type { - ContentRoot, - ContextId, - Diagnostic, - ExecutionEnvironment, - ExplicitCall, - ExpressionId, - ExpressionUpdate, - MethodPointer, - StackItem, - VisualizationConfiguration, -} from 'shared/languageServerTypes' -import type { AbortScope } from 'shared/util/net' +import type { Diagnostic, ExpressionId, MethodPointer } from 'shared/languageServerTypes' +import { type AbortScope } from 'shared/util/net' import { DistributedProject, localUserActionOrigins, @@ -48,7 +31,6 @@ import { computed, markRaw, onScopeDispose, - reactive, ref, shallowRef, watch, @@ -75,362 +57,21 @@ function resolveLsUrl(config: GuiConfig): LsUrls { throw new Error('Incomplete engine configuration') } -async function initializeLsRpcConnection( - clientId: Uuid, - url: string, - abort: AbortScope, -): Promise<{ - connection: LanguageServer - contentRoots: ContentRoot[] -}> { +function createLsRpcConnection(clientId: Uuid, url: string, abort: AbortScope): LanguageServer { const transport = createRpcTransport(url) - const requestManager = new RequestManager([transport]) - const client = new Client(requestManager) - const connection = new LanguageServer(client) + const connection = new LanguageServer(clientId, transport) abort.onAbort(() => connection.release()) - const initialization = await lsRpcWithRetries(() => connection.initProtocolConnection(clientId), { - onBeforeRetry: (error, _, delay) => { - console.warn( - `Failed to initialize language server connection, retrying after ${delay}ms...\n`, - error, - ) - }, - }).catch((error) => { - console.error('Error initializing Language Server RPC:', error) - throw error - }) - const contentRoots = initialization.contentRoots - return { connection, contentRoots } + return connection } -async function initializeDataConnection(clientId: Uuid, url: string, abort: AbortScope) { - const client = createWebsocketClient(url, abort, { binaryType: 'arraybuffer', sendPings: false }) - const connection = new DataServer(client, abort) +function initializeDataConnection(clientId: Uuid, url: string, abort: AbortScope) { + const client = createDataWebsocket(url, 'arraybuffer') + const connection = new DataServer(clientId, client, abort) + abort.handleDispose(connection) onScopeDispose(() => connection.dispose()) - await connection.initialize(clientId).catch((error) => { - console.error('Error initializing data connection:', error) - throw error - }) return connection } -export type NodeVisualizationConfiguration = Omit< - VisualizationConfiguration, - 'executionContextId' -> & { - expressionId: ExternalId -} - -interface ExecutionContextState { - lsRpc: LanguageServer - created: boolean - visualizations: Map - stack: StackItem[] -} - -function visualizationConfigEqual( - a: NodeVisualizationConfiguration, - b: NodeVisualizationConfiguration, -): boolean { - return ( - a === b || - (a.visualizationModule === b.visualizationModule && - (a.positionalArgumentsExpressions === b.positionalArgumentsExpressions || - (Array.isArray(a.positionalArgumentsExpressions) && - Array.isArray(b.positionalArgumentsExpressions) && - array.equalFlat(a.positionalArgumentsExpressions, b.positionalArgumentsExpressions))) && - (a.expression === b.expression || - (typeof a.expression === 'object' && - typeof b.expression === 'object' && - object.equalFlat(a.expression, b.expression)))) - ) -} - -type EntryPoint = Omit - -type ExecutionContextNotification = { - 'expressionUpdates'(updates: ExpressionUpdate[]): void - 'visualizationEvaluationFailed'( - visualizationId: Uuid, - expressionId: ExpressionId, - message: string, - diagnostic: Diagnostic | undefined, - ): void - 'executionFailed'(message: string): void - 'executionComplete'(): void - 'executionStatus'(diagnostics: Diagnostic[]): void - 'newVisualizationConfiguration'(configs: Set): void - 'visualizationsConfigured'(configs: Set): void -} - -/** - * Execution Context - * - * This class represent an execution context created in the Language Server. It creates - * it and pushes the initial frame upon construction. - * - * It hides the asynchronous nature of the language server. Each call is scheduled and - * run only when the previous call is done. - */ -export class ExecutionContext extends ObservableV2 { - id: ContextId = random.uuidv4() as ContextId - queue: AsyncQueue - taskRunning = false - visSyncScheduled = false - desiredStack: StackItem[] = reactive([]) - visualizationConfigs: Map = new Map() - - constructor( - lsRpc: Promise, - entryPoint: EntryPoint, - private abort: AbortScope, - ) { - super() - this.abort.handleDispose(this) - - this.queue = new AsyncQueue( - lsRpc.then((lsRpc) => ({ - lsRpc, - created: false, - visualizations: new Map(), - stack: [], - })), - ) - this.registerHandlers() - this.create() - this.pushItem({ type: 'ExplicitCall', ...entryPoint }) - this.recompute() - } - - private withBackoff(f: () => Promise, message: string): Promise { - return lsRpcWithRetries(f, { - onBeforeRetry: (error, _, delay) => { - if (this.abort.signal.aborted) return false - console.warn( - `${message}: ${error.payload.cause.message}. Retrying after ${delay}ms...\n`, - error, - ) - }, - }) - } - - private syncVisualizations() { - if (this.visSyncScheduled || this.abort.signal.aborted) return - this.visSyncScheduled = true - this.queue.pushTask(async (state) => { - this.visSyncScheduled = false - if (!state.created || this.abort.signal.aborted) return state - this.emit('newVisualizationConfiguration', [new Set(this.visualizationConfigs.keys())]) - const promises: Promise[] = [] - - const attach = (id: Uuid, config: NodeVisualizationConfiguration) => { - return this.withBackoff( - () => - state.lsRpc.attachVisualization(id, config.expressionId, { - executionContextId: this.id, - expression: config.expression, - visualizationModule: config.visualizationModule, - ...(config.positionalArgumentsExpressions ? - { positionalArgumentsExpressions: config.positionalArgumentsExpressions } - : {}), - }), - 'Failed to attach visualization', - ).then(() => { - state.visualizations.set(id, config) - }) - } - - const modify = (id: Uuid, config: NodeVisualizationConfiguration) => { - return this.withBackoff( - () => - state.lsRpc.modifyVisualization(id, { - executionContextId: this.id, - expression: config.expression, - visualizationModule: config.visualizationModule, - ...(config.positionalArgumentsExpressions ? - { positionalArgumentsExpressions: config.positionalArgumentsExpressions } - : {}), - }), - 'Failed to modify visualization', - ).then(() => { - state.visualizations.set(id, config) - }) - } - - const detach = (id: Uuid, config: NodeVisualizationConfiguration) => { - return this.withBackoff( - () => state.lsRpc.detachVisualization(id, config.expressionId, this.id), - 'Failed to detach visualization', - ).then(() => { - state.visualizations.delete(id) - }) - } - - // Attach new and update existing visualizations. - for (const [id, config] of this.visualizationConfigs) { - const previousConfig = state.visualizations.get(id) - if (previousConfig == null) { - promises.push(attach(id, config)) - } else if (!visualizationConfigEqual(previousConfig, config)) { - if (previousConfig.expressionId === config.expressionId) { - promises.push(modify(id, config)) - } else { - promises.push(detach(id, previousConfig).then(() => attach(id, config))) - } - } - } - - // Detach removed visualizations. - for (const [id, config] of state.visualizations) { - if (!this.visualizationConfigs.get(id)) { - promises.push(detach(id, config)) - } - } - const settled = await Promise.allSettled(promises) - - // Emit errors for failed requests. - const errors = settled - .map((result) => (result.status === 'rejected' ? result.reason : null)) - .filter(isSome) - if (errors.length > 0) { - console.error('Failed to synchronize visualizations:', errors) - } - - this.emit('visualizationsConfigured', [new Set(this.visualizationConfigs.keys())]) - - // State object was updated in-place in each successful promise. - return state - }) - } - - private pushItem(item: StackItem) { - this.desiredStack.push(item) - this.queue.pushTask(async (state) => { - if (!state.created) return state - await this.withBackoff( - () => state.lsRpc.pushExecutionContextItem(this.id, item), - 'Failed to push item to execution context stack', - ) - state.stack.push(item) - return state - }) - } - - push(expressionId: ExpressionId) { - this.pushItem({ type: 'LocalCall', expressionId }) - } - - pop() { - if (this.desiredStack.length === 1) { - console.debug('Cannot pop last item from execution context stack') - return - } - this.desiredStack.pop() - this.queue.pushTask(async (state) => { - if (!state.created) return state - if (state.stack.length === 1) { - console.debug('Cannot pop last item from execution context stack') - return state - } - await this.withBackoff( - () => state.lsRpc.popExecutionContextItem(this.id), - 'Failed to pop item from execution context stack', - ) - state.stack.pop() - return state - }) - } - - async setVisualization(id: Uuid, configuration: Opt) { - if (configuration == null) { - this.visualizationConfigs.delete(id) - } else { - this.visualizationConfigs.set(id, configuration) - } - this.syncVisualizations() - } - - private create() { - this.queue.pushTask(async (state) => { - if (state.created) return state - return this.withBackoff(async () => { - const result = await state.lsRpc.createExecutionContext(this.id) - if (result.contextId !== this.id) { - throw new Error('Unexpected Context ID returned by the language server.') - } - state.lsRpc.retain() - return { ...state, created: true } - }, 'Failed to create execution context') - }) - } - - private registerHandlers() { - this.queue.pushTask(async (state) => { - this.abort.handleObserve(state.lsRpc, 'executionContext/expressionUpdates', (event) => { - if (event.contextId == this.id) this.emit('expressionUpdates', [event.updates]) - }) - this.abort.handleObserve(state.lsRpc, 'executionContext/executionFailed', (event) => { - if (event.contextId == this.id) this.emit('executionFailed', [event.message]) - }) - this.abort.handleObserve(state.lsRpc, 'executionContext/executionComplete', (event) => { - if (event.contextId == this.id) this.emit('executionComplete', []) - }) - this.abort.handleObserve(state.lsRpc, 'executionContext/executionStatus', (event) => { - if (event.contextId == this.id) this.emit('executionStatus', [event.diagnostics]) - }) - this.abort.handleObserve( - state.lsRpc, - 'executionContext/visualizationEvaluationFailed', - (event) => { - if (event.contextId == this.id) - this.emit('visualizationEvaluationFailed', [ - event.visualizationId, - event.expressionId, - event.message, - event.diagnostic, - ]) - }, - ) - return state - }) - } - - recompute( - expressionIds: 'all' | ExternalId[] = 'all', - executionEnvironment?: ExecutionEnvironment, - ) { - this.queue.pushTask(async (state) => { - if (!state.created) return state - await state.lsRpc.recomputeExecutionContext(this.id, expressionIds, executionEnvironment) - return state - }) - } - - getStackBottom(): StackItem { - return this.desiredStack[0]! - } - - getStackTop(): StackItem { - return this.desiredStack[this.desiredStack.length - 1]! - } - - setExecutionEnvironment(mode: ExecutionEnvironment) { - this.queue.pushTask(async (state) => { - await state.lsRpc.setExecutionEnvironment(this.id, mode) - return state - }) - } - - dispose() { - this.queue.pushTask(async (state) => { - if (!state.created) return state - await state.lsRpc.destroyExecutionContext(this.id) - state.lsRpc.release() - return { ...state, created: false } - }) - } -} - /** * The project store synchronizes and holds the open project-related data. The synchronization is * performed using a CRDT data types from Yjs. Once the data is synchronized with a "LS bridge" @@ -451,21 +92,8 @@ export const useProjectStore = defineStore('project', () => { const clientId = random.uuidv4() as Uuid const lsUrls = resolveLsUrl(config.value) - const initializedConnection = initializeLsRpcConnection(clientId, lsUrls.rpcUrl, abort) - const lsRpcConnection = initializedConnection.then( - ({ connection }) => connection, - (error) => { - console.error('Error getting Language Server connection:', error) - throw error - }, - ) - const contentRoots = initializedConnection.then( - ({ contentRoots }) => contentRoots, - (error) => { - console.error('Error getting content roots:', error) - throw error - }, - ) + const lsRpcConnection = createLsRpcConnection(clientId, lsUrls.rpcUrl, abort) + const contentRoots = lsRpcConnection.contentRoots const dataConnection = initializeDataConnection(clientId, lsUrls.dataUrl, abort) const rpcUrl = new URL(lsUrls.rpcUrl) @@ -563,14 +191,9 @@ export const useProjectStore = defineStore('project', () => { ) } - const firstExecution = lsRpcConnection.then( - (lsRpc) => - nextEvent(lsRpc, 'executionContext/executionComplete').catch((error) => { - console.error('First execution failed:', error) - throw error - }), + const firstExecution = nextEvent(lsRpcConnection, 'executionContext/executionComplete').catch( (error) => { - console.error('Could not get Language Server for first execution:', error) + console.error('First execution failed:', error) throw error }, ) @@ -597,13 +220,7 @@ export const useProjectStore = defineStore('project', () => { { immediate: true, flush: 'post' }, ) - return computed(() => { - const json = visualizationDataRegistry.getRawData(id) - if (!json?.ok) return json ?? undefined - const parsed = Ok(JSON.parse(json.value)) - markRaw(parsed) - return parsed - }) + return computed(() => parseVisualizationData(visualizationDataRegistry.getRawData(id))) } const dataflowErrors = new ReactiveMapping(computedValueRegistry.db, (id, info) => { @@ -648,37 +265,52 @@ export const useProjectStore = defineStore('project', () => { function executeExpression( expressionId: ExternalId, expression: string, - ): Promise | null> { + ): Promise | null> { return new Promise((resolve) => { - Promise.all([lsRpcConnection, dataConnection]).then(([lsRpc, data]) => { - const visualizationId = random.uuidv4() as Uuid - const dataHandler = (visData: VisualizationUpdate, uuid: Uuid | null) => { - if (uuid === visualizationId) { - const dataStr = visData.dataString() - resolve(dataStr != null ? Ok(dataStr) : null) - data.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler) - executionContext.off('visualizationEvaluationFailed', errorHandler) - } + const visualizationId = random.uuidv4() as Uuid + const dataHandler = (visData: VisualizationUpdate, uuid: Uuid | null) => { + if (uuid === visualizationId) { + dataConnection.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler) + executionContext.off('visualizationEvaluationFailed', errorHandler) + const dataStr = Ok(visData.dataString()) + resolve(parseVisualizationData(dataStr)) } - const errorHandler = ( - uuid: Uuid, - _expressionId: ExpressionId, - message: string, - _diagnostic: Diagnostic | undefined, - ) => { - if (uuid == visualizationId) { - resolve(Err(message)) - data.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler) - executionContext.off('visualizationEvaluationFailed', errorHandler) - } + } + const errorHandler = ( + uuid: Uuid, + _expressionId: ExpressionId, + message: string, + _diagnostic: Diagnostic | undefined, + ) => { + if (uuid == visualizationId) { + resolve(Err(message)) + dataConnection.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler) + executionContext.off('visualizationEvaluationFailed', errorHandler) } - data.on(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler) - executionContext.on('visualizationEvaluationFailed', errorHandler) - lsRpc.executeExpression(executionContext.id, visualizationId, expressionId, expression) - }) + } + dataConnection.on(`${OutboundPayload.VISUALIZATION_UPDATE}`, dataHandler) + executionContext.on('visualizationEvaluationFailed', errorHandler) + return lsRpcConnection.executeExpression( + executionContext.id, + visualizationId, + expressionId, + expression, + ) }) } + function parseVisualizationData(data: Result | null): Result | null { + if (!data?.ok) return data + if (data.value == null) return null + try { + return Ok(markRaw(JSON.parse(data.value))) + } catch (error) { + if (error instanceof SyntaxError) + return Err(`Parsing visualization result failed: ${error.message}`) + else throw error + } + } + const { executionMode } = setupSettings(projectModel) function disposeYDocsProvider() { diff --git a/app/gui2/src/stores/project/visualizationDataRegistry.ts b/app/gui2/src/stores/project/visualizationDataRegistry.ts index cba55a9c09a6..7a10c5ddb202 100644 --- a/app/gui2/src/stores/project/visualizationDataRegistry.ts +++ b/app/gui2/src/stores/project/visualizationDataRegistry.ts @@ -1,7 +1,7 @@ -import type { ExecutionContext } from '@/stores/project' +import type { ExecutionContext } from '@/stores/project/executionContext' import { Err, Ok, type Result } from '@/util/data/result' +import type { DataServer } from '@/util/net/dataServer' import { OutboundPayload, VisualizationUpdate } from 'shared/binaryProtocol' -import type { DataServer } from 'shared/dataServer' import type { Diagnostic, ExpressionId, @@ -24,21 +24,19 @@ export class VisualizationDataRegistry { /** This map stores only keys representing attached visualization. The responses for * executeExpression are handled by project store's `executeExpression` method. */ private visualizationValues: Map | null> - private dataServer: Promise + private dataServer: DataServer private executionContext: ExecutionContext private reconfiguredHandler = this.visualizationsConfigured.bind(this) private dataHandler = this.visualizationUpdate.bind(this) private errorHandler = this.visualizationError.bind(this) - constructor(executionContext: ExecutionContext, dataServer: Promise) { + constructor(executionContext: ExecutionContext, dataServer: DataServer) { this.executionContext = executionContext this.dataServer = dataServer this.visualizationValues = reactive(new Map()) this.executionContext.on('newVisualizationConfiguration', this.reconfiguredHandler) - this.dataServer.then((data) => { - data.on(`${OutboundPayload.VISUALIZATION_UPDATE}`, this.dataHandler) - }) + dataServer.on(`${OutboundPayload.VISUALIZATION_UPDATE}`, this.dataHandler) this.executionContext.on('visualizationEvaluationFailed', this.errorHandler) } @@ -88,9 +86,7 @@ export class VisualizationDataRegistry { dispose() { this.executionContext.off('visualizationsConfigured', this.reconfiguredHandler) - this.dataServer.then((data) => { - data.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, this.dataHandler) - }) + this.dataServer.off(`${OutboundPayload.VISUALIZATION_UPDATE}`, this.dataHandler) this.executionContext.off('visualizationEvaluationFailed', this.errorHandler) } } diff --git a/app/gui2/src/stores/suggestionDatabase/index.ts b/app/gui2/src/stores/suggestionDatabase/index.ts index ddadf55fe517..2167683ea601 100644 --- a/app/gui2/src/stores/suggestionDatabase/index.ts +++ b/app/gui2/src/stores/suggestionDatabase/index.ts @@ -2,7 +2,7 @@ import { useProjectStore } from '@/stores/project' import { entryQn, type SuggestionEntry, type SuggestionId } from '@/stores/suggestionDatabase/entry' import { applyUpdates, entryFromLs } from '@/stores/suggestionDatabase/lsUpdate' import { ReactiveDb, ReactiveIndex } from '@/util/database/reactiveDb' -import { AsyncQueue, rpcWithRetries } from '@/util/net' +import { AsyncQueue } from '@/util/net' import { normalizeQualifiedName, qnJoin, @@ -13,6 +13,7 @@ import { import { defineStore } from 'pinia' import { LanguageServer } from 'shared/languageServer' import type { MethodPointer } from 'shared/languageServerTypes' +import { exponentialBackoff } from 'shared/util/net' import { markRaw, ref, type Ref } from 'vue' export class SuggestionDb extends ReactiveDb { @@ -72,10 +73,13 @@ class Synchronizer { public groups: Ref, ) { const projectStore = useProjectStore() - const initState = projectStore.lsRpcConnection.then(async (lsRpc) => { - await rpcWithRetries(() => - lsRpc.acquireCapability('search/receivesSuggestionsDatabaseUpdates', {}), - ) + const lsRpc = projectStore.lsRpcConnection + const initState = exponentialBackoff(() => + lsRpc.acquireCapability('search/receivesSuggestionsDatabaseUpdates', {}), + ).then((capability) => { + if (!capability.ok) { + capability.error.log('Will not receive database updates') + } this.setupUpdateHandler(lsRpc) this.loadGroups(lsRpc, projectStore.firstExecution) return Synchronizer.loadDatabase(entries, lsRpc, groups.value) @@ -89,8 +93,14 @@ class Synchronizer { lsRpc: LanguageServer, groups: Group[], ): Promise<{ currentVersion: number }> { - const initialDb = await lsRpc.getSuggestionsDatabase() - for (const lsEntry of initialDb.entries) { + const initialDb = await exponentialBackoff(() => lsRpc.getSuggestionsDatabase()) + if (!initialDb.ok) { + initialDb.error.log( + 'Cannot load initial suggestion database. Continuing with empty suggestion database', + ) + return { currentVersion: 0 } + } + for (const lsEntry of initialDb.value.entries) { const entry = entryFromLs(lsEntry.suggestion, groups) if (!entry.ok) { entry.error.log() @@ -99,7 +109,7 @@ class Synchronizer { entries.set(lsEntry.id, entry.value) } } - return { currentVersion: initialDb.currentVersion } + return { currentVersion: initialDb.value.currentVersion } } private setupUpdateHandler(lsRpc: LanguageServer) { @@ -132,8 +142,12 @@ class Synchronizer { private async loadGroups(lsRpc: LanguageServer, firstExecution: Promise) { this.queue.pushTask(async ({ currentVersion }) => { await firstExecution - const groups = await lsRpc.getComponentGroups() - this.groups.value = groups.componentGroups.map( + const groups = await exponentialBackoff(() => lsRpc.getComponentGroups()) + if (!groups.ok) { + groups.error.log('Cannot read component groups. Continuing without gruops.') + return { currentVersion } + } + this.groups.value = groups.value.componentGroups.map( (group): Group => ({ name: group.name, ...(group.color ? { color: group.color } : {}), diff --git a/app/gui2/src/stores/visualization/compilerMessaging.ts b/app/gui2/src/stores/visualization/compilerMessaging.ts index f3739ea06abe..e47b0703d0e8 100644 --- a/app/gui2/src/stores/visualization/compilerMessaging.ts +++ b/app/gui2/src/stores/visualization/compilerMessaging.ts @@ -19,8 +19,8 @@ import { VisualizationModule } from '@/stores/visualization/runtimeTypes' import { assertNever } from '@/util/assert' import { toError } from '@/util/data/error' import type { Opt } from '@/util/data/opt' +import type { DataServer } from '@/util/net/dataServer' import { Error as DataError } from 'shared/binaryProtocol' -import type { DataServer } from 'shared/dataServer' import type { Uuid } from 'shared/languageServerTypes' import * as vue from 'vue' diff --git a/app/gui2/src/stores/visualization/index.ts b/app/gui2/src/stores/visualization/index.ts index b25d2ff53ad3..7395f79af9b9 100644 --- a/app/gui2/src/stores/visualization/index.ts +++ b/app/gui2/src/stores/visualization/index.ts @@ -24,7 +24,6 @@ import type { VisualizationModule } from '@/stores/visualization/runtimeTypes' import type { Opt } from '@/util/data/opt' import { isUrlString } from '@/util/data/urlString' import { isIconName } from '@/util/iconName' -import { rpcWithRetries } from '@/util/net' import { defineStore } from 'pinia' import { ErrorCode, LsRpcError, RemoteRpcError } from 'shared/languageServer' import type { Event as LSEvent, VisualizationConfiguration } from 'shared/languageServerTypes' @@ -214,21 +213,20 @@ export const useVisualizationStore = defineStore('visualization', () => { console.error('Could not load custom visualizations: Project directory not found.') return } - try { - await ls.watchFiles(projectRoot, [customVisualizationsDirectory], onFileEvent, rpcWithRetries) - .promise - } catch (error) { + const watching = await ls.watchFiles(projectRoot, [customVisualizationsDirectory], onFileEvent) + .promise + if (!watching.ok) { if ( - error instanceof LsRpcError && - error.cause instanceof RemoteRpcError && - error.cause.code === ErrorCode.FILE_NOT_FOUND + watching.error.payload instanceof LsRpcError && + watching.error.payload.cause instanceof RemoteRpcError && + watching.error.payload.cause.code === ErrorCode.FILE_NOT_FOUND ) { console.info( "'visualizations/' folder not found in project directory. " + "If you have custom visualizations, please put them under 'visualizations/'.", ) } else { - throw error + watching.error.log('Could not load custom visualizations') } } }) diff --git a/app/gui2/src/util/__tests__/net.test.ts b/app/gui2/src/util/__tests__/net.test.ts index f46d7fa728f4..672881e37a31 100644 --- a/app/gui2/src/util/__tests__/net.test.ts +++ b/app/gui2/src/util/__tests__/net.test.ts @@ -1,5 +1,4 @@ -import { Err, Ok, ResultError } from '@/util/data/result' -import { AsyncQueue, exponentialBackoff } from '@/util/net' +import { AsyncQueue } from '@/util/net' import { wait } from 'lib0/promise' import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest' @@ -51,67 +50,3 @@ describe('AsyncQueue', () => { expect(await queue.waitForCompletion()).toBe(5) }) }) - -describe('exponentialBackoff', () => { - test('runs successful task once', async () => { - const task = vi.fn(async () => Ok(1)) - const result = await exponentialBackoff(task) - expect(result).toEqual({ ok: true, value: 1 }) - expect(task).toHaveBeenCalledTimes(1) - }) - - test('retry failing task up to a limit', async () => { - const task = vi.fn(async () => Err(1)) - const promise = exponentialBackoff(task, { maxRetries: 4 }) - vi.runAllTimersAsync() - const result = await promise - expect(result).toEqual({ ok: false, error: new ResultError(1) }) - expect(task).toHaveBeenCalledTimes(5) - }) - - test('wait before retrying', async () => { - const task = vi.fn(async () => Err(null)) - exponentialBackoff(task, { - maxRetries: 10, - retryDelay: 100, - retryDelayMultiplier: 3, - retryDelayMax: 1000, - }) - expect(task).toHaveBeenCalledTimes(1) - await vi.advanceTimersByTimeAsync(100) - expect(task).toHaveBeenCalledTimes(2) - await vi.advanceTimersByTimeAsync(300) - expect(task).toHaveBeenCalledTimes(3) - await vi.advanceTimersByTimeAsync(900) - expect(task).toHaveBeenCalledTimes(4) - await vi.advanceTimersByTimeAsync(5000) - expect(task).toHaveBeenCalledTimes(9) - }) - - test('retry task until success', async () => { - const task = vi.fn() - task.mockReturnValueOnce(Promise.resolve(Err(3))) - task.mockReturnValueOnce(Promise.resolve(Err(2))) - task.mockReturnValueOnce(Promise.resolve(Ok(1))) - const promise = exponentialBackoff(task) - vi.runAllTimersAsync() - const result = await promise - expect(result).toEqual({ ok: true, value: 1 }) - expect(task).toHaveBeenCalledTimes(3) - }) - - test('call retry callback', async () => { - const task = vi.fn() - task.mockReturnValueOnce(Promise.resolve(Err(3))) - task.mockReturnValueOnce(Promise.resolve(Err(2))) - task.mockReturnValueOnce(Promise.resolve(Ok(1))) - const onBeforeRetry = vi.fn() - - const promise = exponentialBackoff(task, { onBeforeRetry }) - vi.runAllTimersAsync() - await promise - expect(onBeforeRetry).toHaveBeenCalledTimes(2) - expect(onBeforeRetry).toHaveBeenNthCalledWith(1, new ResultError(3), 0, 1000) - expect(onBeforeRetry).toHaveBeenNthCalledWith(2, new ResultError(2), 1, 2000) - }) -}) diff --git a/app/gui2/src/util/ast/match.ts b/app/gui2/src/util/ast/match.ts index 4f5cf4d5a58a..a9ff19965a98 100644 --- a/app/gui2/src/util/ast/match.ts +++ b/app/gui2/src/util/ast/match.ts @@ -39,6 +39,11 @@ export class Pattern { return matches } + /** Check if the given expression matches the pattern */ + test(target: Ast.Ast): boolean { + return this.match(target) != null + } + /** Create a new concrete example of the pattern, with the placeholders replaced with the given subtrees. */ instantiate(edit: MutableModule, subtrees: Ast.Owned[]): Ast.Owned { const template = edit.copy(this.template) diff --git a/app/gui2/src/util/net.ts b/app/gui2/src/util/net.ts index bc0fb67b15a4..2f7f32173f43 100644 --- a/app/gui2/src/util/net.ts +++ b/app/gui2/src/util/net.ts @@ -1,107 +1,37 @@ -import { ResultError, rejectionToResult, type Result } from '@/util/data/result' -import { WebSocketTransport } from '@open-rpc/client-js' import type { IJSONRPCNotificationResponse, JSONRPCRequestData, } from '@open-rpc/client-js/build/Request' import { Transport } from '@open-rpc/client-js/build/transports/Transport' import { type ArgumentsType } from '@vueuse/core' -import { wait } from 'lib0/promise' -import { LsRpcError } from 'shared/languageServer' +import { WebSocket as ReconnectingWebSocket } from 'partysocket' +import { type WebSocketEventMap } from 'partysocket/ws' import type { Notifications } from 'shared/languageServerTypes' -import { AbortScope } from 'shared/util/net' -import { WebsocketClient } from 'shared/websocket' +import { AbortScope, type ReconnectingTransportWithWebsocketEvents } from 'shared/util/net' +import ReconnectingWebSocketTransport from 'shared/util/net/ReconnectingWSTransport' import { onScopeDispose } from 'vue' -export interface BackoffOptions { - maxRetries?: number - retryDelay?: number - retryDelayMultiplier?: number - retryDelayMax?: number - /** - * Called when the promise return an error result, and the next retry is about to be attempted. - * When this function returns `false`, the backoff is immediately aborted. When this function is - * not provided, the backoff will always continue until the maximum number of retries is reached. - */ - onBeforeRetry?: (error: ResultError, retryCount: number, delay: number) => boolean | void -} - -const defaultBackoffOptions: Required> = { - maxRetries: 3, - retryDelay: 1000, - retryDelayMultiplier: 2, - retryDelayMax: 10000, - onBeforeRetry: () => {}, -} - -/** - * Retry a failing promise function with exponential backoff. - */ -export async function exponentialBackoff( - f: () => Promise>, - backoffOptions?: BackoffOptions, -): Promise> { - const options = { ...defaultBackoffOptions, ...backoffOptions } - for ( - let retries = 0, delay = options.retryDelay; - ; - retries += 1, delay = Math.min(options.retryDelayMax, delay * options.retryDelayMultiplier) - ) { - const result = await f() - if ( - result.ok || - retries >= options.maxRetries || - options.onBeforeRetry(result.error, retries, delay) === false - ) { - return result - } - await wait(delay) - } -} +export { AbortScope } from 'shared/util/net' -export const lsRequestResult = rejectionToResult(LsRpcError) - -/** - * Retry a failing Language Server RPC call with exponential backoff. The provided async function is - * called on each retry. - */ -export async function rpcWithRetries( - f: () => Promise, - backoffOptions?: BackoffOptions, -): Promise { - const result = await exponentialBackoff(() => lsRequestResult(f()), backoffOptions) - if (result.ok) return result.value - else { - console.error('Too many failed retries.') - throw result.error - } -} - -type QueueTask = (state: State) => Promise - -export function createRpcTransport(url: string): Transport { +export function createRpcTransport(url: string): ReconnectingTransportWithWebsocketEvents { if (url.startsWith('mock://')) { const mockName = url.slice('mock://'.length) return new MockTransport(mockName) } else { - const transport = new WebSocketTransport(url) + const transport = new ReconnectingWebSocketTransport(url) return transport } } -export function createWebsocketClient( - url: string, - abort: AbortScope, - options?: { binaryType?: 'arraybuffer' | 'blob' | null; sendPings?: boolean }, -): WebsocketClient { +export function createDataWebsocket(url: string, binaryType: 'arraybuffer' | 'blob'): WebSocket { if (url.startsWith('mock://')) { - const mockWs = new MockWebSocketClient(url, abort) - if (options?.binaryType) mockWs.binaryType = options.binaryType + const mockWs = new MockWebSocket(url, url.slice('mock://'.length)) + mockWs.binaryType = binaryType return mockWs } else { - const client = new WebsocketClient(url, abort, options) - client.connect() - return client + const websocket = new ReconnectingWebSocket(url) + websocket.binaryType = binaryType + return websocket as WebSocket } } @@ -111,6 +41,7 @@ export interface MockTransportData { export class MockTransport extends Transport { static mocks: Map = new Map() + private openEventListeners = new Set<(event: WebSocketEventMap['open']) => void>() constructor(public name: string) { super() } @@ -119,8 +50,10 @@ export class MockTransport extends Transport { MockTransport.mocks.set(name, data as any) } connect(): Promise { + for (const listener of this.openEventListeners) listener(new Event('open')) return Promise.resolve() } + reconnect() {} close(): void {} sendData(data: JSONRPCRequestData, timeout?: number | null): Promise { if (Array.isArray(data)) return Promise.all(data.map((d) => this.sendData(d.request, timeout))) @@ -136,6 +69,15 @@ export class MockTransport extends Transport { params, } as IJSONRPCNotificationResponse) } + + on(type: K, cb: (event: WebSocketEventMap[K]) => void): void { + if (type === 'open') + this.openEventListeners.add(cb as (event: WebSocketEventMap['open']) => void) + } + off(type: K, cb: (event: WebSocketEventMap[K]) => void): void { + if (type === 'open') + this.openEventListeners.delete(cb as (event: WebSocketEventMap['open']) => void) + } } export interface WebSocketHandler { @@ -188,12 +130,7 @@ export class MockWebSocket extends EventTarget implements WebSocket { } } -export class MockWebSocketClient extends WebsocketClient { - constructor(url: string, abort: AbortScope) { - super(url, abort) - super.connect(new MockWebSocket(url, url.slice('mock://'.length))) - } -} +type QueueTask = (state: State) => Promise /** * A serializing queue of asynchronous tasks transforming a state. Each task is a function that diff --git a/app/gui2/shared/dataServer.ts b/app/gui2/src/util/net/dataServer.ts similarity index 76% rename from app/gui2/shared/dataServer.ts rename to app/gui2/src/util/net/dataServer.ts index c6ca7d729652..f410c749f469 100644 --- a/app/gui2/shared/dataServer.ts +++ b/app/gui2/src/util/net/dataServer.ts @@ -1,6 +1,5 @@ import { ObservableV2 } from 'lib0/observable' import * as random from 'lib0/random' -import type { Path as LSPath } from 'shared/languageServerTypes' import { Builder, ByteBuffer, @@ -28,11 +27,12 @@ import { type AnyInboundPayload, type Offset, type Table, -} from './binaryProtocol' -import type { AbortScope } from './util/net' -import { uuidFromBits, uuidToBits } from './uuid' -import type { WebsocketClient } from './websocket' -import type { Uuid } from './yjsModel' +} from 'shared/binaryProtocol' +import type { Path as LSPath } from 'shared/languageServerTypes' +import { Err, Ok, type Result } from 'shared/util/data/result' +import { exponentialBackoff, type AbortScope } from 'shared/util/net' +import { uuidFromBits, uuidToBits } from 'shared/uuid' +import type { Uuid } from 'shared/yjsModel' const PAYLOAD_CONSTRUCTOR = { [OutboundPayload.NONE]: None, @@ -53,27 +53,20 @@ export type DataServerEvents = { } export class DataServer extends ObservableV2 { - initialized = false - ready: Promise - clientId!: string + initialized: Promise> + private initializationScheduled = false resolveCallbacks = new Map void>() /** `websocket.binaryType` should be `ArrayBuffer`. */ constructor( - public websocket: WebsocketClient, + public clientId: string, + public websocket: WebSocket, abort: AbortScope, ) { super() abort.handleDispose(this) - if (websocket.connected) { - this.ready = Promise.resolve() - } else { - this.ready = new Promise((resolve, reject) => { - websocket.on('connect', () => resolve()) - websocket.on('disconnect', reject) - }) - } - websocket.on('message', (rawPayload) => { + + websocket.addEventListener('message', ({ data: rawPayload }) => { if (!(rawPayload instanceof ArrayBuffer)) { console.warn('Data Server: Data type was invalid:', rawPayload) // Ignore all non-binary messages. If the messages are `Blob`s instead, this is a @@ -98,25 +91,57 @@ export class DataServer extends ObservableV2 { this.emit(`${payloadType}`, [payload, uuid]) } }) + websocket.addEventListener('error', (error) => + console.error('Language Server Binary socket error:', error), + ) + websocket.addEventListener('close', () => { + this.scheduleInitializationAfterConnect() + }) + + this.initialized = this.initialize() } dispose() { + this.websocket.close() this.resolveCallbacks.clear() } - async initialize(clientId: Uuid) { - if (!this.initialized) { - this.clientId = clientId - await this.ready - await this.initSession() - } + private scheduleInitializationAfterConnect() { + if (this.initializationScheduled) return this.initialized + this.initializationScheduled = true + this.initialized = new Promise((resolve) => { + const cb = () => { + this.websocket.removeEventListener('open', cb) + this.initializationScheduled = false + resolve(this.initialize()) + } + this.websocket.addEventListener('open', cb) + }) + return this.initialized } - protected send( + private initialize() { + return exponentialBackoff(() => this.initSession().then(responseAsResult), { + onBeforeRetry: (error, _, delay) => { + console.warn( + `Failed to initialize language server binary connection, retrying after ${delay}ms...\n`, + error, + ) + }, + }).then((result) => { + if (!result.ok) { + result.error.log('Error initializing Language Server Binary Protocol') + return result + } else return Ok() + }) + } + + protected async send( builder: Builder, payloadType: InboundPayload, payloadOffset: Offset, - ): Promise { + waitForInit: boolean = true, + ): Promise { const messageUuid = random.uuidv4() const rootTable = InboundMessage.createInboundMessage( builder, @@ -125,10 +150,16 @@ export class DataServer extends ObservableV2 { payloadType, payloadOffset, ) - const promise = new Promise((resolve) => { + if (waitForInit) { + const initResult = await this.initialized + if (!initResult.ok) { + return initResult.error.payload + } + } + this.websocket.send(builder.finish(rootTable).toArrayBuffer()) + const promise = new Promise((resolve) => { this.resolveCallbacks.set(messageUuid, resolve) }) - this.websocket.send(builder.finish(rootTable).toArrayBuffer()) return promise } @@ -143,7 +174,7 @@ export class DataServer extends ObservableV2 { builder, this.createUUID(this.clientId), ) - return this.send(builder, InboundPayload.INIT_SESSION_CMD, commandOffset) + return this.send(builder, InboundPayload.INIT_SESSION_CMD, commandOffset, false) } async writeFile( @@ -213,3 +244,8 @@ export class DataServer extends ObservableV2 { return await this.send(builder, InboundPayload.WRITE_BYTES_CMD, command) } } + +function responseAsResult(resp: T | Error): Result { + if (resp instanceof Error) return Err(resp) + else return Ok(resp) +} diff --git a/app/gui2/ydoc-server/languageServerSession.ts b/app/gui2/ydoc-server/languageServerSession.ts index a523d2462706..4353907c7ef2 100644 --- a/app/gui2/ydoc-server/languageServerSession.ts +++ b/app/gui2/ydoc-server/languageServerSession.ts @@ -1,4 +1,3 @@ -import { Client, RequestManager, WebSocketTransport } from '@open-rpc/client-js' import * as json from 'lib0/json' import * as map from 'lib0/map' import { ObservableV2 } from 'lib0/observable' @@ -8,9 +7,18 @@ import * as Ast from '../shared/ast' import { astCount } from '../shared/ast' import { EnsoFileParts, combineFileParts, splitFileContents } from '../shared/ensoFile' import { LanguageServer, computeTextChecksum } from '../shared/languageServer' -import { Checksum, FileEdit, Path, TextEdit, response } from '../shared/languageServerTypes' -import { exponentialBackoff, printingCallbacks } from '../shared/retry' -import { AbortScope } from '../shared/util/net' +import { + Checksum, + FileEdit, + FileEventKind, + Path, + TextEdit, + response, +} from '../shared/languageServerTypes' +import { assertNever } from '../shared/util/assert' +import { Err, Ok, Result, withContext } from '../shared/util/data/result' +import { AbortScope, exponentialBackoff, printingCallbacks } from '../shared/util/net' +import ReconnectingWebSocketTransport from '../shared/util/net/ReconnectingWSTransport' import { DistributedProject, ExternalId, @@ -34,22 +42,12 @@ const EXTENSION = '.enso' const DEBUG_LOG_SYNC = false -function createOpenRPCClient(url: string) { - const transport = new WebSocketTransport(url) - const requestManager = new RequestManager([transport]) - transport.connection.on('error', (error) => - console.error('Language Server transport error:', error), - ) - return new Client(requestManager) -} - export class LanguageServerSession { clientId: Uuid indexDoc: WSSharedDoc docs: Map retainCount: number url: string - client: Client ls: LanguageServer connection: response.InitProtocolConnection | undefined model: DistributedProject @@ -78,9 +76,9 @@ export class LanguageServerSession { if (!persistence) continue } }) - const { client, ls } = this.setupClient() - this.client = client - this.ls = ls + this.ls = new LanguageServer(this.clientId, new ReconnectingWebSocketTransport(this.url)) + this.clientScope.onAbort(() => this.ls.release()) + this.setupClient() } static sessions = new Map() @@ -95,28 +93,52 @@ export class LanguageServerSession { } private restartClient() { - this.clientScope.dispose('Client restarted.') - this.clientScope = new AbortScope() - this.connection = undefined - this.setupClient() + this.ls.reconnect() + return exponentialBackoff(() => this.readInitialState()) } private setupClient() { - this.client = createOpenRPCClient(this.url) - this.ls = new LanguageServer(this.client) - this.clientScope.onAbort(() => this.ls.release()) this.ls.on('file/event', async (event) => { if (DEBUG_LOG_SYNC) { console.log('file/event', event) } + const result = await this.handleFileEvent(event) + if (!result.ok) this.restartClient() + }) + this.ls.on('text/fileModifiedOnDisk', async (event) => { const path = event.path.segments.join('/') - try { + const result = await exponentialBackoff( + async () => this.tryGetExistingModuleModel(event.path)?.reload() ?? Ok(), + printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`), + ) + if (!result.ok) this.restartClient() + }) + exponentialBackoff( + () => this.readInitialState(), + printingCallbacks('read initial state', 'read initial state'), + ).then((result) => { + if (!result.ok) { + result.error.log('Could not read initial state') + exponentialBackoff( + async () => this.restartClient(), + printingCallbacks('restarted RPC client', 'restart RPC client'), + ) + } + }) + } + + private handleFileEvent(event: { path: Path; kind: FileEventKind }): Promise> { + return withContext( + () => 'Handling file/event', + async () => { + const path = event.path.segments.join('/') switch (event.kind) { case 'Added': { if (isSourceFile(event.path)) { const fileInfo = await this.ls.fileInfo(event.path) - if (fileInfo.attributes.kind.type == 'File') { - await exponentialBackoff( + if (!fileInfo.ok) return fileInfo + if (fileInfo.value.attributes.kind.type == 'File') { + return await exponentialBackoff( () => this.getModuleModel(event.path).open(), printingCallbacks(`opened new file '${path}'`, `open new file '${path}'`), ) @@ -125,74 +147,59 @@ export class LanguageServerSession { break } case 'Modified': { - await exponentialBackoff( - async () => this.tryGetExistingModuleModel(event.path)?.reload(), + return await exponentialBackoff( + () => this.tryGetExistingModuleModel(event.path)?.reload() ?? Promise.resolve(Ok()), printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`), ) - break } } - } catch { - this.restartClient() - } - }) - this.ls.on('text/fileModifiedOnDisk', async (event) => { - const path = event.path.segments.join('/') - try { - await exponentialBackoff( - async () => this.tryGetExistingModuleModel(event.path)?.reload(), - printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`), - ) - } catch { - this.restartClient() - } - }) - exponentialBackoff( - () => this.readInitialState(), - printingCallbacks('read initial state', 'read initial state'), - ).catch((error) => { - console.error('Could not read initial state.') - console.error(error) - exponentialBackoff( - async () => this.restartClient(), - printingCallbacks('restarted RPC client', 'restart RPC client'), - ) - }) - return { client: this.client, ls: this.ls } + return Ok() + }, + ) } private assertProjectRoot(): asserts this is { projectRootId: Uuid } { if (this.projectRootId == null) throw new Error('Missing project root') } - private async readInitialState() { - let moduleOpenPromises: Promise[] = [] - try { - const connection = this.connection ?? (await this.ls.initProtocolConnection(this.clientId)) - this.connection = connection - const projectRoot = connection.contentRoots.find((root) => root.type === 'Project') - if (!projectRoot) throw new Error('Missing project root') - this.projectRootId = projectRoot.id - await this.ls.acquireReceivesTreeUpdates({ rootId: this.projectRootId, segments: [] }) - const files = await this.scanSourceFiles() - moduleOpenPromises = this.indexDoc.doc.transact( - () => - files.map((file) => this.getModuleModel(pushPathSegment(file.path, file.name)).open()), - this, - ) - await Promise.all(moduleOpenPromises) - } catch (error) { - console.error('LS initialization failed.') - throw error - } - console.log('LS connection initialized.') + private async readInitialState(): Promise> { + return await withContext( + () => 'When reading initial state', + async () => { + let moduleOpenPromises: Promise>[] = [] + const projectRoot = (await this.ls.contentRoots).find((root) => root.type === 'Project') + if (!projectRoot) return Err('Missing project root') + this.projectRootId = projectRoot.id + const aquireResult = await this.ls.acquireReceivesTreeUpdates({ + rootId: this.projectRootId, + segments: [], + }) + if (!aquireResult.ok) return aquireResult + const files = await this.scanSourceFiles() + if (!files.ok) return files + moduleOpenPromises = this.indexDoc.doc.transact( + () => + files.value.map((file) => + this.getModuleModel(pushPathSegment(file.path, file.name)).open(), + ), + this, + ) + const results = await Promise.all(moduleOpenPromises) + return results.find((res) => !res.ok) ?? Ok() + }, + ) } async scanSourceFiles() { this.assertProjectRoot() const sourceDir: Path = { rootId: this.projectRootId, segments: [SOURCE_DIR] } const srcModules = await this.ls.listFiles(sourceDir) - return srcModules.paths.filter((file) => file.type === 'File' && file.name.endsWith(EXTENSION)) + if (!srcModules.ok) return srcModules + return Ok( + srcModules.value.paths.filter( + (file) => file.type === 'File' && file.name.endsWith(EXTENSION), + ), + ) } tryGetExistingModuleModel(path: Path): ModulePersistence | undefined { @@ -341,51 +348,68 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { /** Set the current state to the given state while the callback is running. * Set the current state back to {@link LsSyncState.Synchronized} when the callback finishes. */ - private async withState(state: LsSyncState, callback: () => void | Promise) { + private async withState(state: LsSyncState, callback: () => void | Promise): Promise + private async withState( + state: LsSyncState, + callback: () => Result | Promise>, + ): Promise> + private async withState( + state: LsSyncState, + callback: () => void | Promise | Result | Promise>, + ): Promise | void> { this.setState(state) - await callback() + const result = await callback() + if (result && !result.ok) return result this.setState(LsSyncState.Synchronized) + if (result) return result } - async open() { - this.queuedAction = LsAction.Open - switch (this.state) { - case LsSyncState.Disposed: - case LsSyncState.WritingFile: - case LsSyncState.Synchronized: - case LsSyncState.WriteError: - case LsSyncState.Reloading: { - return - } - case LsSyncState.Closing: { - await this.lastAction - if (this.queuedAction === LsAction.Open) await this.open() - return - } - case LsSyncState.Opening: { - await this.lastAction - return - } - case LsSyncState.Closed: { - await this.withState(LsSyncState.Opening, async () => { - const promise = this.ls.openTextFile(this.path) - this.setLastAction(promise.catch(() => this.setState(LsSyncState.Closed))) - const result = await promise - if (!result.writeCapability) { - console.error('Could not acquire write capability for module:', this.path) - throw new Error( - `Could not acquire write capability for module '${this.path.segments.join('/')}'`, - ) + async open(): Promise> { + return await withContext( + () => `When opening module ${this.path}`, + async () => { + this.queuedAction = LsAction.Open + switch (this.state) { + case LsSyncState.Disposed: + case LsSyncState.WritingFile: + case LsSyncState.Synchronized: + case LsSyncState.WriteError: + case LsSyncState.Reloading: { + return Ok() } - this.syncFileContents(result.content, result.currentVersion) - }) - return - } - default: { - this.state satisfies never - return - } - } + case LsSyncState.Closing: { + await this.lastAction + if (this.queuedAction === LsAction.Open) return await this.open() + return Ok() + } + case LsSyncState.Opening: { + await this.lastAction + return Ok() + } + case LsSyncState.Closed: { + await this.withState(LsSyncState.Opening, async () => { + const promise = this.ls.openTextFile(this.path) + this.setLastAction( + promise.then((res) => !res.ok && this.setState(LsSyncState.Closed)), + ) + const result = await promise + if (!result.ok) return result + if (!result.value.writeCapability) { + return Err( + `Could not acquire write capability for module '${this.path.segments.join('/')}'`, + ) + } + this.syncFileContents(result.value.content, result.value.currentVersion) + return Ok() + }) + return Ok() + } + default: { + assertNever(this.state) + } + } + }, + ) } handleFileRemoved() { @@ -614,62 +638,61 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { return } default: { - this.state satisfies never - return + assertNever(this.state) } } } - async reload() { - this.queuedAction = LsAction.Reload - switch (this.state) { - case LsSyncState.Opening: - case LsSyncState.Disposed: - case LsSyncState.Closed: - case LsSyncState.Closing: { - return - } - case LsSyncState.Reloading: { - await this.lastAction - return - } - case LsSyncState.WritingFile: { - await this.lastAction - if (this.queuedAction === LsAction.Reload) await this.reload() - return - } - case LsSyncState.Synchronized: { - this.withState(LsSyncState.Reloading, async () => { - const promise = Promise.all([ - this.ls.readFile(this.path), - this.ls.fileChecksum(this.path), - ]) - this.setLastAction(promise) - const [contents, checksum] = await promise - this.syncFileContents(contents.contents, checksum.checksum) - }) - return - } - case LsSyncState.WriteError: { - this.withState(LsSyncState.Reloading, async () => { - const path = this.path.segments.join('/') - const reloading = this.ls - .closeTextFile(this.path) - .catch((error) => { - console.error('Could not close file after write error:') - console.error(error) + async reload(): Promise> { + return await withContext( + () => `When reloading module ${this.path}`, + async () => { + this.queuedAction = LsAction.Reload + switch (this.state) { + case LsSyncState.Opening: + case LsSyncState.Disposed: + case LsSyncState.Closed: + case LsSyncState.Closing: { + return Ok() + } + case LsSyncState.Reloading: { + await this.lastAction + return Ok() + } + case LsSyncState.WritingFile: { + await this.lastAction + if (this.queuedAction === LsAction.Reload) return await this.reload() + return Ok() + } + case LsSyncState.Synchronized: { + return this.withState(LsSyncState.Reloading, async () => { + const promise = Promise.all([ + this.ls.readFile(this.path), + this.ls.fileChecksum(this.path), + ]) + this.setLastAction(promise) + const [contents, checksum] = await promise + if (!contents.ok) return contents + if (!checksum.ok) return checksum + this.syncFileContents(contents.value.contents, checksum.value.checksum) + return Ok() }) - .then( - () => - exponentialBackoff( + } + case LsSyncState.WriteError: { + return this.withState(LsSyncState.Reloading, async () => { + const path = this.path.segments.join('/') + const reloading = this.ls.closeTextFile(this.path).then(async (closing) => { + if (!closing.ok) closing.error.log('Could not close file after write error:') + return exponentialBackoff( async () => { const result = await this.ls.openTextFile(this.path) - if (!result.writeCapability) { - const message = `Could not acquire write capability for module '${this.path.segments.join( - '/', - )}'` - console.error(message) - throw new Error(message) + if (!result.ok) return result + if (!result.value.writeCapability) { + return Err( + `Could not acquire write capability for module '${this.path.segments.join( + '/', + )}'`, + ) } return result }, @@ -677,25 +700,22 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { `opened file '${path}' for writing`, `open file '${path}' for writing`, ), - ), - (error) => { - console.error('Could not reopen file after write error:') - console.error(error) - // This error is unrecoverable. - throw error - }, - ) - this.setLastAction(reloading) - const result = await reloading - this.syncFileContents(result.content, result.currentVersion) - }) - return - } - default: { - this.state satisfies never - return - } - } + ) + }) + + this.setLastAction(reloading) + const result = await reloading + if (!result.ok) return result + this.syncFileContents(result.value.content, result.value.currentVersion) + return Ok() + }) + } + default: { + assertNever(this.state) + } + } + }, + ) } async dispose(): Promise { @@ -703,6 +723,9 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { const alreadyClosed = this.inState(LsSyncState.Closing, LsSyncState.Closed) this.setState(LsSyncState.Disposed) if (alreadyClosed) return Promise.resolve() - return this.ls.closeTextFile(this.path) + const closing = await this.ls.closeTextFile(this.path) + if (!closing.ok) { + closing.error.log(`Closing text file ${this.path}`) + } } } diff --git a/package-lock.json b/package-lock.json index 1e5662d0527b..009de39d89b4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -52,6 +52,7 @@ "lib0": "^0.2.85", "magic-string": "^0.30.3", "murmurhash": "^2.0.1", + "partysocket": "^1.0.1", "pinia": "^2.1.7", "postcss-inline-svg": "^6.0.0", "postcss-nesting": "^12.0.1", @@ -11193,6 +11194,17 @@ "node": ">=4.0.0" } }, + "node_modules/event-target-shim": { + "version": "6.0.2", + "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-6.0.2.tgz", + "integrity": "sha512-8q3LsZjRezbFZ2PN+uP+Q7pnHUMmAOziU2vA2OwoFaKIXxlxl38IylhSSgUorWu/rf4er67w0ikBqjBFk/pomA==", + "engines": { + "node": ">=10.13.0" + }, + "funding": { + "url": "https://github.com/sponsors/mysticatea" + } + }, "node_modules/events": { "version": "3.3.0", "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", @@ -15957,6 +15969,14 @@ "node": ">= 0.8" } }, + "node_modules/partysocket": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/partysocket/-/partysocket-1.0.1.tgz", + "integrity": "sha512-sSnLf9X0Oaxw0wXp0liKho0QQqStDJB5I4ViaqmtI4nHm6cpb2kUealErPrcQpYUF6zgTHzLQhIO++2tcJc59A==", + "dependencies": { + "event-target-shim": "^6.0.2" + } + }, "node_modules/pascal-case": { "version": "3.1.2", "resolved": "https://registry.npmjs.org/pascal-case/-/pascal-case-3.1.2.tgz",