Skip to content

Commit

Permalink
file-watchers: use singleton server
Browse files Browse the repository at this point in the history
Theia spawns as many nsfw servers as there are clients, maybe more.

This commit replaces this behaviour by centralizing everything into the
same service process in order to optimize resources (watch handles are a
limited resource on a given Linux host).

Some notes on the current design:

- Everytime someone requests to watch an `(uri, options)` couple we will
re-use the same watch handle (and actual nsfw instance). This allows for
resource allocation optimizations.
- Since a given service can only serve a single client, I added an event
dispatcher named `FileSystemWatcherServerDispatcher`. You must register
yourself with some unique id you will use to make requests.
- The new `.watchUriChanges` function now takes a `clientId` so that
when events are sent to the dispatcher it then gets routed back to your
own client.
- Added a `--nsfw-watcher-verbose` backend argument to enable verbose
logging for the watcher server.

Signed-off-by: Paul Maréchal <paul.marechal@ericsson.com>
  • Loading branch information
paul-marechal committed Sep 29, 2020
1 parent 4739e26 commit adda895
Show file tree
Hide file tree
Showing 15 changed files with 816 additions and 319 deletions.
Expand Up @@ -19,10 +19,12 @@ import { bindDynamicLabelProvider } from './label/sample-dynamic-label-provider-
import { bindSampleUnclosableView } from './view/sample-unclosable-view-contribution';
import { bindSampleOutputChannelWithSeverity } from './output/sample-output-channel-with-severity';
import { bindSampleMenu } from './menu/sample-menu-contribution';
import { bindSampleFileWatching } from './file-watching/sample-file-watching-contribution';

export default new ContainerModule(bind => {
bindDynamicLabelProvider(bind);
bindSampleUnclosableView(bind);
bindSampleOutputChannelWithSeverity(bind);
bindSampleMenu(bind);
bindSampleFileWatching(bind);
});
@@ -0,0 +1,50 @@
/********************************************************************************
* Copyright (C) 2020 Ericsson and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/

import { injectable, inject, interfaces } from 'inversify';
import { FrontendApplicationContribution, LabelProvider } from '@theia/core/lib/browser';
import { FileService } from '@theia/filesystem/lib/browser/file-service';
import { WorkspaceService } from '@theia/workspace/lib/browser';

export function bindSampleFileWatching(bind: interfaces.Bind): void {
bind(FrontendApplicationContribution).to(SampleFileWatchingContribution).inSingletonScope();
}

@injectable()
export class SampleFileWatchingContribution implements FrontendApplicationContribution {

@inject(FileService)
protected readonly files: FileService;

@inject(LabelProvider)
protected readonly label: LabelProvider;

@inject(WorkspaceService)
protected readonly workspace: WorkspaceService;

onStart(): void {
this.files.onDidFilesChange(event => {
// Get the workspace roots for the current frontend:
const roots = this.workspace.tryGetRoots();
// Create some name to help find out which frontend logged the message:
const workspace = roots.length > 0
? roots.map(root => this.label.getName(root.resource)).join('+')
: '<no workspace>';
console.log(`Sample File Watching: ${event.changes.length} files changed! ${workspace}`);
});
}

}
33 changes: 33 additions & 0 deletions packages/core/src/common/promise-util.ts
Expand Up @@ -59,3 +59,36 @@ export async function retry<T>(task: () => Promise<T>, delay: number, retries: n

throw lastError;
}

export class AsyncLock {

/**
* Reference to the last registered promise in the chain.
*
* We'll keep on adding with each call to `acquire` to queue other tasks.
*/
protected queue: Promise<void> = Promise.resolve();

/**
* This method will queue the execution of `callback` behind previous calls to `acquire`.
*
* This is useful to ensure that only one task handles a resource at a time.
*
* @param callback task to execute once previous ones finished.
* @param args to pass to your callback.
* @returns callback's result.
*/
async acquire<T>(callback: () => PromiseLike<T>): Promise<T>;
// eslint-disable-next-line space-before-function-paren, @typescript-eslint/no-explicit-any
async acquire<T, U extends any[]>(callback: (...args: U) => PromiseLike<T>, ...args: U): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.queue = this.queue.then(
// Execute the callbacks once previous tasks resolved.
// Resolve the outer promise to extract the callback's return value.
// Put these operations in the queue slot for later calls to be placed behind.
() => callback(...args).then<void, void>(resolve, reject)
);
});
}

}
3 changes: 3 additions & 0 deletions packages/filesystem/compile.tsconfig.json
Expand Up @@ -9,6 +9,9 @@
"mv": [
"src/typings/mv"
],
"nsfw": [
"src/typings/nsfw"
],
"trash": [
"src/typings/trash"
]
Expand Down
81 changes: 30 additions & 51 deletions packages/filesystem/src/browser/file-service.ts
Expand Up @@ -1278,63 +1278,42 @@ export class FileService {
*/
readonly onDidFilesChange = this.onDidFilesChangeEmitter.event;

private activeWatchers = new Map<string, { disposable: Disposable, count: number }>();

watch(resource: URI, options: WatchOptions = { recursive: false, excludes: [] }): Disposable {
const resolvedOptions: WatchOptions = {
...options,
// always ignore temporary upload files
excludes: options.excludes.concat('**/theia_upload_*')
};

let watchDisposed = false;
let watchDisposable = Disposable.create(() => watchDisposed = true);

// Watch and wire in disposable which is async but
// check if we got disposed meanwhile and forward
this.doWatch(resource, resolvedOptions).then(disposable => {
if (watchDisposed) {
disposable.dispose();
} else {
watchDisposable = disposable;
}
}, error => console.error(error));

return Disposable.create(() => watchDisposable.dispose());
}

async doWatch(resource: URI, options: WatchOptions): Promise<Disposable> {
const provider = await this.withProvider(resource);
const key = this.toWatchKey(provider, resource, options);

// Only start watching if we are the first for the given key
const watcher = this.activeWatchers.get(key) || { count: 0, disposable: provider.watch(resource, options) };
if (!this.activeWatchers.has(key)) {
this.activeWatchers.set(key, watcher);
}

// Increment usage counter
watcher.count += 1;

return Disposable.create(() => {

// Unref
watcher.count--;

// Dispose only when last user is reached
if (watcher.count === 0) {
watcher.disposable.dispose();
this.activeWatchers.delete(key);
}
});
}

private toWatchKey(provider: FileSystemProvider, resource: URI, options: WatchOptions): string {
return [
this.toMapKey(provider, resource), // lowercase path if the provider is case insensitive
String(options.recursive), // use recursive: true | false as part of the key
options.excludes.join() // use excludes as part of the key
].join();
/**
* Disposable handle. Can be disposed early (before the watcher is allocated.)
*/
const handle = {
disposed: false,
watcher: undefined as Disposable | undefined,
dispose(): void {
if (this.disposed) {
return;
}
if (this.watcher) {
this.watcher.dispose();
this.watcher = undefined;
}
this.disposed = true;
},
};
this.withProvider(resource)
.then(provider => {
const watcher = provider.watch(resource, resolvedOptions);
// If the handle got disposed before allocation, stop here.
if (handle.disposed) {
watcher.dispose();
} else {
handle.watcher = watcher;
}
}, error => {
console.error(error);
});
return handle;
}

// #endregion
Expand Down
46 changes: 45 additions & 1 deletion packages/filesystem/src/common/filesystem-watcher-protocol.ts
Expand Up @@ -19,6 +19,47 @@ import { JsonRpcServer, JsonRpcProxy } from '@theia/core';
import { FileChangeType } from './files';
export { FileChangeType };

export const FileSystemWatcherServer2 = Symbol('FileSystemWatcherServer2');
/**
* Singleton implementation of the watch server.
*
* Since multiple clients all make requests to this service, we need to track those individually via a `clientId`.
*/
export interface FileSystemWatcherServer2 extends JsonRpcServer<FileSystemWatcherClient2> {
/**
* @param clientId arbitrary id used to identify a client.
* @param uri the path to watch.
* @param options optional parameters.
* @returns promise to a unique `number` handle for this request.
*/
watchFileChanges2(clientId: number, uri: string, options?: WatchOptions): Promise<number>;
/**
* @param watcherId handle mapping to a previous `watchFileChanges` request.
*/
unwatchFileChanges2(watcherId: number): Promise<void>;
}

export interface FileSystemWatcherClient2 {
/** Listen for change events emitted by the watcher. */
onDidFilesChanged2(event: DidFilesChangedParams2): void;
/** The watcher can crash in certain conditions. */
onError2(event: FileSystemWatcherErrorParams2): void;
}

export interface DidFilesChangedParams2 {
/** Clients to route the events to. */
clients: number[];
/** FileSystem changes that occured. */
changes: FileChange[];
}

export interface FileSystemWatcherErrorParams2 {
/** Clients to route the events to. */
clients: number[];
/** The uri that originated the error. */
uri: string;
}

export const FileSystemWatcherServer = Symbol('FileSystemWatcherServer');
export interface FileSystemWatcherServer extends JsonRpcServer<FileSystemWatcherClient> {
/**
Expand All @@ -32,7 +73,7 @@ export interface FileSystemWatcherServer extends JsonRpcServer<FileSystemWatcher
* Stop file watching for the given id.
* Resolve when watching is stopped.
*/
unwatchFileChanges(watcher: number): Promise<void>;
unwatchFileChanges(watcherId: number): Promise<void>;
}

export interface FileSystemWatcherClient {
Expand Down Expand Up @@ -63,6 +104,9 @@ export interface FileChange {
export const FileSystemWatcherServerProxy = Symbol('FileSystemWatcherServerProxy');
export type FileSystemWatcherServerProxy = JsonRpcProxy<FileSystemWatcherServer>;

/**
* @deprecated not used internally anymore.
*/
@injectable()
export class ReconnectingFileSystemWatcherServer implements FileSystemWatcherServer {

Expand Down

0 comments on commit adda895

Please sign in to comment.