-
Notifications
You must be signed in to change notification settings - Fork 623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream closed before all message are received #2375
Labels
Comments
I have added some log in setupReadable: this.stream.on('data', async (data) => {
trace('ULRICH: Data received');
const messages = decoder.write(data);
pendingMessageProcessing = true;
trace('ULRICH: Pause ');
this.stream.pause();
for (const message of messages) {
trace('ULRICH: Process message ');
if (this.maxReceiveMessageSize !== -1 &&
message.length > this.maxReceiveMessageSize) {
this.sendError({
code: constants_1.Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
});
return;
}
this.emit('receiveMessage');
const compressed = message.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage)
return;
this.pushOrBufferMessage(readable, decompressedMessage);
}
pendingMessageProcessing = false;
trace('ULRICH: Resume ');
this.stream.resume();
maybePushEnd();
});
this.stream.once('end', () => {
trace('ULRICH: End of stream called');
readsDone = true;
maybePushEnd();
}); The result is :
Pause is called but data method is called ... |
I have made a correction on local on my computer, for all work correctly, there should not have promise that not have been awaited: setupReadable(readable, encoding) {
const decoder = new stream_decoder_1.StreamDecoder();
let readsDone = false;
let pendingMessageProcessing = false;
let pushedEnd = false;
const maybePushEnd = () => {
if (!pushedEnd && readsDone && !pendingMessageProcessing) {
pushedEnd = true;
this.pushOrBufferMessage(readable, null);
}
};
this.stream.on('data', async (data) => {
trace('ULRICH: Data received');
const messages = decoder.write(data);
pendingMessageProcessing = true;
trace('ULRICH: Pause ');
this.stream.pause();
for (const message of messages) {
trace('ULRICH: Process message ');
if (this.maxReceiveMessageSize !== -1 &&
message.length > this.maxReceiveMessageSize) {
this.sendError({
code: constants_1.Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`,
});
return;
}
trace('ULRICH: BeforeEmit ');
this.emit('receiveMessage');
trace('ULRICH: AfterEmit ');
const compressed = message.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
trace('ULRICH: BeforeDecompress ');
const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding);
trace('ULRICH: AfterDecompress ');
// Encountered an error with decompression; it'll already have been propogated back
// Just return early
if (!decompressedMessage)
return;
trace('ULRICH: PushOrBufferMessage ');
await this.pushOrBufferMessage(readable, decompressedMessage);
}
pendingMessageProcessing = false;
trace('ULRICH: Resume ');
this.stream.resume();
maybePushEnd();
});
this.stream.once('end', () => {
trace('ULRICH: End of stream called');
readsDone = true;
maybePushEnd();
});
this.stream.on('pipe', () => {
trace('ULRICH: Pipe called');
})
}
consumeUnpushedMessages(readable) {
this.canPush = true;
while (this.messagesToPush.length > 0) {
const nextMessage = this.messagesToPush.shift();
const canPush = readable.push(nextMessage);
if (nextMessage === null || canPush === false) {
this.canPush = false;
break;
}
}
return this.canPush;
}
async pushOrBufferMessage(readable, messageBytes) {
if (this.isPushPending) {
this.bufferedMessages.push(messageBytes);
}
else {
await this.pushMessage(readable, messageBytes);
}
}
async pushMessage(readable, messageBytes) {
if (messageBytes === null) {
trace('Received end of stream');
if (this.canPush) {
readable.push(null);
}
else {
this.messagesToPush.push(null);
}
return;
}
trace('Received message of length ' + messageBytes.length);
this.isPushPending = true;
try {
const deserialized = await this.deserializeMessage(messageBytes);
if (this.canPush) {
if (!readable.push(deserialized)) {
this.canPush = false;
this.stream.pause();
}
}
else {
this.messagesToPush.push(deserialized);
}
}
catch (error) {
// Ignore any remaining messages when errors occur.
this.bufferedMessages.length = 0;
let code = (0, error_1.getErrorCode)(error);
if (code === null || code < constants_1.Status.OK || code > constants_1.Status.UNAUTHENTICATED) {
code = constants_1.Status.INTERNAL;
}
readable.emit('error', {
details: (0, error_1.getErrorMessage)(error),
code: code
});
}
this.isPushPending = false;
if (this.bufferedMessages.length > 0) {
await this.pushMessage(readable, this.bufferedMessages.shift());
}
} I will propose a PR |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Problem description
The client send multiple message through a stream, then close the stream and wait a response. The server receive all the element, but the end of stream can be receive before all the message and then last message are ignored.
Reproduction steps
I have a client that write to the stream :
The server use requestStream: ServerReadableStream<RefreshCacheRequest, RefreshCacheReply> :
I have some missing event in read. If i had a setTimeout 5s arroud writer.end() i have all the message.
In the log i have :
And the last two message are lost. The Request to method send the response because the stream is ended.
If i read with a setTimeout, all message are processed and the event are in the good order:
Log:
Environment
Additional context
The text was updated successfully, but these errors were encountered: