diff --git a/src/util/mock-servers/mock-server.ts b/src/util/mock-servers/mock-server.ts index c4ebb9779..02ca9b7e4 100644 --- a/src/util/mock-servers/mock-server.ts +++ b/src/util/mock-servers/mock-server.ts @@ -37,7 +37,6 @@ export class MockServer { `localhost:${this.port}`, grpc.ServerCredentials.createInsecure(), () => { - server.start(); callback ? callback(portString) : undefined; } ); diff --git a/system-test/read-rows.ts b/system-test/read-rows.ts index 37beab74c..7deccc643 100644 --- a/system-test/read-rows.ts +++ b/system-test/read-rows.ts @@ -12,69 +12,109 @@ // See the License for the specific language governing permissions and // limitations under the License. -import {Bigtable} from '../src'; -import {Mutation} from '../src/mutation.js'; +import {Bigtable, Cluster, protos, Table} from '../src'; const {tests} = require('../../system-test/data/read-rows-retry-test.json') as { - tests: Test[]; + tests: ReadRowsTest[]; }; -import {google} from '../protos/protos'; import * as assert from 'assert'; -import {describe, it, afterEach, beforeEach} from 'mocha'; -import * as sinon from 'sinon'; -import {EventEmitter} from 'events'; -import {Test} from './testTypes'; -import {ServiceError, GrpcClient, GoogleError, CallOptions} from 'google-gax'; -import {PassThrough} from 'stream'; +import {describe, it, before} from 'mocha'; +import {CreateReadStreamRequest, ReadRowsTest} from './testTypes'; +import {ServiceError, GrpcClient, CallOptions, GoogleError} from 'google-gax'; +import {MockServer} from '../src/util/mock-servers/mock-server'; +import {MockService} from '../src/util/mock-servers/mock-service'; +import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service'; +import {ServerWritableStream} from '@grpc/grpc-js'; const {grpc} = new GrpcClient(); -// eslint-disable-next-line @typescript-eslint/no-explicit-any -function dispatch(emitter: EventEmitter, response: any) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const emits: any[] = [{name: 'request'}]; - if (response.row_keys) { - emits.push.apply(emits, [ - {name: 'response', arg: 200}, - { - name: 'data', - arg: {chunks: response.row_keys.map(rowResponse)}, - }, - ]); - } - if (response.end_with_error) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const error: any = new Error(); - error.code = response.end_with_error; - emits.push({name: 'error', arg: error}); - } else { - emits.push({name: 'end'}); - } - let index = 0; - setImmediate(next); - - function next() { - if (index < emits.length) { - const emit = emits[index]; - index++; - emitter.emit(emit.name, emit.arg); - setImmediate(next); - } - } -} - -function rowResponse(rowKey: {}) { +function rowResponseFromServer(rowKey: string) { return { - rowKey: Mutation.convertToBytes(rowKey), + rowKey: Buffer.from(rowKey).toString('base64'), familyName: {value: 'family'}, - qualifier: {value: 'qualifier'}, - valueSize: 0, - timestampMicros: 0, - labels: [], + qualifier: {value: Buffer.from('qualifier').toString('base64')}, commitRow: true, - value: 'value', + value: Buffer.from(rowKey).toString('base64'), }; } +function isRowKeysWithFunction(array: unknown): array is RowKeysWithFunction { + return (array as RowKeysWithFunction).asciiSlice !== undefined; +} + +function isRowKeysWithFunctionArray( + array: unknown[] +): array is RowKeysWithFunction[] { + return array.every((element: unknown) => { + return isRowKeysWithFunction(element); + }); +} + +interface TestRowRange { + startKey?: 'startKeyClosed' | 'startKeyOpen'; + endKey?: 'endKeyOpen' | 'endKeyClosed'; + startKeyClosed?: Uint8Array | string | null; + startKeyOpen?: Uint8Array | string | null; + endKeyOpen?: Uint8Array | string | null; + endKeyClosed?: Uint8Array | string | null; +} +interface RowKeysWithFunction { + asciiSlice: () => string; +} +function getRequestOptions(request: { + rows?: { + rowRanges?: TestRowRange[] | null; + rowKeys?: Uint8Array[] | null; + } | null; + rowsLimit?: string | number | Long | null | undefined; +}): CreateReadStreamRequest { + const requestOptions = {} as CreateReadStreamRequest; + if (request.rows && request.rows.rowRanges) { + requestOptions.rowRanges = request.rows.rowRanges.map( + (range: TestRowRange) => { + const convertedRowRange = {} as {[index: string]: string}; + { + // startKey and endKey get filled in during the grpc request. + // They should be removed as the test data does not look + // for these properties in the request. + if (range.startKey) { + delete range.startKey; + } + if (range.endKey) { + delete range.endKey; + } + } + Object.entries(range).forEach( + ([key, value]) => (convertedRowRange[key] = value.asciiSlice()) + ); + return convertedRowRange; + } + ); + } + if ( + request.rows && + request.rows.rowKeys && + isRowKeysWithFunctionArray(request.rows.rowKeys) + ) { + requestOptions.rowKeys = request.rows.rowKeys.map( + (rowKeys: RowKeysWithFunction) => rowKeys.asciiSlice() + ); + } + // The grpc protocol sets rowsLimit to '0' if rowsLimit is not provided in the + // grpc request. + // + // Do not append rowsLimit to collection of request options if received grpc + // rows limit is '0' so that test data in read-rows-retry-test.json remains + // shorter. + if ( + request.rowsLimit && + request.rowsLimit !== '0' && + typeof request.rowsLimit === 'string' + ) { + requestOptions.rowsLimit = parseInt(request.rowsLimit); + } + return requestOptions; +} + describe('Bigtable/Table', () => { const bigtable = new Bigtable(); const INSTANCE_NAME = 'fake-instance2'; @@ -82,7 +122,6 @@ describe('Bigtable/Table', () => { (bigtable as any).grpcCredentials = grpc.credentials.createInsecure(); const INSTANCE = bigtable.instance('instance'); - const TABLE = INSTANCE.table('table'); describe('close', () => { it('should fail when invoking readRows with closed client', async () => { @@ -123,97 +162,92 @@ describe('Bigtable/Table', () => { }); }); - describe('createReadStream', () => { - let clock: sinon.SinonFakeTimers; - let endCalled: boolean; - let error: ServiceError | null; - let requestedOptions: Array<{}>; - let responses: Array<{}> | null; - let rowKeysRead: Array>; - let stub: sinon.SinonStub; - - beforeEach(() => { - clock = sinon.useFakeTimers({ - toFake: [ - 'setTimeout', - 'clearTimeout', - 'setImmediate', - 'clearImmediate', - 'setInterval', - 'clearInterval', - 'Date', - 'nextTick', - ], + describe('createReadStream using mock server', () => { + let server: MockServer; + let service: MockService; + let bigtable = new Bigtable(); + let table: Table; + before(async () => { + // make sure we have everything initialized before starting tests + const port = await new Promise(resolve => { + server = new MockServer(resolve); }); - endCalled = false; - error = null; - responses = null; - rowKeysRead = []; - requestedOptions = []; - stub = sinon.stub(bigtable, 'request').callsFake(cfg => { - const reqOpts = cfg.reqOpts; - const requestOptions = {} as google.bigtable.v2.IRowSet; - if (reqOpts.rows && reqOpts.rows.rowRanges) { - requestOptions.rowRanges = reqOpts.rows.rowRanges.map( - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (range: any) => { - const convertedRowRange = {} as {[index: string]: string}; - Object.keys(range).forEach( - key => (convertedRowRange[key] = range[key].asciiSlice()) - ); - return convertedRowRange; - } - ); - } - if (reqOpts.rows && reqOpts.rows.rowKeys) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - requestOptions.rowKeys = reqOpts.rows.rowKeys.map((rowKeys: any) => - rowKeys.asciiSlice() - ); - } - if (reqOpts.rowsLimit) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (requestOptions as any).rowsLimit = reqOpts.rowsLimit; - } - requestedOptions.push(requestOptions); - rowKeysRead.push([]); - const requestStream = new PassThrough({objectMode: true}); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (requestStream as any).abort = () => {}; - dispatch(requestStream, responses!.shift()); - return requestStream; + bigtable = new Bigtable({ + apiEndpoint: `localhost:${port}`, }); + table = bigtable.instance('fake-instance').table('fake-table'); + service = new BigtableClientMockService(server); }); - afterEach(() => { - clock.restore(); - stub.restore(); + after(async () => { + server.shutdown(() => {}); }); tests.forEach(test => { - it(test.name, () => { - responses = test.responses; - TABLE.maxRetries = test.max_retries; - TABLE.createReadStream(test.createReadStream_options) - .on('data', row => rowKeysRead[rowKeysRead.length - 1].push(row.id)) - .on('end', () => (endCalled = true)) - .on('error', err => (error = err as ServiceError)); - clock.runAll(); - - if (test.error) { - assert(!endCalled, ".on('end') should not have been invoked"); - assert.strictEqual(error!.code, test.error); - } else { - assert(endCalled, ".on('end') shoud have been invoked"); - assert.ifError(error); + it(test.name, done => { + // These variables store request/response data capturing data sent + // and received when using readRows with retries. This data is evaluated + // in checkResults at the end of the test for correctness. + const requestedOptions: CreateReadStreamRequest[] = []; + const responses = test.responses; + const rowKeysRead: string[][] = []; + let endCalled = false; + let error: ServiceError | null = null; + function checkResults() { + if (test.error) { + assert(!endCalled, ".on('end') should not have been invoked"); + assert.strictEqual(error!.code, test.error); + } else { + assert(endCalled, ".on('end') should have been invoked"); + assert.ifError(error); + } + assert.deepStrictEqual(rowKeysRead, test.row_keys_read); + assert.strictEqual( + responses.length, + 0, + 'not all the responses were used' + ); + assert.deepStrictEqual(requestedOptions, test.request_options); + done(); } - assert.deepStrictEqual(rowKeysRead, test.row_keys_read); - assert.strictEqual( - responses.length, - 0, - 'not all the responses were used' - ); - assert.deepStrictEqual(requestedOptions, test.request_options); + + table.maxRetries = test.max_retries; + service.setService({ + ReadRows: ( + stream: ServerWritableStream< + protos.google.bigtable.v2.IReadRowsRequest, + protos.google.bigtable.v2.IReadRowsResponse + > + ) => { + const response = responses!.shift(); + assert(response); + rowKeysRead.push([]); + requestedOptions.push(getRequestOptions(stream.request)); + if (response.row_keys) { + stream.write({ + chunks: response.row_keys.map(rowResponseFromServer), + }); + } + if (response.end_with_error) { + const error: GoogleError = new GoogleError(); + error.code = response.end_with_error; + stream.emit('error', error); + } else { + stream.end(); + } + }, + }); + table + .createReadStream(test.createReadStream_options) + .on('data', row => rowKeysRead[rowKeysRead.length - 1].push(row.id)) + .on('end', () => { + endCalled = true; + checkResults(); + }) + .on('error', err => { + error = err as ServiceError; + checkResults(); + }); }); }); }); diff --git a/system-test/testTypes.ts b/system-test/testTypes.ts index 43613cc67..d6b83464a 100644 --- a/system-test/testTypes.ts +++ b/system-test/testTypes.ts @@ -14,6 +14,7 @@ import {ServiceError} from 'google-gax'; import {GetRowsOptions} from '../src/table'; +import {google} from '../protos/protos'; export interface Test { name: string; @@ -46,3 +47,23 @@ export interface Test { row_keys_read: {}; createReadStream_options: GetRowsOptions; } + +interface CreateReadStreamResponse { + row_keys?: string[]; + end_with_error?: 4; +} + +export interface CreateReadStreamRequest { + rowKeys: string[]; + rowRanges: google.bigtable.v2.IRowRange[]; + rowsLimit?: number; +} +export interface ReadRowsTest { + createReadStream_options?: GetRowsOptions; + max_retries: number; + responses: CreateReadStreamResponse[]; + request_options: CreateReadStreamRequest[]; + error: number; + row_keys_read: string[][]; + name: string; +}