Skip to content
/ bellboy Public
forked from Claviz/bellboy

Highly performant JavaScript data stream ETL engine.

License

Notifications You must be signed in to change notification settings

aregee/bellboy

 
 

Repository files navigation

bellboy Build Status codecov npm

Highly performant JavaScript data stream ETL engine.

How it works?

Bellboy streams input data row by row. Every row, in turn, goes through user-defined function where it can be transformed. When enough data is collected in batch, it is being loaded to destination.

Installation

Before install, make sure you are using latest version of Node.js.

npm install bellboy

Example

This example shows how bellboy can extract rows from the Excel file, modify it on the fly, load to the Postgres database, move processed file to the other folder and process remaining files.

Just in five simple steps.

const bellboy = require('bellboy');
const fs = require('fs');
const path = require('path');

(async () => {
    const srcPath = `C:/source`;
    // 1. create a processor which will process 
    // Excel files one by one in the folder 
    const processor = new bellboy.ExcelProcessor({
        path: srcPath,
        hasHeader: true,
    });
    // 2. create a destination which will add a new 'status' 
    // field to each row and load processed data to Postgres database
    const destination = new bellboy.PostgresDestination({
        connection: {
            user: 'user',
            password: 'password',
            server: 'localhost',
            database: 'bellboy',
        },
        table: 'stats',
        recordGenerator: async function* (record) {
            yield {
                ...record.raw.obj,
                status: 'done',
            };
        }
    });
    // 3. create a job which will glue processor and destination together
    const job = new bellboy.Job(processor, [destination]);
    // 4. tell bellboy to move file away as soon as it was processed
    job.on('endProcessingStream', async (file) => {
        const filePath = path.join(srcPath, file);
        const newFilePath = path.join(`./destination`, file);
        await fs.renameSync(filePath, newFilePath);
    });
    // 5. run your job
    await job.run();
})();

Jobs

A job in bellboy is a relationship link between processor and destinations. When the job is run, data processing and loading mechanism will be started.

Initialization

To initialize a Job instance, pass processor and some destination(s).

const job = new bellboy.Job(processor_instance, [destination_instance], job_options = {});

Options

Instance methods

  • run async function()
    Starts processing data.
  • on function(event, async function listener)
    Intercepts specified event and pauses processing until listener function will be executed.
    If on returns some truthy value, processing will be stopped.
// move file to the new location when endProcessingStream event is fired
job.on('endProcessingStream', async (file) => {
    const filePath = path.join(srcPath, file);
    const newFilePath = path.join(`./destination`, file);
    await rename(filePath, newFilePath);
});

Events

The following table lists the job life-cycle events and parameters they emit.

Event Parameters Description
startProcessing Job has started execution.
endProcessing Job has finished execution.
startProcessingStream ...args Stream processing has been started (before calling processStream inside processor). Passed parameters may vary based on specific processor.
endProcessingStream ...args Stream processing has been finished (after calling processStream inside processor). Passed parameters may vary based on specific processor.
processingError error Job has failed.
startProcessingRow data Received row is about to be processed.
endProcessingRow Received row has been processed.
rowGenerated destinationIndex, data Row has been generated using recordGenerator function.
rowGenerationError destinationIndex, data, error Record generation function recordGenerator has been failed.
transformingBatch destinationIndex, data Batch is about to be transformed (before calling batchTransformer function).
transformedBatch destinationIndex, data Batch has been successfully transformed (after calling batchTransformer function).
transformingBatchError destinationIndex, data, error Batch transformation has been failed (batchTransformer function has thrown an error).
loadingBatch destinationIndex, data Batch is about to be loaded in destination.
loadedBatch destinationIndex Batch load has been finished.
loadingBatchError destinationIndex, data, error Batch load has failed.

Processors

Each processor in bellboy is a class which has a single responsibility of processing data of specific type -

Options

  • rowLimit number
    Number of records to be processed before stopping processor. If not specified or 0 is passed, all records will be processed.

MqttProcessor

Usage examples

Listens for messages and processes them one by one. It also handles backpressure by queuing messages, so all messages can be eventually processed.

Options

HttpProcessor

Usage examples

Processes data received from a HTTP call. Can process JSON as well as delimited data. Can handle pagination by using nextRequest function.

Options

  • Processor options
  • connection object required
    Options from request library.
  • dataFormat delimited | json required
  • delimiter string required for delimited
  • jsonPath string required for json
    Only values that match provided JSONPath will be processed.
  • nextRequest async function(header)
    Function which must return connection for the next request or null if the next request is not needed. If data format is json, it will have header parameter which contains data before the first jsonPath match.
const processor = new bellboy.HttpProcessor({
    // gets next connection from the header until last page is reached
    nextRequest: async function (header) {
        if (header) {
            const pagination = header.pagination;
            if (pagination.total_pages > pagination.current_page) {
                return {
                    ...connection,
                    url: `${url}&current_page=${pagination.current_page + 1}`
                };
            }
        }
        return null;
    },
    // ...
});

Directory processors

Used for streaming text data from files in directory. There are currently three types of directory processors - ExcelProcessor, JsonProcessor and TailProcessor. Such processors search for the files in the source directory and process them one by one.

File name (file) and full file path (filePath) parameters will be passed to startProcessingStream event.

Options

  • Processor options
  • path string required
    Path to the directory where files are located.
  • filePattern string
    Regex pattern for the files to be processed. If not specified, all files in the directory will be matched.
  • files string[]
    Array of file names. If not specified, all files in the directory will be matched against filePattern regex and processed in alphabetical order.

ExcelProcessor

Usage examples

Processes XLSX files in the directory.

Options

  • Directory processor options
  • hasHeader boolean
    Wether worksheet has header or not, false by default.
  • ignoreEmpty boolean
    Wether ignore empty rows or not, true by default.
  • sheets (string | number)[] | async function(sheets)
    Array of sheet names and/or sheet indexes or async function, which accepts array of all sheet names and must return another array of sheets that needs to be processed. If not specified, first sheet will be processed.
const processor = new bellboy.ExcelProcessor({
    // process last sheet
    sheets: async (sheets) => {
        return [sheets[sheets.length - 1]];
    },
    // ...
});

Produced row

To see how processed row will look like, proceed to xlstream library documentation which is used for Excel processing.

JsonProcessor

Processes JSON files in the directory.

Options

DelimitedProcessor

Usage examples

Processes files with delimited data in the directory.

Options

TailProcessor

Usage examples

Watches for file changes and outputs last part of file as soon as new lines are added to the file.

Options

  • Directory processor options
  • fromBeginning boolean
    In addition to emitting new lines, emits lines from the beginning of file, false by default.

Produced row

  • file string
    Name of the file the data came from.
  • data string

Database processors

Processes SELECT query row by row. There are two database processors - PostgresProcessor (usage examples) and MssqlProcessor (usage examples). Both of them are having the same options.

Options

  • Processor options
  • query string required
    Query to execute.
  • connection object required
    • user
    • password
    • server
    • host
    • database
    • schema
      Currently available only for PostgresProcessor.

DynamicProcessor

Processor which generates records on the fly. Can be used to define custom data processors.

Options

// processor which generates 10 records dynamically
const processor = new bellboy.DynamicProcessor({
    generator: async function* () {
        for (let i = 0; i < 10; i++) {
            yield i;
        }
    },
});

Destinations

Every job can have as many destinations (outputs) as needed. For example, one job can load processed data into a database, log this data to stdout and post it by HTTP simultaneously.

Options

  • disableLoad boolean
    If true, no data will be loaded to the destination. In combination with reporters, this option can become handy during testing process.
  • batchSize number
    Number of records to be processed before loading them to the destination. If not specified or 0 is passed, all records will be processed.
  • recordGenerator async generator function(row)
    Function which receives produced row by processor and can apply transformations to it.
  • batchTransformer async function(rows)
    Function which receives whole batch of rows. This function is being called after row count reaches batchSize. Data is being loaded to destination immediately after this function has been executed.

StdoutDestination

Logs out all data to stdout (console).

Options

HttpDestination

Usage examples

Puts processed data one by one in body and executes specified HTTP request.

Options

PostgresDestination

Usage examples

Inserts data to PostgreSQL.

Options

  • General destination options
  • table string required
    Table name.
  • upsertConstraints string[]
    If specified, UPSERT command will be executed based on provided constraints.
  • connection object required
    • user
    • password
    • server
    • host
    • database
    • schema

MssqlDestination

Usage examples

Inserts data to MSSQL.

Options

  • General destination options
  • table string required
    Table name.
  • upsertConstraints string[]
    If specified, UPSERT command will be executed based on provided constraints.
  • connection object required
    • user
    • password
    • server
    • host
    • database

Extendability

New processors and destinations can be made by extending existing ones. Feel free to make a pull request if you create something interesting.

Creating a new processor

Processor class examples

To create a new processor, you must extend Processor class and implement async process function. This function accepts one parameter:

  • processStream async function(readStream, ...args) required
    Callback function which accepts Readable stream. After calling this function, job instance will handle passed stream internally. Passed parameters (args) will be emitted with startProcessingStream event during job execution.
class CustomProcessor extends bellboy.Processor {
    async process(processStream) {
        // await processStream(readStream, 'hello', 'world');
    }
}

Creating a new destination

Destination class examples

To create a new destination, you must extend Destination class and implement async loadBatch function. This function accepts one parameter:

  • data any[] required
    Array of some processed data that needs to be loaded.
class CustomDestination extends bellboy.Destination {
    async loadBatch(data) {
        console.log(data);
    }
}

Creating a new reporter

Official stdout reporter

Reporter is a job wrapper which can operate with job instance (for example, listen to events using job on method). To create a new reporter, you must extend Reporter class and implement report function, which will be executed during job instance initialization. This function accepts one parameter:

  • job Job required
    Job instance
class CustomReporter extends bellboy.Reporter {
    report(job) {
        job.on('startProcessing', async () => {
            console.log('Job has been started.');
        });
    }
}

Testing

Tests can be run by using docker-compose up --abort-on-container-exit --exit-code-from test command.

About

Highly performant JavaScript data stream ETL engine.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • TypeScript 92.4%
  • Shell 7.5%
  • JavaScript 0.1%