Skip to content

Commit

Permalink
file-watchers: use singleton server
Browse files Browse the repository at this point in the history
Theia spawns as many nsfw servers as there are clients, maybe more.

This commit replaces this behaviour by centralizing everything into the
same service process in order to optimize resources (watch handles are a
limited resource on a given Linux host).

Some notes on the current design:

- Everytime someone requests to watch an `(uri, options)` couple we will
re-use the same watch handle (and actual nsfw instance). This allows for
resource allocation optimizations.
- Since a given service can only serve a single client, I added an event
dispatcher named `FileSystemWatcherServerDispatcher`. You must register
yourself with some unique id you will use to make requests.
- The new `.watchUriChanges` function now takes a `clientId` so that
when events are sent to the dispatcher it then gets routed back to your
own client.
- Added a `--nsfw-watcher-verbose` backend argument to enable verbose
logging for the watcher server.

Signed-off-by: Paul Maréchal <paul.marechal@ericsson.com>
  • Loading branch information
paul-marechal committed Oct 5, 2020
1 parent 4739e26 commit 7dcd8f4
Show file tree
Hide file tree
Showing 15 changed files with 777 additions and 278 deletions.
Expand Up @@ -19,10 +19,12 @@ import { bindDynamicLabelProvider } from './label/sample-dynamic-label-provider-
import { bindSampleUnclosableView } from './view/sample-unclosable-view-contribution';
import { bindSampleOutputChannelWithSeverity } from './output/sample-output-channel-with-severity';
import { bindSampleMenu } from './menu/sample-menu-contribution';
import { bindSampleFileWatching } from './file-watching/sample-file-watching-contribution';

export default new ContainerModule(bind => {
bindDynamicLabelProvider(bind);
bindSampleUnclosableView(bind);
bindSampleOutputChannelWithSeverity(bind);
bindSampleMenu(bind);
bindSampleFileWatching(bind);
});
@@ -0,0 +1,50 @@
/********************************************************************************
* Copyright (C) 2020 Ericsson and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/

import { injectable, inject, interfaces } from 'inversify';
import { FrontendApplicationContribution, LabelProvider } from '@theia/core/lib/browser';
import { FileService } from '@theia/filesystem/lib/browser/file-service';
import { WorkspaceService } from '@theia/workspace/lib/browser';

export function bindSampleFileWatching(bind: interfaces.Bind): void {
bind(FrontendApplicationContribution).to(SampleFileWatchingContribution).inSingletonScope();
}

@injectable()
export class SampleFileWatchingContribution implements FrontendApplicationContribution {

@inject(FileService)
protected readonly files: FileService;

@inject(LabelProvider)
protected readonly label: LabelProvider;

@inject(WorkspaceService)
protected readonly workspace: WorkspaceService;

onStart(): void {
this.files.onDidFilesChange(event => {
// Get the workspace roots for the current frontend:
const roots = this.workspace.tryGetRoots();
// Create some name to help find out which frontend logged the message:
const workspace = roots.length > 0
? roots.map(root => this.label.getName(root.resource)).join('+')
: '<no workspace>';
console.log(`Sample File Watching: ${event.changes.length} file(s) changed! ${workspace}`);
});
}

}
30 changes: 30 additions & 0 deletions packages/core/src/common/promise-util.ts
Expand Up @@ -59,3 +59,33 @@ export async function retry<T>(task: () => Promise<T>, delay: number, retries: n

throw lastError;
}

export class AsyncLock {

/**
* Reference to the last registered promise in the chain.
*
* We'll keep on adding with each call to `acquire` to queue other tasks.
*/
protected queue: Promise<void> = Promise.resolve();

/**
* This method will queue the execution of `callback` behind previous calls to `acquire`.
*
* This is useful to ensure that only one task handles a resource at a time.
*
* @param callback task to execute once previous ones finished.
* @param args to pass to your callback.
* @returns callback's result.
*/
async acquire<T>(callback: () => PromiseLike<T>): Promise<T>;
// eslint-disable-next-line space-before-function-paren, @typescript-eslint/no-explicit-any
async acquire<T, U extends any[]>(callback: (...args: U) => PromiseLike<T>, ...args: U): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.queue = this.queue.then(
() => callback(...args).then(resolve, reject)
);
});
}

}
3 changes: 3 additions & 0 deletions packages/filesystem/compile.tsconfig.json
Expand Up @@ -9,6 +9,9 @@
"mv": [
"src/typings/mv"
],
"nsfw": [
"src/typings/nsfw"
],
"trash": [
"src/typings/trash"
]
Expand Down
12 changes: 2 additions & 10 deletions packages/filesystem/src/browser/file-service.ts
Expand Up @@ -65,7 +65,6 @@ import { UTF8, UTF8_with_bom } from '@theia/core/lib/common/encodings';
import { EncodingService, ResourceEncoding, DecodeStreamResult } from '@theia/core/lib/common/encoding-service';
import { Mutable } from '@theia/core/lib/common/types';
import { readFileIntoStream } from '../common/io';
import { FileSystemWatcherErrorHandler } from './filesystem-watcher-error-handler';

export interface FileOperationParticipant {

Expand Down Expand Up @@ -248,9 +247,6 @@ export class FileService {
@inject(ContributionProvider) @named(FileServiceContribution)
protected readonly contributions: ContributionProvider<FileServiceContribution>;

@inject(FileSystemWatcherErrorHandler)
protected readonly watcherErrorHandler: FileSystemWatcherErrorHandler;

@postConstruct()
protected init(): void {
for (const contribution of this.contributions.getContributions()) {
Expand Down Expand Up @@ -312,7 +308,6 @@ export class FileService {

const providerDisposables = new DisposableCollection();
providerDisposables.push(provider.onDidChangeFile(changes => this.onDidFilesChangeEmitter.fire(new FileChangesEvent(changes))));
providerDisposables.push(provider.onFileWatchError(() => this.handleFileWatchError()));
providerDisposables.push(provider.onDidChangeCapabilities(() => this.onDidChangeFileSystemProviderCapabilitiesEmitter.fire({ provider, scheme })));

return Disposable.create(() => {
Expand Down Expand Up @@ -767,7 +762,7 @@ export class FileService {
* that only the mtime is an indicator for a file that has changed on disk.
*
* Second, if the mtime has advanced, we compare the size of the file on disk with our previous
* one using the etag() function. Relying only on the mtime check has proven to produce false
* one using the etag() function. Relying only on the mtime check has prooven to produce false
* positives due to file system weirdness (especially around remote file systems). As such, the
* check for size is a weaker check because it can return a false negative if the file has changed
* but to the same length. This is a compromise we take to avoid having to produce checksums of
Expand Down Expand Up @@ -1024,7 +1019,7 @@ export class FileService {
// validation
const { exists, isSameResourceWithDifferentPathCase } = await this.doValidateMoveCopy(sourceProvider, source, targetProvider, target, mode, overwrite);

// delete as needed (unless target is same resource with different path case)
// delete as needed (unless target is same resurce with different path case)
if (exists && !isSameResourceWithDifferentPathCase && overwrite) {
await this.delete(target, { recursive: true });
}
Expand Down Expand Up @@ -1680,7 +1675,4 @@ export class FileService {

// #endregion

protected handleFileWatchError(): void {
this.watcherErrorHandler.handleError();
}
}
46 changes: 45 additions & 1 deletion packages/filesystem/src/common/filesystem-watcher-protocol.ts
Expand Up @@ -19,6 +19,47 @@ import { JsonRpcServer, JsonRpcProxy } from '@theia/core';
import { FileChangeType } from './files';
export { FileChangeType };

export const FileSystemWatcherServer2 = Symbol('FileSystemWatcherServer2');
/**
* Singleton implementation of the watch server.
*
* Since multiple clients all make requests to this service, we need to track those individually via a `clientId`.
*/
export interface FileSystemWatcherServer2 extends JsonRpcServer<FileSystemWatcherClient2> {
/**
* @param clientId arbitrary id used to identify a client.
* @param uri the path to watch.
* @param options optional parameters.
* @returns promise to a unique `number` handle for this request.
*/
watchFileChanges2(clientId: number, uri: string, options?: WatchOptions): Promise<number>;
/**
* @param watcherId handle mapping to a previous `watchFileChanges` request.
*/
unwatchFileChanges2(watcherId: number): Promise<void>;
}

export interface FileSystemWatcherClient2 {
/** Listen for change events emitted by the watcher. */
onDidFilesChanged2(event: DidFilesChangedParams2): void;
/** The watcher can crash in certain conditions. */
onError2(event: FileSystemWatcherErrorParams2): void;
}

export interface DidFilesChangedParams2 {
/** Clients to route the events to. */
clients: number[];
/** FileSystem changes that occured. */
changes: FileChange[];
}

export interface FileSystemWatcherErrorParams2 {
/** Clients to route the events to. */
clients: number[];
/** The uri that originated the error. */
uri: string;
}

export const FileSystemWatcherServer = Symbol('FileSystemWatcherServer');
export interface FileSystemWatcherServer extends JsonRpcServer<FileSystemWatcherClient> {
/**
Expand All @@ -32,7 +73,7 @@ export interface FileSystemWatcherServer extends JsonRpcServer<FileSystemWatcher
* Stop file watching for the given id.
* Resolve when watching is stopped.
*/
unwatchFileChanges(watcher: number): Promise<void>;
unwatchFileChanges(watcherId: number): Promise<void>;
}

export interface FileSystemWatcherClient {
Expand Down Expand Up @@ -63,6 +104,9 @@ export interface FileChange {
export const FileSystemWatcherServerProxy = Symbol('FileSystemWatcherServerProxy');
export type FileSystemWatcherServerProxy = JsonRpcProxy<FileSystemWatcherServer>;

/**
* @deprecated not used internally anymore.
*/
@injectable()
export class ReconnectingFileSystemWatcherServer implements FileSystemWatcherServer {

Expand Down
63 changes: 41 additions & 22 deletions packages/filesystem/src/common/remote-file-system-provider.ts
Expand Up @@ -27,7 +27,6 @@ import {
} from './files';
import { JsonRpcServer, JsonRpcProxy, JsonRpcProxyFactory } from '@theia/core/lib/common/messaging/proxy-factory';
import { ApplicationError } from '@theia/core/lib/common/application-error';
import { Deferred } from '@theia/core/lib/common/promise-util';
import type { TextDocumentContentChangeEvent } from 'vscode-languageserver-protocol';
import { newWriteableStream, ReadableStreamEvents } from '@theia/core/lib/common/stream';
import { CancellationToken, cancelled } from '@theia/core/lib/common/cancellation';
Expand Down Expand Up @@ -103,6 +102,11 @@ export class RemoteFileSystemProxyFactory<T extends object> extends JsonRpcProxy
}
}

/**
* Frontend component.
*
* Wraps the remote filesystem provider living on the backend.
*/
@injectable()
export class RemoteFileSystemProvider implements Required<FileSystemProvider>, Disposable {

Expand All @@ -129,6 +133,10 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
);

protected watcherSequence = 0;
/**
* We'll track the currently allocated watchers, in order to re-allocate them
* with the same options once we reconnect to the backend after a disconnection.
*/
protected readonly watchOptions = new Map<number, {
uri: string;
options: WatchOptions
Expand All @@ -137,20 +145,19 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
private _capabilities: FileSystemProviderCapabilities = 0;
get capabilities(): FileSystemProviderCapabilities { return this._capabilities; }

protected readonly deferredReady = new Deferred<void>();
get ready(): Promise<void> {
return this.deferredReady.promise;
}
readonly ready: Promise<void>;

/**
* Wrapped remote filesystem.
*/
@inject(RemoteFileSystemServer)
protected readonly server: JsonRpcProxy<RemoteFileSystemServer>;

@postConstruct()
protected init(): void {
this.server.getCapabilities().then(capabilities => {
(this.ready as Promise<void>) = this.server.getCapabilities().then(capabilities => {
this._capabilities = capabilities;
this.deferredReady.resolve(undefined);
}, this.deferredReady.reject);
});
this.server.setClient({
notifyDidChangeFile: ({ changes }) => {
this.onDidChangeFileEmitter.fire(changes.map(event => ({ resource: new URI(event.resource), type: event.type })));
Expand Down Expand Up @@ -286,19 +293,23 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
}

watch(resource: URI, options: WatchOptions): Disposable {
const watcher = this.watcherSequence++;
const watcherId = this.watcherSequence++;
const uri = resource.toString();
this.watchOptions.set(watcher, { uri, options });
this.server.watch(watcher, uri, options);

this.watchOptions.set(watcherId, { uri, options });
this.server.watch(watcherId, uri, options);
const toUnwatch = Disposable.create(() => {
this.watchOptions.delete(watcher);
this.server.unwatch(watcher);
this.watchOptions.delete(watcherId);
this.server.unwatch(watcherId);
});
this.toDispose.push(toUnwatch);
return toUnwatch;
}

/**
* When a frontend disconnects (e.g. bad connection) the backend resources will be cleared.
*
* This means that we need to re-allocate the watchers when a frontend reconnects.
*/
protected reconnect(): void {
for (const [watcher, { uri, options }] of this.watchOptions.entries()) {
this.server.watch(watcher, uri, options);
Expand All @@ -308,13 +319,17 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
}

/**
* Backend component.
*
* JSON-RPC server exposing a wrapped file system provider remotely.
*/
@injectable()
export class FileSystemProviderServer implements RemoteFileSystemServer {

private readonly BUFFER_SIZE = 64 * 1024;

protected watchers = new Map<number, Disposable>();

protected readonly toDispose = new DisposableCollection();
dispose(): void {
this.toDispose.dispose();
Expand All @@ -325,6 +340,9 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
this.client = client;
}

/**
* Wrapped file system provider.
*/
@inject(FileSystemProvider)
protected readonly provider: FileSystemProvider & Partial<Disposable>;

Expand Down Expand Up @@ -450,18 +468,19 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
throw new Error('not supported');
}

protected watchers = new Map<number, Disposable>();

async watch(req: number, resource: string, opts: WatchOptions): Promise<void> {
async watch(requestedWatcherId: number, resource: string, opts: WatchOptions): Promise<void> {
if (this.watchers.has(requestedWatcherId)) {
throw new Error('watcher id is already allocated!');
}
const watcher = this.provider.watch(new URI(resource), opts);
this.watchers.set(req, watcher);
this.toDispose.push(Disposable.create(() => this.unwatch(req)));
this.watchers.set(requestedWatcherId, watcher);
this.toDispose.push(Disposable.create(() => this.unwatch(requestedWatcherId)));
}

async unwatch(req: number): Promise<void> {
const watcher = this.watchers.get(req);
async unwatch(watcherId: number): Promise<void> {
const watcher = this.watchers.get(watcherId);
if (watcher) {
this.watchers.delete(req);
this.watchers.delete(watcherId);
watcher.dispose();
}
}
Expand Down

0 comments on commit 7dcd8f4

Please sign in to comment.