Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move retries createreadstream basic use of api #1415

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a9b8d1b
getRanges
danieljbruce Jun 30, 2022
0f1b9eb
Slight refactor of createReadStream
danieljbruce Jun 30, 2022
c75b4ed
Add header to table utils
danieljbruce Jun 30, 2022
fcbd461
Refactor range and keys getting
danieljbruce Jul 4, 2022
c2f4dfe
Pull request opts into a separate function
danieljbruce Jul 4, 2022
4817863
Revert "Pull request opts into a separate function"
danieljbruce Jul 4, 2022
810db82
logical separation of ranges and keys
danieljbruce Jul 4, 2022
648cc92
Merge branch 'actually-refactor-createreadstream-2' of https://github…
danieljbruce Jul 4, 2022
b25b2c2
Revert "Revert "Pull request opts into a separate function""
danieljbruce Jul 4, 2022
1c66b3b
Merge branch 'main' of https://github.com/googleapis/nodejs-bigtable …
danieljbruce Apr 19, 2024
6324ad9
Add the less than or equal to fn to utils
danieljbruce Apr 19, 2024
8f51ddf
remove server start
danieljbruce Apr 19, 2024
5edaf82
Add splice ranges back to diagnose problem
danieljbruce Apr 19, 2024
575ae2b
Revert "Add splice ranges back to diagnose problem"
danieljbruce Apr 22, 2024
0f80c35
get all tests passing
danieljbruce Apr 22, 2024
b46f6b1
Use tableUtils lessthanorequalto
danieljbruce Apr 22, 2024
577ba7a
refactor: Move retries finish making createreadstream smaller
danieljbruce Apr 22, 2024
03fb022
More specific type for options
danieljbruce Apr 22, 2024
752bf0b
Import type and replace with any
danieljbruce Apr 22, 2024
4989fa4
Turn on gax streaming retries
danieljbruce Apr 22, 2024
921005a
Just a basic use of the API
danieljbruce Apr 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ export class Bigtable {
{},
baseOptions,
{
gaxServerStreamingRetries: true,
servicePath: customEndpointBaseUrl || defaultBaseUrl,
'grpc.callInvocationTransformer': grpcGcp.gcpCallInvocationTransformer,
'grpc.channelFactoryOverride': grpcGcp.gcpChannelFactoryOverride,
Expand Down
129 changes: 55 additions & 74 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import {ServiceError} from 'google-gax';
import {GoogleError, ServiceError} from 'google-gax';
import {BackoffSettings} from 'google-gax/build/src/gax';
import {PassThrough, Transform} from 'stream';

Expand Down Expand Up @@ -43,12 +43,13 @@
import {google} from '../protos/protos';
import {Duplex} from 'stream';
import {TableUtils} from './utils/table';
import * as protos from '../protos/protos';

// See protos/google/rpc/code.proto
// (4=DEADLINE_EXCEEDED, 8=RESOURCE_EXHAUSTED, 10=ABORTED, 14=UNAVAILABLE)
const RETRYABLE_STATUS_CODES = new Set([4, 8, 10, 14]);
// (1=CANCELLED)
const IGNORED_STATUS_CODES = new Set([1]);

Check warning on line 52 in src/table.ts

View workflow job for this annotation

GitHub Actions / lint

'IGNORED_STATUS_CODES' is assigned a value but never used

const DEFAULT_BACKOFF_SETTINGS: BackoffSettings = {
initialRetryDelayMillis: 10,
Expand Down Expand Up @@ -719,11 +720,10 @@
* region_tag:bigtable_api_table_readstream
*/
createReadStream(opts?: GetRowsOptions) {
const options = opts || {};
const options: GetRowsOptions = opts || {};
const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 10;
let activeRequestStream: AbortableDuplex | null;
let rowKeys: string[];
let filter: {} | null;
const rowsLimit = options.limit || 0;
const hasLimit = rowsLimit !== 0;
let rowsRead = 0;
Expand All @@ -741,10 +741,6 @@
ranges.push({});
}

if (options.filter) {
filter = Filter.parse(options.filter);
}

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

Expand Down Expand Up @@ -800,60 +796,26 @@
// eslint-disable-next-line @typescript-eslint/no-explicit-any
chunkTransformer = new ChunkTransformer({decode: options.decode} as any);

const reqOpts = {
tableName: this.name,
appProfileId: this.bigtable.appProfileId,
} as google.bigtable.v2.IReadRowsRequest;

const shouldRetryFn = function checkRetry(error: GoogleError) {
numConsecutiveErrors++;
numRequestsMade++;
return (
numConsecutiveErrors <= maxRetries &&
error.code &&
(RETRYABLE_STATUS_CODES.has(error.code) || isRstStreamError(error))
);
};
const retryOpts = {
currentRetryAttempt: 0, // was numConsecutiveErrors
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {
return false;
},
shouldRetryFn,
};

if (lastRowKey) {
// Readjust and/or remove ranges based on previous valid row reads.
// Iterate backward since items may need to be removed.
for (let index = ranges.length - 1; index >= 0; index--) {
const range = ranges[index];
const startValue = is.object(range.start)
? (range.start as BoundData).value
: range.start;
const endValue = is.object(range.end)
? (range.end as BoundData).value
: range.end;
const startKeyIsRead =
!startValue ||
TableUtils.lessThanOrEqualTo(
startValue as string,
lastRowKey as string
);
const endKeyIsNotRead =
!endValue ||
(endValue as Buffer).length === 0 ||
TableUtils.lessThan(lastRowKey as string, endValue as string);
if (startKeyIsRead) {
if (endKeyIsNotRead) {
// EndKey is not read, reset the range to start from lastRowKey open
range.start = {
value: lastRowKey,
inclusive: false,
};
} else {
// EndKey is read, remove this range
ranges.splice(index, 1);
}
}
}

// Remove rowKeys already read.
rowKeys = rowKeys.filter(rowKey =>
TableUtils.greaterThan(rowKey, lastRowKey as string)
);
TableUtils.spliceRanges(ranges, lastRowKey);
rowKeys = TableUtils.getRowKeys(rowKeys, lastRowKey);

// If there was a row limit in the original request and
// we've already read all the rows, end the stream and
Expand All @@ -870,25 +832,8 @@
}
}

// Create the new reqOpts
reqOpts.rows = {};

// TODO: preprocess all the keys and ranges to Bytes
reqOpts.rows.rowKeys = rowKeys.map(
Mutation.convertToBytes
) as {} as Uint8Array[];

reqOpts.rows.rowRanges = ranges.map(range =>
Filter.createRange(
range.start as BoundData,
range.end as BoundData,
'Key'
)
);

if (filter) {
reqOpts.filter = filter;
}
const reqOpts: protos.google.bigtable.v2.IReadRowsRequest =
this.#readRowsReqOpts(ranges, rowKeys, options);

if (hasLimit) {
reqOpts.rowsLimit = rowsLimit - rowsRead;
Expand Down Expand Up @@ -929,7 +874,7 @@
rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]);

// Retry on "received rst stream" errors
const isRstStreamError = (error: ServiceError): boolean => {
const isRstStreamError = (error: GoogleError): boolean => {
if (error.code === 13 && error.message) {
const error_message = (error.message || '').toLowerCase();
return (
Expand All @@ -942,6 +887,7 @@
};

rowStream
/*
.on('error', (error: ServiceError) => {
rowStreamUnpipe(rowStream, userStream);
activeRequestStream = null;
Expand Down Expand Up @@ -969,7 +915,8 @@
userStream.emit('error', error);
}
})
*/
.on('data', _ => {

Check warning on line 919 in src/table.ts

View workflow job for this annotation

GitHub Actions / lint

'_' is defined but never used
// Reset error count after a successful read so the backoff
// time won't keep increasing when as stream had multiple errors
numConsecutiveErrors = 0;
Expand Down Expand Up @@ -1592,7 +1539,7 @@
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {

Check warning on line 1542 in src/table.ts

View workflow job for this annotation

GitHub Actions / lint

'_' is defined but never used

Check warning on line 1542 in src/table.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
return false;
},
};
Expand Down Expand Up @@ -1640,6 +1587,40 @@
makeNextBatchRequest();
}

#readRowsReqOpts(
ranges: PrefixRange[],
rowKeys: string[],
options: GetRowsOptions
) {
const reqOpts = {
tableName: this.name,
appProfileId: this.bigtable.appProfileId,
} as google.bigtable.v2.IReadRowsRequest;

// Create the new reqOpts
reqOpts.rows = {};

// TODO: preprocess all the keys and ranges to Bytes
reqOpts.rows.rowKeys = rowKeys.map(
Mutation.convertToBytes
) as {} as Uint8Array[];

reqOpts.rows.rowRanges = ranges.map(range =>
Filter.createRange(
range.start as BoundData,
range.end as BoundData,
'Key'
)
);

const filter = options.filter;
if (filter) {
reqOpts.filter = Filter.parse(filter);
}

return reqOpts;
}

/**
* Get a reference to a table row.
*
Expand Down Expand Up @@ -2049,7 +2030,7 @@
* that a callback is omitted.
*/
promisifyAll(Table, {
exclude: ['family', 'row'],
exclude: ['family', 'row', '#readRowsReqOpts'],
});

function getNextDelay(numConsecutiveErrors: number, config: BackoffSettings) {
Expand Down
1 change: 0 additions & 1 deletion src/util/mock-servers/mock-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export class MockServer {
`localhost:${this.port}`,
grpc.ServerCredentials.createInsecure(),
() => {
server.start();
callback ? callback(portString) : undefined;
}
);
Expand Down
46 changes: 46 additions & 0 deletions src/utils/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import {GetRowsOptions, PrefixRange} from '../table';
import {Mutation} from '../mutation';
import * as is from 'is';
import {BoundData} from '../filter';

export class TableUtils {
static getRanges(options: GetRowsOptions) {
Expand Down Expand Up @@ -63,9 +65,53 @@ export class TableUtils {
return this.lessThan(rhs, lhs);
}

static spliceRanges(
ranges: PrefixRange[],
lastRowKey: string | number | true | Uint8Array
): void {
// Readjust and/or remove ranges based on previous valid row reads.
// Iterate backward since items may need to be removed.
for (let index = ranges.length - 1; index >= 0; index--) {
const range = ranges[index];
const startValue = is.object(range.start)
? (range.start as BoundData).value
: range.start;
const endValue = is.object(range.end)
? (range.end as BoundData).value
: range.end;
const startKeyIsRead =
!startValue ||
this.lessThanOrEqualTo(startValue as string, lastRowKey as string);
const endKeyIsNotRead =
!endValue ||
(endValue as Buffer).length === 0 ||
this.lessThan(lastRowKey as string, endValue as string);
if (startKeyIsRead) {
if (endKeyIsNotRead) {
// EndKey is not read, reset the range to start from lastRowKey open
range.start = {
value: lastRowKey,
inclusive: false,
};
} else {
// EndKey is read, remove this range
ranges.splice(index, 1);
}
}
}
}
static lessThanOrEqualTo(lhs: string, rhs: string) {
return !this.greaterThan(lhs, rhs);
}
static getRowKeys(
rowKeys: string[],
lastRowKey: string | number | true | Uint8Array
) {
// Remove rowKeys already read.
return rowKeys.filter(rowKey =>
this.greaterThan(rowKey, lastRowKey as string)
);
}

static createPrefixRange(start: string): PrefixRange {
const prefix = start.replace(new RegExp('[\xff]+$'), '');
Expand Down
2 changes: 1 addition & 1 deletion src/v2/bigtable_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ export class BigtableClient {
readRows: new this._gaxModule.StreamDescriptor(
this._gaxModule.StreamType.SERVER_STREAMING,
!!opts.fallback,
/* gaxStreamingRetries: */ false
/* gaxStreamingRetries: */ true
),
sampleRowKeys: new this._gaxModule.StreamDescriptor(
this._gaxModule.StreamType.SERVER_STREAMING,
Expand Down
6 changes: 5 additions & 1 deletion test/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ const fakePromisify = Object.assign({}, promisify, {
return;
}
promisified = true;
assert.deepStrictEqual(options.exclude, ['family', 'row']);
assert.deepStrictEqual(options.exclude, [
'family',
'row',
'#readRowsReqOpts',
]);
},
});

Expand Down
Loading