Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reset buffered chunked value before retry #1397

Merged
merged 11 commits into from
Jun 25, 2021
92 changes: 72 additions & 20 deletions src/partial-result-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {grpc} from 'google-gax';

import {codec, JSONOptions, Json, Field, Value} from './codec';
import {google} from '../protos/protos';
import * as stream from 'stream';

export type ResumeToken = string | Uint8Array;

Expand Down Expand Up @@ -141,6 +142,7 @@ export class PartialResultStream extends Transform implements ResultEvents {
private _fields!: google.spanner.v1.StructType.Field[];
private _options: RowOptions;
private _pendingValue?: p.IValue;
private _pendingValueForResume?: p.IValue;
private _values: p.IValue[];
private _numPushFailed = 0;
constructor(options = {}) {
Expand Down Expand Up @@ -240,6 +242,15 @@ export class PartialResultStream extends Transform implements ResultEvents {
}
}

_clearPendingValues() {
this._values = [];
if (this._pendingValueForResume) {
this._pendingValue = this._pendingValueForResume;
} else {
delete this._pendingValue;
}
}

/**
* Manages any chunked values.
*
Expand Down Expand Up @@ -268,6 +279,11 @@ export class PartialResultStream extends Transform implements ResultEvents {
// chunk to be processed.
if (chunk.chunkedValue) {
this._pendingValue = values.pop();
if (_hasResumeToken(chunk)) {
this._pendingValueForResume = this._pendingValue;
}
} else if (_hasResumeToken(chunk)) {
delete this._pendingValueForResume;
}

let res = true;
Expand Down Expand Up @@ -419,33 +435,51 @@ export function partialResultStream(
options?: RowOptions
): PartialResultStream {
const retryableCodes = [grpc.status.UNAVAILABLE];
const maxQueued = 10;
let lastResumeToken: ResumeToken;
let lastRetriedErr: grpc.ServiceError | undefined;
let lastRequestStream: Readable;

// mergeStream allows multiple streams to be connected into one. This is good;
// if we need to retry a request and pipe more data to the user's stream.
// We also add an additional stream that can be used to flush any remaining
// items in the checkpoint stream that have been received, and that did not
// contain a resume token.
const requestsStream = mergeStream();
const flushStream = new stream.PassThrough({objectMode: true});
requestsStream.add(flushStream);
const partialRSStream = new PartialResultStream(options);
const userStream = streamEvents(partialRSStream);
// We keep track of the number of PartialResultSets that did not include a
// resume token, as that is an indication whether it is safe to retry the
// stream halfway.
let withoutCheckpointCount = 0;
const batchAndSplitOnTokenStream = checkpointStream.obj({
maxQueued: 10,
isCheckpointFn: (row: google.spanner.v1.PartialResultSet): boolean => {
return is.defined(row.resumeToken);
maxQueued,
isCheckpointFn: (chunk: google.spanner.v1.PartialResultSet): boolean => {
const withCheckpoint = _hasResumeToken(chunk);
if (withCheckpoint) {
withoutCheckpointCount = 0;
} else {
withoutCheckpointCount++;
}
return withCheckpoint;
},
});

// This listener ensures that the last request that executed successfully
// after one or more retries will end the requestsStream.
const endListener = () => {
if (lastRetriedErr) {
setImmediate(() => requestsStream.end());
}
setImmediate(() => {
// Push a fake PartialResultSet without any values but with a resume token
// into the stream to ensure that the checkpoint stream is emptied, and
// then push `null` to end the stream.
flushStream.push({resumeToken: '_'});
flushStream.push(null);
requestsStream.end();
});
};
const makeRequest = (): void => {
if (lastRequestStream) {
lastRequestStream.removeListener('end', endListener);
}
partialRSStream._clearPendingValues();
lastRequestStream = requestFn(lastResumeToken);
lastRequestStream.on('end', endListener);
requestsStream.add(lastRequestStream);
Expand All @@ -456,30 +490,44 @@ export function partialResultStream(
!(
err.code &&
(retryableCodes!.includes(err.code) || isRetryableInternalError(err))
)
) ||
// If we have received too many chunks without a resume token, it is not
// safe to retry.
withoutCheckpointCount > maxQueued
) {
// This is not a retryable error, so this will flush any rows the
// This is not a retryable error so this will flush any rows the
// checkpoint stream has queued. After that, we will destroy the
// user's stream with the same error.
setImmediate(() => batchAndSplitOnTokenStream.destroy(err));
return;
}

// We're going to retry from where we left off.
// Keep track of the fact that we retried an error in order to end the
// merged result stream.
lastRetriedErr = err;
// Empty queued rows on the checkpoint stream (will not emit them to user).
batchAndSplitOnTokenStream.reset();
makeRequest();
if (lastRequestStream) {
lastRequestStream.removeListener('end', endListener);
lastRequestStream.destroy();
}
// Delay the retry until all the values that are already in the stream
// pipeline have been handled. This ensures that the checkpoint stream is
// reset to the correct point. Calling .reset() directly here could cause
// any values that are currently in the pipeline and that have not been
// handled yet, to be pushed twice into the entire stream.
setImmediate(() => {
// Empty queued rows on the checkpoint stream (will not emit them to user).
batchAndSplitOnTokenStream.reset();
makeRequest();
});
};

userStream.once('reading', makeRequest);
eventsIntercept.patch(requestsStream);

// need types for events-intercept
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(requestsStream as any).intercept('error', retry);
(requestsStream as any).intercept('error', err =>
// Retry __after__ all pending data has been processed to ensure that the
// checkpoint stream is reset at the correct position.
setImmediate(() => retry(err))
);

return (
requestsStream
Expand All @@ -497,6 +545,10 @@ export function partialResultStream(
);
}

function _hasResumeToken(chunk: google.spanner.v1.PartialResultSet): boolean {
return is.defined(chunk.resumeToken) && chunk.resumeToken.length > 0;
}

function isRetryableInternalError(err: grpc.ServiceError): boolean {
return (
err.code === grpc.status.INTERNAL &&
Expand Down