Skip to content

Commit

Permalink
Limit rows on processor instead of destination.
Browse files Browse the repository at this point in the history
  • Loading branch information
jansivans committed May 25, 2019
1 parent 4aaa21d commit 9ea456f
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 58 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bellboy",
"version": "2.3.4",
"version": "2.3.5",
"description": "Highly performant JavaScript data stream ETL engine.",
"main": "lib/index",
"types": "lib/index",
Expand Down
5 changes: 1 addition & 4 deletions src/destinations/base/destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@ export abstract class Destination implements IDestination {
batchSize: number;
recordGenerator: ((row: any) => AsyncIterableIterator<{}>) | undefined;
batchTransformer: ((rows: any[]) => Promise<any[]>) | undefined;
rowLimit: number;
loadInPreviewMode: boolean;

constructor(config: IDestinationConfig | undefined) {
constructor(config?: IDestinationConfig) {
this.batchSize = 0;
this.rowLimit = 0;
this.loadInPreviewMode = false;
if (config) {
this.rowLimit = config.rowLimit || 0;
this.batchSize = config.batchSize || 0;
this.recordGenerator = config.recordGenerator;
this.batchTransformer = config.batchTransformer;
Expand Down
47 changes: 16 additions & 31 deletions src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,53 +138,38 @@ export class Job implements IJob {
protected async processStream(readStream: ReadStream | Readable) {
this.closed = false;
const results: any[][] = [];
const loadedRowNumber: number[] = [];
const destinations = this.destinations;
for (let j = 0; j < destinations.length; j++) {
const destination = destinations[j];
destination.rowLimit = this.previewMode && !destination.rowLimit ? 10 : destination.rowLimit;
results[j] = [];
loadedRowNumber[j] = 0;
}
let processedRowCount = 0;
const rowLimit = this.previewMode && !this.processor.rowLimit ? 10 : this.processor.rowLimit;
let header;

while (!this.closed && (readStream.readable || (readStream as any).stream)) {
const result = await this.getNextRecord(readStream);
processedRowCount++;
if (result) {
if (result.header) {
header = result.header;
}
for (let j = 0; j < destinations.length; j++) {
const destination = destinations[j];
results[j].push(...result.data[j]);
if (!destination.batchSize || (!!destination.rowLimit && destination.rowLimit < destination.batchSize)) {
if (destination.rowLimit && results[j].length > destination.rowLimit) {
const toSend = results[j].splice(0, destination.rowLimit);
results[j] = toSend;
}
loadedRowNumber[j] = results[j].length;
} else {
while (results[j].length >= destination.batchSize && (!destination.rowLimit || loadedRowNumber[j] < destination.rowLimit)) {
let toSend = results[j].splice(0, destination.batchSize);
const futureLength = loadedRowNumber[j] + toSend.length;
if (destination.rowLimit && futureLength > destination.rowLimit) {
toSend = toSend.splice(0, futureLength - destination.rowLimit);
results[j] = [];
}
await this.loadBatch(destination, toSend);
loadedRowNumber[j] += toSend.length;
}
for (let i = 0; i < destinations.length; i++) {
results[i].push(...result.data[i]);
while (destinations[i].batchSize && results[i].length >= destinations[i].batchSize) {
const destination = destinations[i];
const toSend = results[i].splice(0, destination.batchSize);
await this.loadBatch(destination, toSend);
}
}
const shouldStop = destinations.every((x, j) => !!x.rowLimit && loadedRowNumber[j] === x.rowLimit);
if (shouldStop) {
readStream.destroy();
}
}
const shouldStop = rowLimit === processedRowCount;
if (shouldStop) {
readStream.destroy();
}
}
for (let j = 0; j < destinations.length; j++) {
if (results[j].length) {
await this.loadBatch(destinations[j], results[j]);
for (let i = 0; i < destinations.length; i++) {
if (results[i].length) {
await this.loadBatch(destinations[i], results[i]);
}
}
return header;
Expand Down
8 changes: 7 additions & 1 deletion src/processors/base/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import { IProcessor, IProcessorConfig, processStream, emit } from '../../types';

export abstract class Processor implements IProcessor {

constructor(config: IProcessorConfig) {
rowLimit: number;

constructor(config?: IProcessorConfig) {
this.rowLimit = 0;
if (config) {
this.rowLimit = config.rowLimit || 0;
}
}

async process(processStream: processStream, emit: emit) {
Expand Down
4 changes: 2 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export interface IJob {
}

export interface IProcessorConfig {
rowLimit?: number;
}

export interface IDynamicProcessorConfig extends IProcessorConfig {
Expand All @@ -42,14 +43,14 @@ export interface IExcelProcessorConfig extends IDirectoryProcessorConfig {

export interface IProcessor {
process(processStream: processStream, emit: emit): Promise<void>;
rowLimit: number;
}

export interface IDestination {
loadBatch: (data: any[]) => Promise<void>;
batchSize: number;
recordGenerator: ((row: any) => AsyncIterableIterator<{}>) | undefined;
batchTransformer: ((rows: any[]) => Promise<any[]>) | undefined;
rowLimit: number;
loadInPreviewMode: boolean;
}

Expand All @@ -58,7 +59,6 @@ export interface IDestinationConfig {
batchSize?: number;
recordGenerator?: (row: any) => AsyncIterableIterator<{}>;
batchTransformer?: (rows: any[]) => Promise<any[]>;
rowLimit?: number;
}

export interface IDatabaseDestinationConfig extends IDestinationConfig {
Expand Down
11 changes: 5 additions & 6 deletions tests/general.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ afterAll(async () => {
});

it('destination should stop loading when rowLimit is specified and reached', async () => {
const destination = new CustomDestination({
rowLimit: 3,
});
const destination = new CustomDestination();
const processor = new DynamicProcessor({
generator: async function* () {
for (let i = 0; i < 10; i++) {
yield `test${i}`;
}
}
},
rowLimit: 3,
});
const job = new Job(processor, [destination]);
await job.run();
Expand All @@ -37,14 +36,14 @@ it(`destination should respsect row limit even if it is less than batchSize`, as
const data: any[] = [];
const destination = new CustomDestination({
batchSize: 10,
rowLimit: 3,
});
const processor = new DynamicProcessor({
generator: async function* () {
for (let i = 0; i < 100; i++) {
yield `test${i}`;
}
}
},
rowLimit: 3,
});
const job = new Job(processor, [destination], {
previewMode: true,
Expand Down
4 changes: 2 additions & 2 deletions tests/json-source.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ afterAll(async () => {
it('reads root data from JSON file', async () => {
const json = ['hello', 'world'];
fs.appendFileSync(filePath, JSON.stringify(json));
const destination = new CustomDestination({});
const destination = new CustomDestination();
const processor = new JsonProcessor({
path: './',
files: [filePath],
Expand All @@ -40,7 +40,7 @@ it('reads root data from JSON file', async () => {
it('reads nested data from JSON file', async () => {
const json = { fields: ['hello', 'world'] };
fs.appendFileSync(filePath, JSON.stringify(json));
const destination = new CustomDestination({});
const destination = new CustomDestination();
const processor = new JsonProcessor({
path: './',
files: [filePath],
Expand Down
5 changes: 2 additions & 3 deletions tests/mqtt-source.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ afterAll(async () => {
})

it('gets messages from broker', async () => {
const destination = new CustomDestination({
rowLimit: 1,
});
const destination = new CustomDestination();
const processor = new MqttProcessor({
topics: ['presence'],
url: 'mqtt://localhost',
rowLimit: 1,
});
const job = new Job(processor, [destination]);
await job.run();
Expand Down
4 changes: 2 additions & 2 deletions tests/preview-mode.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ it(`destination shouldn't process more than 10 rows if job is in preview mode an
it(`destination should respect previewMode's rowLimit if in preview mode`, async () => {
const destination = new CustomDestination({
loadInPreviewMode: true,
rowLimit: 33,
});
const processor = new DynamicProcessor({
generator: async function* () {
for (let i = 0; i < 100; i++) {
yield `test${i}`;
}
}
},
rowLimit: 33,
});
const job = new Job(processor, [destination], {
previewMode: true,
Expand Down
10 changes: 4 additions & 6 deletions tests/tail-source.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,24 @@ afterAll(async () => {
});

it('tails file', async () => {
const destination = new CustomDestination({
rowLimit: 1,
});
const destination = new CustomDestination();
const processor = new TailProcessor({
path: './',
files: [filePath],
rowLimit: 1,
});
const job = new Job(processor, [destination]);
await job.run();
expect(destination.getData()).toEqual([{ file: filePath, data: 'Hello, world!' }]);
});

it('tails file from beginning', async () => {
const destination = new CustomDestination({
rowLimit: 3,
});
const destination = new CustomDestination();
const processor = new TailProcessor({
path: './',
files: [filePath],
fromBeginning: true,
rowLimit: 3,
});
const job = new Job(processor, [destination]);
await job.run();
Expand Down

0 comments on commit 9ea456f

Please sign in to comment.