Permalink
Browse files

Proper end of stream signaling to rpcImpl, see #529

  • Loading branch information...
dcodeIO committed Dec 8, 2016
1 parent e4faf7f commit 9ea3766ff1b8fb7ccad028f44efe27d3b019eeb7

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.

Large diffs are not rendered by default.

Oops, something went wrong.

Large diffs are not rendered by default.

Oops, something went wrong.
BIN +24 Bytes (100%) dist/protobuf.min.js.gz
Binary file not shown.

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -35,7 +35,17 @@ var Greeter = root.lookup("Greeter"),
Hello = root.lookup("Hello"),
World = root.lookup("World");
var ended = false;
// Implement: Stream-aware RPC implementation
function rpcImpl(method, requestData, callback) {
if (ended)
return;
if (!requestData) {
console.log("rpc ended client-side.");
ended = true;
return;
}
setTimeout(function() {
try {
// <exemplary server side code>
@@ -63,12 +73,14 @@ greeter.on("error", function(err, method) {
console.log("error:", err);
});
greeter.sayHello({ name: 'node' });
greeter.sayHello({ name: 'protobuf' });
greeter.sayHello({ name: 'paralin' });
greeter.sayHello({ name: 'protocol' });
greeter.sayHello({ name: 'buffers' });
greeter.sayHello({ name: 'for' });
setTimeout(function() {
greeter.end();
greeter.sayHello({ name: 'dcode' }); // does nothing
// ^ Signals rpcImpl that the service has been ended client-side by calling it with a null buffer.
// Likewise, rpcImpl can end the stream by calling its callback with an explicit null buffer.
greeter.sayHello({ name: 'javascript' }); // does nothing
}, 1000);
// Likewise, the RPC impl can end the stream by calling its callback with an explicit null message
@@ -15,8 +15,8 @@ function Service(rpcImpl) {
EventEmitter.call(this);
/**
* RPC implementation.
* @type {RPCImpl}
* RPC implementation. Becomes `null` when the service is ended.
* @type {?RPCImpl}
*/
this.$rpc = rpcImpl;
}
@@ -27,8 +27,15 @@ ServicePrototype.constructor = Service;
/**
* Ends this service and emits the `end` event.
* @param {boolean} [endedByRPC=false] Whether the service has been ended by the RPC implementation.
* @returns {rpc.Service} `this`
*/
ServicePrototype.end = function end() {
return this.emit('end').off();
ServicePrototype.end = function end(endedByRPC) {
if (this.$rpc) {
if (!endedByRPC) // signal end to rpcImpl
this.$rpc(null, null, null);
this.$rpc = null;
this.emit('end').off();
}
return this;
};
@@ -162,6 +162,8 @@ ServicePrototype.create = function create(rpcImpl, requestDelimited, responseDel
this.getMethodsArray().forEach(function(method) {
var lcName = method.name.substring(0, 1).toLowerCase() + method.name.substring(1);
rpcService[lcName] = function(request, callback) {
if (!rpcService.$rpc) // already ended?
return;
method.resolve();
var requestData;
try {
@@ -178,7 +180,7 @@ ServicePrototype.create = function create(rpcImpl, requestDelimited, responseDel
return callback ? callback(err) : undefined;
}
if (responseData === null) {
rpcService.emit('end', method);
rpcService.end(/* endedByRPC */ true);
return undefined;
}
var response;
@@ -60,7 +60,7 @@ EventEmitterPrototype.off = function off(evt, fn) {
};
/**
* Emits an event.
* Emits an event by calling its listeners with the specified arguments.
* @param {string} evt Event name
* @param {...*} args Arguments
* @returns {util.EventEmitter} `this`

0 comments on commit 9ea3766

Please sign in to comment.