From adda89528688ea55e33339787b2e950086ebfcc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Mar=C3=A9chal?= Date: Fri, 18 Sep 2020 13:07:02 -0400 Subject: [PATCH] file-watchers: use singleton server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Theia spawns as many nsfw servers as there are clients, maybe more. This commit replaces this behaviour by centralizing everything into the same service process in order to optimize resources (watch handles are a limited resource on a given Linux host). Some notes on the current design: - Everytime someone requests to watch an `(uri, options)` couple we will re-use the same watch handle (and actual nsfw instance). This allows for resource allocation optimizations. - Since a given service can only serve a single client, I added an event dispatcher named `FileSystemWatcherServerDispatcher`. You must register yourself with some unique id you will use to make requests. - The new `.watchUriChanges` function now takes a `clientId` so that when events are sent to the dispatcher it then gets routed back to your own client. - Added a `--nsfw-watcher-verbose` backend argument to enable verbose logging for the watcher server. Signed-off-by: Paul Maréchal --- .../browser/api-samples-frontend-module.ts | 2 + .../sample-file-watching-contribution.ts | 50 ++ packages/core/src/common/promise-util.ts | 33 + packages/filesystem/compile.tsconfig.json | 3 + .../filesystem/src/browser/file-service.ts | 81 +-- .../src/common/filesystem-watcher-protocol.ts | 46 +- .../src/common/remote-file-system-provider.ts | 63 +- .../src/node/disk-file-system-provider.ts | 33 +- .../src/node/filesystem-backend-module.ts | 58 +- .../src/node/filesystem-watcher-client.ts | 87 ++- .../src/node/filesystem-watcher-dispatcher.ts | 82 +++ .../filesystem/src/node/nsfw-watcher/index.ts | 4 +- .../nsfw-filesystem-watcher.spec.ts | 14 +- .../nsfw-watcher/nsfw-filesystem-watcher.ts | 576 +++++++++++++----- .../filesystem/src/typings/nsfw/index.d.ts | 3 +- 15 files changed, 816 insertions(+), 319 deletions(-) create mode 100644 examples/api-samples/src/browser/file-watching/sample-file-watching-contribution.ts create mode 100644 packages/filesystem/src/node/filesystem-watcher-dispatcher.ts diff --git a/examples/api-samples/src/browser/api-samples-frontend-module.ts b/examples/api-samples/src/browser/api-samples-frontend-module.ts index 1a397ab0a4852..fc95418cdf58e 100644 --- a/examples/api-samples/src/browser/api-samples-frontend-module.ts +++ b/examples/api-samples/src/browser/api-samples-frontend-module.ts @@ -19,10 +19,12 @@ import { bindDynamicLabelProvider } from './label/sample-dynamic-label-provider- import { bindSampleUnclosableView } from './view/sample-unclosable-view-contribution'; import { bindSampleOutputChannelWithSeverity } from './output/sample-output-channel-with-severity'; import { bindSampleMenu } from './menu/sample-menu-contribution'; +import { bindSampleFileWatching } from './file-watching/sample-file-watching-contribution'; export default new ContainerModule(bind => { bindDynamicLabelProvider(bind); bindSampleUnclosableView(bind); bindSampleOutputChannelWithSeverity(bind); bindSampleMenu(bind); + bindSampleFileWatching(bind); }); diff --git a/examples/api-samples/src/browser/file-watching/sample-file-watching-contribution.ts b/examples/api-samples/src/browser/file-watching/sample-file-watching-contribution.ts new file mode 100644 index 0000000000000..62f04430da541 --- /dev/null +++ b/examples/api-samples/src/browser/file-watching/sample-file-watching-contribution.ts @@ -0,0 +1,50 @@ +/******************************************************************************** + * Copyright (C) 2020 Ericsson and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ + +import { injectable, inject, interfaces } from 'inversify'; +import { FrontendApplicationContribution, LabelProvider } from '@theia/core/lib/browser'; +import { FileService } from '@theia/filesystem/lib/browser/file-service'; +import { WorkspaceService } from '@theia/workspace/lib/browser'; + +export function bindSampleFileWatching(bind: interfaces.Bind): void { + bind(FrontendApplicationContribution).to(SampleFileWatchingContribution).inSingletonScope(); +} + +@injectable() +export class SampleFileWatchingContribution implements FrontendApplicationContribution { + + @inject(FileService) + protected readonly files: FileService; + + @inject(LabelProvider) + protected readonly label: LabelProvider; + + @inject(WorkspaceService) + protected readonly workspace: WorkspaceService; + + onStart(): void { + this.files.onDidFilesChange(event => { + // Get the workspace roots for the current frontend: + const roots = this.workspace.tryGetRoots(); + // Create some name to help find out which frontend logged the message: + const workspace = roots.length > 0 + ? roots.map(root => this.label.getName(root.resource)).join('+') + : ''; + console.log(`Sample File Watching: ${event.changes.length} files changed! ${workspace}`); + }); + } + +} diff --git a/packages/core/src/common/promise-util.ts b/packages/core/src/common/promise-util.ts index fc67564242844..0a317318edb52 100644 --- a/packages/core/src/common/promise-util.ts +++ b/packages/core/src/common/promise-util.ts @@ -59,3 +59,36 @@ export async function retry(task: () => Promise, delay: number, retries: n throw lastError; } + +export class AsyncLock { + + /** + * Reference to the last registered promise in the chain. + * + * We'll keep on adding with each call to `acquire` to queue other tasks. + */ + protected queue: Promise = Promise.resolve(); + + /** + * This method will queue the execution of `callback` behind previous calls to `acquire`. + * + * This is useful to ensure that only one task handles a resource at a time. + * + * @param callback task to execute once previous ones finished. + * @param args to pass to your callback. + * @returns callback's result. + */ + async acquire(callback: () => PromiseLike): Promise; + // eslint-disable-next-line space-before-function-paren, @typescript-eslint/no-explicit-any + async acquire(callback: (...args: U) => PromiseLike, ...args: U): Promise { + return new Promise((resolve, reject) => { + this.queue = this.queue.then( + // Execute the callbacks once previous tasks resolved. + // Resolve the outer promise to extract the callback's return value. + // Put these operations in the queue slot for later calls to be placed behind. + () => callback(...args).then(resolve, reject) + ); + }); + } + +} diff --git a/packages/filesystem/compile.tsconfig.json b/packages/filesystem/compile.tsconfig.json index 324e2d66da5ab..6bf6e997229f6 100644 --- a/packages/filesystem/compile.tsconfig.json +++ b/packages/filesystem/compile.tsconfig.json @@ -9,6 +9,9 @@ "mv": [ "src/typings/mv" ], + "nsfw": [ + "src/typings/nsfw" + ], "trash": [ "src/typings/trash" ] diff --git a/packages/filesystem/src/browser/file-service.ts b/packages/filesystem/src/browser/file-service.ts index de52a82846939..3164e16d4b881 100644 --- a/packages/filesystem/src/browser/file-service.ts +++ b/packages/filesystem/src/browser/file-service.ts @@ -1278,63 +1278,42 @@ export class FileService { */ readonly onDidFilesChange = this.onDidFilesChangeEmitter.event; - private activeWatchers = new Map(); - watch(resource: URI, options: WatchOptions = { recursive: false, excludes: [] }): Disposable { const resolvedOptions: WatchOptions = { ...options, // always ignore temporary upload files excludes: options.excludes.concat('**/theia_upload_*') }; - - let watchDisposed = false; - let watchDisposable = Disposable.create(() => watchDisposed = true); - - // Watch and wire in disposable which is async but - // check if we got disposed meanwhile and forward - this.doWatch(resource, resolvedOptions).then(disposable => { - if (watchDisposed) { - disposable.dispose(); - } else { - watchDisposable = disposable; - } - }, error => console.error(error)); - - return Disposable.create(() => watchDisposable.dispose()); - } - - async doWatch(resource: URI, options: WatchOptions): Promise { - const provider = await this.withProvider(resource); - const key = this.toWatchKey(provider, resource, options); - - // Only start watching if we are the first for the given key - const watcher = this.activeWatchers.get(key) || { count: 0, disposable: provider.watch(resource, options) }; - if (!this.activeWatchers.has(key)) { - this.activeWatchers.set(key, watcher); - } - - // Increment usage counter - watcher.count += 1; - - return Disposable.create(() => { - - // Unref - watcher.count--; - - // Dispose only when last user is reached - if (watcher.count === 0) { - watcher.disposable.dispose(); - this.activeWatchers.delete(key); - } - }); - } - - private toWatchKey(provider: FileSystemProvider, resource: URI, options: WatchOptions): string { - return [ - this.toMapKey(provider, resource), // lowercase path if the provider is case insensitive - String(options.recursive), // use recursive: true | false as part of the key - options.excludes.join() // use excludes as part of the key - ].join(); + /** + * Disposable handle. Can be disposed early (before the watcher is allocated.) + */ + const handle = { + disposed: false, + watcher: undefined as Disposable | undefined, + dispose(): void { + if (this.disposed) { + return; + } + if (this.watcher) { + this.watcher.dispose(); + this.watcher = undefined; + } + this.disposed = true; + }, + }; + this.withProvider(resource) + .then(provider => { + const watcher = provider.watch(resource, resolvedOptions); + // If the handle got disposed before allocation, stop here. + if (handle.disposed) { + watcher.dispose(); + } else { + handle.watcher = watcher; + } + }, error => { + console.error(error); + }); + return handle; } // #endregion diff --git a/packages/filesystem/src/common/filesystem-watcher-protocol.ts b/packages/filesystem/src/common/filesystem-watcher-protocol.ts index c2c3440afff55..29aa20eb31914 100644 --- a/packages/filesystem/src/common/filesystem-watcher-protocol.ts +++ b/packages/filesystem/src/common/filesystem-watcher-protocol.ts @@ -19,6 +19,47 @@ import { JsonRpcServer, JsonRpcProxy } from '@theia/core'; import { FileChangeType } from './files'; export { FileChangeType }; +export const FileSystemWatcherServer2 = Symbol('FileSystemWatcherServer2'); +/** + * Singleton implementation of the watch server. + * + * Since multiple clients all make requests to this service, we need to track those individually via a `clientId`. + */ +export interface FileSystemWatcherServer2 extends JsonRpcServer { + /** + * @param clientId arbitrary id used to identify a client. + * @param uri the path to watch. + * @param options optional parameters. + * @returns promise to a unique `number` handle for this request. + */ + watchFileChanges2(clientId: number, uri: string, options?: WatchOptions): Promise; + /** + * @param watcherId handle mapping to a previous `watchFileChanges` request. + */ + unwatchFileChanges2(watcherId: number): Promise; +} + +export interface FileSystemWatcherClient2 { + /** Listen for change events emitted by the watcher. */ + onDidFilesChanged2(event: DidFilesChangedParams2): void; + /** The watcher can crash in certain conditions. */ + onError2(event: FileSystemWatcherErrorParams2): void; +} + +export interface DidFilesChangedParams2 { + /** Clients to route the events to. */ + clients: number[]; + /** FileSystem changes that occured. */ + changes: FileChange[]; +} + +export interface FileSystemWatcherErrorParams2 { + /** Clients to route the events to. */ + clients: number[]; + /** The uri that originated the error. */ + uri: string; +} + export const FileSystemWatcherServer = Symbol('FileSystemWatcherServer'); export interface FileSystemWatcherServer extends JsonRpcServer { /** @@ -32,7 +73,7 @@ export interface FileSystemWatcherServer extends JsonRpcServer; + unwatchFileChanges(watcherId: number): Promise; } export interface FileSystemWatcherClient { @@ -63,6 +104,9 @@ export interface FileChange { export const FileSystemWatcherServerProxy = Symbol('FileSystemWatcherServerProxy'); export type FileSystemWatcherServerProxy = JsonRpcProxy; +/** + * @deprecated not used internally anymore. + */ @injectable() export class ReconnectingFileSystemWatcherServer implements FileSystemWatcherServer { diff --git a/packages/filesystem/src/common/remote-file-system-provider.ts b/packages/filesystem/src/common/remote-file-system-provider.ts index b823ec6b40e07..a9f121efd859d 100644 --- a/packages/filesystem/src/common/remote-file-system-provider.ts +++ b/packages/filesystem/src/common/remote-file-system-provider.ts @@ -27,7 +27,6 @@ import { } from './files'; import { JsonRpcServer, JsonRpcProxy, JsonRpcProxyFactory } from '@theia/core/lib/common/messaging/proxy-factory'; import { ApplicationError } from '@theia/core/lib/common/application-error'; -import { Deferred } from '@theia/core/lib/common/promise-util'; import type { TextDocumentContentChangeEvent } from 'vscode-languageserver-protocol'; import { newWriteableStream, ReadableStreamEvents } from '@theia/core/lib/common/stream'; import { CancellationToken, cancelled } from '@theia/core/lib/common/cancellation'; @@ -103,6 +102,11 @@ export class RemoteFileSystemProxyFactory extends JsonRpcProxy } } +/** + * Frontend component. + * + * Wraps the remote filesystem provider living on the backend side. + */ @injectable() export class RemoteFileSystemProvider implements Required, Disposable { @@ -129,6 +133,10 @@ export class RemoteFileSystemProvider implements Required, D ); protected watcherSequence = 0; + /** + * We'll track the currently allocated watchers, in order to re-allocate them + * with the same options once we reconnect to the backend after a disconnection. + */ protected readonly watchOptions = new Map, D private _capabilities: FileSystemProviderCapabilities = 0; get capabilities(): FileSystemProviderCapabilities { return this._capabilities; } - protected readonly deferredReady = new Deferred(); - get ready(): Promise { - return this.deferredReady.promise; - } + readonly ready: Promise; + /** + * Wrapped remote filesystem. + */ @inject(RemoteFileSystemServer) protected readonly server: JsonRpcProxy; @postConstruct() protected init(): void { - this.server.getCapabilities().then(capabilities => { + (this.ready as Promise) = this.server.getCapabilities().then(capabilities => { this._capabilities = capabilities; - this.deferredReady.resolve(undefined); - }, this.deferredReady.reject); + }); this.server.setClient({ notifyDidChangeFile: ({ changes }) => { this.onDidChangeFileEmitter.fire(changes.map(event => ({ resource: new URI(event.resource), type: event.type }))); @@ -286,19 +293,23 @@ export class RemoteFileSystemProvider implements Required, D } watch(resource: URI, options: WatchOptions): Disposable { - const watcher = this.watcherSequence++; + const watcherId = this.watcherSequence++; const uri = resource.toString(); - this.watchOptions.set(watcher, { uri, options }); - this.server.watch(watcher, uri, options); - + this.watchOptions.set(watcherId, { uri, options }); + this.server.watch(watcherId, uri, options); const toUnwatch = Disposable.create(() => { - this.watchOptions.delete(watcher); - this.server.unwatch(watcher); + this.watchOptions.delete(watcherId); + this.server.unwatch(watcherId); }); this.toDispose.push(toUnwatch); return toUnwatch; } + /** + * When a frontend disconnects (e.g. bad connection) the backend resources will be cleared. + * + * This means that we need to re-allocate the watchers when a frontend reconnects. + */ protected reconnect(): void { for (const [watcher, { uri, options }] of this.watchOptions.entries()) { this.server.watch(watcher, uri, options); @@ -308,6 +319,8 @@ export class RemoteFileSystemProvider implements Required, D } /** + * Backend component. + * * JSON-RPC server exposing a wrapped file system provider remotely. */ @injectable() @@ -315,6 +328,8 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { private readonly BUFFER_SIZE = 64 * 1024; + protected watchers = new Map(); + protected readonly toDispose = new DisposableCollection(); dispose(): void { this.toDispose.dispose(); @@ -325,6 +340,9 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { this.client = client; } + /** + * Wrapped file system provider. + */ @inject(FileSystemProvider) protected readonly provider: FileSystemProvider & Partial; @@ -450,18 +468,19 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { throw new Error('not supported'); } - protected watchers = new Map(); - - async watch(req: number, resource: string, opts: WatchOptions): Promise { + async watch(requestedWatcherId: number, resource: string, opts: WatchOptions): Promise { + if (this.watchers.has(requestedWatcherId)) { + throw new Error('watcher id is already allocated!'); + } const watcher = this.provider.watch(new URI(resource), opts); - this.watchers.set(req, watcher); - this.toDispose.push(Disposable.create(() => this.unwatch(req))); + this.watchers.set(requestedWatcherId, watcher); + this.toDispose.push(Disposable.create(() => this.unwatch(requestedWatcherId))); } - async unwatch(req: number): Promise { - const watcher = this.watchers.get(req); + async unwatch(watcherId: number): Promise { + const watcher = this.watchers.get(watcherId); if (watcher) { - this.watchers.delete(req); + this.watchers.delete(watcherId); watcher.dispose(); } } diff --git a/packages/filesystem/src/node/disk-file-system-provider.ts b/packages/filesystem/src/node/disk-file-system-provider.ts index 341c6c6c60636..a73c11d6558c1 100644 --- a/packages/filesystem/src/node/disk-file-system-provider.ts +++ b/packages/filesystem/src/node/disk-file-system-provider.ts @@ -804,18 +804,35 @@ export class DiskFileSystemProvider implements Disposable, // #region File Watching watch(resource: URI, opts: WatchOptions): Disposable { - const toUnwatch = new DisposableCollection(Disposable.create(() => { /* mark as not disposed */ })); - this.watcher.watchFileChanges(resource.toString(), { + const watcherService = this.watcher; + /** + * Disposable handle. Can be disposed early (before the watcher is allocated.) + */ + const handle = { + disposed: false, + watcherId: undefined as number | undefined, + dispose(): void { + if (this.disposed) { + return; + } + if (typeof this.watcherId !== 'undefined') { + watcherService.unwatchFileChanges(this.watcherId); + } + this.disposed = true; + }, + }; + watcherService.watchFileChanges(resource.toString(), { + // Convert from `files.WatchOptions` to internal `watcher-protocol.WatchOptions`: ignored: opts.excludes - }).then(watcher => { - if (toUnwatch.disposed) { - this.watcher.unwatchFileChanges(watcher); + }).then(watcherId => { + if (handle.disposed) { + watcherService.unwatchFileChanges(watcherId); } else { - toUnwatch.push(Disposable.create(() => this.watcher.unwatchFileChanges(watcher))); + handle.watcherId = watcherId; } }); - this.toDispose.push(toUnwatch); - return toUnwatch; + this.toDispose.push(handle); + return handle; } // #endregion diff --git a/packages/filesystem/src/node/filesystem-backend-module.ts b/packages/filesystem/src/node/filesystem-backend-module.ts index 574ed49a513b2..163771ffe8db4 100644 --- a/packages/filesystem/src/node/filesystem-backend-module.ts +++ b/packages/filesystem/src/node/filesystem-backend-module.ts @@ -14,9 +14,10 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ +import * as path from 'path'; import { ContainerModule, interfaces } from 'inversify'; import { ConnectionHandler, JsonRpcConnectionHandler, ILogger } from '@theia/core/lib/common'; -import { FileSystemWatcherServer } from '../common/filesystem-watcher-protocol'; +import { FileSystemWatcherServer, FileSystemWatcherServer2 } from '../common/filesystem-watcher-protocol'; import { FileSystemWatcherServerClient } from './filesystem-watcher-client'; import { NsfwFileSystemWatcherServer } from './nsfw-watcher/nsfw-filesystem-watcher'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; @@ -28,25 +29,67 @@ import { } from '../common/remote-file-system-provider'; import { FileSystemProvider } from '../common/files'; import { EncodingService } from '@theia/core/lib/common/encoding-service'; +import { IPCConnectionProvider } from '@theia/core/lib/node'; +import { JsonRpcProxyFactory, ConnectionErrorHandler } from '@theia/core'; +import { FileSystemWatcherServer2Dispatcher } from './filesystem-watcher-dispatcher'; const SINGLE_THREADED = process.argv.indexOf('--no-cluster') !== -1; +const NSFW_WATCHER_VERBOSE = process.argv.indexOf('--nsfw-watcher-verbose') !== -1; export function bindFileSystemWatcherServer(bind: interfaces.Bind, { singleThreaded }: { singleThreaded: boolean } = { singleThreaded: SINGLE_THREADED }): void { - bind(NsfwOptions).toConstantValue({}); + bind(NsfwOptions).toConstantValue({ debounceMS: 100 }); + + bind(FileSystemWatcherServer2Dispatcher).toSelf().inSingletonScope(); + + bind(FileSystemWatcherServerClient).toSelf(); + bind(FileSystemWatcherServer).toService(FileSystemWatcherServerClient); if (singleThreaded) { - bind(FileSystemWatcherServer).toDynamicValue(ctx => { + // Bind and run the watch server in the current process: + bind(FileSystemWatcherServer2).toDynamicValue(ctx => { const logger = ctx.container.get(ILogger); const nsfwOptions = ctx.container.get(NsfwOptions); - return new NsfwFileSystemWatcherServer({ + const dispatcher = ctx.container.get(FileSystemWatcherServer2Dispatcher); + const server = new NsfwFileSystemWatcherServer({ nsfwOptions, + verbose: NSFW_WATCHER_VERBOSE, info: (message, ...args) => logger.info(message, ...args), error: (message, ...args) => logger.error(message, ...args) }); - }); + server.setClient(dispatcher); + return server; + }).inSingletonScope(); } else { - bind(FileSystemWatcherServerClient).toSelf(); - bind(FileSystemWatcherServer).toService(FileSystemWatcherServerClient); + // Run the watcher server in a child process. + // Bind to a proxy forwarding calls to the child process. + bind(FileSystemWatcherServer2).toDynamicValue(ctx => { + const serverName = 'nsfw-watcher'; + const logger = ctx.container.get(ILogger); + const nsfwOptions = ctx.container.get(NsfwOptions); + const ipcConnectionProvider = ctx.container.get(IPCConnectionProvider); + const dispatcher = ctx.container.get(FileSystemWatcherServer2Dispatcher); + const proxyFactory = new JsonRpcProxyFactory(); + const serverProxy = proxyFactory.createProxy(); + // We need to call `.setClient` before listening, else the JSON-RPC calls won't go through. + serverProxy.setClient(dispatcher); + const args: string[] = [ + `--nsfwOptions=${JSON.stringify(nsfwOptions)}` + ]; + if (NSFW_WATCHER_VERBOSE) { + args.push('--verbose'); + } + ipcConnectionProvider.listen({ + serverName, + entryPoint: path.resolve(__dirname, serverName), + errorHandler: new ConnectionErrorHandler({ + serverName, + logger, + }), + env: process.env, + args, + }, connection => proxyFactory.listen(connection)); + return serverProxy; + }).inSingletonScope(); } } @@ -65,7 +108,6 @@ export default new ContainerModule(bind => { return server; }, RemoteFileSystemProxyFactory) ).inSingletonScope(); - bind(NodeFileUploadService).toSelf().inSingletonScope(); bind(MessagingService.Contribution).toService(NodeFileUploadService); }); diff --git a/packages/filesystem/src/node/filesystem-watcher-client.ts b/packages/filesystem/src/node/filesystem-watcher-client.ts index 8e5fa512facdf..01289abfc24f6 100644 --- a/packages/filesystem/src/node/filesystem-watcher-client.ts +++ b/packages/filesystem/src/node/filesystem-watcher-client.ts @@ -14,74 +14,59 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ -import * as path from 'path'; import { injectable, inject } from 'inversify'; -import { JsonRpcProxyFactory, ILogger, ConnectionErrorHandler, DisposableCollection, Disposable } from '@theia/core'; -import { IPCConnectionProvider } from '@theia/core/lib/node/messaging'; -import { FileSystemWatcherServer, WatchOptions, FileSystemWatcherClient, ReconnectingFileSystemWatcherServer } from '../common/filesystem-watcher-protocol'; -import { NsfwOptions } from './nsfw-watcher/nsfw-options'; +import { FileSystemWatcherServer, WatchOptions, FileSystemWatcherClient, FileSystemWatcherServer2 } from '../common/filesystem-watcher-protocol'; +import { FileSystemWatcherServer2Dispatcher } from './filesystem-watcher-dispatcher'; export const NSFW_WATCHER = 'nsfw-watcher'; +/** + * Wraps the NSFW singleton server for each frontend. + */ @injectable() export class FileSystemWatcherServerClient implements FileSystemWatcherServer { - protected readonly proxyFactory = new JsonRpcProxyFactory(); - protected readonly remote = new ReconnectingFileSystemWatcherServer(this.proxyFactory.createProxy()); + protected static clientIdSequence = 0; - protected readonly toDispose = new DisposableCollection(); + /** + * Track allocated watcherIds to de-allocate on disposal. + */ + protected watcherIds = new Set(); - constructor( - @inject(ILogger) protected readonly logger: ILogger, - @inject(IPCConnectionProvider) protected readonly ipcConnectionProvider: IPCConnectionProvider, - @inject(NsfwOptions) protected readonly nsfwOptions: NsfwOptions - ) { - this.remote.setClient({ - onDidFilesChanged: e => { - if (this.client) { - this.client.onDidFilesChanged(e); - } - }, - onError: () => { - if (this.client) { - this.client.onError(); - } - } - }); - this.toDispose.push(this.remote); - this.toDispose.push(this.listen()); - } + /** + * @todo make this number precisely map to one specific frontend. + */ + protected readonly clientId = FileSystemWatcherServerClient.clientIdSequence++; - dispose(): void { - this.toDispose.dispose(); - } + @inject(FileSystemWatcherServer2Dispatcher) + protected readonly watcherDispatcher: FileSystemWatcherServer2Dispatcher; - watchFileChanges(uri: string, options?: WatchOptions): Promise { - return this.remote.watchFileChanges(uri, options); + @inject(FileSystemWatcherServer2) + protected readonly watcherServer: FileSystemWatcherServer2; + + async watchFileChanges(uri: string, options?: WatchOptions): Promise { + const watcherId = await this.watcherServer.watchFileChanges2(this.clientId, uri, options); + this.watcherIds.add(watcherId); + return watcherId; } - unwatchFileChanges(watcher: number): Promise { - return this.remote.unwatchFileChanges(watcher); + async unwatchFileChanges(watcherId: number): Promise { + this.watcherIds.delete(watcherId); + return this.watcherServer.unwatchFileChanges2(watcherId); } - protected client: FileSystemWatcherClient | undefined; setClient(client: FileSystemWatcherClient | undefined): void { - this.client = client; + if (typeof client !== 'undefined') { + this.watcherDispatcher.registerClient(this.clientId, client); + } else { + this.watcherDispatcher.unregisterClient(this.clientId); + } } - protected listen(): Disposable { - return this.ipcConnectionProvider.listen({ - serverName: NSFW_WATCHER, - entryPoint: path.resolve(__dirname, NSFW_WATCHER), - errorHandler: new ConnectionErrorHandler({ - serverName: NSFW_WATCHER, - logger: this.logger - }), - args: [ - `--nsfwOptions=${JSON.stringify(this.nsfwOptions)}` - ], - env: process.env - }, connection => this.proxyFactory.listen(connection)); + dispose(): void { + this.setClient(undefined); + for (const watcherId of this.watcherIds) { + this.unwatchFileChanges(watcherId); + } } - } diff --git a/packages/filesystem/src/node/filesystem-watcher-dispatcher.ts b/packages/filesystem/src/node/filesystem-watcher-dispatcher.ts new file mode 100644 index 0000000000000..47f3d21243484 --- /dev/null +++ b/packages/filesystem/src/node/filesystem-watcher-dispatcher.ts @@ -0,0 +1,82 @@ +/******************************************************************************** + * Copyright (C) 2020 Ericsson and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ + +import { injectable } from 'inversify'; +import { + FileSystemWatcherClient, FileSystemWatcherClient2, + DidFilesChangedParams2, FileSystemWatcherErrorParams2 +} from '../common/filesystem-watcher-protocol'; + +/** + * This component routes watch events to the right clients. + */ +@injectable() +export class FileSystemWatcherServer2Dispatcher implements FileSystemWatcherClient2 { + + /** + * Mapping of `clientId` to actual clients. + */ + protected readonly clients = new Map(); + + onDidFilesChanged2(event: DidFilesChangedParams2): void { + for (const client of this.iterRegisteredClients(event.clients)) { + client.onDidFilesChanged(event); + } + } + + onError2(event: FileSystemWatcherErrorParams2): void { + for (const client of this.iterRegisteredClients(event.clients)) { + client.onError(); + } + } + + /** + * Listen for events targeted at `clientId`. + */ + registerClient(clientId: number, client: FileSystemWatcherClient): void { + if (this.clients.has(clientId)) { + console.warn(`FileSystemWatcherServer2Dispatcher: a client was already registered! clientId=${clientId}`); + } + this.clients.set(clientId, client); + } + + unregisterClient(clientId: number): void { + if (!this.clients.has(clientId)) { + console.warn(`FileSystemWatcherServer2Dispatcher: tried to remove unknown client! clientId=${clientId}`); + } + this.clients.delete(clientId); + } + + /** + * Only yield registered clients for the given `clientIds`. + * + * If clientIds is empty, will return all clients. + */ + protected *iterRegisteredClients(clientIds?: number[]): Iterable { + if (!Array.isArray(clientIds) || clientIds.length === 0) { + // If we receive an event targeted to "no client", + // interpret that as notifying all clients: + yield* this.clients.values(); + } else { + for (const clientId of clientIds) { + const client = this.clients.get(clientId); + if (typeof client !== 'undefined') { + yield client; + } + } + } + } +} diff --git a/packages/filesystem/src/node/nsfw-watcher/index.ts b/packages/filesystem/src/node/nsfw-watcher/index.ts index fcf9701e686cf..66f6591e634e8 100644 --- a/packages/filesystem/src/node/nsfw-watcher/index.ts +++ b/packages/filesystem/src/node/nsfw-watcher/index.ts @@ -16,7 +16,7 @@ import * as yargs from 'yargs'; import { JsonRpcProxyFactory } from '@theia/core'; -import { FileSystemWatcherClient } from '../../common/filesystem-watcher-protocol'; +import { FileSystemWatcherClient2 } from '../../common/filesystem-watcher-protocol'; import { NsfwFileSystemWatcherServer } from './nsfw-filesystem-watcher'; import { IPCEntryPoint } from '@theia/core/lib/node/messaging/ipc-protocol'; @@ -39,7 +39,7 @@ const options: { export default (connection => { const server = new NsfwFileSystemWatcherServer(options); - const factory = new JsonRpcProxyFactory(server); + const factory = new JsonRpcProxyFactory(server); server.setClient(factory.createProxy()); factory.listen(connection); }); diff --git a/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.spec.ts b/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.spec.ts index 22f7afad1856e..ceed218010f14 100644 --- a/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.spec.ts +++ b/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.spec.ts @@ -38,7 +38,7 @@ describe('nsfw-filesystem-watcher', function (): void { beforeEach(async () => { root = FileUri.create(fs.realpathSync(temp.mkdirSync('node-fs-root'))); watcherServer = createNsfwFileSystemWatcherServer(); - watcherId = await watcherServer.watchFileChanges(root.toString()); + watcherId = await watcherServer.watchFileChanges2(0, root.toString()); await sleep(2000); }); @@ -50,15 +50,14 @@ describe('nsfw-filesystem-watcher', function (): void { it('Should receive file changes events from in the workspace by default.', async function (): Promise { if (process.platform === 'win32') { this.skip(); - return; } const actualUris = new Set(); const watcherClient = { - onDidFilesChanged(event: DidFilesChangedParams): void { + onDidFilesChanged2(event: DidFilesChangedParams): void { event.changes.forEach(c => actualUris.add(c.uri.toString())); }, - onError(): void { + onError2(): void { } }; watcherServer.setClient(watcherClient); @@ -87,21 +86,20 @@ describe('nsfw-filesystem-watcher', function (): void { it('Should not receive file changes events from in the workspace by default if unwatched', async function (): Promise { if (process.platform === 'win32') { this.skip(); - return; } const actualUris = new Set(); const watcherClient = { - onDidFilesChanged(event: DidFilesChangedParams): void { + onDidFilesChanged2(event: DidFilesChangedParams): void { event.changes.forEach(c => actualUris.add(c.uri.toString())); }, - onError(): void { + onError2(): void { } }; watcherServer.setClient(watcherClient); /* Unwatch root */ - watcherServer.unwatchFileChanges(watcherId); + watcherServer.unwatchFileChanges2(watcherId); fs.mkdirSync(FileUri.fsPath(root.resolve('foo'))); expect(fs.statSync(FileUri.fsPath(root.resolve('foo'))).isDirectory()).to.be.true; diff --git a/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.ts b/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.ts index d35299aeded78..fe668bfe54cde 100644 --- a/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.ts +++ b/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.ts @@ -14,207 +14,281 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ -import * as fs from 'fs'; import * as nsfw from 'nsfw'; -import * as paths from 'path'; +import { join } from 'path'; +import { promises as fsp } from 'fs'; import { IMinimatch, Minimatch } from 'minimatch'; -import { Disposable, DisposableCollection } from '@theia/core/lib/common/disposable'; import { FileUri } from '@theia/core/lib/node/file-uri'; import { - FileChangeType, - FileSystemWatcherClient, - FileSystemWatcherServer, - WatchOptions + FileChangeType, FileSystemWatcherServer2, FileSystemWatcherClient2, WatchOptions } from '../../common/filesystem-watcher-protocol'; import { FileChangeCollection } from '../file-change-collection'; -import { setInterval, clearInterval } from 'timers'; - -const debounce = require('lodash.debounce'); - -/* eslint-disable @typescript-eslint/no-explicit-any */ +import { Deferred, AsyncLock } from '@theia/core/lib/common/promise-util'; +/** NsfwWatcherOptions */ export interface WatcherOptions { ignored: IMinimatch[] } +type NsfwWatcherOptions = WatcherOptions; -export class NsfwFileSystemWatcherServer implements FileSystemWatcherServer { +export interface NsfwFileSystemWatcherServerOptions { + verbose: boolean; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + info: (message: string, ...args: any[]) => void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + error: (message: string, ...args: any[]) => void; + nsfwOptions: nsfw.Options; +} - protected client: FileSystemWatcherClient | undefined; +/** + * This is a flag value passed around upon disposal. + */ +export const WatcherDisposal = Symbol('WatcherDisposal'); - protected watcherSequence = 1; - protected readonly watchers = new Map(); - protected readonly watcherOptions = new Map(); +/** + * Because URIs can be watched by different clients, we'll track + * how many are listening for a given URI. + * + * This component wraps the whole start/stop process given some + * reference count. + * + * Once there are no more references the handle + * will wait for some time before destroying its resources. + */ +export class NsfwWatcher { - protected readonly toDispose = new DisposableCollection( - Disposable.create(() => this.setClient(undefined)) - ); + protected static debugIdSequence = 0; - protected changes = new FileChangeCollection(); + protected disposed = false; - protected readonly options: { - verbose: boolean - info: (message: string, ...args: any[]) => void - error: (message: string, ...args: any[]) => void, - nsfwOptions: nsfw.Options - }; + /** + * Used for debugging to keep track of the watchers. + */ + protected debugId = NsfwWatcher.debugIdSequence++; - constructor(options?: { - verbose?: boolean, - nsfwOptions?: nsfw.Options, - info?: (message: string, ...args: any[]) => void - error?: (message: string, ...args: any[]) => void - }) { - this.options = { - nsfwOptions: {}, - verbose: false, - info: (message, ...args) => console.info(message, ...args), - error: (message, ...args) => console.error(message, ...args), - ...options - }; - } + /** + * When this field is set, it means the nsfw instance was successfully started. + */ + protected nsfw: nsfw.NSFW | undefined; - dispose(): void { - this.toDispose.dispose(); - } - - async watchFileChanges(uri: string, options?: WatchOptions): Promise { - const watcherId = this.watcherSequence++; - const basePath = FileUri.fsPath(uri); - this.debug('Starting watching:', basePath); - const toDisposeWatcher = new DisposableCollection(); - this.watchers.set(watcherId, toDisposeWatcher); - toDisposeWatcher.push(Disposable.create(() => this.watchers.delete(watcherId))); - if (fs.existsSync(basePath)) { - this.start(watcherId, basePath, options, toDisposeWatcher); - } else { - const toClearTimer = new DisposableCollection(); - const timer = setInterval(() => { - if (fs.existsSync(basePath)) { - toClearTimer.dispose(); - this.pushAdded(watcherId, basePath); - this.start(watcherId, basePath, options, toDisposeWatcher); - } - }, 500); - toClearTimer.push(Disposable.create(() => clearInterval(timer))); - toDisposeWatcher.push(toClearTimer); - } - this.toDispose.push(toDisposeWatcher); - return watcherId; - } + /** + * When the ref count hits zero, we schedule this watch handle to be disposed. + */ + protected deferredDisposalTimer: NodeJS.Timer | undefined; - protected async start(watcherId: number, basePath: string, rawOptions: WatchOptions | undefined, toDisposeWatcher: DisposableCollection): Promise { - const options: WatchOptions = { - ignored: [], - ...rawOptions - }; - if (options.ignored.length > 0) { - this.debug('Files ignored for watching', options.ignored); - } + /** + * This deferred only rejects with `WatcherDisposal` and never resolves. + */ + protected readonly deferredDisposalDeferred = new Deferred(); - let watcher: nsfw.NSFW | undefined = await nsfw(fs.realpathSync(basePath), (events: nsfw.ChangeEvent[]) => { - for (const event of events) { - if (event.action === nsfw.actions.CREATED) { - this.pushAdded(watcherId, this.resolvePath(event.directory, event.file!)); - } - if (event.action === nsfw.actions.DELETED) { - this.pushDeleted(watcherId, this.resolvePath(event.directory, event.file!)); - } - if (event.action === nsfw.actions.MODIFIED) { - this.pushUpdated(watcherId, this.resolvePath(event.directory, event.file!)); - } - if (event.action === nsfw.actions.RENAMED) { - this.pushDeleted(watcherId, this.resolvePath(event.directory, event.oldFile!)); - this.pushAdded(watcherId, this.resolvePath(event.newDirectory || event.directory, event.newFile!)); - } - } - }, { - errorCallback: error => { - // see https://github.com/atom/github/issues/342 - console.warn(`Failed to watch "${basePath}":`, error); - if (error === 'Inotify limit reached') { - if (this.client) { - this.client.onError(); - } - } - this.unwatchFileChanges(watcherId); - }, - ...this.options.nsfwOptions - }); - await watcher.start(); - this.options.info('Started watching:', basePath); - if (toDisposeWatcher.disposed) { - this.debug('Stopping watching:', basePath); - await watcher.stop(); - // remove a reference to nsfw otherwise GC cannot collect it - watcher = undefined; - this.options.info('Stopped watching:', basePath); - return; - } - toDisposeWatcher.push(Disposable.create(async () => { - this.watcherOptions.delete(watcherId); - if (watcher) { - this.debug('Stopping watching:', basePath); - await watcher.stop(); - // remove a reference to nsfw otherwise GC cannot collect it - watcher = undefined; - this.options.info('Stopped watching:', basePath); + /** + * We count each reference made to this watcher, per client. + * + * We do this to know where to send events via the network. + * + * An entry should be removed when its value hits zero. + */ + protected readonly refsPerClient = new Map(); + + /** + * Ensures that events are processed in the order they are emitted, + * despite being processed async. + */ + protected readonly nsfwEventProcessingLock = new AsyncLock(); + + /** + * Resolves once this handle disposed itself and its resources. Never throws. + */ + readonly whenDisposed: Promise = this.deferredDisposalDeferred.promise.catch(() => undefined); + + /** + * Promise that resolves when the watcher is fully started, or got disposed. + * + * Will reject if an error occured while starting. + * + * @returns `true` if successfully started, `false` if disposed early. + */ + readonly whenStarted: Promise; + + constructor( + /** Initial reference to this handle. */ + initialClientId: number, + /** Filesystem path to be watched. */ + readonly fsPath: string, + /** Watcher-specific options */ + readonly watcherOptions: NsfwWatcherOptions, + /** Logging and Nsfw options */ + protected readonly nsfwFileSystemWatchServerOptions: NsfwFileSystemWatcherServerOptions, + /** The client to forward events to. */ + protected readonly fileSystemWatcherClient: FileSystemWatcherClient2, + /** Amount of time in ms to wait once this handle ins't referenced anymore. */ + protected readonly deferredDisposalTimeout = 10_000, + ) { + this.refsPerClient.set(initialClientId, { value: 1 }); + this.whenStarted = this.start().then(() => true, error => { + if (error === WatcherDisposal) { + return false; } - })); - this.watcherOptions.set(watcherId, { - ignored: options.ignored.map(pattern => new Minimatch(pattern, { dot: true })) + this._dispose(); + this.fireError(); + throw error; }); + this.debug('NEW', `initialClientId=${initialClientId}, fsPath=${fsPath}`); } - unwatchFileChanges(watcherId: number): Promise { - const disposable = this.watchers.get(watcherId); - if (disposable) { - this.watchers.delete(watcherId); - disposable.dispose(); + addRef(clientId: number): void { + let refs = this.refsPerClient.get(clientId); + if (typeof refs === 'undefined') { + this.refsPerClient.set(clientId, refs = { value: 1 }); + } else { + refs.value += 1; + } + const totalRefs = this.getTotalReferences(); + // If it was zero before, 1 means we were revived: + const revived = totalRefs === 1; + if (revived) { + this.onRefsRevive(); } - return Promise.resolve(); + this.debug('REF++', `clientId=${clientId}, clientRefs=${refs.value}, totalRefs=${totalRefs}. revived=${revived}`); } - setClient(client: FileSystemWatcherClient | undefined): void { - if (client && this.toDispose.disposed) { + removeRef(clientId: number): void { + const refs = this.refsPerClient.get(clientId); + if (typeof refs === 'undefined') { + this.info('WARN REF--', `removed one too many reference: clientId=${clientId}`); return; } - this.client = client; + refs.value -= 1; + // We must remove the key from `this.clientReferences` because + // we list active clients by reading the keys of this map. + if (refs.value === 0) { + this.refsPerClient.delete(clientId); + } + const totalRefs = this.getTotalReferences(); + const dead = totalRefs === 0; + if (dead) { + this.onRefsReachZero(); + } + this.debug('REF--', `clientId=${clientId}, clientRefs=${refs.value}, totalRefs=${totalRefs}, dead=${dead}`); } - protected pushAdded(watcherId: number, path: string): void { - this.debug('Added:', path); - this.pushFileChange(watcherId, path, FileChangeType.ADDED); + /** + * All clients with at least one active reference. + */ + getClientIds(): number[] { + return Array.from(this.refsPerClient.keys()); } - protected pushUpdated(watcherId: number, path: string): void { - this.debug('Updated:', path); - this.pushFileChange(watcherId, path, FileChangeType.UPDATED); + /** + * Add the references for each client together. + */ + getTotalReferences(): number { + let total = 0; + for (const refs of this.refsPerClient.values()) { + total += refs.value; + } + return total; } - protected pushDeleted(watcherId: number, path: string): void { - this.debug('Deleted:', path); - this.pushFileChange(watcherId, path, FileChangeType.DELETED); + /** + * Returns true if at least one client listens to this handle. + */ + isInUse(): boolean { + return this.refsPerClient.size > 0; } - protected pushFileChange(watcherId: number, path: string, type: FileChangeType): void { - if (this.isIgnored(watcherId, path)) { - return; + /** + * When starting a watcher, we'll first check and wait for the path to exists + * before running an NSFW watcher. + */ + protected async start(): Promise { + while (await this.orCancel(fsp.stat(this.fsPath).then(() => false, () => true))) { + await this.orCancel(new Promise(resolve => setTimeout(resolve, 500))); } + const watcher = await this.orCancel(this.createNsfw()); + await this.orCancel(watcher.start().then(() => { + this.debug('STARTED', `fsPath=${this.fsPath}, diposed=${this.disposed}`); + // The watcher could be disposed while it was starting, make sure to check for this: + if (this.disposed) { + this.stopNsfw(watcher); + } + })); + this.nsfw = watcher; + } - const uri = FileUri.create(path).toString(); - this.changes.push({ uri, type }); + /** + * Given a started nsfw instance, gracefully shut it down. + */ + protected async stopNsfw(watcher: nsfw.NSFW): Promise { + await watcher.stop() + .then(() => 'success=true', error => error) + .then(status => this.debug('STOPPED', `fsPath=${this.fsPath}`, status)); + } - this.fireDidFilesChanged(); + protected async createNsfw(): Promise { + const fsPath = await fsp.realpath(this.fsPath); + return nsfw(fsPath, events => this.handleNsfwEvents(events), { + ...this.nsfwFileSystemWatchServerOptions.nsfwOptions, + // The errorCallback is called whenever NSFW crashes *while* watching. + // See https://github.com/atom/github/issues/342 + errorCallback: error => { + console.error(`NSFW service error on "${fsPath}":`, error); + this._dispose(); + this.fireError(); + // Make sure to call user's error handling code: + if (this.nsfwFileSystemWatchServerOptions.nsfwOptions.errorCallback) { + this.nsfwFileSystemWatchServerOptions.nsfwOptions.errorCallback(error); + } + }, + }); } - protected resolvePath(directory: string, file: string): string { - const path = paths.join(directory, file); + protected handleNsfwEvents(events: nsfw.ChangeEvent[]): void { + // Only process events if someone is listening. + if (this.isInUse()) { + // This callback is async, but nsfw won't wait for it to finish before firing the next one. + // We will use a lock/queue to make sure everything is processed in the order it arrives. + this.nsfwEventProcessingLock.acquire(async () => { + const fileChangeCollection = new FileChangeCollection(); + await Promise.all(events.map(async event => { + if (event.action === nsfw.actions.RENAMED) { + const [oldPath, newPath] = await Promise.all([ + this.resolveEventPath(event.directory, event.oldFile!), + this.resolveEventPath(event.newDirectory || event.directory, event.newFile!), + ]); + this.pushFileChange(fileChangeCollection, FileChangeType.DELETED, oldPath); + this.pushFileChange(fileChangeCollection, FileChangeType.ADDED, newPath); + } else { + const path = await this.resolveEventPath(event.directory, event.file!); + if (event.action === nsfw.actions.CREATED) { + this.pushFileChange(fileChangeCollection, FileChangeType.ADDED, path); + } else if (event.action === nsfw.actions.DELETED) { + this.pushFileChange(fileChangeCollection, FileChangeType.DELETED, path); + } else if (event.action === nsfw.actions.MODIFIED) { + this.pushFileChange(fileChangeCollection, FileChangeType.UPDATED, path); + } + } + })); + const changes = fileChangeCollection.values(); + // If all changes are part of the ignored files, the collection will be empty. + if (changes.length > 0) { + this.fileSystemWatcherClient.onDidFilesChanged2({ + clients: this.getClientIds(), + changes, + }); + } + }); + } + } + + protected async resolveEventPath(directory: string, file: string): Promise { + const path = join(directory, file); try { - return fs.realpathSync(path); + return await fsp.realpath(path); } catch { try { // file does not exist try to resolve directory - return paths.join(fs.realpathSync(directory), file); + return join(await fsp.realpath(directory), file); } catch { // directory does not exist fall back to symlink return path; @@ -222,28 +296,196 @@ export class NsfwFileSystemWatcherServer implements FileSystemWatcherServer { } } + protected pushFileChange(changes: FileChangeCollection, type: FileChangeType, path: string): void { + if (!this.isIgnored(path)) { + const uri = FileUri.create(path).toString(); + changes.push({ type, uri }); + } + } + + protected fireError(): void { + this.fileSystemWatcherClient.onError2({ + clients: this.getClientIds(), + uri: this.fsPath, + }); + } + + /** + * Wrap a promise to reject as soon as this handle gets disposed. + */ + protected async orCancel(promise: Promise): Promise { + return Promise.race([this.deferredDisposalDeferred.promise, promise]); + } + + /** + * When references hit zero, we'll schedule disposal for a bit later. + * + * This allows new references to reuse this watcher instead of creating a new one. + * + * e.g. A frontend disconnects for a few milliseconds before reconnecting again. + */ + protected onRefsReachZero(): void { + this.deferredDisposalTimer = setTimeout(() => this._dispose(), this.deferredDisposalTimeout); + } + + /** + * If we get new references after hitting zero, let's unschedule our disposal and keep watching. + */ + protected onRefsRevive(): void { + if (this.deferredDisposalTimer) { + clearTimeout(this.deferredDisposalTimer); + this.deferredDisposalTimer = undefined; + } + } + + protected isIgnored(path: string): boolean { + return this.watcherOptions.ignored.length > 0 + && this.watcherOptions.ignored.some(m => m.match(path)); + } + + /** + * Internal disposal mechanism. + */ + protected async _dispose(): Promise { + if (!this.disposed) { + this.disposed = true; + this.deferredDisposalDeferred.reject(WatcherDisposal); + if (this.nsfw) { + this.stopNsfw(this.nsfw); + this.nsfw = undefined; + } + this.debug('DISPOSED', `fsPath=${this.fsPath}`); + } + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + protected info(prefix: string, message: string, ...params: any[]): void { + this.nsfwFileSystemWatchServerOptions.info(`${prefix} NsfwWatcher(${this.debugId}): ${message}`, ...params); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + protected debug(prefix: string, message: string, ...params: any[]): void { + if (this.nsfwFileSystemWatchServerOptions.verbose) { + this.info(prefix, message, ...params); + } + } +} + +/** + * Each time a client makes a watchRequest, we generate a unique watcherId for it. + * + * This watcherId will map to this handle type which keeps track of the clientId that made the request. + */ +export interface NsfwWatcherHandle { + clientId: number; + watcher: NsfwWatcher; +} + +export class NsfwFileSystemWatcherServer implements FileSystemWatcherServer2 { + + protected client: FileSystemWatcherClient2 | undefined; + + protected watcherId = 0; + protected readonly watchers = new Map(); + protected readonly watcherHandles = new Map(); + + protected readonly options: NsfwFileSystemWatcherServerOptions; + + /** + * `this.client` is undefined until someone sets it. + */ + protected readonly maybeClient: FileSystemWatcherClient2 = { + onDidFilesChanged2: event => { + if (this.client) { + this.client.onDidFilesChanged2(event); + } + }, + onError2: event => { + if (this.client) { + this.client.onError2(event); + } + }, + }; + + constructor(options?: Partial) { + this.options = { + nsfwOptions: {}, + verbose: false, + info: (message, ...args) => console.info(message, ...args), + error: (message, ...args) => console.error(message, ...args), + ...options + }; + } + + setClient(client: FileSystemWatcherClient2 | undefined): void { + this.client = client; + } + /** - * Fires file changes to clients. - * It is debounced in the case if the filesystem is spamming to avoid overwhelming clients with events. + * A specific client requests us to watch a given `uri` according to some `options`. + * + * We internally re-use all the same `(uri, options)` pairs. */ - protected readonly fireDidFilesChanged: () => void = debounce(() => this.doFireDidFilesChanged(), 50); - protected doFireDidFilesChanged(): void { - const changes = this.changes.values(); - this.changes = new FileChangeCollection(); - const event = { changes }; - if (this.client) { - this.client.onDidFilesChanged(event); + async watchFileChanges2(clientId: number, uri: string, options?: WatchOptions): Promise { + const resolvedOptions = this.resolveWatchOptions(options); + const watcherKey = this.getWatcherKey(uri, resolvedOptions); + let watcher = this.watchers.get(watcherKey); + if (typeof watcher === 'undefined') { + const fsPath = FileUri.fsPath(uri); + const watcherOptions: NsfwWatcherOptions = { + ignored: resolvedOptions.ignored + .map(pattern => new Minimatch(pattern, { dot: true })), + }; + watcher = new NsfwWatcher(clientId, fsPath, watcherOptions, this.options, this.maybeClient); + watcher.whenDisposed.then(() => this.watchers.delete(watcherKey)); + this.watchers.set(watcherKey, watcher); + } else { + watcher.addRef(clientId); + } + const watcherId = this.watcherId++; + this.watcherHandles.set(watcherId, { clientId, watcher }); + watcher.whenDisposed.then(() => this.watcherHandles.delete(watcherId)); + return watcherId; + } + + async unwatchFileChanges2(watcherId: number): Promise { + const handle = this.watcherHandles.get(watcherId); + if (typeof handle === 'undefined') { + console.warn(`tried to de-allocate a disposed watcher: watcherId=${watcherId}`); + } else { + this.watcherHandles.delete(watcherId); + handle.watcher.removeRef(handle.clientId); } } - protected isIgnored(watcherId: number, path: string): boolean { - const options = this.watcherOptions.get(watcherId); - return !!options && options.ignored.length > 0 && options.ignored.some(m => m.match(path)); + /** + * Given some `URI` and some `WatchOptions`, generate a unique key. + */ + protected getWatcherKey(uri: string, options: WatchOptions): string { + return [ + uri, + options.ignored.slice(0).sort().join() // use a **sorted copy** of `ignored` as part of the key + ].join(); + } + + /** + * Return fully qualified options. + */ + protected resolveWatchOptions(options?: WatchOptions): WatchOptions { + return { + ignored: [], + ...options, + }; } + // eslint-disable-next-line @typescript-eslint/no-explicit-any protected debug(message: string, ...params: any[]): void { if (this.options.verbose) { this.options.info(message, ...params); } } + + dispose(): void { + // Singletons shouldn't be disposed... + } } diff --git a/packages/filesystem/src/typings/nsfw/index.d.ts b/packages/filesystem/src/typings/nsfw/index.d.ts index 141cf175e7e56..b95b35ca80cca 100644 --- a/packages/filesystem/src/typings/nsfw/index.d.ts +++ b/packages/filesystem/src/typings/nsfw/index.d.ts @@ -13,4 +13,5 @@ * * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ -/// + +export * from '@theia/core/src/typings/nsfw';