Skip to content

Commit

Permalink
Distributed tracing support in gRPC (#436)
Browse files Browse the repository at this point in the history
PR-URL: #436
  • Loading branch information
kjin committed Mar 10, 2017
1 parent 1ab25b2 commit 71cd5c3
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 86 deletions.
117 changes: 81 additions & 36 deletions src/plugins/plugin-grpc.js
Expand Up @@ -21,6 +21,17 @@ var findIndex = require('lodash.findindex');

var SKIP_FRAMES = 3;

// Required for adding distributed tracing metadata to outgoing gRPC requests.
// This value is assigned in patchMetadata, and used in patchClient.
// patchMetadata is guaranteed to be called before patchClient because Client
// depends on Metadata.
var Metadata;

function patchMetadata(metadata, api) {
// metadata is the value of module.exports of src/node/src/metadata.js
Metadata = metadata;
}

function patchClient(client, api) {
/**
* Wraps a callback so that the current span for this trace is also ended when
Expand Down Expand Up @@ -59,37 +70,57 @@ function patchClient(client, api) {
// doesn't exist.
return method.apply(this, arguments);
}
var args = Array.prototype.slice.call(arguments);
// Check if the response is through a stream or a callback.
if (!method.responseStream) {
// We need to wrap the callback with the context, to propagate it.
// The callback is always required. It should be the only function in
// the arguments, since we cannot send a function as an argument through
// gRPC.
var cbIndex = findIndex(arguments, function(arg) {
var cbIndex = findIndex(args, function(arg) {
return typeof arg === 'function';
});
if (cbIndex !== -1) {
arguments[cbIndex] = wrapCallback(span, arguments[cbIndex]);
args[cbIndex] = wrapCallback(span, args[cbIndex]);
}
}
var call = method.apply(this, arguments);
// This finds an instance of Metadata among the arguments.
// A possible issue that could occur is if the 'options' parameter from
// the user contains an '_internal_repr' as well as a 'getMap' function,
// but this is an extremely rare case.
var metaIndex = findIndex(args, function(arg) {
return arg && typeof arg === 'object' && arg._internal_repr &&
typeof arg.getMap === 'function';
});
if (metaIndex === -1) {
var metadata = new Metadata();
if (!method.requestStream) {
// unary or server stream
if (args.length === 0) {
// No argument (for the gRPC call) was provided, so we will have to
// provide one, since metadata cannot be the first argument.
// The internal representation of argument defaults to undefined
// in its non-presence.
// Note that we can't pass null instead of undefined because the
// serializer within gRPC doesn't accept it.
args.push(undefined);
}
metaIndex = 1;
} else {
// client stream or bidi
metaIndex = 0;
}
args.splice(metaIndex, 0, metadata);
}
args[metaIndex].set(api.constants.TRACE_CONTEXT_HEADER_NAME,
span.getTraceContext());
var call = method.apply(this, args);
// Add extra data only when call successfully goes through. At this point
// we know that the arguments are correct.
if (api.enhancedDatabaseReportingEnabled()) {
// This finds an instance of Metadata among the arguments.
// A possible issue that could occur is if the 'options' parameter from
// the user contains an '_internal_repr' as well as a 'getMap' function,
// but this is an extremely rare case.
var metaIndex = findIndex(arguments, function(arg) {
return arg && typeof arg === 'object' && arg._internal_repr &&
typeof arg.getMap === 'function';
});
if (metaIndex !== -1) {
var metadata = arguments[metaIndex];
span.addLabel('metadata', JSON.stringify(metadata.getMap()));
}
span.addLabel('metadata', JSON.stringify(args[metaIndex].getMap()));
if (!method.requestStream) {
span.addLabel('argument', JSON.stringify(arguments[0]));
span.addLabel('argument', JSON.stringify(args[0]));
}
}
// The user might need the current context in listeners to this stream.
Expand Down Expand Up @@ -142,6 +173,8 @@ function unpatchClient(client) {
}

function patchServer(server, api) {
var traceContextHeaderName = api.constants.TRACE_CONTEXT_HEADER_NAME;

/**
* A helper function to record metadata in a trace span. The return value of
* this function can be used as the 'wrapper' argument to wrap sendMetadata.
Expand Down Expand Up @@ -171,13 +204,14 @@ function patchServer(server, api) {
// We wrap it so that a span is started immediately beforehand, and ended
// when the callback provided to it as an argument is invoked.
shimmer.wrap(handlerSet, 'func', function (serverMethod) {
var rootSpanOptions = {
name: requestName,
url: requestName,
skipFrames: SKIP_FRAMES
};
return function serverMethodTrace(call, callback) {
var that = this;
var rootSpanOptions = {
name: requestName,
url: requestName,
traceContext: call.metadata.getMap()[traceContextHeaderName],
skipFrames: SKIP_FRAMES
};
return api.runInRootSpan(rootSpanOptions, function(rootSpan) {
if (!rootSpan) {
return serverMethod.call(that, call, callback);
Expand Down Expand Up @@ -221,13 +255,14 @@ function patchServer(server, api) {
// We wrap it so that a span is started immediately beforehand, and ended
// when there is no data to be sent from the server.
shimmer.wrap(handlerSet, 'func', function (serverMethod) {
var rootSpanOptions = {
name: requestName,
url: requestName,
skipFrames: SKIP_FRAMES
};
return function serverMethodTrace(stream) {
var that = this;
var rootSpanOptions = {
name: requestName,
url: requestName,
traceContext: stream.metadata.getMap()[traceContextHeaderName],
skipFrames: SKIP_FRAMES
};
return api.runInRootSpan(rootSpanOptions, function(rootSpan) {
if (!rootSpan) {
return serverMethod.call(that, stream);
Expand Down Expand Up @@ -280,13 +315,14 @@ function patchServer(server, api) {
// We wrap it so that a span is started immediately beforehand, and ended
// when the callback provided to it as an argument is invoked.
shimmer.wrap(handlerSet, 'func', function (serverMethod) {
var rootSpanOptions = {
name: requestName,
url: requestName,
skipFrames: SKIP_FRAMES
};
return function serverMethodTrace(stream, callback) {
var that = this;
var rootSpanOptions = {
name: requestName,
url: requestName,
traceContext: stream.metadata.getMap()[traceContextHeaderName],
skipFrames: SKIP_FRAMES
};
return api.runInRootSpan(rootSpanOptions, function(rootSpan) {
if (!rootSpan) {
return serverMethod.call(that, stream, callback);
Expand Down Expand Up @@ -336,13 +372,14 @@ function patchServer(server, api) {
// We wrap it so that a span is started immediately beforehand, and ended
// when there is no data to be sent from the server.
shimmer.wrap(handlerSet, 'func', function (serverMethod) {
var rootSpanOptions = {
name: requestName,
url: requestName,
skipFrames: SKIP_FRAMES
};
return function serverMethodTrace(stream) {
var that = this;
var rootSpanOptions = {
name: requestName,
url: requestName,
traceContext: stream.metadata.getMap()[traceContextHeaderName],
skipFrames: SKIP_FRAMES
};
return api.runInRootSpan(rootSpanOptions, function(rootSpan) {
if (!rootSpan) {
return serverMethod.call(that, stream);
Expand Down Expand Up @@ -441,6 +478,14 @@ module.exports = [
patch: patchClient,
unpatch: unpatchClient
},
{
file: 'src/node/src/metadata.js',
versions: SUPPORTED_VERSIONS,
patch: patchMetadata,
// patchMetadata doesn't modify the module exports of metadata.js.
// So it's safe to have provide a no-op unpatch function.
unpatch: function unpatchMetadata() {}
},
{
file: 'src/node/src/server.js',
versions: SUPPORTED_VERSIONS,
Expand Down
23 changes: 5 additions & 18 deletions src/trace-agent.js
Expand Up @@ -24,6 +24,7 @@ var uuid = require('uuid');
var constants = require('./constants.js');
var tracingPolicy = require('./tracing-policy.js');
var isEqual = require('lodash.isequal');
var util = require('./util.js');

/** @type {TraceAgent} */
var traceAgent;
Expand Down Expand Up @@ -164,11 +165,9 @@ TraceAgent.prototype.isTraceAgentRequest = function(options) {
};

/**
* Parse a cookie-style header string to extract traceId, spandId and options
* ex: '123456/667;o=3'
* -> {traceId: '123456', spanId: '667', options: '3'}
* note that we ignore trailing garbage if there is more than one '='
* Returns null if traceId or spanId could not be found.
* Parse a cookie-style header string to extract traceId, spandId and options,
* or returns null if the agent has been configured to ignore it.
* @see util.parseContextFromHeader
*
* @param {string} str string representation of the trace headers
* @return {?{traceId: string, spanId: string, options: number}}
Expand All @@ -178,19 +177,7 @@ TraceAgent.prototype.parseContextFromHeader = function(str) {
if (this.config_.ignoreContextHeader) {
return null;
}
if (!str) {
return null;
}
var matches = str.match(/^([0-9a-fA-F]+)(?:\/([0-9a-fA-F]+))?(?:;o=(.*))?/);
if (!matches || matches.length !== 4 || matches[0] !== str ||
(matches[2] && isNaN(matches[2]))) {
return null;
}
return {
traceId: matches[1],
spanId: matches[2],
options: Number(matches[3])
};
return util.parseContextFromHeader(str);
};

/**
Expand Down
28 changes: 28 additions & 0 deletions src/util.js
Expand Up @@ -50,6 +50,33 @@ var moduleRegex = new RegExp(
']*).*'
);

/**
* Parse a cookie-style header string to extract traceId, spandId and options
* ex: '123456/667;o=3'
* -> {traceId: '123456', spanId: '667', options: '3'}
* note that we ignore trailing garbage if there is more than one '='
* Returns null if traceId or spanId could not be found.
*
* @param {string} str string representation of the trace headers
* @return {?{traceId: string, spanId: string, options: number}}
* object with keys. null if there is a problem.
*/
function parseContextFromHeader(str) {
if (!str) {
return null;
}
var matches = str.match(/^([0-9a-fA-F]+)(?:\/([0-9a-fA-F]+))?(?:;o=(.*))?/);
if (!matches || matches.length !== 4 || matches[0] !== str ||
(matches[2] && isNaN(matches[2]))) {
return null;
}
return {
traceId: matches[1],
spanId: matches[2],
options: Number(matches[3])
};
}

/**
* Retrieves a package name from the full import path.
* For example:
Expand Down Expand Up @@ -100,6 +127,7 @@ function findModuleVersion(modulePath, load) {

module.exports = {
truncate: truncate,
parseContextFromHeader: parseContextFromHeader,
packageNameFromPath: packageNameFromPath,
findModulePath: findModulePath,
findModuleVersion: findModuleVersion
Expand Down

0 comments on commit 71cd5c3

Please sign in to comment.