Skip to content

Commit

Permalink
Enforce order of operations for synchronous requests
Browse files Browse the repository at this point in the history
  • Loading branch information
drobertduke committed Feb 28, 2018
1 parent 6bfb5de commit 4c502ed
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 20 deletions.
132 changes: 112 additions & 20 deletions packages/grpc-native-core/src/client_interceptors.js
Original file line number Diff line number Diff line change
Expand Up @@ -652,16 +652,22 @@ EndListener.prototype.onReceiveMessage = function(){};
EndListener.prototype.onReceiveStatus = function(){};
EndListener.prototype.recvMessageWithContext = function(){};

var OP_DEPENDENCIES = {
[grpc.opType.SEND_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA],
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: [grpc.opType.SEND_MESSAGE],
[grpc.opType.RECV_MESSAGE]: [grpc.opType.SEND_INITIAL_METADATA]
};

/**
* Produces a callback triggered by streaming response messages.
* @private
* @param {EventEmitter} emitter
* @param {grpc.internal~Call} call
* @param {grpc~Listener} listener
* @param {function} get_listener Returns a grpc~Listener.
* @param {grpc~deserialize} deserialize
* @return {Function}
*/
function _getStreamReadCallback(emitter, call, listener, deserialize) {
function _getStreamReadCallback(emitter, call, get_listener, deserialize) {
return function (err, response) {
if (err) {
// Something has gone wrong. Stop reading and wait for status
Expand All @@ -684,6 +690,7 @@ function _getStreamReadCallback(emitter, call, listener, deserialize) {
emitter._readsDone();
return;
}
var listener = get_listener();
var context = {
call: call,
listener: listener
Expand All @@ -692,6 +699,66 @@ function _getStreamReadCallback(emitter, call, listener, deserialize) {
};
}

/**
* Tests whether a batch can be started.
* @private
* @param {number[]} batch_ops The operations in the batch we are checking.
* @param {number[]} completed_ops Previously completed operations.
* @return {boolean}
*/
function _areBatchRequirementsMet(batch_ops, completed_ops) {
var dependencies = _.flatMap(batch_ops, function(op) {
return OP_DEPENDENCIES[op] || [];
});
var dependencies_met = _.intersection(dependencies,
batch_ops.concat(completed_ops));
return _.isEqual(dependencies_met.sort(), dependencies.sort());
}

/**
* Enforces the order of operations for synchronous requests. If a batch's
* operations cannot be started because required operations have not started
* yet, the batch is deferred until requirements are met.
* @private
* @param {grpc.Client~Call} call
* @param {object} batch
* @param {object} batch_state
* @param {number[]} [batch_state.completed_ops] The ops already sent.
* @param {object} [batch_state.deferred_batches] Batches to be sent after
* their dependencies are fulfilled.
* @param {function} callback
* @return {object}
*/
function _startBatchIfReady(call, batch, batch_state, callback) {
var completed_ops = batch_state.completed_ops;
var deferred_batches = batch_state.deferred_batches;
var batch_ops = _.map(_.keys(batch), Number);
if (_areBatchRequirementsMet(batch_ops, completed_ops)) {
// Dependencies are met, start the batch and any deferred batches whose
// dependencies are met as a result.
call.startBatch(batch, callback);
completed_ops = _.union(completed_ops, batch_ops);
deferred_batches = _.flatMap(deferred_batches, function(deferred_batch) {
var deferred_batch_ops = _.map(_.keys(deferred_batch), Number);
if (_areBatchRequirementsMet(deferred_batch_ops, completed_ops)) {
call.startBatch(deferred_batch.batch, deferred_batch.callback);
return [];
}
return [deferred_batch];
});
} else {
// Dependencies are not met, defer the batch
deferred_batches = deferred_batches.concat({
batch: batch,
callback: callback
});
}
return {
completed_ops: completed_ops,
deferred_batches: deferred_batches
};
}

/**
* Produces an interceptor which will start gRPC batches for unary calls.
* @private
Expand All @@ -708,19 +775,25 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
var call = common.getCall(channel, method_definition.path, options);
var first_listener;
var final_requester = {};
var batch_state = {
completed_ops: [],
deferred_batches: []
};
final_requester.start = function (metadata, listener) {
var batch = {
[grpc.opType.SEND_INITIAL_METADATA]:
metadata._getCoreRepresentation(),
};
first_listener = listener;
call.startBatch(batch, function () { });
batch_state = _startBatchIfReady(call, batch, batch_state,
function() {});
};
final_requester.sendMessage = function (message) {
var batch = {
[grpc.opType.SEND_MESSAGE]: serialize(message),
};
call.startBatch(batch, function () { });
batch_state = _startBatchIfReady(call, batch, batch_state,
function() {});
};
final_requester.halfClose = function () {
var batch = {
Expand All @@ -729,7 +802,7 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
[grpc.opType.RECV_MESSAGE]: true,
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
};
call.startBatch(batch, function (err, response) {
var callback = function (err, response) {
response.status.metadata = Metadata._fromCoreRepresentation(
response.status.metadata);
var status = response.status;
Expand Down Expand Up @@ -757,7 +830,8 @@ function _getUnaryInterceptor(method_definition, channel, emitter, callback) {
first_listener.onReceiveMetadata(response.metadata);
first_listener.onReceiveMessage(deserialized);
first_listener.onReceiveStatus(status);
});
};
batch_state = _startBatchIfReady(call, batch, batch_state, callback);
};
final_requester.cancel = function () {
call.cancel();
Expand Down Expand Up @@ -895,25 +969,34 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) {
method_definition.responseDeserialize);
var serialize = method_definition.requestSerialize;
return function (options) {
var first_listener;
var batch_state = {
completed_ops: [],
deferred_batches: []
};
var call = common.getCall(channel, method_definition.path, options);
var final_requester = {};
var first_listener;
var get_listener = function() {
return first_listener;
};
final_requester.start = function(metadata, listener) {
first_listener = listener;
metadata = metadata.clone();
var metadata_batch = {
[grpc.opType.SEND_INITIAL_METADATA]: metadata._getCoreRepresentation(),
[grpc.opType.RECV_INITIAL_METADATA]: true,
[grpc.opType.RECV_INITIAL_METADATA]: true
};
call.startBatch(metadata_batch, function(err, response) {
var callback = function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
first_listener.onReceiveMetadata(
Metadata._fromCoreRepresentation(response.metadata));
});
};
batch_state = _startBatchIfReady(call, metadata_batch, batch_state,
callback);
var status_batch = {
[grpc.opType.RECV_STATUS_ON_CLIENT]: true
};
Expand All @@ -935,26 +1018,28 @@ function _getServerStreamingInterceptor(method_definition, channel, emitter) {
var send_batch = {
[grpc.opType.SEND_MESSAGE]: message
};
call.startBatch(send_batch, function(err, response) {
var callback = function(err, response) {
if (err) {
// The call has stopped for some reason. A non-OK status will arrive
// in the other batch.
return;
}
});
};
batch_state = _startBatchIfReady(call, send_batch, batch_state, callback);
};
final_requester.halfClose = function() {
var batch = {
[grpc.opType.SEND_CLOSE_FROM_CLIENT]: true
};
call.startBatch(batch, function() {});
batch_state = _startBatchIfReady(call, batch, batch_state, function() {});
};
final_requester.recvMessageWithContext = function(context) {
var recv_batch = {
[grpc.opType.RECV_MESSAGE]: true
};
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
first_listener, deserialize));
var callback = _getStreamReadCallback(emitter, call,
get_listener, deserialize);
batch_state = _startBatchIfReady(call, recv_batch, batch_state, callback);
};
final_requester.cancel = function() {
call.cancel();
Expand All @@ -981,6 +1066,9 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) {
method_definition.responseDeserialize);
return function (options) {
var first_listener;
var get_listener = function() {
return first_listener;
};
var call = common.getCall(channel, method_definition.path, options);
var final_requester = {};
final_requester.start = function (metadata, listener) {
Expand Down Expand Up @@ -1057,7 +1145,7 @@ function _getBidiStreamingInterceptor(method_definition, channel, emitter) {
[grpc.opType.RECV_MESSAGE]: true
};
call.startBatch(recv_batch, _getStreamReadCallback(emitter, call,
first_listener, deserialize));
get_listener, deserialize));
};
final_requester.cancel = function() {
call.cancel();
Expand Down Expand Up @@ -1144,11 +1232,13 @@ function _getServerStreamingListener(method_definition, emitter) {
onReceiveMessage: function(message, next, context) {
if (emitter.push(message) && message !== null) {
var call = context.call;
var listener = context.listener;
var get_listener = function() {
return context.listener;
};
var read_batch = {};
read_batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(read_batch, _getStreamReadCallback(emitter, call,
listener, deserialize));
get_listener, deserialize));
} else {
emitter.reading = false;
}
Expand Down Expand Up @@ -1176,11 +1266,13 @@ function _getBidiStreamingListener(method_definition, emitter) {
onReceiveMessage: function(message, next, context) {
if (emitter.push(message) && message !== null) {
var call = context.call;
var listener = context.listener;
var get_listener = function() {
return context.listener;
};
var read_batch = {};
read_batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(read_batch, _getStreamReadCallback(emitter, call,
listener, deserialize));
get_listener, deserialize));
} else {
emitter.reading = false;
}
Expand Down
90 changes: 90 additions & 0 deletions packages/grpc-native-core/test/client_interceptors_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1702,4 +1702,94 @@ describe('Client interceptors', function() {
bidi_stream.end();
});
});

describe('order of operations enforced for async interceptors', function() {
it('with unary call', function(done) {
var expected_calls = [
'close_b',
'message_b',
'start_b',
'done'
];
var registry = new CallRegistry(done, expected_calls, true);
var message = {value: 'foo'};
var interceptor_a = function(options, nextCall) {
return new InterceptingCall(nextCall(options), {
start: function(metadata, listener, next) {
setTimeout(function() { next(metadata, listener); }, 50);
},
sendMessage: function(message, next) {
setTimeout(function () { next(message); }, 10);
}
});
};
var interceptor_b = function(options, nextCall) {
return new InterceptingCall(nextCall(options), {
start: function(metadata, listener, next) {
registry.addCall('start_b');
next(metadata, listener);
},
sendMessage: function(message, next) {
registry.addCall('message_b');
next(message);
},
halfClose: function(next) {
registry.addCall('close_b');
next();
}
});
};
var options = {
interceptors: [interceptor_a, interceptor_b]
};
client.echo(message, options, function(err, response) {
assert.strictEqual(err, null);
registry.addCall('done');
});
});
it('with serverStreaming call', function(done) {
var expected_calls = [
'close_b',
'message_b',
'start_b',
'done'
];
var registry = new CallRegistry(done, expected_calls, true);
var message = {value: 'foo'};
var interceptor_a = function(options, nextCall) {
return new InterceptingCall(nextCall(options), {
start: function(metadata, listener, next) {
setTimeout(function() { next(metadata, listener); }, 50);
},
sendMessage: function(message, next) {
setTimeout(function () { next(message); }, 10);
}
});
};
var interceptor_b = function(options, nextCall) {
return new InterceptingCall(nextCall(options), {
start: function(metadata, listener, next) {
registry.addCall('start_b');
next(metadata, listener);
},
sendMessage: function(message, next) {
registry.addCall('message_b');
next(message);
},
halfClose: function(next) {
registry.addCall('close_b');
next();
}
});
};
var options = {
interceptors: [interceptor_a, interceptor_b]
};
var stream = client.echoServerStream(message, options);
stream.on('data', function(response) {
assert.strictEqual(response.value, 'foo');
registry.addCall('done');
});
});
});
});

0 comments on commit 4c502ed

Please sign in to comment.