diff --git a/README.md b/README.md index 5fc9dc3..79fc325 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,4 @@ -[![Build Status](https://travis-ci.org/Claviz/bellboy.svg?branch=master)](https://travis-ci.org/Claviz/bellboy) -[![codecov](https://codecov.io/gh/Claviz/bellboy/branch/master/graph/badge.svg)](https://codecov.io/gh/Claviz/bellboy) -![npm](https://img.shields.io/npm/v/bellboy.svg) - -# bellboy +# bellboy [![Build Status](https://travis-ci.org/Claviz/bellboy.svg?branch=master)](https://travis-ci.org/Claviz/bellboy) [![codecov](https://codecov.io/gh/Claviz/bellboy/branch/master/graph/badge.svg)](https://codecov.io/gh/Claviz/bellboy) ![npm](https://img.shields.io/npm/v/bellboy.svg) Highly performant JavaScript data stream ETL engine. @@ -103,20 +99,25 @@ job.on('processedFile', async (file) => { ``` #### Events -* **startProcessing**\ -Emitted when processor starts working. -* **processedFile**\ -Emitted when processor ends it's work. -* **transformingBatch**\ -Emitted when batch is about to be transformed - right before calling `batchTransformer` function. -* **transformedBatch**\ -Emitted when batch has been transformed - after calling `batchTransformer` function. -* **loadingBatch**\ -Emitted when batch is about to be loaded in destination. -* **loadingBatchError** `(error)`\ -Emitted when batch load has failed. -* **loadedBatch**\ -Emitted when batch has been loaded. + +The following table lists the job life-cycle events and parameters they emit. + +| Event | Parameters | Description | +| ---------------------- | ----------------------- | ----------------------------------------------------------------------------------------------------- | +| startProcessing | | Job has started execution. | +| startProcessingRow | data | Received row is about to be processed. | +| rowAddedToBatch | destinationIndex, data | Received row has been added to the destination batch (wether as is or by `recordGenerator` function). | +| rowProcessingError | destinationIndex, error | Received row processing has been failed. | +| endProcessingRow | | Received row has been processed. | +| transformingBatch | destinationIndex, data | Batch is about to be transformed (before calling `batchTransformer` function). | +| transformingBatchError | destinationIndex, error | Batch transformation has been failed (`batchTransformer` function has thrown an error). | +| transformedBatch | destinationIndex, data | Batch has been transformed (after calling `batchTransformer` function). | +| loadingBatch | destinationIndex, data | Batch is about to be loaded in destination. | +| loadingBatchError | destinationIndex, error | Batch load has failed. | +| loadedBatch | destinationIndex | Batch load has been finished. | +| endProcessing | | Job has finished execution. | + + ## Processors
diff --git a/package-lock.json b/package-lock.json index fbcabaf..52d3eed 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9062,6 +9062,18 @@ "safe-buffer": "^5.1.2", "ws": "^3.2.0", "xtend": "^4.0.0" + }, + "dependencies": { + "ws": { + "version": "3.3.3", + "resolved": "https://registry.npmjs.org/ws/-/ws-3.3.3.tgz", + "integrity": "sha512-nnWLa/NwZSt4KQJu51MYlCcSQ5g7INpOrOMt4XV8j4dqTXdmlUmSHQ8/oLC069ckre0fRsgfvsKwbTdtKLCDkA==", + "requires": { + "async-limiter": "~1.0.0", + "safe-buffer": "~5.1.0", + "ultron": "~1.1.0" + } + } } }, "whatwg-encoding": { @@ -9191,20 +9203,10 @@ "signal-exit": "^3.0.2" } }, - "ws": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/ws/-/ws-3.3.3.tgz", - "integrity": "sha512-nnWLa/NwZSt4KQJu51MYlCcSQ5g7INpOrOMt4XV8j4dqTXdmlUmSHQ8/oLC069ckre0fRsgfvsKwbTdtKLCDkA==", - "requires": { - "async-limiter": "~1.0.0", - "safe-buffer": "~5.1.0", - "ultron": "~1.1.0" - } - }, "xlstream": { - "version": "1.0.9", - "resolved": "https://registry.npmjs.org/xlstream/-/xlstream-1.0.9.tgz", - "integrity": "sha512-tQ5KZpzkz1L4ktCh/O45Jcmc2dKSQmIZujVYyrTmYfZR6XQEfKlx4Nwqdotf7jLjoDLYvexeg9WhnDjnOm2+mg==", + "version": "1.0.10", + "resolved": "https://registry.npmjs.org/xlstream/-/xlstream-1.0.10.tgz", + "integrity": "sha512-MqbvpfD0d/TDtuiE0gRReqljhWy3eVxoUITkksCx9K3anRZOAcl4TPUNkzuaPR9DII8sxMEbsALvi1aZz+9KYg==", "requires": { "node-stream-zip": "^1.8.0", "sax-stream": "^1.3.0", diff --git a/package.json b/package.json index 7570991..39487ce 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bellboy", - "version": "2.1.0", + "version": "2.2.0", "description": "Highly performant JavaScript data stream ETL engine.", "main": "lib/index", "types": "lib/index", @@ -49,6 +49,6 @@ "request-promise": "^4.2.4", "split2": "^3.1.1", "tail": "^2.0.2", - "xlstream": "^1.0.9" + "xlstream": "^1.0.10" } } \ No newline at end of file diff --git a/src/job.ts b/src/job.ts index 746bfb7..4ed6551 100644 --- a/src/job.ts +++ b/src/job.ts @@ -1,16 +1,14 @@ import { ReadStream } from 'fs'; import { Readable } from 'stream'; -import { emit, event, IJob, IDestination, IProcessor, IJobConfig } from './types'; +import { emit, event, IDestination, IJob, IJobConfig, IProcessor } from './types'; export class Job implements IJob { protected closed: boolean = false; - protected verbose: boolean; protected events: { [fn: string]: emit[] } = {}; constructor(protected processor: IProcessor, protected destinations: IDestination[], options: IJobConfig = {}) { - this.verbose = !!options.verbose; } async run() { @@ -33,24 +31,30 @@ export class Job implements IJob { protected async loadBatch(destination: IDestination, data: any[]) { if (data.length) { + const destinationIndex = this.destinations.indexOf(destination); if (destination.batchTransformer) { - await this.emit('transformingBatch'); - data = await destination.batchTransformer(data); - await this.emit('transformedBatch'); + await this.emit('transformingBatch', destinationIndex, data); + try { + data = await destination.batchTransformer(data); + } catch (err) { + await this.emit('transformingBatchError', destinationIndex, err); + } + await this.emit('transformedBatch', destinationIndex, data); } - await this.emit('loadingBatch'); + await this.emit('loadingBatch', destinationIndex, data); try { await destination.loadBatch(data); } catch (err) { - await this.emit('loadingBatchError', err); + await this.emit('loadingBatchError', destinationIndex, err); } - await this.emit('loadedBatch'); + await this.emit('loadedBatch', destinationIndex); } } protected async getNextRecord(readStream: ReadStream | Readable) { return new Promise<{ data: any[][]; header: any; }>((resolve, reject) => { const destinations = this.destinations; + const emit = (eventName: string, ...args: any) => this.emit(eventName, ...args); let data: any[][] = []; if (destinations) { for (let i = 0; i < destinations.length; i++) { @@ -62,7 +66,7 @@ export class Job implements IJob { const niceEnding = () => { removeListeners(); this.closed = true; - resolve({ data, header }); + resolve(); } function errorEnding(error: any) { @@ -73,26 +77,31 @@ export class Job implements IJob { async function handleData(obj: any) { readStream.pause(); removeListeners(); - if (destinations) { - for (let i = 0; i < destinations.length; i++) { - const recordGeneratorFn = destinations[i].recordGenerator; - if (!recordGeneratorFn) { - data[i].push(obj); - } else { - const recordGenerator = recordGeneratorFn(obj); + await emit('startProcessingRow', obj); + for (let i = 0; i < destinations.length; i++) { + const recordGeneratorFn = destinations[i].recordGenerator; + if (!recordGeneratorFn) { + data[i].push(obj); + await emit('rowAddedToBatch', i, obj); + } else { + const wrapper = async function* (recordGenerator: AsyncIterableIterator<{}>) { try { for await (const record of recordGenerator) { - data[i].push(record); + yield record; } } catch (err) { - reject(err); + await emit('rowProcessingError', i, err); } } + let recordGenerator = wrapper(recordGeneratorFn(obj)); + for await (const record of recordGenerator) { + data[i].push(record); + await emit('rowAddedToBatch', i, record); + } } - resolve({ data, header }); - } else { - resolve({ data: obj, header }); } + await emit('endProcessingRow'); + resolve({ data, header }); } function handleHeader(x: any) { @@ -157,12 +166,12 @@ export class Job implements IJob { protected async emit(eventName: string, ...args: any) { const fn = this.events[eventName]; - if (this.verbose) { - console.log({ eventName, args }); - } if (fn) { for (let i = 0; i < fn.length; i++) { - const result = await fn[i].apply(this, args); + let result; + try { + result = await fn[i].apply(this, args); + } catch (err) { } if (result) { this.closed = true; } diff --git a/src/types.ts b/src/types.ts index a9ce327..0c71d39 100644 --- a/src/types.ts +++ b/src/types.ts @@ -12,7 +12,6 @@ export interface IDbConnection { } export interface IJobConfig { - verbose?: boolean; } export interface IJob {