Skip to content

Utility function providing a "handleline" callback which is called for every record in a message stream.

License

Notifications You must be signed in to change notification settings

gulpetl/gulp-etl-handlelines

Repository files navigation

gulp-etl-handlelines

Utility function providing a "handleline" callback which is called for every record in a gulp-etl Message Stream. This very powerful functionality can be used for filtering, transformations, counters, etc. and is a nice way to add functionality without building a full module. It also powers a number of our other modules, greatly simplifying their development by handling the "boilerplate" code needed for a module. Works in both buffer and streaming mode.

This is a gulp-etl plugin, and as such it is a gulp plugin. gulp-etl plugins work with ndjson data streams/files which we call Message Streams and which are compliant with the Singer specification. Message Streams look like this:

{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "users", "record": {"id": 1, "name": "Chris"}}
{"type": "RECORD", "stream": "users", "record": {"id": 2, "name": "Mike"}}
{"type": "SCHEMA", "stream": "locations", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "locations", "record": {"id": 1, "name": "Philadelphia"}}
{"type": "STATE", "value": {"users": 2, "locations": 1}}

Usage

gulp-etl plugins accept a configObj as its first parameter. The configObj will contain any info the plugin needs.

In addition, this plugin also accepts a TransformCallback function. That function will receive a Singer message object (a RECORD, SCHEMA or STATE) and is expected to return either the Singer message object (whether transformed or unchanged) to be passed downstream, an array of singer messages or null to remove the message from the stream).

This plugin also accepts a FinishCallback and StartCallback, which are functions that are executed before and after the TransformCallback. The FinishCallback can be used to manage data stored collected from the stream.

All callbacks are passed two objects:

  • a context object, which is created per-file and allows the callbacks to persist their data across calls. It follows the API suggested by gulp-data and can be set or retrieved by other plugins as file.data.config
  • the gulp file object itself

Send in callbacks as a second parameter in the form:

{
    transformCallback: transformLineHandler,
    finishCallback: defaultFinishHandler,
    startCallback: defaultStartHandler
}

Sample gulpfile.js

var handleLines = require('gulp-etl-handlelines').handlelines
// for TypeScript use this line instead:
// import { handlinelines } from 'gulp-etl-handlelines'

const defaultFinishHandler = (context, file): void => {
    console.log("The handler for " + file.basename + " has officially ended!");
}
const defaultStartHandler = (context, file) => {
    console.log("The handler for " + file.basename + " has officially started!");
}

const linehandler = (lineObj, context) => {
    // add a linenum property to each line to demonstrate how the context object tracks context per file
    if (!context.lineNum) context.lineNum = 1
    else context.lineNum++
    lineObj.lineNum = context.lineNum;


    // return null to remove this line
    if (!lineObj.record || lineObj.record["TestValue"] == 'illegalValue') {return null}

    // optionally make changes to lineObj
    lineObj.record["NewProperty"] = "asdf"

    // return the changed lineObj
    return lineObj
}

exports.default = function() {
    return src('data/*.ndjson')
    // pipe the files through our handlelines plugin
    .pipe(handlelines({}, { transformCallback: linehandler,
        finishCallback: defaultFinishHandler,
        startCallback: defaultStartHandler }
    ))
    .pipe(dest('output/'));
}

Model Plugin

This plugin is intended to be a model gulp-etl plugin, usable as a template to be forked to create new plugins for other uses. It is compliant with best practices for gulp plugins, and it properly handles both buffers and streams.

Quick Start

  • Dependencies:
    • git
    • nodejs - At least v6.3 (6.9 for Windows) required for TypeScript debugging
    • npm (installs with Node)
    • typescript - installed as a development dependency
  • Clone this repo and run npm install to install npm packages
  • Debug: with VScode use Open Folder to open the project folder, then hit F5 to debug. This runs without compiling to javascript using ts-node
  • Test: npm test or npm t
  • Compile to javascript: npm run build

Testing

We are using Jest for our testing. Each of our tests are in the test folder.

- Run npm test to run the test suites Note: Tests are currently broken

Note: This document is written in Markdown. We like to use Typora and Markdown Preview Plus for our Markdown work..

About

Utility function providing a "handleline" callback which is called for every record in a message stream.

Resources

License

Stars

Watchers

Forks

Packages

No packages published