Skip to content

Commit

Permalink
watchers: use singleton server
Browse files Browse the repository at this point in the history
Theia spawns as many nsfw servers as there are clients, maybe more.

This commit replaces this behaviour by centralizing everything into the
same service process in order to optimize resources (watch handles are a
limited resource on a given Linux host).

Some notes on the current design:

- Everytime someone requests to watch an `(uri, options)` couple we will
re-use the same watch handle (and actual nsfw instance). This allows for
resource allocation optimizations.
- Since a given service can only serve a single client, I added an event
dispatcher named `FileSystemWatcherServiceDispatcher`. You must register
yourself with some unique id you will use to make requests.
- The new `.watchUriChanges` function now takes a `clientId` so that
when events are sent to the dispatcher it then gets routed back to your
own client.
- Added a `--nsfw-watcher-verbose` backend argument to enable verbose
logging for the watcher server.

Signed-off-by: Paul Maréchal <paul.marechal@ericsson.com>
  • Loading branch information
paul-marechal committed Oct 15, 2020
1 parent 8ff85ba commit 495f303
Show file tree
Hide file tree
Showing 14 changed files with 886 additions and 110 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Expand Up @@ -4,12 +4,16 @@

- [git] the changes in the commit details (opened from the history view) and in the diff view (opened with 'Compare With...' on a folder's context menu) are now switchable between 'list' and 'tree' modes [#8084](https://github.com/eclipse-theia/theia/pull/8084)
- [scm] show in the commit textbox the branch to which the commit will go [#6156](https://github.com/eclipse-theia/theia/pull/6156)
- [filesystem] file watchers refactoring:
- Added `FileSystemWatcherService` component that should be a singleton centralizing watch requests for all clients.
- Added `FileSystemWatcherServiceDispatcher` to register yourself and listen to file change events.

<a name="breaking_changes_1.7.0">[Breaking Changes:](#breaking_changes_1.7.0)</a>

- [plugin-metrics] renamed `AnalyticsFromRequests.succesfulResponses` to `AnalyticsFromRequests.successfulResponses` []()
- [core] change progress notification cancelable property default from true to false [#8479](https://github.com/eclipse-theia/theia/pull/8479)
- [messages] empty notifications and progress notifications will not be shown [#8479](https://github.com/eclipse-theia/theia/pull/8479)
- [filesystem] `NsfwFileSystemWatcherServer` is deprecated and no longer used.

## v1.6.0 - 24/09/2020

Expand Down
Expand Up @@ -19,6 +19,7 @@ import { bindDynamicLabelProvider } from './label/sample-dynamic-label-provider-
import { bindSampleUnclosableView } from './view/sample-unclosable-view-contribution';
import { bindSampleOutputChannelWithSeverity } from './output/sample-output-channel-with-severity';
import { bindSampleMenu } from './menu/sample-menu-contribution';
import { bindSampleFileWatching } from './file-watching/sample-file-watching-contribution';

import '../../src/browser/style/branding.css';

Expand All @@ -27,4 +28,5 @@ export default new ContainerModule(bind => {
bindSampleUnclosableView(bind);
bindSampleOutputChannelWithSeverity(bind);
bindSampleMenu(bind);
bindSampleFileWatching(bind);
});
@@ -0,0 +1,92 @@
/********************************************************************************
* Copyright (C) 2020 Ericsson and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/

import { postConstruct, injectable, inject, interfaces } from 'inversify';
import {
createPreferenceProxy, FrontendApplicationContribution, LabelProvider,
PreferenceContribution, PreferenceProxy, PreferenceSchema, PreferenceService
} from '@theia/core/lib/browser';
import { FileService } from '@theia/filesystem/lib/browser/file-service';
import { WorkspaceService } from '@theia/workspace/lib/browser';

export function bindSampleFileWatching(bind: interfaces.Bind): void {
bind(FrontendApplicationContribution).to(SampleFileWatchingContribution).inSingletonScope();
bind(PreferenceContribution).toConstantValue({ schema: FileWatchingPreferencesSchema });
bind(FileWatchingPreferences).toDynamicValue(
ctx => createPreferenceProxy(ctx.container.get(PreferenceService), FileWatchingPreferencesSchema)
);
}

const FileWatchingPreferences = Symbol('FileWatchingPreferences');
type FileWatchingPreferences = PreferenceProxy<FileWatchingPreferencesSchema>;

interface FileWatchingPreferencesSchema {
'sample.file-watching.verbose': boolean
}
const FileWatchingPreferencesSchema: PreferenceSchema = {
type: 'object',
properties: {
'sample.file-watching.verbose': {
type: 'boolean',
default: false,
description: 'Enable verbose file watching logs.'
}
}
};

@injectable()
class SampleFileWatchingContribution implements FrontendApplicationContribution {

protected verbose: boolean;

@inject(FileService)
protected readonly fileService: FileService;

@inject(LabelProvider)
protected readonly labelProvider: LabelProvider;

@inject(WorkspaceService)
protected readonly workspaceService: WorkspaceService;

@inject(FileWatchingPreferences)
protected readonly fileWatchingPreferences: FileWatchingPreferences;

@postConstruct()
protected postConstruct(): void {
this.verbose = this.fileWatchingPreferences['sample.file-watching.verbose'];
this.fileWatchingPreferences.onPreferenceChanged(e => {
if (e.preferenceName === 'sample.file-watching.verbose') {
this.verbose = e.newValue!;
}
});
}

onStart(): void {
this.fileService.onDidFilesChange(event => {
// Only log if the verbose preference is set.
if (this.verbose) {
// Get the workspace roots for the current frontend:
const roots = this.workspaceService.tryGetRoots();
// Create some name to help find out which frontend logged the message:
const workspace = roots.length > 0
? roots.map(root => this.labelProvider.getLongName(root.resource)).join('+')
: '<no workspace>';
console.log(`Sample File Watching: ${event.changes.length} file(s) changed! ${workspace}`);
}
});
}

}
3 changes: 3 additions & 0 deletions packages/filesystem/compile.tsconfig.json
Expand Up @@ -9,6 +9,9 @@
"mv": [
"src/typings/mv"
],
"nsfw": [
"src/typings/nsfw"
],
"trash": [
"src/typings/trash"
]
Expand Down
51 changes: 45 additions & 6 deletions packages/filesystem/src/common/filesystem-watcher-protocol.ts
Expand Up @@ -19,6 +19,47 @@ import { JsonRpcServer, JsonRpcProxy } from '@theia/core';
import { FileChangeType } from './files';
export { FileChangeType };

export const FileSystemWatcherService = Symbol('FileSystemWatcherServer2');
/**
* Singleton implementation of the watch server.
*
* Since multiple clients all make requests to this service, we need to track those individually via a `clientId`.
*/
export interface FileSystemWatcherService extends JsonRpcServer<FileSystemWatcherServiceClient> {
/**
* @param clientId arbitrary id used to identify a client.
* @param uri the path to watch.
* @param options optional parameters.
* @returns promise to a unique `number` handle for this request.
*/
watchFileChanges(clientId: number, uri: string, options?: WatchOptions): Promise<number>;
/**
* @param watcherId handle mapping to a previous `watchFileChanges` request.
*/
unwatchFileChanges(watcherId: number): Promise<void>;
}

export interface FileSystemWatcherServiceClient {
/** Listen for change events emitted by the watcher. */
onDidFilesChanged(event: DidFilesChangedParams): void;
/** The watcher can crash in certain conditions. */
onError(event: FileSystemWatcherErrorParams): void;
}

export interface DidFilesChangedParams {
/** Clients to route the events to. */
clients?: number[];
/** FileSystem changes that occured. */
changes: FileChange[];
}

export interface FileSystemWatcherErrorParams {
/** Clients to route the events to. */
clients: number[];
/** The uri that originated the error. */
uri: string;
}

export const FileSystemWatcherServer = Symbol('FileSystemWatcherServer');
export interface FileSystemWatcherServer extends JsonRpcServer<FileSystemWatcherClient> {
/**
Expand All @@ -32,7 +73,7 @@ export interface FileSystemWatcherServer extends JsonRpcServer<FileSystemWatcher
* Stop file watching for the given id.
* Resolve when watching is stopped.
*/
unwatchFileChanges(watcher: number): Promise<void>;
unwatchFileChanges(watcherId: number): Promise<void>;
}

export interface FileSystemWatcherClient {
Expand All @@ -50,11 +91,6 @@ export interface FileSystemWatcherClient {
export interface WatchOptions {
ignored: string[];
}

export interface DidFilesChangedParams {
changes: FileChange[];
}

export interface FileChange {
uri: string;
type: FileChangeType;
Expand All @@ -63,6 +99,9 @@ export interface FileChange {
export const FileSystemWatcherServerProxy = Symbol('FileSystemWatcherServerProxy');
export type FileSystemWatcherServerProxy = JsonRpcProxy<FileSystemWatcherServer>;

/**
* @deprecated not used internally anymore.
*/
@injectable()
export class ReconnectingFileSystemWatcherServer implements FileSystemWatcherServer {

Expand Down
65 changes: 45 additions & 20 deletions packages/filesystem/src/common/remote-file-system-provider.ts
Expand Up @@ -103,6 +103,11 @@ export class RemoteFileSystemProxyFactory<T extends object> extends JsonRpcProxy
}
}

/**
* Frontend component.
*
* Wraps the remote filesystem provider living on the backend.
*/
@injectable()
export class RemoteFileSystemProvider implements Required<FileSystemProvider>, Disposable {

Expand All @@ -129,6 +134,10 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
);

protected watcherSequence = 0;
/**
* We'll track the currently allocated watchers, in order to re-allocate them
* with the same options once we reconnect to the backend after a disconnection.
*/
protected readonly watchOptions = new Map<number, {
uri: string;
options: WatchOptions
Expand All @@ -137,20 +146,21 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
private _capabilities: FileSystemProviderCapabilities = 0;
get capabilities(): FileSystemProviderCapabilities { return this._capabilities; }

protected readonly deferredReady = new Deferred<void>();
get ready(): Promise<void> {
return this.deferredReady.promise;
}
protected readonly readyDeferred = new Deferred<void>();
readonly ready = this.readyDeferred.promise;

/**
* Wrapped remote filesystem.
*/
@inject(RemoteFileSystemServer)
protected readonly server: JsonRpcProxy<RemoteFileSystemServer>;

@postConstruct()
protected init(): void {
this.server.getCapabilities().then(capabilities => {
this._capabilities = capabilities;
this.deferredReady.resolve(undefined);
}, this.deferredReady.reject);
this.readyDeferred.resolve();
}, this.readyDeferred.reject);
this.server.setClient({
notifyDidChangeFile: ({ changes }) => {
this.onDidChangeFileEmitter.fire(changes.map(event => ({ resource: new URI(event.resource), type: event.type })));
Expand Down Expand Up @@ -286,19 +296,23 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
}

watch(resource: URI, options: WatchOptions): Disposable {
const watcher = this.watcherSequence++;
const watcherId = this.watcherSequence++;
const uri = resource.toString();
this.watchOptions.set(watcher, { uri, options });
this.server.watch(watcher, uri, options);

this.watchOptions.set(watcherId, { uri, options });
this.server.watch(watcherId, uri, options);
const toUnwatch = Disposable.create(() => {
this.watchOptions.delete(watcher);
this.server.unwatch(watcher);
this.watchOptions.delete(watcherId);
this.server.unwatch(watcherId);
});
this.toDispose.push(toUnwatch);
return toUnwatch;
}

/**
* When a frontend disconnects (e.g. bad connection) the backend resources will be cleared.
*
* This means that we need to re-allocate the watchers when a frontend reconnects.
*/
protected reconnect(): void {
for (const [watcher, { uri, options }] of this.watchOptions.entries()) {
this.server.watch(watcher, uri, options);
Expand All @@ -308,13 +322,20 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
}

/**
* Backend component.
*
* JSON-RPC server exposing a wrapped file system provider remotely.
*/
@injectable()
export class FileSystemProviderServer implements RemoteFileSystemServer {

private readonly BUFFER_SIZE = 64 * 1024;

/**
* Mapping of `watcherId` to a disposable watcher handle.
*/
protected watchers = new Map<number, Disposable>();

protected readonly toDispose = new DisposableCollection();
dispose(): void {
this.toDispose.dispose();
Expand All @@ -325,6 +346,9 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
this.client = client;
}

/**
* Wrapped file system provider.
*/
@inject(FileSystemProvider)
protected readonly provider: FileSystemProvider & Partial<Disposable>;

Expand Down Expand Up @@ -450,18 +474,19 @@ export class FileSystemProviderServer implements RemoteFileSystemServer {
throw new Error('not supported');
}

protected watchers = new Map<number, Disposable>();

async watch(req: number, resource: string, opts: WatchOptions): Promise<void> {
async watch(requestedWatcherId: number, resource: string, opts: WatchOptions): Promise<void> {
if (this.watchers.has(requestedWatcherId)) {
throw new Error('watcher id is already allocated!');
}
const watcher = this.provider.watch(new URI(resource), opts);
this.watchers.set(req, watcher);
this.toDispose.push(Disposable.create(() => this.unwatch(req)));
this.watchers.set(requestedWatcherId, watcher);
this.toDispose.push(Disposable.create(() => this.unwatch(requestedWatcherId)));
}

async unwatch(req: number): Promise<void> {
const watcher = this.watchers.get(req);
async unwatch(watcherId: number): Promise<void> {
const watcher = this.watchers.get(watcherId);
if (watcher) {
this.watchers.delete(req);
this.watchers.delete(watcherId);
watcher.dispose();
}
}
Expand Down
33 changes: 25 additions & 8 deletions packages/filesystem/src/node/disk-file-system-provider.ts
Expand Up @@ -804,18 +804,35 @@ export class DiskFileSystemProvider implements Disposable,
// #region File Watching

watch(resource: URI, opts: WatchOptions): Disposable {
const toUnwatch = new DisposableCollection(Disposable.create(() => { /* mark as not disposed */ }));
this.watcher.watchFileChanges(resource.toString(), {
const watcherService = this.watcher;
/**
* Disposable handle. Can be disposed early (before the watcher is allocated.)
*/
const handle = {
disposed: false,
watcherId: undefined as number | undefined,
dispose(): void {
if (this.disposed) {
return;
}
if (this.watcherId !== undefined) {
watcherService.unwatchFileChanges(this.watcherId);
}
this.disposed = true;
},
};
watcherService.watchFileChanges(resource.toString(), {
// Convert from `files.WatchOptions` to internal `watcher-protocol.WatchOptions`:
ignored: opts.excludes
}).then(watcher => {
if (toUnwatch.disposed) {
this.watcher.unwatchFileChanges(watcher);
}).then(watcherId => {
if (handle.disposed) {
watcherService.unwatchFileChanges(watcherId);
} else {
toUnwatch.push(Disposable.create(() => this.watcher.unwatchFileChanges(watcher)));
handle.watcherId = watcherId;
}
});
this.toDispose.push(toUnwatch);
return toUnwatch;
this.toDispose.push(handle);
return handle;
}

// #endregion
Expand Down

0 comments on commit 495f303

Please sign in to comment.