Skip to content

Commit

Permalink
Refactor events functionality; add new events.
Browse files Browse the repository at this point in the history
  • Loading branch information
jansivans committed May 21, 2019
1 parent 3a196cb commit f14b904
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 61 deletions.
39 changes: 20 additions & 19 deletions README.md
@@ -1,8 +1,4 @@
[![Build Status](https://travis-ci.org/Claviz/bellboy.svg?branch=master)](https://travis-ci.org/Claviz/bellboy)
[![codecov](https://codecov.io/gh/Claviz/bellboy/branch/master/graph/badge.svg)](https://codecov.io/gh/Claviz/bellboy)
![npm](https://img.shields.io/npm/v/bellboy.svg)

# bellboy
# bellboy [![Build Status](https://travis-ci.org/Claviz/bellboy.svg?branch=master)](https://travis-ci.org/Claviz/bellboy) [![codecov](https://codecov.io/gh/Claviz/bellboy/branch/master/graph/badge.svg)](https://codecov.io/gh/Claviz/bellboy) ![npm](https://img.shields.io/npm/v/bellboy.svg)

Highly performant JavaScript data stream ETL engine.

Expand Down Expand Up @@ -103,20 +99,25 @@ job.on('processedFile', async (file) => {
```

#### Events
* **startProcessing**\
Emitted when processor starts working.
* **processedFile**\
Emitted when processor ends it's work.
* **transformingBatch**\
Emitted when batch is about to be transformed - right before calling `batchTransformer` function.
* **transformedBatch**\
Emitted when batch has been transformed - after calling `batchTransformer` function.
* **loadingBatch**\
Emitted when batch is about to be loaded in destination.
* **loadingBatchError** `(error)`\
Emitted when batch load has failed.
* **loadedBatch**\
Emitted when batch has been loaded.

The following table lists the job life-cycle events and parameters they emit.

| Event | Parameters | Description |
| ---------------------- | ----------------------- | ----------------------------------------------------------------------------------------------------- |
| startProcessing | | Job has started execution. |
| startProcessingRow | data | Received row is about to be processed. |
| rowAddedToBatch | destinationIndex, data | Received row has been added to the destination batch (wether as is or by `recordGenerator` function). |
| rowProcessingError | destinationIndex, error | Received row processing has been failed. |
| endProcessingRow | | Received row has been processed. |
| transformingBatch | destinationIndex, data | Batch is about to be transformed (before calling `batchTransformer` function). |
| transformingBatchError | destinationIndex, error | Batch transformation has been failed (`batchTransformer` function has thrown an error). |
| transformedBatch | destinationIndex, data | Batch has been transformed (after calling `batchTransformer` function). |
| loadingBatch | destinationIndex, data | Batch is about to be loaded in destination. |
| loadingBatchError | destinationIndex, error | Batch load has failed. |
| loadedBatch | destinationIndex | Batch load has been finished. |
| endProcessing | | Job has finished execution. |



## Processors <div id='processors'/>

Expand Down
28 changes: 15 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
@@ -1,6 +1,6 @@
{
"name": "bellboy",
"version": "2.1.0",
"version": "2.2.0",
"description": "Highly performant JavaScript data stream ETL engine.",
"main": "lib/index",
"types": "lib/index",
Expand Down Expand Up @@ -49,6 +49,6 @@
"request-promise": "^4.2.4",
"split2": "^3.1.1",
"tail": "^2.0.2",
"xlstream": "^1.0.9"
"xlstream": "^1.0.10"
}
}
61 changes: 35 additions & 26 deletions src/job.ts
@@ -1,16 +1,14 @@
import { ReadStream } from 'fs';
import { Readable } from 'stream';

import { emit, event, IJob, IDestination, IProcessor, IJobConfig } from './types';
import { emit, event, IDestination, IJob, IJobConfig, IProcessor } from './types';

export class Job implements IJob {

protected closed: boolean = false;
protected verbose: boolean;
protected events: { [fn: string]: emit[] } = {};

constructor(protected processor: IProcessor, protected destinations: IDestination[], options: IJobConfig = {}) {
this.verbose = !!options.verbose;
}

async run() {
Expand All @@ -33,24 +31,30 @@ export class Job implements IJob {

protected async loadBatch(destination: IDestination, data: any[]) {
if (data.length) {
const destinationIndex = this.destinations.indexOf(destination);
if (destination.batchTransformer) {
await this.emit('transformingBatch');
data = await destination.batchTransformer(data);
await this.emit('transformedBatch');
await this.emit('transformingBatch', destinationIndex, data);
try {
data = await destination.batchTransformer(data);
} catch (err) {
await this.emit('transformingBatchError', destinationIndex, err);
}
await this.emit('transformedBatch', destinationIndex, data);
}
await this.emit('loadingBatch');
await this.emit('loadingBatch', destinationIndex, data);
try {
await destination.loadBatch(data);
} catch (err) {
await this.emit('loadingBatchError', err);
await this.emit('loadingBatchError', destinationIndex, err);
}
await this.emit('loadedBatch');
await this.emit('loadedBatch', destinationIndex);
}
}

protected async getNextRecord(readStream: ReadStream | Readable) {
return new Promise<{ data: any[][]; header: any; }>((resolve, reject) => {
const destinations = this.destinations;
const emit = (eventName: string, ...args: any) => this.emit(eventName, ...args);
let data: any[][] = [];
if (destinations) {
for (let i = 0; i < destinations.length; i++) {
Expand All @@ -62,7 +66,7 @@ export class Job implements IJob {
const niceEnding = () => {
removeListeners();
this.closed = true;
resolve({ data, header });
resolve();
}

function errorEnding(error: any) {
Expand All @@ -73,26 +77,31 @@ export class Job implements IJob {
async function handleData(obj: any) {
readStream.pause();
removeListeners();
if (destinations) {
for (let i = 0; i < destinations.length; i++) {
const recordGeneratorFn = destinations[i].recordGenerator;
if (!recordGeneratorFn) {
data[i].push(obj);
} else {
const recordGenerator = recordGeneratorFn(obj);
await emit('startProcessingRow', obj);
for (let i = 0; i < destinations.length; i++) {
const recordGeneratorFn = destinations[i].recordGenerator;
if (!recordGeneratorFn) {
data[i].push(obj);
await emit('rowAddedToBatch', i, obj);
} else {
const wrapper = async function* (recordGenerator: AsyncIterableIterator<{}>) {
try {
for await (const record of recordGenerator) {
data[i].push(record);
yield record;
}
} catch (err) {
reject(err);
await emit('rowProcessingError', i, err);
}
}
let recordGenerator = wrapper(recordGeneratorFn(obj));
for await (const record of recordGenerator) {
data[i].push(record);
await emit('rowAddedToBatch', i, record);
}
}
resolve({ data, header });
} else {
resolve({ data: obj, header });
}
await emit('endProcessingRow');
resolve({ data, header });
}

function handleHeader(x: any) {
Expand Down Expand Up @@ -157,12 +166,12 @@ export class Job implements IJob {

protected async emit(eventName: string, ...args: any) {
const fn = this.events[eventName];
if (this.verbose) {
console.log({ eventName, args });
}
if (fn) {
for (let i = 0; i < fn.length; i++) {
const result = await fn[i].apply(this, args);
let result;
try {
result = await fn[i].apply(this, args);
} catch (err) { }
if (result) {
this.closed = true;
}
Expand Down
1 change: 0 additions & 1 deletion src/types.ts
Expand Up @@ -12,7 +12,6 @@ export interface IDbConnection {
}

export interface IJobConfig {
verbose?: boolean;
}

export interface IJob {
Expand Down

0 comments on commit f14b904

Please sign in to comment.