Skip to content

Commit

Permalink
Merge pull request #48 from ZJONSSON/postgres
Browse files Browse the repository at this point in the history
Postgres
  • Loading branch information
ZJONSSON committed Jan 7, 2017
2 parents 43ea14d + f8d6535 commit e59107d
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 5 deletions.
50 changes: 48 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ fs.createReadStream('scores.csv')
* [Parsers](#parsers)
* [Transforms](#transforms)
* [Database upload](#databases)
* [Mongodb](#mongodb)
* [Mongodb](#mongo)
* [Mysql](#mysql)
* [Postgres](#postgres)
* [Elasticsearch](#elasticsearch)
* [Utilities](#utilities)
Expand Down Expand Up @@ -259,10 +260,17 @@ etl.file('test.csv')
Pipeline that scripts incoming packets into bulk sql commands (`etl.mysql.script`) and executes them (`etl.mysql.execute`) using the supplied mysql pool. When the size of each SQL command reaches `maxBuffer` (1mb by default) the command is sent to the server. Concurrency is managed automatically by the mysql poolSize.
Example:
```js
etl.file('test.csv')
.pipe(etl.csv())
.pipe(etl.mysql.upsert(pool,'testschema','testtable',{concurrency:4 }))
```
<a name="mysqlscript" href="#mysqlscript">#</a> etl.mysql.<b>script</b>(<i>pool</i>, <i>schema</i>, <i>table</i> [,<i>options</i>])
Collects data and builds up a mysql statement to insert/update data until the buffer is more than `maxBuffer` (customizable in options). Then the maxBuffer is reached, a full sql statement is pushed downstream. When the input stream has ended, any remaining sql statement buffer will be flushed as well.
Collects data and builds up a mysql statement to insert/update data until the buffer is more than `maxBuffer` (customizable in options). Then the maxBuffer is reached, a full sql statement is pushed downstream. When the input stream has ended, any remaining sql statement buffer will be flushed as well.
The script stream first establishes the column names of the table being updated, and as data comes in - it uses only the properties that match column names in the table.
Expand All @@ -282,6 +290,44 @@ etl.file('test.csv')
.pipe(etl.mysql.execute(pool,4))
```
#### Postgres
<a name="postgresupsert" href="#postgresupsert">#</a> etl.postgres.<b>upsert</b>(<i>pool</i>, <i>schema</i>, <i>table</i> [,<i>options</i>])
Pipeline that scripts incoming packets into bulk sql commands (`etl.postgres.script`) and executes them (`etl.postgres.execute`) using the supplied postgres pool. When the size of each SQL command reaches `maxBuffer` (1mb by default) the command is sent to the server. Concurrency is managed automatically by the postgres poolSize. If primary key is defined and an incoming data packet contains a primary key that already exists in the table, the record will be updated - otherwise the packet will be inserted.
Example:
```js
etl.file('test.csv')
.pipe(etl.csv())
.pipe(etl.postgres.upsert(pool,'testschema','testtable',{concurrency:4 }))
```
<a name="postgresscript" href="#postgresscript">#</a> etl.postgres.<b>script</b>(<i>pool</i>, <i>schema</i>, <i>table</i> [,<i>options</i>])
Collects data and builds up a postgres statement to insert/update data until the buffer is more than `maxBuffer` (customizable in options). Then the maxBuffer is reached, a full sql statement is pushed downstream. When the input stream has ended, any remaining sql statement buffer will be flushed as well.
The script stream first establishes the column names of the table being updated, and as data comes in - it uses only the properties that match column names in the table.
<a name="postgresexecute" href="#postgresexecute">#</a> etl.postgres.<b>execute</b>(<i>pool</i> [,<i>options</i>])
This component executes any incoming packets as sql statements using connections from the connection pool. The maximum concurrency is automatically determined by the postgres poolSize, using the combination of callbacks and Promises.
Example:
```js
// The following bulks data from the csv into sql statements and executes them with
// a maximum of 4 concurrent connections

etl.file('test.csv')
.pipe(etl.csv())
.pipe(etl.postgres.script(pool,'testschema','testtable'))
.pipe(etl.postgres.execute(pool,4))
```
#### Elasticsearch
<a name="elasticbulk" href="#elasticbulk">#</a> etl.elastic.<b>bulk</b>(<i>action</i>, <i>client</i>, [,<i>index</i>] [,<i>type</i>] [,<i>options</i>])
Expand Down
3 changes: 2 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module.exports = {
inspect : require('./inspect'),
mongo : require('./mongo'),
mysql : require('./mysql'),
postgres : require('./postgres'),
elastic : require('./elasticsearch'),
cluster : require('./cluster'),
chain : require('./chain'),
Expand All @@ -22,4 +23,4 @@ module.exports = {
map : streamz,
keepOpen: require('./keepOpen'),
streamz : streamz
};
};
25 changes: 25 additions & 0 deletions lib/postgres/execute.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
var Postgres = require('./postgres'),
util = require('util');

function Execute(pool,options) {
if (!(this instanceof Execute))
return new Execute(pool,options);

options = options || {};
Postgres.call(this,pool,options);

}

util.inherits(Execute,Postgres);

Execute.prototype._fn = function(d,cb) {
var self = this;
// TODO make transaction or use {maxBuffer:1} in options
//console.log(d);
return this.query(d,cb)
.then(function(d) {
return self.options.pushResult && d;
});
};

module.exports = Execute;
6 changes: 6 additions & 0 deletions lib/postgres/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module.exports = {
postgres : require('./postgres'),
script : require('./script'),
execute : require('./execute'),
upsert : require('./upsert')
};
65 changes: 65 additions & 0 deletions lib/postgres/postgres.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
var Streamz = require('streamz'),
util = require('util'),
Promise = require('bluebird');

function Postgres(pool,options) {
if (!(this instanceof Postgres))
return new Postgres(pool);

if (!pool)
throw 'POOL_MISSING';

Streamz.call(this,options);
this.pool = pool;
this.options = options || {};
}

util.inherits(Postgres,Streamz);

Postgres.prototype.getConnection = function() {
// Needs from refactor
return Promise.fromNode(this.pool.connect.bind(this.pool))
.disposer(function(connection) {
connection.release();
});
};

Postgres.prototype.query = function(query,cb) {
return Promise.using(this.getConnection(),function(connection) {
// Trigger callback when we get a connection, not when we (later) get results
// allowing overall concurrency to be controlled by the Postgres pool
if (typeof cb === 'function') cb();
return Promise.fromNode(function(callback) {
connection.query(query,callback);
});
});
};

Postgres.prototype.stream = function(query,cb) {
var passThrough = Streamz();

if (!query || query.state !== 'initialized') {
passThrough.emit('error',new Error('Query should be QueryStream'));
return passThrough;
}

Promise.using(this.getConnection(),function(connection) {
// Trigger callback when we get a connection, not when we (later) get results
// allowing overall concurrency to be controlled by the Postgres pool
if (typeof cb === 'function') cb();
return new Promise(function(resolve,reject) {
connection.query(query)
.on('end',resolve)
.on('error',reject)
.pipe(passThrough);
});
})
.catch(function(e) {
passThrough.emit('error',e);
});

return passThrough;

};

module.exports = Postgres;
144 changes: 144 additions & 0 deletions lib/postgres/script.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
var Postgres = require('./postgres'),
util = require('util'),
moment = require('moment');

function Script(pool, schema, table, options) {
if (!(this instanceof Script))
return new Script(pool, schema, table, options);

Postgres.call(this, pool, options);

this.schema = schema;
this.table = table;
this.columns = this.getColumns();
this.pkeys = this.getPrimayKeys();
this.prefix = this.options.prefix || 'INSERT INTO';
}

util.inherits(Script, Postgres);

Script.prototype.getColumns = function () {
var sql = "SELECT column_name FROM INFORMATION_SCHEMA.COLUMNS WHERE " +
"TABLE_CATALOG='" + this.schema + "' AND TABLE_NAME='" + this.table + "';";
return this.query(sql)
.then(function (d) {
d = d.rows;
if (!d.length)
throw 'TABLE_NOT_FOUND';
return d.map(function (d) {
return d.column_name;
});
});
};

Script.prototype.getPrimayKeys = function () {
var sql = "SELECT a.attname"
+ " FROM pg_index i"
+ " JOIN pg_attribute a ON a.attrelid = i.indrelid"
+ " AND a.attnum = ANY(i.indkey)"
+ " WHERE i.indrelid = '" + this.table + "'::regclass"
+ " AND i.indisprimary;";
return this.query(sql)
.then(function (d) {
d = d.rows;
return d.map(function (d) {
return d.attname;
});
});
};

Script.prototype.buffer = undefined;

Script.prototype._push = function () {
if (this.buffer) this.push(this.buffer);
this.buffer = undefined;
};

Script.prototype._fn = function (record) {
var self = this;
return Promise.all([this.columns, this.pkeys]).then(function (data) {
var columns = data[0];
var pkeys = data[1];
var d = (Array.isArray(record)) ? record[0] : record;
if (typeof d === "undefined") { return; }

if (!self.buffer)
self.buffer = self.prefix + " " + self.table +
" (" + columns.join(',') + ") VALUES ";

self.buffer += '(' + columns.map(function (key) {
var value = d[key];
if (typeof value === "undefined")
return "DEFAULT";
else if (value === null)
return "null";
else if (typeof value === "object")
return escapeLiteral(JSON.stringify(value));
else
return escapeLiteral(value);
})
.join(',') + ')';


var tmp_arr = [];
for (var i = 0, l = columns.length; i < l; i++) {
var value = d[columns[i]];
if (typeof value === "undefined") {
continue;
}
var sql = columns[i] + " = ";
if (value === null)
sql += "null";
else if (typeof value === "object")
sql += escapeLiteral(JSON.stringify(value));
else
sql += escapeLiteral(value);

tmp_arr.push(sql);
}
if (tmp_arr.length && pkeys.length) {
self.buffer += " ON CONFLICT (" + pkeys.join(', ') + ") DO UPDATE SET "+tmp_arr.join(', ');
}
self.buffer += ";";

self._push();

});
};

Script.prototype._flush = function (cb) {
var self = this;
return Postgres.prototype._flush.call(this, function () {
self._push();
setImmediate(cb);
});
};

// https://github.com/brianc/node-postgres/blob/83a946f61cb9e74c7f499e44d03a268c399bd623/lib/client.js
function escapeLiteral(str) {
var hasBackslash = false;
var escaped = '\'';

if (typeof str !== "string") { return str; }
for (var i = 0; i < str.length; i++) {
var c = str[i];
if (c === '\'') {
escaped += c + c;
} else if (c === '\\') {
escaped += c + c;
hasBackslash = true;
} else {
escaped += c;
}
}

escaped += '\'';

if (hasBackslash === true) {
escaped = ' E' + escaped;
}

return escaped;
}

module.exports = Script;
11 changes: 11 additions & 0 deletions lib/postgres/upsert.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
var chain = require('../chain'),
script = require('./script'),
execute = require('./execute');

module.exports = function upsert(pool,schema,table,options) {
return chain(function(incoming) {
return incoming
.pipe(script(pool,schema,table,options))
.pipe(execute(pool,options));
});
};
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "etl",
"version": "0.3.32",
"version": "0.3.33",
"description": "Collection of stream-based components that form an ETL pipeline",
"main": "index.js",
"author": "Ziggy Jonsson (http://github.com/zjonsson/)",
Expand All @@ -22,6 +22,8 @@
"devDependencies": {
"mongodb": "~2.1.6",
"elasticsearch": "~8.2.0",
"mysql": "~2.9.0"
"mysql": "~2.9.0",
"pg": "^6.1.2",
"pg-query-stream": "^1.0.0"
}
}
Loading

0 comments on commit e59107d

Please sign in to comment.