Skip to content

Commit

Permalink
Missing row test with the mock server
Browse files Browse the repository at this point in the history
Add a test with the mock server to reproduce the problem with missing rows.
  • Loading branch information
danieljbruce committed May 16, 2023
1 parent 7d6bd06 commit 7a9ab4b
Showing 1 changed file with 107 additions and 38 deletions.
145 changes: 107 additions & 38 deletions test/streams.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,59 @@
import {describe, it} from 'mocha';
import {before, describe, it} from 'mocha';
import {Bigtable, AbortableDuplex} from '../src';
import {PassThrough, pipeline, Transform, Writable} from 'stream';
const streamEvents = require('stream-events');
import * as assert from 'assert';
import {MockServer} from '../src/util/mock-servers/mock-server';
import {MockService} from '../src/util/mock-servers/mock-service';
import {BigtableClientMockService} from '../src/util/mock-servers/service-implementations/bigtable-client-mock-service';

describe('Bigtable/Streams', () => {
describe('createReadStream', () => {
let rowCount = 0;
const transformer = new Transform({
objectMode: true,
transform(
chunk: any,
_encoding: any,
callback: (err: any, data: any) => void
) {
rowCount++;
// console.log(`row count: ${rowCount}`);
setTimeout(() => {
callback(null, chunk);
}, 0);
},
});
const output = new Writable({
objectMode: true,
write(_chunk, _encoding, callback) {
callback();
},
});
function ascii(index: number) {
return String.fromCharCode(index);
}
function getChunk(index: number) {
return {
labels: [],
rowKey: Buffer.from(`a${ascii(index)}`),
familyName: {value: 'cf1'},
qualifier: {value: Buffer.from('a')},
timestampMicros: '12',
value: Buffer.from('a'),
valueSize: 0,
commitRow: true,
rowStatus: 'commitRow',
};
}

it('should finish delivering a chunk every time a chunk is sent', async () => {
let rowCount = 0;
rowCount = 0;
const chunkSize = 209;
const bigtable = new Bigtable();
const instance = bigtable.instance('fake-instance');
const table = instance.table('fake-table');
const requestFn = table.bigtable.request;
const transformer = new Transform({
objectMode: true,
transform(
chunk: any,
_encoding: any,
callback: (err: any, data: any) => void
) {
rowCount++;
// console.log(`row count: ${rowCount}`);
setTimeout(() => {
callback(null, chunk);
}, 0);
},
});
const output = new Writable({
objectMode: true,
write(_chunk, _encoding, callback) {
callback();
},
});
const stream = streamEvents(
new PassThrough({
objectMode: true,
Expand All @@ -42,22 +63,6 @@ describe('Bigtable/Streams', () => {
return stream as AbortableDuplex;
};
const readStream = table.createReadStream({});
function ascii(index: number) {
return String.fromCharCode(index);
}
function getChunk(index: number) {
return {
labels: [],
rowKey: Buffer.from(`a${ascii(index)}`),
familyName: {value: 'cf1'},
qualifier: {value: Buffer.from('a')},
timestampMicros: '12',
value: Buffer.from('a'),
valueSize: 0,
commitRow: true,
rowStatus: 'commitRow',
};
}
function pushChunks(numChunks: number) {
const chunks = [];
for (let i = 0; i < numChunks; i++) {
Expand Down Expand Up @@ -94,5 +99,69 @@ describe('Bigtable/Streams', () => {
table.bigtable.request = requestFn;
assert.strictEqual(rowCount % chunkSize, 0);
});
describe('with the mock server', () => {
let server: MockServer;
let service: MockService;
let bigtable: Bigtable;
let table: any;

before(done => {
server = new MockServer(() => {
bigtable = new Bigtable({
apiEndpoint: `localhost:${server.port}`,
});
table = bigtable.instance('fake-instance').table('fake-table');
service = new BigtableClientMockService(server);
service.setService({
ReadRows: pushDataToUser,
});
done();
});
});

function createResponse(i: number) {
const response: any = {
chunks: [],
};
const chunk1 = getChunk(i);
response.chunks = [chunk1];
return response;
}
const pushDataToUser = (call: any) => {
let rowId = 0;
function sendNextRow() {
for (let j = 0; j < 100; j++) {
call.write(createResponse(rowId++));
}

if (rowId < 30000) {
setTimeout(sendNextRow, 10);
} else {
call.end();
}
}
sendNextRow();
};

it('should finish delivering all the data to the user', async () => {
rowCount = 0;
const instance = bigtable.instance('fake-instance');
const table = instance.table('fake-table');
const readStream = table.createReadStream({
decode: false,
limit: 81,
});
await new Promise((resolve, reject) => {
pipeline(readStream, transformer, output, (err: any) => {
if (err) {
reject(err);
} else {
resolve(null);
}
});
});
assert.strictEqual(rowCount, 81);
});
});
});
});

0 comments on commit 7a9ab4b

Please sign in to comment.