Skip to content

Commit

Permalink
fix: only reset pending value with resume token (#2000)
Browse files Browse the repository at this point in the history
Fixes #1959
  • Loading branch information
olavloite committed Feb 23, 2024
1 parent 937a7a1 commit f337089
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -12,3 +12,4 @@ system-test/*key.json
.DS_Store
package-lock.json
__pycache__
.idea
7 changes: 4 additions & 3 deletions src/partial-result-stream.ts
Expand Up @@ -244,8 +244,7 @@ export class PartialResultStream extends Transform implements ResultEvents {
}
}

_clearPendingValues() {
this._values = [];
_resetPendingValues() {
if (this._pendingValueForResume) {
this._pendingValue = this._pendingValueForResume;
} else {
Expand Down Expand Up @@ -484,7 +483,9 @@ export function partialResultStream(
});
};
const makeRequest = (): void => {
partialRSStream._clearPendingValues();
if (is.defined(lastResumeToken) && lastResumeToken.length > 0) {
partialRSStream._resetPendingValues();
}
lastRequestStream = requestFn(lastResumeToken);
lastRequestStream.on('end', endListener);
requestsStream.add(lastRequestStream);
Expand Down
207 changes: 200 additions & 7 deletions test/spanner.ts
Expand Up @@ -120,7 +120,6 @@ describe('Spanner with mock server', () => {
}
);
});
server.start();
spannerMock.putStatementResult(
selectSql,
mock.StatementResult.resultSet(mock.createSimpleResultSet())
Expand Down Expand Up @@ -3695,10 +3694,10 @@ describe('Spanner with mock server', () => {
});

it('should return all values from PartialResultSet with chunked string value', async () => {
for (const includeResumeToken in [true, false]) {
for (const includeResumeToken of [true, false]) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let errorOnIndexes: any;
for (errorOnIndexes in [[], [0], [1], [0, 1]]) {
for (errorOnIndexes of [[], [0], [1], [0, 1]]) {
const sql = 'SELECT * FROM TestTable';
const prs1 = PartialResultSet.create({
resumeToken: includeResumeToken
Expand Down Expand Up @@ -3747,10 +3746,10 @@ describe('Spanner with mock server', () => {
});

it('should return all values from PartialResultSet with chunked string value in an array', async () => {
for (const includeResumeToken in [true, false]) {
for (const includeResumeToken of [true, false]) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let errorOnIndexes: any;
for (errorOnIndexes in [[], [0], [1], [0, 1]]) {
for (errorOnIndexes of [[], [0], [1], [0, 1]]) {
const sql = 'SELECT * FROM TestTable';
const prs1 = PartialResultSet.create({
resumeToken: includeResumeToken
Expand Down Expand Up @@ -3800,10 +3799,10 @@ describe('Spanner with mock server', () => {
});

it('should return all values from PartialResultSet with chunked list value', async () => {
for (const includeResumeToken in [true, false]) {
for (const includeResumeToken of [true, false]) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
let errorOnIndexes: any;
for (errorOnIndexes in [[], [0], [1], [0, 1]]) {
for (errorOnIndexes of [[], [0], [1], [0, 1]]) {
const sql = 'SELECT * FROM TestTable';
const prs1 = PartialResultSet.create({
resumeToken: includeResumeToken
Expand Down Expand Up @@ -4047,6 +4046,200 @@ describe('Spanner with mock server', () => {
}
});

it('should clear pending values if the last partial result did not have a resume token and was not a complete row', async () => {
const sql = 'SELECT * FROM TestTable';
const prs1 = PartialResultSet.create({
resumeToken: undefined,
metadata: createMultiColumnMetadata(),
values: [
{stringValue: 'id1.1'},
{stringValue: 'id1.2'},
{stringValue: '100'},
],
chunkedValue: false,
});
const prs2 = PartialResultSet.create({
resumeToken: undefined,
values: [
{boolValue: true},
{boolValue: true},
{numberValue: 0.5},
{stringValue: 'id2.1'},
{stringValue: 'id2.2'},
],
chunkedValue: false,
});
const prs3 = PartialResultSet.create({
resumeToken: undefined,
values: [
{stringValue: '200'},
{boolValue: true},
{boolValue: true},
{numberValue: 0.5},
],
});
// Let the stream return UNAVAILABLE on index 1 (so the second PartialResultSet).
setupResultsAndErrors(sql, [prs1, prs2, prs3], [1]);
const database = newTestDatabase();
try {
const [rows] = (await database.run({
sql,
json: true,
})) as Json[][];
verifyQueryResult(rows);
} finally {
await database.close();
}
});

it('should not clear pending values if the last partial result had a resume token and was not a complete row', async () => {
for (const errorIndexes of [[1], [2]]) {
const sql = 'SELECT * FROM TestTable';
const prs1 = PartialResultSet.create({
resumeToken: Buffer.from('00000000'),
metadata: createMultiColumnMetadata(),
values: [
{stringValue: 'id1.1'},
{stringValue: 'id1.2'},
{stringValue: '100'},
],
chunkedValue: false,
});
const prs2 = PartialResultSet.create({
resumeToken: undefined,
values: [
{boolValue: true},
{boolValue: true},
{numberValue: 0.5},
{stringValue: 'id2.1'},
{stringValue: 'id2.2'},
],
chunkedValue: false,
});
const prs3 = PartialResultSet.create({
resumeToken: undefined,
values: [
{stringValue: '200'},
{boolValue: true},
{boolValue: true},
{numberValue: 0.5},
],
});
setupResultsAndErrors(sql, [prs1, prs2, prs3], errorIndexes);
const database = newTestDatabase();
try {
const [rows] = (await database.run({
sql,
json: true,
})) as Json[][];
verifyQueryResult(rows);
} finally {
await database.close();
}
}
});

it('should not clear pending values if the last partial result was chunked and had a resume token', async () => {
for (const errorIndexes of [[2]]) {
const sql = 'SELECT * FROM TestTable';
const prs1 = PartialResultSet.create({
resumeToken: Buffer.from('00000000'),
metadata: createMultiColumnMetadata(),
values: [
{stringValue: 'id1.1'},
{stringValue: 'id1.2'},
{stringValue: '100'},
],
chunkedValue: true,
});
const prs2 = PartialResultSet.create({
resumeToken: undefined,
values: [
// The previous value was chunked, but it is still perfectly possible that it actually contained
// the entire value. So in this case the actual value was '100'.
{stringValue: ''},
{boolValue: true},
{boolValue: true},
{numberValue: 0.5},
{stringValue: 'id2.1'},
{stringValue: 'id2.2'},
],
chunkedValue: false,
});
const prs3 = PartialResultSet.create({
resumeToken: undefined,
values: [
{stringValue: '200'},
{boolValue: true},
{boolValue: true},
{numberValue: 0.5},
],
});
setupResultsAndErrors(sql, [prs1, prs2, prs3], errorIndexes);
const database = newTestDatabase();
try {
const [rows] = (await database.run({
sql,
json: true,
})) as Json[][];
verifyQueryResult(rows);
} finally {
await database.close();
}
}
});

function verifyQueryResult(rows: Json[]) {
assert.strictEqual(rows.length, 2);
assert.strictEqual(rows[0].col1, 'id1.1');
assert.strictEqual(rows[0].col2, 'id1.2');
assert.strictEqual(rows[0].col3, 100);
assert.strictEqual(rows[0].col4, true);
assert.strictEqual(rows[0].col5, true);
assert.strictEqual(rows[0].col6, 0.5);

assert.strictEqual(rows[1].col1, 'id2.1');
assert.strictEqual(rows[1].col2, 'id2.2');
assert.strictEqual(rows[1].col3, 200);
assert.strictEqual(rows[1].col4, true);
assert.strictEqual(rows[1].col5, true);
assert.strictEqual(rows[1].col6, 0.5);
}

function createMultiColumnMetadata() {
const fields = [
protobuf.StructType.Field.create({
name: 'col1',
type: protobuf.Type.create({code: protobuf.TypeCode.STRING}),
}),
protobuf.StructType.Field.create({
name: 'col2',
type: protobuf.Type.create({code: protobuf.TypeCode.STRING}),
}),
protobuf.StructType.Field.create({
name: 'col3',
type: protobuf.Type.create({code: protobuf.TypeCode.INT64}),
}),
protobuf.StructType.Field.create({
name: 'col4',
type: protobuf.Type.create({code: protobuf.TypeCode.BOOL}),
}),
protobuf.StructType.Field.create({
name: 'col5',
type: protobuf.Type.create({code: protobuf.TypeCode.BOOL}),
}),
protobuf.StructType.Field.create({
name: 'col6',
type: protobuf.Type.create({code: protobuf.TypeCode.FLOAT64}),
}),
];
return new protobuf.ResultSetMetadata({
rowType: new protobuf.StructType({
fields,
}),
});
}

function createMetadata() {
const fields = [
protobuf.StructType.Field.create({
Expand Down

0 comments on commit f337089

Please sign in to comment.