diff --git a/packages/grpc-native-core/src/client_interceptors.js b/packages/grpc-native-core/src/client_interceptors.js index cf7171fa4..ca9340dc1 100644 --- a/packages/grpc-native-core/src/client_interceptors.js +++ b/packages/grpc-native-core/src/client_interceptors.js @@ -707,59 +707,65 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) { return function (options) { var call = common.getCall(channel, method_definition.path, options); var first_listener; - return new InterceptingCall(null, { - start: function (metadata, listener) { - var batch = { - [grpc.opType.SEND_INITIAL_METADATA]: - metadata._getCoreRepresentation(), - }; - first_listener = listener; - call.startBatch(batch, function () {}); - }, - sendMessage: function(message) { - var batch = { - [grpc.opType.SEND_MESSAGE]: serialize(message), - }; - call.startBatch(batch, function () {}); - }, - halfClose: function() { - var batch = { - [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true, - [grpc.opType.RECV_INITIAL_METADATA]: true, - [grpc.opType.RECV_MESSAGE]: true, - [grpc.opType.RECV_STATUS_ON_CLIENT]: true - }; - call.startBatch(batch, function (err, response) { - response.status.metadata = Metadata._fromCoreRepresentation( - response.status.metadata); - var status = response.status; - var deserialized; - if (status.code === constants.status.OK) { - if (err) { - // Got a batch error, but OK status. Something went wrong - callback(err); - return; - } else { - try { - deserialized = deserialize(response.read); - } catch (e) { - /* Change status to indicate bad server response. This - * will result in passing an error to the callback */ - status = { - code: constants.status.INTERNAL, - details: 'Failed to parse server response' - }; - } + var final_requester = {}; + final_requester.start = function (metadata, listener) { + var batch = { + [grpc.opType.SEND_INITIAL_METADATA]: + metadata._getCoreRepresentation(), + }; + first_listener = listener; + call.startBatch(batch, function () { }); + }; + final_requester.sendMessage = function (message) { + var batch = { + [grpc.opType.SEND_MESSAGE]: serialize(message), + }; + call.startBatch(batch, function () { }); + }; + final_requester.halfClose = function () { + var batch = { + [grpc.opType.SEND_CLOSE_FROM_CLIENT]: true, + [grpc.opType.RECV_INITIAL_METADATA]: true, + [grpc.opType.RECV_MESSAGE]: true, + [grpc.opType.RECV_STATUS_ON_CLIENT]: true + }; + call.startBatch(batch, function (err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); + var status = response.status; + var deserialized; + if (status.code === constants.status.OK) { + if (err) { + // Got a batch error, but OK status. Something went wrong + callback(err); + return; + } else { + try { + deserialized = deserialize(response.read); + } catch (e) { + /* Change status to indicate bad server response. This + * will result in passing an error to the callback */ + status = { + code: constants.status.INTERNAL, + details: 'Failed to parse server response' + }; } } - response.metadata = - Metadata._fromCoreRepresentation(response.metadata); - first_listener.onReceiveMetadata(response.metadata); - first_listener.onReceiveMessage(deserialized); - first_listener.onReceiveStatus(status); - }); - } - }); + } + response.metadata = + Metadata._fromCoreRepresentation(response.metadata); + first_listener.onReceiveMetadata(response.metadata); + first_listener.onReceiveMessage(deserialized); + first_listener.onReceiveStatus(status); + }); + }; + final_requester.cancel = function () { + call.cancel(); + }; + final_requester.getPeer = function () { + return call.getPeer(); + }; + return new InterceptingCall(null, final_requester); }; } @@ -865,6 +871,12 @@ function _getClientStreamingInterceptor(method_definition, channel, emitter, }; call.startBatch(batch, function () { }); }; + final_requester.cancel = function () { + call.cancel(); + }; + final_requester.getPeer = function() { + return call.getPeer(); + }; return new InterceptingCall(null, final_requester); }; } @@ -944,6 +956,12 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) { call.startBatch(recv_batch, _getStreamReadCallback(emitter, call, first_listener, deserialize)); }; + final_requester.cancel = function() { + call.cancel(); + }; + final_requester.getPeer = function() { + return call.getPeer(); + }; return new InterceptingCall(null, final_requester); }; } @@ -1041,6 +1059,12 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) { call.startBatch(recv_batch, _getStreamReadCallback(emitter, call, first_listener, deserialize)); }; + final_requester.cancel = function() { + call.cancel(); + }; + final_requester.getPeer = function() { + return call.getPeer(); + }; return new InterceptingCall(null, final_requester); }; } diff --git a/packages/grpc-native-core/src/common.js b/packages/grpc-native-core/src/common.js index b2a771f42..0001acbd6 100644 --- a/packages/grpc-native-core/src/common.js +++ b/packages/grpc-native-core/src/common.js @@ -289,6 +289,12 @@ exports.getCall = function(channel, path, options) { * @param {function} next Calls the next interceptor. */ +/** + * @function GetPeerRequester + * @param {function} next Calls the next interceptor. + * @return {string} + */ + /** * @typedef {object} grpc~Requester * @param {MetadataRequester=} start A function triggered when the call begins. @@ -298,6 +304,8 @@ exports.getCall = function(channel, path, options) { * closes the call. * @param {CancelRequester=} cancel A function triggered when the call is * cancelled. + * @param {GetPeerRequester=} getPeer A function triggered when the endpoint is + * requested. */ /**