diff --git a/examples/api-samples/src/browser/api-samples-frontend-module.ts b/examples/api-samples/src/browser/api-samples-frontend-module.ts index 1a397ab0a4852..fc95418cdf58e 100644 --- a/examples/api-samples/src/browser/api-samples-frontend-module.ts +++ b/examples/api-samples/src/browser/api-samples-frontend-module.ts @@ -19,10 +19,12 @@ import { bindDynamicLabelProvider } from './label/sample-dynamic-label-provider- import { bindSampleUnclosableView } from './view/sample-unclosable-view-contribution'; import { bindSampleOutputChannelWithSeverity } from './output/sample-output-channel-with-severity'; import { bindSampleMenu } from './menu/sample-menu-contribution'; +import { bindSampleFileWatching } from './file-watching/sample-file-watching-contribution'; export default new ContainerModule(bind => { bindDynamicLabelProvider(bind); bindSampleUnclosableView(bind); bindSampleOutputChannelWithSeverity(bind); bindSampleMenu(bind); + bindSampleFileWatching(bind); }); diff --git a/examples/api-samples/src/browser/file-watching/sample-file-watching-contribution.ts b/examples/api-samples/src/browser/file-watching/sample-file-watching-contribution.ts new file mode 100644 index 0000000000000..cac0300650e5f --- /dev/null +++ b/examples/api-samples/src/browser/file-watching/sample-file-watching-contribution.ts @@ -0,0 +1,50 @@ +/******************************************************************************** + * Copyright (C) 2020 Ericsson and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ + +import { injectable, inject, interfaces } from 'inversify'; +import { FrontendApplicationContribution, LabelProvider } from '@theia/core/lib/browser'; +import { FileService } from '@theia/filesystem/lib/browser/file-service'; +import { WorkspaceService } from '@theia/workspace/lib/browser'; + +export function bindSampleFileWatching(bind: interfaces.Bind): void { + bind(FrontendApplicationContribution).to(SampleFileWatchingContribution).inSingletonScope(); +} + +@injectable() +export class SampleFileWatchingContribution implements FrontendApplicationContribution { + + @inject(FileService) + protected readonly files: FileService; + + @inject(LabelProvider) + protected readonly label: LabelProvider; + + @inject(WorkspaceService) + protected readonly workspace: WorkspaceService; + + onStart(): void { + this.files.onDidFilesChange(event => { + // Get the workspace roots for the current frontend: + const roots = this.workspace.tryGetRoots(); + // Create some name to help find out which frontend logged the message: + const workspace = roots.length > 0 + ? roots.map(root => this.label.getName(root.resource)).join('+') + : ''; + console.log(`Sample File Watching: ${event.changes.length} file(s) changed! ${workspace}`); + }); + } + +} diff --git a/packages/core/src/common/promise-util.ts b/packages/core/src/common/promise-util.ts index fc67564242844..25c3f2e35dcda 100644 --- a/packages/core/src/common/promise-util.ts +++ b/packages/core/src/common/promise-util.ts @@ -59,3 +59,33 @@ export async function retry(task: () => Promise, delay: number, retries: n throw lastError; } + +export class AsyncLock { + + /** + * Reference to the last registered promise in the chain. + * + * We'll keep on adding with each call to `acquire` to queue other tasks. + */ + protected queue: Promise = Promise.resolve(); + + /** + * This method will queue the execution of `callback` behind previous calls to `acquire`. + * + * This is useful to ensure that only one task handles a resource at a time. + * + * @param callback task to execute once previous ones finished. + * @param args to pass to your callback. + * @returns callback's result. + */ + async acquire(callback: () => PromiseLike): Promise; + // eslint-disable-next-line space-before-function-paren, @typescript-eslint/no-explicit-any + async acquire(callback: (...args: U) => PromiseLike, ...args: U): Promise { + return new Promise((resolve, reject) => { + this.queue = this.queue.then( + () => callback(...args).then(resolve, reject) + ); + }); + } + +} diff --git a/packages/filesystem/compile.tsconfig.json b/packages/filesystem/compile.tsconfig.json index 324e2d66da5ab..6bf6e997229f6 100644 --- a/packages/filesystem/compile.tsconfig.json +++ b/packages/filesystem/compile.tsconfig.json @@ -9,6 +9,9 @@ "mv": [ "src/typings/mv" ], + "nsfw": [ + "src/typings/nsfw" + ], "trash": [ "src/typings/trash" ] diff --git a/packages/filesystem/src/browser/file-service.ts b/packages/filesystem/src/browser/file-service.ts index de52a82846939..eec770c498182 100644 --- a/packages/filesystem/src/browser/file-service.ts +++ b/packages/filesystem/src/browser/file-service.ts @@ -65,7 +65,6 @@ import { UTF8, UTF8_with_bom } from '@theia/core/lib/common/encodings'; import { EncodingService, ResourceEncoding, DecodeStreamResult } from '@theia/core/lib/common/encoding-service'; import { Mutable } from '@theia/core/lib/common/types'; import { readFileIntoStream } from '../common/io'; -import { FileSystemWatcherErrorHandler } from './filesystem-watcher-error-handler'; export interface FileOperationParticipant { @@ -248,9 +247,6 @@ export class FileService { @inject(ContributionProvider) @named(FileServiceContribution) protected readonly contributions: ContributionProvider; - @inject(FileSystemWatcherErrorHandler) - protected readonly watcherErrorHandler: FileSystemWatcherErrorHandler; - @postConstruct() protected init(): void { for (const contribution of this.contributions.getContributions()) { @@ -312,7 +308,6 @@ export class FileService { const providerDisposables = new DisposableCollection(); providerDisposables.push(provider.onDidChangeFile(changes => this.onDidFilesChangeEmitter.fire(new FileChangesEvent(changes)))); - providerDisposables.push(provider.onFileWatchError(() => this.handleFileWatchError())); providerDisposables.push(provider.onDidChangeCapabilities(() => this.onDidChangeFileSystemProviderCapabilitiesEmitter.fire({ provider, scheme }))); return Disposable.create(() => { @@ -767,7 +762,7 @@ export class FileService { * that only the mtime is an indicator for a file that has changed on disk. * * Second, if the mtime has advanced, we compare the size of the file on disk with our previous - * one using the etag() function. Relying only on the mtime check has proven to produce false + * one using the etag() function. Relying only on the mtime check has prooven to produce false * positives due to file system weirdness (especially around remote file systems). As such, the * check for size is a weaker check because it can return a false negative if the file has changed * but to the same length. This is a compromise we take to avoid having to produce checksums of @@ -1024,7 +1019,7 @@ export class FileService { // validation const { exists, isSameResourceWithDifferentPathCase } = await this.doValidateMoveCopy(sourceProvider, source, targetProvider, target, mode, overwrite); - // delete as needed (unless target is same resource with different path case) + // delete as needed (unless target is same resurce with different path case) if (exists && !isSameResourceWithDifferentPathCase && overwrite) { await this.delete(target, { recursive: true }); } @@ -1680,7 +1675,4 @@ export class FileService { // #endregion - protected handleFileWatchError(): void { - this.watcherErrorHandler.handleError(); - } } diff --git a/packages/filesystem/src/common/filesystem-watcher-protocol.ts b/packages/filesystem/src/common/filesystem-watcher-protocol.ts index c2c3440afff55..29aa20eb31914 100644 --- a/packages/filesystem/src/common/filesystem-watcher-protocol.ts +++ b/packages/filesystem/src/common/filesystem-watcher-protocol.ts @@ -19,6 +19,47 @@ import { JsonRpcServer, JsonRpcProxy } from '@theia/core'; import { FileChangeType } from './files'; export { FileChangeType }; +export const FileSystemWatcherServer2 = Symbol('FileSystemWatcherServer2'); +/** + * Singleton implementation of the watch server. + * + * Since multiple clients all make requests to this service, we need to track those individually via a `clientId`. + */ +export interface FileSystemWatcherServer2 extends JsonRpcServer { + /** + * @param clientId arbitrary id used to identify a client. + * @param uri the path to watch. + * @param options optional parameters. + * @returns promise to a unique `number` handle for this request. + */ + watchFileChanges2(clientId: number, uri: string, options?: WatchOptions): Promise; + /** + * @param watcherId handle mapping to a previous `watchFileChanges` request. + */ + unwatchFileChanges2(watcherId: number): Promise; +} + +export interface FileSystemWatcherClient2 { + /** Listen for change events emitted by the watcher. */ + onDidFilesChanged2(event: DidFilesChangedParams2): void; + /** The watcher can crash in certain conditions. */ + onError2(event: FileSystemWatcherErrorParams2): void; +} + +export interface DidFilesChangedParams2 { + /** Clients to route the events to. */ + clients: number[]; + /** FileSystem changes that occured. */ + changes: FileChange[]; +} + +export interface FileSystemWatcherErrorParams2 { + /** Clients to route the events to. */ + clients: number[]; + /** The uri that originated the error. */ + uri: string; +} + export const FileSystemWatcherServer = Symbol('FileSystemWatcherServer'); export interface FileSystemWatcherServer extends JsonRpcServer { /** @@ -32,7 +73,7 @@ export interface FileSystemWatcherServer extends JsonRpcServer; + unwatchFileChanges(watcherId: number): Promise; } export interface FileSystemWatcherClient { @@ -63,6 +104,9 @@ export interface FileChange { export const FileSystemWatcherServerProxy = Symbol('FileSystemWatcherServerProxy'); export type FileSystemWatcherServerProxy = JsonRpcProxy; +/** + * @deprecated not used internally anymore. + */ @injectable() export class ReconnectingFileSystemWatcherServer implements FileSystemWatcherServer { diff --git a/packages/filesystem/src/common/remote-file-system-provider.ts b/packages/filesystem/src/common/remote-file-system-provider.ts index b823ec6b40e07..d9da728a49f55 100644 --- a/packages/filesystem/src/common/remote-file-system-provider.ts +++ b/packages/filesystem/src/common/remote-file-system-provider.ts @@ -27,7 +27,6 @@ import { } from './files'; import { JsonRpcServer, JsonRpcProxy, JsonRpcProxyFactory } from '@theia/core/lib/common/messaging/proxy-factory'; import { ApplicationError } from '@theia/core/lib/common/application-error'; -import { Deferred } from '@theia/core/lib/common/promise-util'; import type { TextDocumentContentChangeEvent } from 'vscode-languageserver-protocol'; import { newWriteableStream, ReadableStreamEvents } from '@theia/core/lib/common/stream'; import { CancellationToken, cancelled } from '@theia/core/lib/common/cancellation'; @@ -103,6 +102,11 @@ export class RemoteFileSystemProxyFactory extends JsonRpcProxy } } +/** + * Frontend component. + * + * Wraps the remote filesystem provider living on the backend. + */ @injectable() export class RemoteFileSystemProvider implements Required, Disposable { @@ -129,6 +133,10 @@ export class RemoteFileSystemProvider implements Required, D ); protected watcherSequence = 0; + /** + * We'll track the currently allocated watchers, in order to re-allocate them + * with the same options once we reconnect to the backend after a disconnection. + */ protected readonly watchOptions = new Map, D private _capabilities: FileSystemProviderCapabilities = 0; get capabilities(): FileSystemProviderCapabilities { return this._capabilities; } - protected readonly deferredReady = new Deferred(); - get ready(): Promise { - return this.deferredReady.promise; - } + readonly ready: Promise; + /** + * Wrapped remote filesystem. + */ @inject(RemoteFileSystemServer) protected readonly server: JsonRpcProxy; @postConstruct() protected init(): void { - this.server.getCapabilities().then(capabilities => { + (this.ready as Promise) = this.server.getCapabilities().then(capabilities => { this._capabilities = capabilities; - this.deferredReady.resolve(undefined); - }, this.deferredReady.reject); + }); this.server.setClient({ notifyDidChangeFile: ({ changes }) => { this.onDidChangeFileEmitter.fire(changes.map(event => ({ resource: new URI(event.resource), type: event.type }))); @@ -286,19 +293,23 @@ export class RemoteFileSystemProvider implements Required, D } watch(resource: URI, options: WatchOptions): Disposable { - const watcher = this.watcherSequence++; + const watcherId = this.watcherSequence++; const uri = resource.toString(); - this.watchOptions.set(watcher, { uri, options }); - this.server.watch(watcher, uri, options); - + this.watchOptions.set(watcherId, { uri, options }); + this.server.watch(watcherId, uri, options); const toUnwatch = Disposable.create(() => { - this.watchOptions.delete(watcher); - this.server.unwatch(watcher); + this.watchOptions.delete(watcherId); + this.server.unwatch(watcherId); }); this.toDispose.push(toUnwatch); return toUnwatch; } + /** + * When a frontend disconnects (e.g. bad connection) the backend resources will be cleared. + * + * This means that we need to re-allocate the watchers when a frontend reconnects. + */ protected reconnect(): void { for (const [watcher, { uri, options }] of this.watchOptions.entries()) { this.server.watch(watcher, uri, options); @@ -308,6 +319,8 @@ export class RemoteFileSystemProvider implements Required, D } /** + * Backend component. + * * JSON-RPC server exposing a wrapped file system provider remotely. */ @injectable() @@ -315,6 +328,8 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { private readonly BUFFER_SIZE = 64 * 1024; + protected watchers = new Map(); + protected readonly toDispose = new DisposableCollection(); dispose(): void { this.toDispose.dispose(); @@ -325,6 +340,9 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { this.client = client; } + /** + * Wrapped file system provider. + */ @inject(FileSystemProvider) protected readonly provider: FileSystemProvider & Partial; @@ -450,18 +468,19 @@ export class FileSystemProviderServer implements RemoteFileSystemServer { throw new Error('not supported'); } - protected watchers = new Map(); - - async watch(req: number, resource: string, opts: WatchOptions): Promise { + async watch(requestedWatcherId: number, resource: string, opts: WatchOptions): Promise { + if (this.watchers.has(requestedWatcherId)) { + throw new Error('watcher id is already allocated!'); + } const watcher = this.provider.watch(new URI(resource), opts); - this.watchers.set(req, watcher); - this.toDispose.push(Disposable.create(() => this.unwatch(req))); + this.watchers.set(requestedWatcherId, watcher); + this.toDispose.push(Disposable.create(() => this.unwatch(requestedWatcherId))); } - async unwatch(req: number): Promise { - const watcher = this.watchers.get(req); + async unwatch(watcherId: number): Promise { + const watcher = this.watchers.get(watcherId); if (watcher) { - this.watchers.delete(req); + this.watchers.delete(watcherId); watcher.dispose(); } } diff --git a/packages/filesystem/src/node/disk-file-system-provider.ts b/packages/filesystem/src/node/disk-file-system-provider.ts index 341c6c6c60636..d1a955047f430 100644 --- a/packages/filesystem/src/node/disk-file-system-provider.ts +++ b/packages/filesystem/src/node/disk-file-system-provider.ts @@ -804,18 +804,35 @@ export class DiskFileSystemProvider implements Disposable, // #region File Watching watch(resource: URI, opts: WatchOptions): Disposable { - const toUnwatch = new DisposableCollection(Disposable.create(() => { /* mark as not disposed */ })); - this.watcher.watchFileChanges(resource.toString(), { + const watcherService = this.watcher; + /** + * Disposable handle. Can be disposed early (before the watcher is allocated.) + */ + const handle = { + disposed: false, + watcherId: undefined as number | undefined, + dispose(): void { + if (this.disposed) { + return; + } + if (this.watcherId !== undefined) { + watcherService.unwatchFileChanges(this.watcherId); + } + this.disposed = true; + }, + }; + watcherService.watchFileChanges(resource.toString(), { + // Convert from `files.WatchOptions` to internal `watcher-protocol.WatchOptions`: ignored: opts.excludes - }).then(watcher => { - if (toUnwatch.disposed) { - this.watcher.unwatchFileChanges(watcher); + }).then(watcherId => { + if (handle.disposed) { + watcherService.unwatchFileChanges(watcherId); } else { - toUnwatch.push(Disposable.create(() => this.watcher.unwatchFileChanges(watcher))); + handle.watcherId = watcherId; } }); - this.toDispose.push(toUnwatch); - return toUnwatch; + this.toDispose.push(handle); + return handle; } // #endregion diff --git a/packages/filesystem/src/node/filesystem-backend-module.ts b/packages/filesystem/src/node/filesystem-backend-module.ts index 574ed49a513b2..6914967a2da4e 100644 --- a/packages/filesystem/src/node/filesystem-backend-module.ts +++ b/packages/filesystem/src/node/filesystem-backend-module.ts @@ -14,9 +14,10 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ +import * as path from 'path'; import { ContainerModule, interfaces } from 'inversify'; import { ConnectionHandler, JsonRpcConnectionHandler, ILogger } from '@theia/core/lib/common'; -import { FileSystemWatcherServer } from '../common/filesystem-watcher-protocol'; +import { FileSystemWatcherServer, FileSystemWatcherServer2 } from '../common/filesystem-watcher-protocol'; import { FileSystemWatcherServerClient } from './filesystem-watcher-client'; import { NsfwFileSystemWatcherServer } from './nsfw-watcher/nsfw-filesystem-watcher'; import { MessagingService } from '@theia/core/lib/node/messaging/messaging-service'; @@ -28,25 +29,67 @@ import { } from '../common/remote-file-system-provider'; import { FileSystemProvider } from '../common/files'; import { EncodingService } from '@theia/core/lib/common/encoding-service'; +import { IPCConnectionProvider } from '@theia/core/lib/node'; +import { JsonRpcProxyFactory, ConnectionErrorHandler } from '@theia/core'; +import { FileSystemWatcherServer2Dispatcher } from './filesystem-watcher-dispatcher'; const SINGLE_THREADED = process.argv.indexOf('--no-cluster') !== -1; +const NSFW_WATCHER_VERBOSE = process.argv.indexOf('--nsfw-watcher-verbose') !== -1; export function bindFileSystemWatcherServer(bind: interfaces.Bind, { singleThreaded }: { singleThreaded: boolean } = { singleThreaded: SINGLE_THREADED }): void { - bind(NsfwOptions).toConstantValue({}); + bind(NsfwOptions).toConstantValue({ debounceMS: 100 }); + + bind(FileSystemWatcherServer2Dispatcher).toSelf().inSingletonScope(); + + bind(FileSystemWatcherServerClient).toSelf(); + bind(FileSystemWatcherServer).toService(FileSystemWatcherServerClient); if (singleThreaded) { - bind(FileSystemWatcherServer).toDynamicValue(ctx => { + // Bind and run the watch server in the current process: + bind(FileSystemWatcherServer2).toDynamicValue(ctx => { const logger = ctx.container.get(ILogger); const nsfwOptions = ctx.container.get(NsfwOptions); - return new NsfwFileSystemWatcherServer({ + const dispatcher = ctx.container.get(FileSystemWatcherServer2Dispatcher); + const server = new NsfwFileSystemWatcherServer({ nsfwOptions, + verbose: NSFW_WATCHER_VERBOSE, info: (message, ...args) => logger.info(message, ...args), error: (message, ...args) => logger.error(message, ...args) }); - }); + server.setClient(dispatcher); + return server; + }).inSingletonScope(); } else { - bind(FileSystemWatcherServerClient).toSelf(); - bind(FileSystemWatcherServer).toService(FileSystemWatcherServerClient); + // Run the watch server in a child process. + // Bind to a proxy forwarding calls to the child process. + bind(FileSystemWatcherServer2).toDynamicValue(ctx => { + const serverName = 'nsfw-watcher'; + const logger = ctx.container.get(ILogger); + const nsfwOptions = ctx.container.get(NsfwOptions); + const ipcConnectionProvider = ctx.container.get(IPCConnectionProvider); + const dispatcher = ctx.container.get(FileSystemWatcherServer2Dispatcher); + const proxyFactory = new JsonRpcProxyFactory(); + const serverProxy = proxyFactory.createProxy(); + // We need to call `.setClient` before listening, else the JSON-RPC calls won't go through. + serverProxy.setClient(dispatcher); + const args: string[] = [ + `--nsfwOptions=${JSON.stringify(nsfwOptions)}` + ]; + if (NSFW_WATCHER_VERBOSE) { + args.push('--verbose'); + } + ipcConnectionProvider.listen({ + serverName, + entryPoint: path.resolve(__dirname, serverName), + errorHandler: new ConnectionErrorHandler({ + serverName, + logger, + }), + env: process.env, + args, + }, connection => proxyFactory.listen(connection)); + return serverProxy; + }).inSingletonScope(); } } @@ -65,7 +108,6 @@ export default new ContainerModule(bind => { return server; }, RemoteFileSystemProxyFactory) ).inSingletonScope(); - bind(NodeFileUploadService).toSelf().inSingletonScope(); bind(MessagingService.Contribution).toService(NodeFileUploadService); }); diff --git a/packages/filesystem/src/node/filesystem-watcher-client.ts b/packages/filesystem/src/node/filesystem-watcher-client.ts index 8e5fa512facdf..088449111880e 100644 --- a/packages/filesystem/src/node/filesystem-watcher-client.ts +++ b/packages/filesystem/src/node/filesystem-watcher-client.ts @@ -14,74 +14,59 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ -import * as path from 'path'; import { injectable, inject } from 'inversify'; -import { JsonRpcProxyFactory, ILogger, ConnectionErrorHandler, DisposableCollection, Disposable } from '@theia/core'; -import { IPCConnectionProvider } from '@theia/core/lib/node/messaging'; -import { FileSystemWatcherServer, WatchOptions, FileSystemWatcherClient, ReconnectingFileSystemWatcherServer } from '../common/filesystem-watcher-protocol'; -import { NsfwOptions } from './nsfw-watcher/nsfw-options'; +import { FileSystemWatcherServer, WatchOptions, FileSystemWatcherClient, FileSystemWatcherServer2 } from '../common/filesystem-watcher-protocol'; +import { FileSystemWatcherServer2Dispatcher } from './filesystem-watcher-dispatcher'; export const NSFW_WATCHER = 'nsfw-watcher'; +/** + * Wraps the NSFW singleton server for each frontend. + */ @injectable() export class FileSystemWatcherServerClient implements FileSystemWatcherServer { - protected readonly proxyFactory = new JsonRpcProxyFactory(); - protected readonly remote = new ReconnectingFileSystemWatcherServer(this.proxyFactory.createProxy()); + protected static clientIdSequence = 0; - protected readonly toDispose = new DisposableCollection(); + /** + * Track allocated watcherIds to de-allocate on disposal. + */ + protected watcherIds = new Set(); - constructor( - @inject(ILogger) protected readonly logger: ILogger, - @inject(IPCConnectionProvider) protected readonly ipcConnectionProvider: IPCConnectionProvider, - @inject(NsfwOptions) protected readonly nsfwOptions: NsfwOptions - ) { - this.remote.setClient({ - onDidFilesChanged: e => { - if (this.client) { - this.client.onDidFilesChanged(e); - } - }, - onError: () => { - if (this.client) { - this.client.onError(); - } - } - }); - this.toDispose.push(this.remote); - this.toDispose.push(this.listen()); - } + /** + * @todo make this number precisely map to one specific frontend. + */ + protected readonly clientId = FileSystemWatcherServerClient.clientIdSequence++; - dispose(): void { - this.toDispose.dispose(); - } + @inject(FileSystemWatcherServer2Dispatcher) + protected readonly watcherDispatcher: FileSystemWatcherServer2Dispatcher; - watchFileChanges(uri: string, options?: WatchOptions): Promise { - return this.remote.watchFileChanges(uri, options); + @inject(FileSystemWatcherServer2) + protected readonly watcherServer: FileSystemWatcherServer2; + + async watchFileChanges(uri: string, options?: WatchOptions): Promise { + const watcherId = await this.watcherServer.watchFileChanges2(this.clientId, uri, options); + this.watcherIds.add(watcherId); + return watcherId; } - unwatchFileChanges(watcher: number): Promise { - return this.remote.unwatchFileChanges(watcher); + async unwatchFileChanges(watcherId: number): Promise { + this.watcherIds.delete(watcherId); + return this.watcherServer.unwatchFileChanges2(watcherId); } - protected client: FileSystemWatcherClient | undefined; setClient(client: FileSystemWatcherClient | undefined): void { - this.client = client; + if (client !== undefined) { + this.watcherDispatcher.registerClient(this.clientId, client); + } else { + this.watcherDispatcher.unregisterClient(this.clientId); + } } - protected listen(): Disposable { - return this.ipcConnectionProvider.listen({ - serverName: NSFW_WATCHER, - entryPoint: path.resolve(__dirname, NSFW_WATCHER), - errorHandler: new ConnectionErrorHandler({ - serverName: NSFW_WATCHER, - logger: this.logger - }), - args: [ - `--nsfwOptions=${JSON.stringify(this.nsfwOptions)}` - ], - env: process.env - }, connection => this.proxyFactory.listen(connection)); + dispose(): void { + this.setClient(undefined); + for (const watcherId of this.watcherIds) { + this.unwatchFileChanges(watcherId); + } } - } diff --git a/packages/filesystem/src/node/filesystem-watcher-dispatcher.ts b/packages/filesystem/src/node/filesystem-watcher-dispatcher.ts new file mode 100644 index 0000000000000..e4a0ed517b4a9 --- /dev/null +++ b/packages/filesystem/src/node/filesystem-watcher-dispatcher.ts @@ -0,0 +1,82 @@ +/******************************************************************************** + * Copyright (C) 2020 Ericsson and others. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the Eclipse + * Public License v. 2.0 are satisfied: GNU General Public License, version 2 + * with the GNU Classpath Exception which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + ********************************************************************************/ + +import { injectable } from 'inversify'; +import { + FileSystemWatcherClient, FileSystemWatcherClient2, + DidFilesChangedParams2, FileSystemWatcherErrorParams2 +} from '../common/filesystem-watcher-protocol'; + +/** + * This component routes watch events to the right clients. + */ +@injectable() +export class FileSystemWatcherServer2Dispatcher implements FileSystemWatcherClient2 { + + /** + * Mapping of `clientId` to actual clients. + */ + protected readonly clients = new Map(); + + onDidFilesChanged2(event: DidFilesChangedParams2): void { + for (const client of this.iterRegisteredClients(event.clients)) { + client.onDidFilesChanged(event); + } + } + + onError2(event: FileSystemWatcherErrorParams2): void { + for (const client of this.iterRegisteredClients(event.clients)) { + client.onError(); + } + } + + /** + * Listen for events targeted at `clientId`. + */ + registerClient(clientId: number, client: FileSystemWatcherClient): void { + if (this.clients.has(clientId)) { + console.warn(`FileSystemWatcherServer2Dispatcher: a client was already registered! clientId=${clientId}`); + } + this.clients.set(clientId, client); + } + + unregisterClient(clientId: number): void { + if (!this.clients.has(clientId)) { + console.warn(`FileSystemWatcherServer2Dispatcher: tried to remove unknown client! clientId=${clientId}`); + } + this.clients.delete(clientId); + } + + /** + * Only yield registered clients for the given `clientIds`. + * + * If clientIds is empty, will return all clients. + */ + protected *iterRegisteredClients(clientIds?: number[]): Iterable { + if (!Array.isArray(clientIds) || clientIds.length === 0) { + // If we receive an event targeted to "no client", + // interpret that as notifying all clients: + yield* this.clients.values(); + } else { + for (const clientId of clientIds) { + const client = this.clients.get(clientId); + if (client !== undefined) { + yield client; + } + } + } + } +} diff --git a/packages/filesystem/src/node/nsfw-watcher/index.ts b/packages/filesystem/src/node/nsfw-watcher/index.ts index fcf9701e686cf..66f6591e634e8 100644 --- a/packages/filesystem/src/node/nsfw-watcher/index.ts +++ b/packages/filesystem/src/node/nsfw-watcher/index.ts @@ -16,7 +16,7 @@ import * as yargs from 'yargs'; import { JsonRpcProxyFactory } from '@theia/core'; -import { FileSystemWatcherClient } from '../../common/filesystem-watcher-protocol'; +import { FileSystemWatcherClient2 } from '../../common/filesystem-watcher-protocol'; import { NsfwFileSystemWatcherServer } from './nsfw-filesystem-watcher'; import { IPCEntryPoint } from '@theia/core/lib/node/messaging/ipc-protocol'; @@ -39,7 +39,7 @@ const options: { export default (connection => { const server = new NsfwFileSystemWatcherServer(options); - const factory = new JsonRpcProxyFactory(server); + const factory = new JsonRpcProxyFactory(server); server.setClient(factory.createProxy()); factory.listen(connection); }); diff --git a/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.spec.ts b/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.spec.ts index 22f7afad1856e..ceed218010f14 100644 --- a/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.spec.ts +++ b/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.spec.ts @@ -38,7 +38,7 @@ describe('nsfw-filesystem-watcher', function (): void { beforeEach(async () => { root = FileUri.create(fs.realpathSync(temp.mkdirSync('node-fs-root'))); watcherServer = createNsfwFileSystemWatcherServer(); - watcherId = await watcherServer.watchFileChanges(root.toString()); + watcherId = await watcherServer.watchFileChanges2(0, root.toString()); await sleep(2000); }); @@ -50,15 +50,14 @@ describe('nsfw-filesystem-watcher', function (): void { it('Should receive file changes events from in the workspace by default.', async function (): Promise { if (process.platform === 'win32') { this.skip(); - return; } const actualUris = new Set(); const watcherClient = { - onDidFilesChanged(event: DidFilesChangedParams): void { + onDidFilesChanged2(event: DidFilesChangedParams): void { event.changes.forEach(c => actualUris.add(c.uri.toString())); }, - onError(): void { + onError2(): void { } }; watcherServer.setClient(watcherClient); @@ -87,21 +86,20 @@ describe('nsfw-filesystem-watcher', function (): void { it('Should not receive file changes events from in the workspace by default if unwatched', async function (): Promise { if (process.platform === 'win32') { this.skip(); - return; } const actualUris = new Set(); const watcherClient = { - onDidFilesChanged(event: DidFilesChangedParams): void { + onDidFilesChanged2(event: DidFilesChangedParams): void { event.changes.forEach(c => actualUris.add(c.uri.toString())); }, - onError(): void { + onError2(): void { } }; watcherServer.setClient(watcherClient); /* Unwatch root */ - watcherServer.unwatchFileChanges(watcherId); + watcherServer.unwatchFileChanges2(watcherId); fs.mkdirSync(FileUri.fsPath(root.resolve('foo'))); expect(fs.statSync(FileUri.fsPath(root.resolve('foo'))).isDirectory()).to.be.true; diff --git a/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.ts b/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.ts index d35299aeded78..55d3e7d9ff0d4 100644 --- a/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.ts +++ b/packages/filesystem/src/node/nsfw-watcher/nsfw-filesystem-watcher.ts @@ -14,207 +14,281 @@ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ -import * as fs from 'fs'; import * as nsfw from 'nsfw'; -import * as paths from 'path'; +import { join } from 'path'; +import { promises as fsp } from 'fs'; import { IMinimatch, Minimatch } from 'minimatch'; -import { Disposable, DisposableCollection } from '@theia/core/lib/common/disposable'; import { FileUri } from '@theia/core/lib/node/file-uri'; import { - FileChangeType, - FileSystemWatcherClient, - FileSystemWatcherServer, - WatchOptions + FileChangeType, FileSystemWatcherServer2, FileSystemWatcherClient2, WatchOptions } from '../../common/filesystem-watcher-protocol'; import { FileChangeCollection } from '../file-change-collection'; -import { setInterval, clearInterval } from 'timers'; - -const debounce = require('lodash.debounce'); - -/* eslint-disable @typescript-eslint/no-explicit-any */ +import { Deferred, AsyncLock } from '@theia/core/lib/common/promise-util'; +/** NsfwWatcherOptions */ export interface WatcherOptions { ignored: IMinimatch[] } +type NsfwWatcherOptions = WatcherOptions; -export class NsfwFileSystemWatcherServer implements FileSystemWatcherServer { +export interface NsfwFileSystemWatcherServerOptions { + verbose: boolean; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + info: (message: string, ...args: any[]) => void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + error: (message: string, ...args: any[]) => void; + nsfwOptions: nsfw.Options; +} - protected client: FileSystemWatcherClient | undefined; +/** + * This is a flag value passed around upon disposal. + */ +export const WatcherDisposal = Symbol('WatcherDisposal'); - protected watcherSequence = 1; - protected readonly watchers = new Map(); - protected readonly watcherOptions = new Map(); +/** + * Because URIs can be watched by different clients, we'll track + * how many are listening for a given URI. + * + * This component wraps the whole start/stop process given some + * reference count. + * + * Once there are no more references the handle + * will wait for some time before destroying its resources. + */ +export class NsfwWatcher { - protected readonly toDispose = new DisposableCollection( - Disposable.create(() => this.setClient(undefined)) - ); + protected static debugIdSequence = 0; - protected changes = new FileChangeCollection(); + protected disposed = false; - protected readonly options: { - verbose: boolean - info: (message: string, ...args: any[]) => void - error: (message: string, ...args: any[]) => void, - nsfwOptions: nsfw.Options - }; + /** + * Used for debugging to keep track of the watchers. + */ + protected debugId = NsfwWatcher.debugIdSequence++; - constructor(options?: { - verbose?: boolean, - nsfwOptions?: nsfw.Options, - info?: (message: string, ...args: any[]) => void - error?: (message: string, ...args: any[]) => void - }) { - this.options = { - nsfwOptions: {}, - verbose: false, - info: (message, ...args) => console.info(message, ...args), - error: (message, ...args) => console.error(message, ...args), - ...options - }; - } + /** + * When this field is set, it means the nsfw instance was successfully started. + */ + protected nsfw: nsfw.NSFW | undefined; - dispose(): void { - this.toDispose.dispose(); - } - - async watchFileChanges(uri: string, options?: WatchOptions): Promise { - const watcherId = this.watcherSequence++; - const basePath = FileUri.fsPath(uri); - this.debug('Starting watching:', basePath); - const toDisposeWatcher = new DisposableCollection(); - this.watchers.set(watcherId, toDisposeWatcher); - toDisposeWatcher.push(Disposable.create(() => this.watchers.delete(watcherId))); - if (fs.existsSync(basePath)) { - this.start(watcherId, basePath, options, toDisposeWatcher); - } else { - const toClearTimer = new DisposableCollection(); - const timer = setInterval(() => { - if (fs.existsSync(basePath)) { - toClearTimer.dispose(); - this.pushAdded(watcherId, basePath); - this.start(watcherId, basePath, options, toDisposeWatcher); - } - }, 500); - toClearTimer.push(Disposable.create(() => clearInterval(timer))); - toDisposeWatcher.push(toClearTimer); - } - this.toDispose.push(toDisposeWatcher); - return watcherId; - } + /** + * When the ref count hits zero, we schedule this watch handle to be disposed. + */ + protected deferredDisposalTimer: NodeJS.Timer | undefined; - protected async start(watcherId: number, basePath: string, rawOptions: WatchOptions | undefined, toDisposeWatcher: DisposableCollection): Promise { - const options: WatchOptions = { - ignored: [], - ...rawOptions - }; - if (options.ignored.length > 0) { - this.debug('Files ignored for watching', options.ignored); - } + /** + * This deferred only rejects with `WatcherDisposal` and never resolves. + */ + protected readonly deferredDisposalDeferred = new Deferred(); - let watcher: nsfw.NSFW | undefined = await nsfw(fs.realpathSync(basePath), (events: nsfw.ChangeEvent[]) => { - for (const event of events) { - if (event.action === nsfw.actions.CREATED) { - this.pushAdded(watcherId, this.resolvePath(event.directory, event.file!)); - } - if (event.action === nsfw.actions.DELETED) { - this.pushDeleted(watcherId, this.resolvePath(event.directory, event.file!)); - } - if (event.action === nsfw.actions.MODIFIED) { - this.pushUpdated(watcherId, this.resolvePath(event.directory, event.file!)); - } - if (event.action === nsfw.actions.RENAMED) { - this.pushDeleted(watcherId, this.resolvePath(event.directory, event.oldFile!)); - this.pushAdded(watcherId, this.resolvePath(event.newDirectory || event.directory, event.newFile!)); - } - } - }, { - errorCallback: error => { - // see https://github.com/atom/github/issues/342 - console.warn(`Failed to watch "${basePath}":`, error); - if (error === 'Inotify limit reached') { - if (this.client) { - this.client.onError(); - } - } - this.unwatchFileChanges(watcherId); - }, - ...this.options.nsfwOptions - }); - await watcher.start(); - this.options.info('Started watching:', basePath); - if (toDisposeWatcher.disposed) { - this.debug('Stopping watching:', basePath); - await watcher.stop(); - // remove a reference to nsfw otherwise GC cannot collect it - watcher = undefined; - this.options.info('Stopped watching:', basePath); - return; - } - toDisposeWatcher.push(Disposable.create(async () => { - this.watcherOptions.delete(watcherId); - if (watcher) { - this.debug('Stopping watching:', basePath); - await watcher.stop(); - // remove a reference to nsfw otherwise GC cannot collect it - watcher = undefined; - this.options.info('Stopped watching:', basePath); + /** + * We count each reference made to this watcher, per client. + * + * We do this to know where to send events via the network. + * + * An entry should be removed when its value hits zero. + */ + protected readonly refsPerClient = new Map(); + + /** + * Ensures that events are processed in the order they are emitted, + * despite being processed async. + */ + protected readonly nsfwEventProcessingLock = new AsyncLock(); + + /** + * Resolves once this handle disposed itself and its resources. Never throws. + */ + readonly whenDisposed: Promise = this.deferredDisposalDeferred.promise.catch(() => undefined); + + /** + * Promise that resolves when the watcher is fully started, or got disposed. + * + * Will reject if an error occured while starting. + * + * @returns `true` if successfully started, `false` if disposed early. + */ + readonly whenStarted: Promise; + + constructor( + /** Initial reference to this handle. */ + initialClientId: number, + /** Filesystem path to be watched. */ + readonly fsPath: string, + /** Watcher-specific options */ + readonly watcherOptions: NsfwWatcherOptions, + /** Logging and Nsfw options */ + protected readonly nsfwFileSystemWatchServerOptions: NsfwFileSystemWatcherServerOptions, + /** The client to forward events to. */ + protected readonly fileSystemWatcherClient: FileSystemWatcherClient2, + /** Amount of time in ms to wait once this handle ins't referenced anymore. */ + protected readonly deferredDisposalTimeout = 10_000, + ) { + this.refsPerClient.set(initialClientId, { value: 1 }); + this.whenStarted = this.start().then(() => true, error => { + if (error === WatcherDisposal) { + return false; } - })); - this.watcherOptions.set(watcherId, { - ignored: options.ignored.map(pattern => new Minimatch(pattern, { dot: true })) + this._dispose(); + this.fireError(); + throw error; }); + this.debug('NEW', `initialClientId=${initialClientId}, fsPath=${fsPath}`); } - unwatchFileChanges(watcherId: number): Promise { - const disposable = this.watchers.get(watcherId); - if (disposable) { - this.watchers.delete(watcherId); - disposable.dispose(); + addRef(clientId: number): void { + let refs = this.refsPerClient.get(clientId); + if (typeof refs === 'undefined') { + this.refsPerClient.set(clientId, refs = { value: 1 }); + } else { + refs.value += 1; + } + const totalRefs = this.getTotalReferences(); + // If it was zero before, 1 means we were revived: + const revived = totalRefs === 1; + if (revived) { + this.onRefsRevive(); } - return Promise.resolve(); + this.debug('REF++', `clientId=${clientId}, clientRefs=${refs.value}, totalRefs=${totalRefs}. revived=${revived}`); } - setClient(client: FileSystemWatcherClient | undefined): void { - if (client && this.toDispose.disposed) { + removeRef(clientId: number): void { + const refs = this.refsPerClient.get(clientId); + if (typeof refs === 'undefined') { + this.info('WARN REF--', `removed one too many reference: clientId=${clientId}`); return; } - this.client = client; + refs.value -= 1; + // We must remove the key from `this.clientReferences` because + // we list active clients by reading the keys of this map. + if (refs.value === 0) { + this.refsPerClient.delete(clientId); + } + const totalRefs = this.getTotalReferences(); + const dead = totalRefs === 0; + if (dead) { + this.onRefsReachZero(); + } + this.debug('REF--', `clientId=${clientId}, clientRefs=${refs.value}, totalRefs=${totalRefs}, dead=${dead}`); } - protected pushAdded(watcherId: number, path: string): void { - this.debug('Added:', path); - this.pushFileChange(watcherId, path, FileChangeType.ADDED); + /** + * All clients with at least one active reference. + */ + getClientIds(): number[] { + return Array.from(this.refsPerClient.keys()); } - protected pushUpdated(watcherId: number, path: string): void { - this.debug('Updated:', path); - this.pushFileChange(watcherId, path, FileChangeType.UPDATED); + /** + * Add the references for each client together. + */ + getTotalReferences(): number { + let total = 0; + for (const refs of this.refsPerClient.values()) { + total += refs.value; + } + return total; } - protected pushDeleted(watcherId: number, path: string): void { - this.debug('Deleted:', path); - this.pushFileChange(watcherId, path, FileChangeType.DELETED); + /** + * Returns true if at least one client listens to this handle. + */ + isInUse(): boolean { + return this.refsPerClient.size > 0; } - protected pushFileChange(watcherId: number, path: string, type: FileChangeType): void { - if (this.isIgnored(watcherId, path)) { - return; + /** + * When starting a watcher, we'll first check and wait for the path to exists + * before running an NSFW watcher. + */ + protected async start(): Promise { + while (await this.orCancel(fsp.stat(this.fsPath).then(() => false, () => true))) { + await this.orCancel(new Promise(resolve => setTimeout(resolve, 500))); } + const watcher = await this.orCancel(this.createNsfw()); + await this.orCancel(watcher.start().then(() => { + this.debug('STARTED', `fsPath=${this.fsPath}, diposed=${this.disposed}`); + // The watcher could be disposed while it was starting, make sure to check for this: + if (this.disposed) { + this.stopNsfw(watcher); + } + })); + this.nsfw = watcher; + } - const uri = FileUri.create(path).toString(); - this.changes.push({ uri, type }); + /** + * Given a started nsfw instance, gracefully shut it down. + */ + protected async stopNsfw(watcher: nsfw.NSFW): Promise { + await watcher.stop() + .then(() => 'success=true', error => error) + .then(status => this.debug('STOPPED', `fsPath=${this.fsPath}`, status)); + } - this.fireDidFilesChanged(); + protected async createNsfw(): Promise { + const fsPath = await fsp.realpath(this.fsPath); + return nsfw(fsPath, events => this.handleNsfwEvents(events), { + ...this.nsfwFileSystemWatchServerOptions.nsfwOptions, + // The errorCallback is called whenever NSFW crashes *while* watching. + // See https://github.com/atom/github/issues/342 + errorCallback: error => { + console.error(`NSFW service error on "${fsPath}":`, error); + this._dispose(); + this.fireError(); + // Make sure to call user's error handling code: + if (this.nsfwFileSystemWatchServerOptions.nsfwOptions.errorCallback) { + this.nsfwFileSystemWatchServerOptions.nsfwOptions.errorCallback(error); + } + }, + }); + } + + protected handleNsfwEvents(events: nsfw.ChangeEvent[]): void { + // Only process events if someone is listening. + if (this.isInUse()) { + // This callback is async, but nsfw won't wait for it to finish before firing the next one. + // We will use a lock/queue to make sure everything is processed in the order it arrives. + this.nsfwEventProcessingLock.acquire(async () => { + const fileChangeCollection = new FileChangeCollection(); + await Promise.all(events.map(async event => { + if (event.action === nsfw.actions.RENAMED) { + const [oldPath, newPath] = await Promise.all([ + this.resolveEventPath(event.directory, event.oldFile!), + this.resolveEventPath(event.newDirectory || event.directory, event.newFile!), + ]); + this.pushFileChange(fileChangeCollection, FileChangeType.DELETED, oldPath); + this.pushFileChange(fileChangeCollection, FileChangeType.ADDED, newPath); + } else { + const path = await this.resolveEventPath(event.directory, event.file!); + if (event.action === nsfw.actions.CREATED) { + this.pushFileChange(fileChangeCollection, FileChangeType.ADDED, path); + } else if (event.action === nsfw.actions.DELETED) { + this.pushFileChange(fileChangeCollection, FileChangeType.DELETED, path); + } else if (event.action === nsfw.actions.MODIFIED) { + this.pushFileChange(fileChangeCollection, FileChangeType.UPDATED, path); + } + } + })); + const changes = fileChangeCollection.values(); + // If all changes are part of the ignored files, the collection will be empty. + if (changes.length > 0) { + this.fileSystemWatcherClient.onDidFilesChanged2({ + clients: this.getClientIds(), + changes, + }); + } + }); + } } - protected resolvePath(directory: string, file: string): string { - const path = paths.join(directory, file); + protected async resolveEventPath(directory: string, file: string): Promise { + const path = join(directory, file); try { - return fs.realpathSync(path); + return await fsp.realpath(path); } catch { try { // file does not exist try to resolve directory - return paths.join(fs.realpathSync(directory), file); + return join(await fsp.realpath(directory), file); } catch { // directory does not exist fall back to symlink return path; @@ -222,28 +296,188 @@ export class NsfwFileSystemWatcherServer implements FileSystemWatcherServer { } } + protected pushFileChange(changes: FileChangeCollection, type: FileChangeType, path: string): void { + if (!this.isIgnored(path)) { + const uri = FileUri.create(path).toString(); + changes.push({ type, uri }); + } + } + + protected fireError(): void { + this.fileSystemWatcherClient.onError2({ + clients: this.getClientIds(), + uri: this.fsPath, + }); + } + + /** + * Wrap a promise to reject as soon as this handle gets disposed. + */ + protected async orCancel(promise: Promise): Promise { + return Promise.race([this.deferredDisposalDeferred.promise, promise]); + } + + /** + * When references hit zero, we'll schedule disposal for a bit later. + * + * This allows new references to reuse this watcher instead of creating a new one. + * + * e.g. A frontend disconnects for a few milliseconds before reconnecting again. + */ + protected onRefsReachZero(): void { + this.deferredDisposalTimer = setTimeout(() => this._dispose(), this.deferredDisposalTimeout); + } + + /** + * If we get new references after hitting zero, let's unschedule our disposal and keep watching. + */ + protected onRefsRevive(): void { + if (this.deferredDisposalTimer) { + clearTimeout(this.deferredDisposalTimer); + this.deferredDisposalTimer = undefined; + } + } + + protected isIgnored(path: string): boolean { + return this.watcherOptions.ignored.length > 0 + && this.watcherOptions.ignored.some(m => m.match(path)); + } + /** - * Fires file changes to clients. - * It is debounced in the case if the filesystem is spamming to avoid overwhelming clients with events. + * Internal disposal mechanism. */ - protected readonly fireDidFilesChanged: () => void = debounce(() => this.doFireDidFilesChanged(), 50); - protected doFireDidFilesChanged(): void { - const changes = this.changes.values(); - this.changes = new FileChangeCollection(); - const event = { changes }; - if (this.client) { - this.client.onDidFilesChanged(event); + protected async _dispose(): Promise { + if (!this.disposed) { + this.disposed = true; + this.deferredDisposalDeferred.reject(WatcherDisposal); + if (this.nsfw) { + this.stopNsfw(this.nsfw); + this.nsfw = undefined; + } + this.debug('DISPOSED', `fsPath=${this.fsPath}`); } } - protected isIgnored(watcherId: number, path: string): boolean { - const options = this.watcherOptions.get(watcherId); - return !!options && options.ignored.length > 0 && options.ignored.some(m => m.match(path)); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + protected info(prefix: string, message: string, ...params: any[]): void { + this.nsfwFileSystemWatchServerOptions.info(`${prefix} NsfwWatcher(${this.debugId}): ${message}`, ...params); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + protected debug(prefix: string, message: string, ...params: any[]): void { + if (this.nsfwFileSystemWatchServerOptions.verbose) { + this.info(prefix, message, ...params); + } } +} + +/** + * Each time a client makes a watchRequest, we generate a unique watcherId for it. + * + * This watcherId will map to this handle type which keeps track of the clientId that made the request. + */ +export interface NsfwWatcherHandle { + clientId: number; + watcher: NsfwWatcher; +} +export class NsfwFileSystemWatcherServer implements FileSystemWatcherServer2 { + + protected client: FileSystemWatcherClient2 | undefined; + + protected watcherId = 0; + protected readonly watchers = new Map(); + protected readonly watcherHandles = new Map(); + + protected readonly options: NsfwFileSystemWatcherServerOptions; + + /** + * `this.client` is undefined until someone sets it. + */ + protected readonly maybeClient: FileSystemWatcherClient2 = { + onDidFilesChanged2: event => this.client?.onDidFilesChanged2(event), + onError2: event => this.client?.onError2(event), + }; + + constructor(options?: Partial) { + this.options = { + nsfwOptions: {}, + verbose: false, + info: (message, ...args) => console.info(message, ...args), + error: (message, ...args) => console.error(message, ...args), + ...options + }; + } + + setClient(client: FileSystemWatcherClient2 | undefined): void { + this.client = client; + } + + /** + * A specific client requests us to watch a given `uri` according to some `options`. + * + * We internally re-use all the same `(uri, options)` pairs. + */ + async watchFileChanges2(clientId: number, uri: string, options?: WatchOptions): Promise { + const resolvedOptions = this.resolveWatchOptions(options); + const watcherKey = this.getWatcherKey(uri, resolvedOptions); + let watcher = this.watchers.get(watcherKey); + if (watcher === undefined) { + const fsPath = FileUri.fsPath(uri); + const watcherOptions: NsfwWatcherOptions = { + ignored: resolvedOptions.ignored + .map(pattern => new Minimatch(pattern, { dot: true })), + }; + watcher = new NsfwWatcher(clientId, fsPath, watcherOptions, this.options, this.maybeClient); + watcher.whenDisposed.then(() => this.watchers.delete(watcherKey)); + this.watchers.set(watcherKey, watcher); + } else { + watcher.addRef(clientId); + } + const watcherId = this.watcherId++; + this.watcherHandles.set(watcherId, { clientId, watcher }); + watcher.whenDisposed.then(() => this.watcherHandles.delete(watcherId)); + return watcherId; + } + + async unwatchFileChanges2(watcherId: number): Promise { + const handle = this.watcherHandles.get(watcherId); + if (handle === undefined) { + console.warn(`tried to de-allocate a disposed watcher: watcherId=${watcherId}`); + } else { + this.watcherHandles.delete(watcherId); + handle.watcher.removeRef(handle.clientId); + } + } + + /** + * Given some `URI` and some `WatchOptions`, generate a unique key. + */ + protected getWatcherKey(uri: string, options: WatchOptions): string { + return [ + uri, + options.ignored.slice(0).sort().join() // use a **sorted copy** of `ignored` as part of the key + ].join(); + } + + /** + * Return fully qualified options. + */ + protected resolveWatchOptions(options?: WatchOptions): WatchOptions { + return { + ignored: [], + ...options, + }; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any protected debug(message: string, ...params: any[]): void { if (this.options.verbose) { this.options.info(message, ...params); } } + + dispose(): void { + // Singletons shouldn't be disposed... + } } diff --git a/packages/filesystem/src/typings/nsfw/index.d.ts b/packages/filesystem/src/typings/nsfw/index.d.ts index 141cf175e7e56..b95b35ca80cca 100644 --- a/packages/filesystem/src/typings/nsfw/index.d.ts +++ b/packages/filesystem/src/typings/nsfw/index.d.ts @@ -13,4 +13,5 @@ * * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 ********************************************************************************/ -/// + +export * from '@theia/core/src/typings/nsfw';