Skip to content

Commit

Permalink
Fix issues with callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
stanley-cheung committed Jun 10, 2020
1 parent aadbdb7 commit 6900db2
Show file tree
Hide file tree
Showing 6 changed files with 570 additions and 94 deletions.
15 changes: 13 additions & 2 deletions javascript/net/grpc/web/clientreadablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,28 @@ const ClientReadableStream = function() {};


/**
* Register a callback to handle I/O events.
* Register a callback to handle different stream events.
*
* @param {string} eventType The event type
* @param {function(?)} callback The call back to handle the event with
* @param {function(?)} callback The callback to handle the event with
* an optional input object
* @return {!ClientReadableStream} this object
*/
ClientReadableStream.prototype.on = goog.abstractMethod;



/**
* Remove a particular callback.
*
* @param {string} eventType The event type
* @param {function(?)} callback The callback to remove
* @return {!ClientReadableStream} this object
*/
ClientReadableStream.prototype.removeListener = goog.abstractMethod;



/**
* Close the stream.
*/
Expand Down
24 changes: 17 additions & 7 deletions javascript/net/grpc/web/grpcwebclientbase.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,23 @@ GrpcWebClientBase.prototype.startStream_ = function(request, hostname) {
* @param {boolean} useUnaryResponse
*/
GrpcWebClientBase.setCallback_ = function(stream, callback, useUnaryResponse) {
var responseReceived = null;
var errorEmitted = false;

stream.on('data', function(response) {
callback(null, response);
responseReceived = response;
});

stream.on('error', function(error) {
if (error.code != StatusCode.OK) {
if (error.code != StatusCode.OK && !errorEmitted) {
errorEmitted = true;
callback(error, null);
}
});

stream.on('status', function(status) {
if (status.code != StatusCode.OK) {
if (status.code != StatusCode.OK && !errorEmitted) {
errorEmitted = true;
callback(
{
code: status.code,
Expand All @@ -241,11 +246,16 @@ GrpcWebClientBase.setCallback_ = function(stream, callback, useUnaryResponse) {
stream.on('metadata', function(metadata) {
callback(null, null, null, metadata);
});

stream.on('end', function() {
callback(null, null);
});
}

stream.on('end', function() {
if (!errorEmitted) {
callback(null, responseReceived);
}
if (useUnaryResponse) {
callback(null, null); // trigger unaryResponse
}
});
};

/**
Expand Down
174 changes: 130 additions & 44 deletions javascript/net/grpc/web/grpcwebclientreadablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,34 +81,39 @@ const GrpcWebClientReadableStream = function(genericTransportInterface) {
this.responseDeserializeFn_ = null;

/**
* @const
* @private
* @type {function(!RESPONSE)|null} The data callback
* @type {!Array<function(!RESPONSE)>} The list of data callbacks
*/
this.onDataCallback_ = null;
this.onDataCallbacks_ = [];

/**
* @const
* @private
* @type {function(!Status)|null} The status callback
* @type {!Array<function(!Status)>} The list of status callbacks
*/
this.onStatusCallback_ = null;
this.onStatusCallbacks_ = [];

/**
* @const
* @private
* @type {function(!Metadata)|null} The metadata callback
* @type {!Array<function(!Metadata)>} The list of metadata callbacks
*/
this.onMetadataCallback_ = null;
this.onMetadataCallbacks_ = [];

/**
* @const
* @private
* @type {function(...):?|null} The error callback
* @type {!Array<function(...):?>} The list of error callbacks
*/
this.onErrorCallback_ = null;
this.onErrorCallbacks_ = [];

/**
* @const
* @private
* @type {function(...):?|null} The stream end callback
* @type {!Array<function(...):?>} The list of stream end callbacks
*/
this.onEndCallback_ = null;
this.onEndCallbacks_ = [];

/**
* @private
Expand Down Expand Up @@ -158,7 +163,7 @@ const GrpcWebClientReadableStream = function(genericTransportInterface) {
if (data) {
var response = self.responseDeserializeFn_(data);
if (response) {
self.onDataCallback_(response);
self.sendDataCallbacks_(response);
}
}
}
Expand All @@ -181,13 +186,11 @@ const GrpcWebClientReadableStream = function(genericTransportInterface) {
grpcStatusMessage = trailers[GRPC_STATUS_MESSAGE];
delete trailers[GRPC_STATUS_MESSAGE];
}
if (self.onStatusCallback_) {
self.onStatusCallback_(/** @type {!Status} */({
code: Number(grpcStatusCode),
details: grpcStatusMessage,
metadata: trailers,
}));
}
self.sendStatusCallbacks_(/** @type {!Status} */({
code: Number(grpcStatusCode),
details: grpcStatusMessage,
metadata: trailers,
}));
}
}
}
Expand All @@ -201,14 +204,12 @@ const GrpcWebClientReadableStream = function(genericTransportInterface) {
var initialMetadata = /** @type {!Metadata} */ ({});

var responseHeaders = self.xhr_.getResponseHeaders();
if (self.onMetadataCallback_) {
Object.keys(responseHeaders).forEach((header_) => {
if (!(EXCLUDED_RESPONSE_HEADERS.includes(header_))) {
initialMetadata[header_] = responseHeaders[header_];
}
});
self.onMetadataCallback_(initialMetadata);
}
Object.keys(responseHeaders).forEach((header_) => {
if (!(EXCLUDED_RESPONSE_HEADERS.includes(header_))) {
initialMetadata[header_] = responseHeaders[header_];
}
});
self.sendMetadataCallbacks_(initialMetadata);

// There's an XHR level error
if (lastErrorCode != ErrorCode.NO_ERROR) {
Expand All @@ -228,12 +229,10 @@ const GrpcWebClientReadableStream = function(genericTransportInterface) {
if (grpcStatusCode == StatusCode.ABORTED && self.aborted_) {
return;
}
if (self.onErrorCallback_) {
self.onErrorCallback_({
code: grpcStatusCode,
message: ErrorCode.getDebugMessage(lastErrorCode)
});
}
self.sendErrorCallbacks_({
code: grpcStatusCode,
message: ErrorCode.getDebugMessage(lastErrorCode)
});
return;
}

Expand All @@ -245,16 +244,16 @@ const GrpcWebClientReadableStream = function(genericTransportInterface) {
if (GRPC_STATUS_MESSAGE in responseHeaders) {
grpcStatusMessage = self.xhr_.getResponseHeader(GRPC_STATUS_MESSAGE);
}
if (Number(grpcStatusCode) != StatusCode.OK && self.onErrorCallback_) {
self.onErrorCallback_({
if (Number(grpcStatusCode) != StatusCode.OK) {
self.sendErrorCallbacks_({
code: Number(grpcStatusCode),
message: grpcStatusMessage,
metadata: responseHeaders
});
errorEmitted = true;
}
if (!errorEmitted && self.onStatusCallback_) {
self.onStatusCallback_(/** @type {!Status} */ ({
if (!errorEmitted) {
self.sendStatusCallbacks_(/** @type {!Status} */ ({
code: Number(grpcStatusCode),
details: grpcStatusMessage,
metadata: responseHeaders
Expand All @@ -263,9 +262,7 @@ const GrpcWebClientReadableStream = function(genericTransportInterface) {
}

if (!errorEmitted) {
if (self.onEndCallback_) {
self.onEndCallback_();
}
self.sendEndCallbacks_();
}
});
};
Expand All @@ -279,15 +276,50 @@ GrpcWebClientReadableStream.prototype.on = function(
eventType, callback) {
// TODO(stanleycheung): change eventType to @enum type
if (eventType == 'data') {
this.onDataCallback_ = callback;
this.onDataCallbacks_.push(callback);
} else if (eventType == 'status') {
this.onStatusCallbacks_.push(callback);
} else if (eventType == 'metadata') {
this.onMetadataCallbacks_.push(callback);
} else if (eventType == 'end') {
this.onEndCallbacks_.push(callback);
} else if (eventType == 'error') {
this.onErrorCallbacks_.push(callback);
}
return this;
};


/**
* @private
* @param {!Array<function(?)>} callbacks the internal list of callbacks
* @param {function(?)} callback the callback to remove
*/
GrpcWebClientReadableStream.prototype.removeListenerFromCallbacks_ = function(
callbacks, callback) {
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
}
};


/**
* @export
* @override
*/
GrpcWebClientReadableStream.prototype.removeListener = function(
eventType, callback) {
if (eventType == 'data') {
this.removeListenerFromCallbacks_(this.onDataCallbacks_, callback);
} else if (eventType == 'status') {
this.onStatusCallback_ = callback;
this.removeListenerFromCallbacks_(this.onStatusCallbacks_, callback);
} else if (eventType == 'metadata') {
this.onMetadataCallback_ = callback;
this.removeListenerFromCallbacks_(this.onMetadataCallbacks_, callback);
} else if (eventType == 'end') {
this.onEndCallback_ = callback;
this.removeListenerFromCallbacks_(this.onEndCallbacks_, callback);
} else if (eventType == 'error') {
this.onErrorCallback_ = callback;
this.removeListenerFromCallbacks_(this.onErrorCallbacks_, callback);
}
return this;
};
Expand Down Expand Up @@ -335,5 +367,59 @@ GrpcWebClientReadableStream.prototype.parseHttp1Headers_ =
};


/**
* @private
* @param {!RESPONSE} data The data to send back
*/
GrpcWebClientReadableStream.prototype.sendDataCallbacks_ = function(data) {
for (var i = 0; i < this.onDataCallbacks_.length; i++) {
this.onDataCallbacks_[i](data);
}
};


/**
* @private
* @param {!Status} status The status to send back
*/
GrpcWebClientReadableStream.prototype.sendStatusCallbacks_ = function(status) {
for (var i = 0; i < this.onStatusCallbacks_.length; i++) {
this.onStatusCallbacks_[i](status);
}
};


/**
* @private
* @param {!Metadata} metadata The metadata to send back
*/
GrpcWebClientReadableStream.prototype.sendMetadataCallbacks_ =
function(metadata) {
for (var i = 0; i < this.onMetadataCallbacks_.length; i++) {
this.onMetadataCallbacks_[i](metadata);
}
};


/**
* @private
* @param {?} error The error to send back
*/
GrpcWebClientReadableStream.prototype.sendErrorCallbacks_ = function(error) {
for (var i = 0; i < this.onErrorCallbacks_.length; i++) {
this.onErrorCallbacks_[i](error);
}
};


/**
* @private
*/
GrpcWebClientReadableStream.prototype.sendEndCallbacks_ = function() {
for (var i = 0; i < this.onEndCallbacks_.length; i++) {
this.onEndCallbacks_[i]();
}
};


exports = GrpcWebClientReadableStream;
Loading

0 comments on commit 6900db2

Please sign in to comment.