Skip to content
This repository has been archived by the owner on Jul 28, 2023. It is now read-only.

Commit

Permalink
fix(backend): use internal pptr channel as service - fe channel (#96)
Browse files Browse the repository at this point in the history
- added throttler for sending notification from service,
- used raw Runtime.evaluate and Runtime.addBinding instead of exposeFunction,

Solution on top of raw protocol is faster then current exposeFunction
and at the same time we need to send all messages from service to frontend
using the same channel to simplify throtling logic.
  • Loading branch information
alexkozy committed Aug 3, 2018
1 parent 60041d5 commit 1cbace7
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 42 deletions.
107 changes: 83 additions & 24 deletions front_end/ndb/NdbMain.js
Expand Up @@ -262,12 +262,13 @@ Ndb.ServiceManager = class {
return service;
}

notify(serviceId, notification) {
const service = this._runningServices.get(serviceId);
if (service) {
if (notification.method === 'disposed')
notify(notifications) {
for (const {serviceId, callId, payload} of notifications) {
const service = this._runningServices.get(serviceId);
if (service)
service._notify(callId, payload);
if (!callId && payload.method === 'disposed')
this._runningServices.delete(serviceId);
service.dispatchEventToListeners(Ndb.Service.Events.Notification, notification);
}
}
};
Expand All @@ -277,11 +278,33 @@ Ndb.Service = class extends Common.Object {
constructor(serviceId) {
super();
this._serviceId = serviceId;
}

async call(method, options) {
const {result, error} = await callNdbService(this._serviceId, method, options);
return error || !result ? {error} : result;
this._lastCallId = 0;
this._callbacks = new Map();
}

call(method, options) {
const callId = ++this._lastCallId;
const promise = new Promise(resolve => this._callbacks.set(callId, resolve));
callNdbService(JSON.stringify({
serviceId: this._serviceId,
callId,
method,
options
}));
return promise;
}

_notify(callId, payload) {
if (callId) {
const callback = this._callbacks.get(callId);
this._callbacks.delete(callId);
if (callback) {
const {result, error} = payload || {};
callback(error || !result ? {error} : result);
}
} else {
this.dispatchEventToListeners(Ndb.Service.Events.Notification, payload);
}
}
};

Expand Down Expand Up @@ -338,20 +361,14 @@ Ndb.NodeProcessManager = class extends Common.Object {
}

async _onProcessAdded(payload) {
let targetInfo;
try {
([targetInfo] = await (await fetch(payload.targetListUrl)).json());
} catch (e) {
return;
}
const pid = payload.id;
const processInfo = new Ndb.ProcessInfo(payload);
this._processes.set(pid, processInfo);

const parentTarget = payload.ppid ? this._targetManager.targetById(payload.ppid) : null;
const target = this._targetManager.createTarget(
pid, processInfo.userFriendlyName(), SDK.Target.Capability.JS,
this._createConnection.bind(this, pid, targetInfo.webSocketDebuggerUrl),
this._createConnection.bind(this, pid),
parentTarget, true);
if (this._shouldPauseAtStart(payload.argv)) {
target.runtimeAgent().invoke_evaluate({
Expand All @@ -362,17 +379,17 @@ Ndb.NodeProcessManager = class extends Common.Object {
return target.runtimeAgent().runIfWaitingForDebugger();
}

_createConnection(id, webSocketDebuggerUrl, params) {
const connection = new SDK.WebSocketConnection(webSocketDebuggerUrl,
_ => {
this._connections.delete(id);
this._processes.delete(id);
},
params);
_createConnection(id, params) {
const connection = new Ndb.Connection(id, this._nddService, this._onWebSocketDisconnected.bind(this, id), params);
this._connections.set(id, connection);
return connection;
}

_onWebSocketDisconnected(id) {
this._connections.delete(id);
this._processes.delete(id);
}

_shouldPauseAtStart(argv) {
if (!Common.moduleSetting('pauseAtStart').get())
return false;
Expand Down Expand Up @@ -436,6 +453,48 @@ Ndb.NodeProcessManager = class extends Common.Object {
}
};

/**
* @implements {Protocol.InspectorBackend.Connection}
*/
Ndb.Connection = class {
constructor(pid, nddService, onWebSocketDisconnect, params) {
this._pid = pid;
this._nddService = nddService;
this._onDisconnect = params.onDisconnect;
this._onMessage = params.onMessage;
this._onWebSocketDisconnect = onWebSocketDisconnect;
this._nddService.addEventListener(Ndb.Service.Events.Notification, this._onServiceNotification.bind(this));
}

_onServiceNotification({data: {name, params}}) {
if (name === 'message' && params.id === this._pid)
this._onMessage.call(null, params.message);
if (name === 'disconnected' && params.id === this._pid) {
this._onWebSocketDisconnect.call(null);
this._onDisconnect.call(null, 'websocket closed');
}
}

/**
* @param {string} domain
* @param {!Protocol.InspectorBackend.Connection.MessageObject} messageObject
*/
sendMessage(domain, messageObject) {
return this._nddService.call('send', {
id: this._pid,
message: JSON.stringify(messageObject)
});
}

/**
* @return {!Promise}
*/
disconnect() {
return this._nddService.call('disconnect', {id: this._pid})
.then(_ => this._onDisconnect.call(null, 'force disconnect'));
}
};

Ndb.ProcessInfo = class {
constructor(payload) {
this._argv = payload.argv;
Expand Down
16 changes: 13 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -31,6 +31,7 @@
"puppeteer": "1.6.1",
"rimraf": "^2.6.2",
"update-notifier": "^2.5.0",
"ws": "^6.0.0",
"xterm": "^3.5.1"
},
"optionalDependencies": {
Expand Down
52 changes: 50 additions & 2 deletions services/ndd_service.js
Expand Up @@ -7,9 +7,11 @@
const {spawn} = require('child_process');
const chokidar = require('chokidar');
const fs = require('fs');
const http = require('http');
const os = require('os');
const path = require('path');
const util = require('util');
const WebSocket = require('ws');

const {ServiceBase} = require('./service_base.js');

Expand All @@ -25,6 +27,7 @@ class NddService extends ServiceBase {
this._nddStore = '';
this._nddStoreWatcher = null;
this._running = new Set();
this._sockets = new Map();
}

async init() {
Expand All @@ -46,11 +49,56 @@ class NddService extends ServiceBase {
this._running.add(id);
try {
const info = JSON.parse(await fsReadFile(path.join(this._nddStore, id), 'utf8'));
this._notify('added', {...info, id});
const {webSocketDebuggerUrl} = (await this._fetch(info.targetListUrl))[0];
const ws = new WebSocket(webSocketDebuggerUrl);
ws.on('error', _ => 0);
ws.once('open', _ => {
this._sockets.set(id, ws);
this._notify('added', {...info, id});
});
ws.on('message', message => this._notify('message', {message, id}));
ws.once('close', _ => {
this._sockets.delete(id);
this._notify('disconnected', {id});
});
} catch (e) {
}
}

async disconnect({id}) {
const ws = this._sockets.get(id);
if (ws)
ws.close();
}

async send({id, message}) {
const ws = this._sockets.get(id);
if (ws)
return new Promise(resolve => ws.send(message, resolve));
}

async _fetch(url) {
return new Promise(resolve => {
http.get(url, res => {
if (res.statusCode !== 200) {
res.resume();
resolve(null);
} else {
res.setEncoding('utf8');
let buffer = '';
res.on('data', data => buffer += data);
res.on('end', _ => {
try {
resolve(JSON.parse(buffer));
} catch (e) {
resolve(null);
}
});
}
}).on('error', _ => resolve(null));
});
}

async dispose() {
try {
for (const id of this._running)
Expand Down Expand Up @@ -103,7 +151,7 @@ class NddService extends ServiceBase {
if (!this._running.has(id))
return;
process.kill(id, 'SIGKILL');
fs.unlink(path.join(this._nddStore, id), err => 0);
fs.unlink(path.join(this._nddStore, id), _ => 0);
}
}

Expand Down
47 changes: 34 additions & 13 deletions services/services.js
Expand Up @@ -18,19 +18,39 @@ class Services {
}

static async create(frontend) {
const services = new Services((serviceId, message) => {
frontend.safeEvaluate(function(serviceId, message, isDebugFrontend) {
callFrontend(_ => Ndb.serviceManager.notify(serviceId, message));
}, serviceId, message, !!process.env.NDB_DEBUG_FRONTEND);
const cdpSession = await frontend.target().createCDPSession();
cdpSession.send('Runtime.enable', {});
let buffer = [];
let timer = 0;
const services = new Services((serviceId, callId, payload) => {
if (!timer) {
timer = setTimeout(_ => {
timer = 0;
const expression = `Ndb.serviceManager.notify(${JSON.stringify(buffer)})`;
buffer = [];
cdpSession.send('Runtime.evaluate', {
expression: process.env.NDB_DEBUG_FRONTEND ? `callFrontend(_ => ${expression}),null` : `${expression},null`
});
}, 50);
}
buffer.push({serviceId, callId, payload});
});
await Promise.all([
frontend.exposeFunction('createNdbService', services.createNdbService.bind(services)),
frontend.exposeFunction('callNdbService', services.callNdbService.bind(services))
cdpSession.send('Runtime.addBinding', {name: 'callNdbService'})
]);
cdpSession.on('Runtime.bindingCalled', event => services._onBindingCalled(event));
frontend.on('close', services.dispose.bind(services));
return services;
}

_onBindingCalled(event) {
if (event.name !== 'callNdbService')
return;
const {serviceId, callId, method, options} = JSON.parse(event.payload);
this.callNdbService(serviceId, callId, method, options);
}

createNdbService(name, serviceDir) {
const serviceName = path.join(serviceDir, `${name}.js`);
if (!fs.existsSync(serviceName))
Expand All @@ -47,20 +67,21 @@ class Services {
})).then(_ => ({serviceId}));
}

callNdbService(serviceId, method, options) {
callNdbService(serviceId, callId, method, options) {
const {service, callbacks, disposeCallbacks} = this._serviceIdToService.get(serviceId) || {};
if (!service)
return {error: `Service with id=${serviceId} not found`};
if (!service) {
this._notify(serviceId, callId, {error: `Service with id=${serviceId} not found`});
return;
}
const callbackId = ++this._lastCallbackId;
const promise = new Promise(resolve => callbacks.set(callbackId, resolve));
callbacks.set(callbackId, this._notify.bind(this, serviceId, callId));
if (method === 'dispose')
disposeCallbacks.add(callbackId);
service.send({
method,
options,
callbackId
});
return promise;
}

dispose() {
Expand All @@ -75,7 +96,7 @@ class Services {
_onServiceMessage(serviceId, {message, callbackId}) {
const {service, callbacks, readyCallback} = this._serviceIdToService.get(serviceId) || {};
if (!service) {
this._notify(serviceId, {error: `Service with id=${serviceId} not found`});
this._notify(serviceId, undefined, {error: `Service with id=${serviceId} not found`});
return;
}
if (callbackId) {
Expand All @@ -86,7 +107,7 @@ class Services {
if (message.name === 'ready')
readyCallback();
else
this._notify(serviceId, message);
this._notify(serviceId, undefined, message);
}
}

Expand All @@ -101,7 +122,7 @@ class Services {
else
callback({error: `Service with id=${serviceId} was disposed`});
}
this._notify(serviceId, { name: 'disposed' });
this._notify(serviceId, undefined, { name: 'disposed' });
}
}

Expand Down

0 comments on commit 1cbace7

Please sign in to comment.