Skip to content

Commit

Permalink
file-watchers: use singleton server
Browse files Browse the repository at this point in the history
Theia spawns as many nsfw servers as there are clients, maybe more.

This commit replaces this behaviour by centralizing everything into the
same service process in order to optimize resources (watch handles are a
limited resource on a given Linux host).

Some notes on the current design:

- Everytime someone requests to watch an `(uri, options)` couple we will
re-use the same watch handle (and actual nsfw instance). This allows for
resource allocation optimizations.
- Since a given service can only serve a single client, I added an event
dispatcher named `FileSystemWatcherServerDispatcher`. You must register
yourself with some unique id you will use to make requests.
- The new `.watchUriChanges` function now takes a `clientId` so that
when events are sent to the dispatcher it then gets routed back to your
own client.
- Added a `--nsfw-watcher-verbose` backend argument to enable verbose
logging for the watcher server.

Signed-off-by: Paul Maréchal <paul.marechal@ericsson.com>
  • Loading branch information
paul-marechal committed Sep 25, 2020
1 parent 4739e26 commit 31077c4
Show file tree
Hide file tree
Showing 13 changed files with 742 additions and 318 deletions.
30 changes: 30 additions & 0 deletions packages/core/src/common/promise-util.ts
Expand Up @@ -59,3 +59,33 @@ export async function retry<T>(task: () => Promise<T>, delay: number, retries: n

throw lastError;
}

export class AsyncLock {

/** How many task are currently queued. */
size = 0;

protected queue: Promise<void> = Promise.resolve();

/**
* This method will queue the execution of `callback` behind previous calls to `acquire`.
*
* This is useful to ensure that only one task handles a resource at a time.
*
* @param callback task to execute once previous ones finished.
* @param args to pass to your callback.
*/
async acquire<T>(callback: () => PromiseLike<T>): Promise<T>;
// eslint-disable-next-line space-before-function-paren, @typescript-eslint/no-explicit-any
async acquire<T, U extends any[]>(callback: (...args: U) => PromiseLike<T>, ...args: U): Promise<T> {
return new Promise<T>((resolve, reject) => {
const next: () => PromiseLike<void> =
() => callback(...args)
.then(resolve, reject)
.then(() => { this.size--; });
this.queue = this.queue.then(next, next);
this.size++;
});
}

}
3 changes: 3 additions & 0 deletions packages/filesystem/compile.tsconfig.json
Expand Up @@ -9,6 +9,9 @@
"mv": [
"src/typings/mv"
],
"nsfw": [
"src/typings/nsfw"
],
"trash": [
"src/typings/trash"
]
Expand Down
81 changes: 30 additions & 51 deletions packages/filesystem/src/browser/file-service.ts
Expand Up @@ -1278,63 +1278,42 @@ export class FileService {
*/
readonly onDidFilesChange = this.onDidFilesChangeEmitter.event;

private activeWatchers = new Map<string, { disposable: Disposable, count: number }>();

watch(resource: URI, options: WatchOptions = { recursive: false, excludes: [] }): Disposable {
const resolvedOptions: WatchOptions = {
...options,
// always ignore temporary upload files
excludes: options.excludes.concat('**/theia_upload_*')
};

let watchDisposed = false;
let watchDisposable = Disposable.create(() => watchDisposed = true);

// Watch and wire in disposable which is async but
// check if we got disposed meanwhile and forward
this.doWatch(resource, resolvedOptions).then(disposable => {
if (watchDisposed) {
disposable.dispose();
} else {
watchDisposable = disposable;
}
}, error => console.error(error));

return Disposable.create(() => watchDisposable.dispose());
}

async doWatch(resource: URI, options: WatchOptions): Promise<Disposable> {
const provider = await this.withProvider(resource);
const key = this.toWatchKey(provider, resource, options);

// Only start watching if we are the first for the given key
const watcher = this.activeWatchers.get(key) || { count: 0, disposable: provider.watch(resource, options) };
if (!this.activeWatchers.has(key)) {
this.activeWatchers.set(key, watcher);
}

// Increment usage counter
watcher.count += 1;

return Disposable.create(() => {

// Unref
watcher.count--;

// Dispose only when last user is reached
if (watcher.count === 0) {
watcher.disposable.dispose();
this.activeWatchers.delete(key);
}
});
}

private toWatchKey(provider: FileSystemProvider, resource: URI, options: WatchOptions): string {
return [
this.toMapKey(provider, resource), // lowercase path if the provider is case insensitive
String(options.recursive), // use recursive: true | false as part of the key
options.excludes.join() // use excludes as part of the key
].join();
/**
* Disposable handle. Can be disposed early (before the watcher is allocated.)
*/
const handle = {
disposed: false,
watcher: undefined as Disposable | undefined,
dispose(): void {
if (this.disposed) {
return;
}
if (this.watcher) {
this.watcher.dispose();
this.watcher = undefined;
}
this.disposed = true;
},
};
this.withProvider(resource)
.then(provider => {
const watcher = provider.watch(resource, resolvedOptions);
// If the handle got disposed before allocation, stop here.
if (handle.disposed) {
watcher.dispose();
} else {
handle.watcher = watcher;
}
}, error => {
console.error(error);
});
return handle;
}

// #endregion
Expand Down
46 changes: 45 additions & 1 deletion packages/filesystem/src/common/filesystem-watcher-protocol.ts
Expand Up @@ -19,6 +19,47 @@ import { JsonRpcServer, JsonRpcProxy } from '@theia/core';
import { FileChangeType } from './files';
export { FileChangeType };

export const FileSystemWatcherServer2 = Symbol('FileSystemWatcherServer2');
/**
* Singleton implementation of the watch server.
*
* Since multiple clients all make requests to this service, we need to track those individually via a `clientId`.
*/
export interface FileSystemWatcherServer2 extends JsonRpcServer<FileSystemWatcherClient2> {
/**
* @param clientId arbitrary id used to identify a client.
* @param uri the path to watch.
* @param options optional parameters.
* @returns promise to a unique `number` handle for this request.
*/
watchFileChanges2(clientId: number, uri: string, options?: WatchOptions): Promise<number>;
/**
* @param watcherId handle mapping to a previous `watchFileChanges` request.
*/
unwatchFileChanges2(watcherId: number): Promise<void>;
}

export interface FileSystemWatcherClient2 {
/** Listen for change events emitted by the watcher. */
onDidFilesChanged2(event: DidFilesChangedParams2): void;
/** The watcher can crash in certain conditions. */
onError2(event: FileSystemWatcherErrorParams2): void;
}

export interface DidFilesChangedParams2 {
/** Clients to route the events to. */
clients: number[];
/** FileSystem changes that occured. */
changes: FileChange[];
}

export interface FileSystemWatcherErrorParams2 {
/** Clients to route the events to. */
clients: number[];
/** The uri that originated the error. */
uri: string;
}

export const FileSystemWatcherServer = Symbol('FileSystemWatcherServer');
export interface FileSystemWatcherServer extends JsonRpcServer<FileSystemWatcherClient> {
/**
Expand All @@ -32,7 +73,7 @@ export interface FileSystemWatcherServer extends JsonRpcServer<FileSystemWatcher
* Stop file watching for the given id.
* Resolve when watching is stopped.
*/
unwatchFileChanges(watcher: number): Promise<void>;
unwatchFileChanges(watcherId: number): Promise<void>;
}

export interface FileSystemWatcherClient {
Expand Down Expand Up @@ -63,6 +104,9 @@ export interface FileChange {
export const FileSystemWatcherServerProxy = Symbol('FileSystemWatcherServerProxy');
export type FileSystemWatcherServerProxy = JsonRpcProxy<FileSystemWatcherServer>;

/**
* @deprecated not used internally anymore.
*/
@injectable()
export class ReconnectingFileSystemWatcherServer implements FileSystemWatcherServer {

Expand Down
63 changes: 41 additions & 22 deletions packages/filesystem/src/common/remote-file-system-provider.ts
Expand Up @@ -27,7 +27,6 @@ import {
} from './files';
import { JsonRpcServer, JsonRpcProxy, JsonRpcProxyFactory } from '@theia/core/lib/common/messaging/proxy-factory';
import { ApplicationError } from '@theia/core/lib/common/application-error';
import { Deferred } from '@theia/core/lib/common/promise-util';
import type { TextDocumentContentChangeEvent } from 'vscode-languageserver-protocol';
import { newWriteableStream, ReadableStreamEvents } from '@theia/core/lib/common/stream';
import { CancellationToken, cancelled } from '@theia/core/lib/common/cancellation';
Expand Down Expand Up @@ -103,6 +102,11 @@ export class RemoteFileSystemProxyFactory<T extends object> extends JsonRpcProxy
}
}

/**
* Frontend component.
*
* Wraps the remote filesystem provider living on the backend side.
*/
@injectable()
export class RemoteFileSystemProvider implements Required<FileSystemProvider>, Disposable {

Expand All @@ -129,6 +133,10 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
);

protected watcherSequence = 0;
/**
* We'll track the currently allocated watchers, in order to re-allocate them
* with the same options once we reconnect to the backend after a disconnection.
*/
protected readonly watchOptions = new Map<number, {
uri: string;
options: WatchOptions
Expand All @@ -137,20 +145,19 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
private _capabilities: FileSystemProviderCapabilities = 0;
get capabilities(): FileSystemProviderCapabilities { return this._capabilities; }

protected readonly deferredReady = new Deferred<void>();
get ready(): Promise<void> {
return this.deferredReady.promise;
}
readonly ready: Promise<void>;

/**
* Wrapped remote filesystem.
*/
@inject(RemoteFileSystemServer)
protected readonly server: JsonRpcProxy<RemoteFileSystemServer>;

@postConstruct()
protected init(): void {
this.server.getCapabilities().then(capabilities => {
(this.ready as Promise<void>) = this.server.getCapabilities().then(capabilities => {
this._capabilities = capabilities;
this.deferredReady.resolve(undefined);
}, this.deferredReady.reject);
});
this.server.setClient({
notifyDidChangeFile: ({ changes }) => {
this.onDidChangeFileEmitter.fire(changes.map(event => ({ resource: new URI(event.resource), type: event.type })));
Expand Down Expand Up @@ -286,19 +293,23 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
}

watch(resource: URI, options: WatchOptions): Disposable {
const watcher = this.watcherSequence++;
const watcherId = this.watcherSequence++;
const uri = resource.toString();
this.watchOptions.set(watcher, { uri, options });
this.server.watch(watcher, uri, options);

this.watchOptions.set(watcherId, { uri, options });
this.server.watch(watcherId, uri, options);
const toUnwatch = Disposable.create(() => {
this.watchOptions.delete(watcher);
this.server.unwatch(watcher);
this.watchOptions.delete(watcherId);
this.server.unwatch(watcherId);
});
this.toDispose.push(toUnwatch);
return toUnwatch;
}

/**
* When a frontend disconnects (e.g. bad connection) the backend resources will be cleared.
*
* This means that we need to re-allocate the watchers when a frontend reconnects.
*/
protected reconnect(): void {
for (const [watcher, { uri, options }] of this.watchOptions.entries()) {
this.server.watch(watcher, uri, options);
Expand All @@ -308,13 +319,17 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
}

/**
* Backend component.
*
* JSON-RPC server exposing a wrapped file system provider remotely.
*/
@injectable()
export class FileSystemProviderServer implements RemoteFileSystemServer {

private readonly BUFFER_SIZE = 64 * 1024;

protected watchers = new Map<number, Disposable>();

protected readonly toDispose = new DisposableCollection();
dispose(): void {
this.toDispose.dispose();
Expand All @@ -325,6 +340,9 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
this.client = client;
}

/**
* Wrapped file system provider.
*/
@inject(FileSystemProvider)
protected readonly provider: FileSystemProvider & Partial<Disposable>;

Expand Down Expand Up @@ -450,18 +468,19 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
throw new Error('not supported');
}

protected watchers = new Map<number, Disposable>();

async watch(req: number, resource: string, opts: WatchOptions): Promise<void> {
async watch(requestedWatcherId: number, resource: string, opts: WatchOptions): Promise<void> {
if (this.watchers.has(requestedWatcherId)) {
throw new Error('watcher id is already allocated!');
}
const watcher = this.provider.watch(new URI(resource), opts);
this.watchers.set(req, watcher);
this.toDispose.push(Disposable.create(() => this.unwatch(req)));
this.watchers.set(requestedWatcherId, watcher);
this.toDispose.push(Disposable.create(() => this.unwatch(requestedWatcherId)));
}

async unwatch(req: number): Promise<void> {
const watcher = this.watchers.get(req);
async unwatch(watcherId: number): Promise<void> {
const watcher = this.watchers.get(watcherId);
if (watcher) {
this.watchers.delete(req);
this.watchers.delete(watcherId);
watcher.dispose();
}
}
Expand Down
33 changes: 25 additions & 8 deletions packages/filesystem/src/node/disk-file-system-provider.ts
Expand Up @@ -804,18 +804,35 @@ export class DiskFileSystemProvider implements Disposable,
// #region File Watching

watch(resource: URI, opts: WatchOptions): Disposable {
const toUnwatch = new DisposableCollection(Disposable.create(() => { /* mark as not disposed */ }));
this.watcher.watchFileChanges(resource.toString(), {
const watcherService = this.watcher;
/**
* Disposable handle. Can be disposed early (before the watcher is allocated.)
*/
const handle = {
disposed: false,
watcherId: undefined as number | undefined,
dispose(): void {
if (this.disposed) {
return;
}
if (typeof this.watcherId !== 'undefined') {
watcherService.unwatchFileChanges(this.watcherId);
}
this.disposed = true;
},
};
watcherService.watchFileChanges(resource.toString(), {
// Convert from `files.WatchOptions` to internal `watcher-protocol.WatchOptions`:
ignored: opts.excludes
}).then(watcher => {
if (toUnwatch.disposed) {
this.watcher.unwatchFileChanges(watcher);
}).then(watcherId => {
if (handle.disposed) {
watcherService.unwatchFileChanges(watcherId);
} else {
toUnwatch.push(Disposable.create(() => this.watcher.unwatchFileChanges(watcher)));
handle.watcherId = watcherId;
}
});
this.toDispose.push(toUnwatch);
return toUnwatch;
this.toDispose.push(handle);
return handle;
}

// #endregion
Expand Down

0 comments on commit 31077c4

Please sign in to comment.