From f6c5d04383fc305059e69bd4d655f837bbc5a144 Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Fri, 2 Feb 2018 14:26:53 -0800 Subject: [PATCH 1/2] Add async-hooks integration --- .../src/async-hooks-integration.js | 67 +++++++++++++++++++ packages/grpc-native-core/src/server.js | 65 +++++++++++++----- 2 files changed, 114 insertions(+), 18 deletions(-) create mode 100644 packages/grpc-native-core/src/async-hooks-integration.js diff --git a/packages/grpc-native-core/src/async-hooks-integration.js b/packages/grpc-native-core/src/async-hooks-integration.js new file mode 100644 index 000000000..0cecdab07 --- /dev/null +++ b/packages/grpc-native-core/src/async-hooks-integration.js @@ -0,0 +1,67 @@ +/** + * @license + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +var semver = require('semver'); +var useAsyncHooks = semver.satisfies(process.version, '>=8'); + +/** + * Assuming that async_hooks is available, this file exposes a function + * createAsyncResourceWrapper which creates an async resource, returning a + * proxy object with the functions necessary to mark the entry/exit of a + * continuation associated with that resource, as well as to destroy the + * resource. + * + * In the absence of async_hooks, a no-op implementation is returned that + * should have minimal performance implications. + */ + +if (useAsyncHooks) { + var asyncHooks = require('async_hooks'); + module.exports = function createAsyncResourceWrapper(name) { + var resource = new asyncHooks.AsyncResource(name); + return { + wrap: function(fn) { + return function() { + if (resource) { + resource.emitBefore(); + try { + var result = fn.apply(this, arguments); + } finally { + resource.emitAfter(); + } + return result; + } else { + return fn.apply(this, arguments); + } + } + }, + destroy: function() { + if (resource) { + resource.emitDestroy(); + resource = null; + } + } + }; + } +} else { + var noImpl = { + wrap: function(fn) { return fn; }, + destroy: function() {} + }; + module.exports = function createAsyncResourceWrapper() { return noImpl; } +} diff --git a/packages/grpc-native-core/src/server.js b/packages/grpc-native-core/src/server.js index cee852e9a..b95785799 100644 --- a/packages/grpc-native-core/src/server.js +++ b/packages/grpc-native-core/src/server.js @@ -37,6 +37,15 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; +/** + * Provides an API for integrating the async_hooks module with gRPC. + * Akin to Node HTTP servers, each incoming request is assigned a unique + * AsyncResource object with which continuation-local storage can be associated. + * All of these request-scoped AsyncResource objects share a common trigger: + * a long-lived AsyncResource assigned to the server. + */ +var createAsyncResourceWrapper = require('./async-hooks-integration'); + /** * Handle an error on a call by sending it as a status * @private @@ -567,6 +576,7 @@ ServerDuplexStream.prototype.waitForCancel = waitForCancel; * @param {grpc.Metadata} metadata Metadata from the client */ function handleUnary(call, handler, metadata) { + var asyncResource = createAsyncResourceWrapper('GrpcServerRequest'); var emitter = new ServerUnaryCall(call, metadata); emitter.on('error', function(error) { handleError(call, error); @@ -574,7 +584,7 @@ function handleUnary(call, handler, metadata) { emitter.waitForCancel(); var batch = {}; batch[grpc.opType.RECV_MESSAGE] = true; - call.startBatch(batch, function(err, result) { + call.startBatch(batch, asyncResource.wrap(function(err, result) { if (err) { handleError(call, err); return; @@ -598,8 +608,9 @@ function handleUnary(call, handler, metadata) { } else { sendUnaryResponse(call, value, handler.serialize, trailer, flags); } + asyncResource.destroy(); }); - }); + })); } /** @@ -622,14 +633,16 @@ function handleUnary(call, handler, metadata) { * @param {grpc.Metadata} metadata Metadata from the client */ function handleServerStreaming(call, handler, metadata) { + var asyncResource = createAsyncResourceWrapper('GrpcServerRequest'); var stream = new ServerWritableStream(call, metadata, handler.serialize); + stream.on('error', asyncResource.destroy); + stream.on('finish', asyncResource.destroy); stream.waitForCancel(); var batch = {}; batch[grpc.opType.RECV_MESSAGE] = true; - call.startBatch(batch, function(err, result) { + call.startBatch(batch, asyncResource.wrap(function(err, result) { if (err) { stream.emit('error', err); - return; } try { stream.request = handler.deserialize(result.read); @@ -639,7 +652,7 @@ function handleServerStreaming(call, handler, metadata) { return; } handler.func(stream); - }); + })); } /** @@ -664,22 +677,26 @@ function handleServerStreaming(call, handler, metadata) { * @param {grpc.Metadata} metadata Metadata from the client */ function handleClientStreaming(call, handler, metadata) { + var asyncResource = createAsyncResourceWrapper('GrpcServerRequest'); var stream = new ServerReadableStream(call, metadata, handler.deserialize); stream.on('error', function(error) { handleError(call, error); }); stream.waitForCancel(); - handler.func(stream, function(err, value, trailer, flags) { - stream.terminate(); - if (err) { - if (trailer) { - err.metadata = trailer; + asyncResource.wrap(function() { + handler.func(stream, function(err, value, trailer, flags) { + stream.terminate(); + if (err) { + if (trailer) { + err.metadata = trailer; + } + handleError(call, err); + } else { + sendUnaryResponse(call, value, handler.serialize, trailer, flags); } - handleError(call, err); - } else { - sendUnaryResponse(call, value, handler.serialize, trailer, flags); - } - }); + asyncResource.destroy(); + }); + })(); } /** @@ -702,10 +719,15 @@ function handleClientStreaming(call, handler, metadata) { * @param {Metadata} metadata Metadata from the client */ function handleBidiStreaming(call, handler, metadata) { + var asyncResource = createAsyncResourceWrapper('GrpcServerRequest'); var stream = new ServerDuplexStream(call, metadata, handler.serialize, handler.deserialize); + stream.on('error', asyncResource.destroy); + stream.on('finish', asyncResource.destroy); stream.waitForCancel(); - handler.func(stream); + asyncResource.wrap(function() { + handler.func(stream); + })(); } var streamHandlers = { @@ -744,6 +766,7 @@ Server.prototype.start = function() { } var self = this; this.started = true; + this.asyncResourceWrap = createAsyncResourceWrapper('GrpcServer'); this._server.start(); /** * Handles the SERVER_RPC_NEW event. If there is a handler associated with @@ -752,7 +775,7 @@ Server.prototype.start = function() { * @param {grpc.internal~Event} event The event to handle with tag * SERVER_RPC_NEW */ - function handleNewCall(err, event) { + var handleNewCall = function (err, event) { if (err) { return; } @@ -782,6 +805,7 @@ Server.prototype.start = function() { } streamHandlers[handler.type](call, handler, metadata); } + handleNewCall = this.asyncResourceWrap.wrap(handleNewCall); this._server.requestCall(handleNewCall); }; @@ -828,7 +852,11 @@ Server.prototype.register = function(name, handler, serialize, deserialize, * @param {function()} callback The shutdown complete callback */ Server.prototype.tryShutdown = function(callback) { - this._server.tryShutdown(callback); + var self = this; + this._server.tryShutdown(function() { + self.asyncResourceWrap.destroy(); + callback(); + }); }; /** @@ -839,6 +867,7 @@ Server.prototype.tryShutdown = function(callback) { */ Server.prototype.forceShutdown = function() { this._server.forceShutdown(); + this.asyncResourceWrap.destroy(); }; var unimplementedStatusResponse = { From 373de3e6e0b25ffa1b5ce4abbc5ff702ec1297e2 Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Mon, 5 Feb 2018 15:31:18 -0500 Subject: [PATCH 2/2] async_hooks integration -- address comments --- .../src/async-hooks-integration.js | 35 ++++++++++++------- packages/grpc-native-core/src/server.js | 15 +++++--- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/packages/grpc-native-core/src/async-hooks-integration.js b/packages/grpc-native-core/src/async-hooks-integration.js index 0cecdab07..851b88b2f 100644 --- a/packages/grpc-native-core/src/async-hooks-integration.js +++ b/packages/grpc-native-core/src/async-hooks-integration.js @@ -31,16 +31,23 @@ var useAsyncHooks = semver.satisfies(process.version, '>=8'); */ if (useAsyncHooks) { - var asyncHooks = require('async_hooks'); - module.exports = function createAsyncResourceWrapper(name) { - var resource = new asyncHooks.AsyncResource(name); + const asyncHooks = require('async_hooks'); + class GrpcAsyncResource extends asyncHooks.AsyncResource { + constructor(name, handle) { + super(name); + this.handle = handle; + } + } + module.exports = function createAsyncResourceWrapper(name, handle) { + let resource = new GrpcAsyncResource(name, handle); return { - wrap: function(fn) { + wrap: (fn) => { return function() { if (resource) { resource.emitBefore(); + let result; try { - var result = fn.apply(this, arguments); + result = fn.apply(this, arguments); } finally { resource.emitAfter(); } @@ -50,18 +57,20 @@ if (useAsyncHooks) { } } }, - destroy: function() { - if (resource) { - resource.emitDestroy(); - resource = null; - } + destroy: () => { + setImmediate(() => { + if (resource) { + resource.emitDestroy(); + resource = null; + } + }); } }; } } else { - var noImpl = { - wrap: function(fn) { return fn; }, - destroy: function() {} + const noImpl = { + wrap: fn => fn, + destroy: () => {} }; module.exports = function createAsyncResourceWrapper() { return noImpl; } } diff --git a/packages/grpc-native-core/src/server.js b/packages/grpc-native-core/src/server.js index b95785799..ff7d03a52 100644 --- a/packages/grpc-native-core/src/server.js +++ b/packages/grpc-native-core/src/server.js @@ -576,10 +576,11 @@ ServerDuplexStream.prototype.waitForCancel = waitForCancel; * @param {grpc.Metadata} metadata Metadata from the client */ function handleUnary(call, handler, metadata) { - var asyncResource = createAsyncResourceWrapper('GrpcServerRequest'); var emitter = new ServerUnaryCall(call, metadata); + var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', emitter); emitter.on('error', function(error) { handleError(call, error); + asyncResource.destroy(); }); emitter.waitForCancel(); var batch = {}; @@ -587,6 +588,7 @@ function handleUnary(call, handler, metadata) { call.startBatch(batch, asyncResource.wrap(function(err, result) { if (err) { handleError(call, err); + asyncResource.destroy(); return; } try { @@ -594,9 +596,11 @@ function handleUnary(call, handler, metadata) { } catch (e) { e.code = constants.status.INTERNAL; handleError(call, e); + asyncResource.destroy(); return; } if (emitter.cancelled) { + asyncResource.destroy(); return; } handler.func(emitter, function sendUnaryData(err, value, trailer, flags) { @@ -633,8 +637,8 @@ function handleUnary(call, handler, metadata) { * @param {grpc.Metadata} metadata Metadata from the client */ function handleServerStreaming(call, handler, metadata) { - var asyncResource = createAsyncResourceWrapper('GrpcServerRequest'); var stream = new ServerWritableStream(call, metadata, handler.serialize); + var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', stream); stream.on('error', asyncResource.destroy); stream.on('finish', asyncResource.destroy); stream.waitForCancel(); @@ -677,10 +681,11 @@ function handleServerStreaming(call, handler, metadata) { * @param {grpc.Metadata} metadata Metadata from the client */ function handleClientStreaming(call, handler, metadata) { - var asyncResource = createAsyncResourceWrapper('GrpcServerRequest'); var stream = new ServerReadableStream(call, metadata, handler.deserialize); + var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', stream); stream.on('error', function(error) { handleError(call, error); + asyncResource.destroy(); }); stream.waitForCancel(); asyncResource.wrap(function() { @@ -719,9 +724,9 @@ function handleClientStreaming(call, handler, metadata) { * @param {Metadata} metadata Metadata from the client */ function handleBidiStreaming(call, handler, metadata) { - var asyncResource = createAsyncResourceWrapper('GrpcServerRequest'); var stream = new ServerDuplexStream(call, metadata, handler.serialize, handler.deserialize); + var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', stream); stream.on('error', asyncResource.destroy); stream.on('finish', asyncResource.destroy); stream.waitForCancel(); @@ -766,7 +771,7 @@ Server.prototype.start = function() { } var self = this; this.started = true; - this.asyncResourceWrap = createAsyncResourceWrapper('GrpcServer'); + this.asyncResourceWrap = createAsyncResourceWrapper('grpc.Server', this); this._server.start(); /** * Handles the SERVER_RPC_NEW event. If there is a handler associated with