Skip to content

Commit

Permalink
Pass cancel and getPeer to underlying call
Browse files Browse the repository at this point in the history
  • Loading branch information
drobertduke committed Feb 23, 2018
1 parent 2064175 commit f644c06
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 51 deletions.
126 changes: 75 additions & 51 deletions packages/grpc-native-core/src/client_interceptors.js
Original file line number Diff line number Diff line change
Expand Up @@ -707,59 +707,65 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
return function (options) {
var call = common.getCall(channel, method_definition.path, options);
var first_listener;
return new InterceptingCall(null, {
start: function (metadata, listener) {
var batch = {
[grpc.opType.SEND_INITIAL_METADATA]:
metadata._getCoreRepresentation(),
};
first_listener = listener;
call.startBatch(batch, function () {});
},
sendMessage: function(message) {
var batch = {
[grpc.opType.SEND_MESSAGE]: serialize(message),
};
call.startBatch(batch, function () {});
},
halfClose: function() {
var batch = {
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: true,
[grpc.opType.RECV_INITIAL_METADATA]: true,
[grpc.opType.RECV_MESSAGE]: true,
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
};
call.startBatch(batch, function (err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var deserialized;
if (status.code === constants.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This
* will result in passing an error to the callback */
status = {
code: constants.status.INTERNAL,
details: 'Failed to parse server response'
};
}
var final_requester = {};
final_requester.start = function (metadata, listener) {
var batch = {
[grpc.opType.SEND_INITIAL_METADATA]:
metadata._getCoreRepresentation(),
};
first_listener = listener;
call.startBatch(batch, function () { });
};
final_requester.sendMessage = function (message) {
var batch = {
[grpc.opType.SEND_MESSAGE]: serialize(message),
};
call.startBatch(batch, function () { });
};
final_requester.halfClose = function () {
var batch = {
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: true,
[grpc.opType.RECV_INITIAL_METADATA]: true,
[grpc.opType.RECV_MESSAGE]: true,
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
};
call.startBatch(batch, function (err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
var deserialized;
if (status.code === constants.status.OK) {
if (err) {
// Got a batch error, but OK status. Something went wrong
callback(err);
return;
} else {
try {
deserialized = deserialize(response.read);
} catch (e) {
/* Change status to indicate bad server response. This
* will result in passing an error to the callback */
status = {
code: constants.status.INTERNAL,
details: 'Failed to parse server response'
};
}
}
response.metadata =
Metadata._fromCoreRepresentation(response.metadata);
first_listener.onReceiveMetadata(response.metadata);
first_listener.onReceiveMessage(deserialized);
first_listener.onReceiveStatus(status);
});
}
});
}
response.metadata =
Metadata._fromCoreRepresentation(response.metadata);
first_listener.onReceiveMetadata(response.metadata);
first_listener.onReceiveMessage(deserialized);
first_listener.onReceiveStatus(status);
});
};
final_requester.cancel = function () {
call.cancel();
};
final_requester.getPeer = function () {
return call.getPeer();
};
return new InterceptingCall(null, final_requester);
};
}

Expand Down Expand Up @@ -865,6 +871,12 @@ function _getClientStreamingInterceptor(method_definition, channel, emitter,
};
call.startBatch(batch, function () { });
};
final_requester.cancel = function () {
call.cancel();
};
final_requester.getPeer = function() {
return call.getPeer();
};
return new InterceptingCall(null, final_requester);
};
}
Expand Down Expand Up @@ -944,6 +956,12 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) {
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
first_listener, deserialize));
};
final_requester.cancel = function() {
call.cancel();
};
final_requester.getPeer = function() {
return call.getPeer();
};
return new InterceptingCall(null, final_requester);
};
}
Expand Down Expand Up @@ -1041,6 +1059,12 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) {
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
first_listener, deserialize));
};
final_requester.cancel = function() {
call.cancel();
};
final_requester.getPeer = function() {
return call.getPeer();
};
return new InterceptingCall(null, final_requester);
};
}
Expand Down
8 changes: 8 additions & 0 deletions packages/grpc-native-core/src/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ exports.getCall = function(channel, path, options) {
* @param {function} next Calls the next interceptor.
*/

/**
* @function GetPeerRequester
* @param {function} next Calls the next interceptor.
* @return {string}
*/

/**
* @typedef {object} grpc~Requester
* @param {MetadataRequester=} start A function triggered when the call begins.
Expand All @@ -298,6 +304,8 @@ exports.getCall = function(channel, path, options) {
* closes the call.
* @param {CancelRequester=} cancel A function triggered when the call is
* cancelled.
* @param {GetPeerRequester=} getPeer A function triggered when the endpoint is
* requested.
*/

/**
Expand Down

0 comments on commit f644c06

Please sign in to comment.