Skip to content

Commit

Permalink
feat(SQLite): Add support for SQLite
Browse files Browse the repository at this point in the history
  • Loading branch information
evansiroky committed Nov 21, 2016
1 parent 804a380 commit a733f0e
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 153 deletions.
21 changes: 6 additions & 15 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ cache:

node_js:
- '6'
- '5'
- '4'

env:
- COVERAGE=false
Expand All @@ -17,7 +15,6 @@ services:
- postgresql

before_install:
- npm i -g npm@^2.0.0
- npm prune

before_script:
Expand All @@ -30,20 +27,14 @@ before_script:
- psql -U postgres -c 'GRANT streamer_role TO streamer;'
- psql -U postgres -c 'GRANT ALL ON DATABASE streamer_test TO streamer_role;'

matrix:
fast_finish: true
include:
- node_js: "0.12"
env: COVERAGE=true
script: "npm run codeclimate"
allow_failures:
- node_js: "0.12"
env: COVERAGE=true
script: "npm run codeclimate"

install:
- npm i
script:
- npm run codeclimate

after_success:
- npm run semantic-release

branches:
except:
- /^v\d+\.\d+\.\d+$/
- /^v\d+\.\d+\.\d+$/
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![npm version](https://badge.fury.io/js/db-streamer.svg)](http://badge.fury.io/js/db-streamer) [![Build Status](https://travis-ci.org/evansiroky/db-streamer.svg?branch=master)](https://travis-ci.org/evansiroky/db-streamer) [![Dependency Status](https://david-dm.org/evansiroky/db-streamer.svg)](https://david-dm.org/evansiroky/db-streamer) [![Test Coverage](https://codeclimate.com/github/evansiroky/db-streamer/badges/coverage.svg)](https://codeclimate.com/github/evansiroky/db-streamer/coverage)

A library to stream data into a SQL database. Currently supports streaming data into PostgreSQL or MySQL tables.
A library to stream data into a SQL database. Currently supports streaming data into PostgreSQL, MySQL or SQLite.

## Additional Dependencies

Expand All @@ -23,11 +23,20 @@ You must also install the package `promise-polyfill` and write additional code.
npm install mysql --save
npm install streamsql --save

### SQLite

npm install sqlite3 --save
npm install streamsql --save

#### Deferred inserting w/ SQLite

For now, deferred inserting with SQLite assumes that a unix shell is available to pipe commands to the sqlite3 binary tool.

## Usage

var dbStreamer = require('db-streamer'),
connString = 'postgres://streamer:streamer@localhost:5432/streamer-test';

// create inserter
var inserter = dbStreamer.getInserter({
dbConnString: connString,
Expand Down Expand Up @@ -66,21 +75,22 @@ You must also install the package `promise-polyfill` and write additional code.
inserter.end();

});

### Inserter Config

| Key | Description |
| --- | --- |
| dbConnString | A database connection string. |
| tableName | The tablename to insert into. |
| columns | Array of column names. |
| primaryKey | Required if using MySQL. String of the primary key (defaults to `id` if omitted). |
| primaryKey | Required if using MySQL or SQLite. String of the primary key (defaults to `id` if omitted). |
| deferUntilEnd | Boolean (default=false). Stream output to temporary file which is then streamed in all at once into table upon calling `end`. |
| sqliteStorage | Required if using SQLite. String of the filename to load data to. Unfortunately, will not work with `:memory:` (well it will, but all data will be lost after disconnecting, so it's kind of pointless). |

### Inserter Config (Sequelize Bulk Insert alternative)

| Key | Description |
| --- | --- |
| useSequelizeBulkInsert | Boolean. Perform the insert using a combination of [async.cargo](https://github.com/caolan/async#cargo) and [sequelize bulkInsert](http://docs.sequelizejs.com/en/latest/api/model/#bulkcreaterecords-options-promisearrayinstance). Must provide `sequelizeModel` parameter too. |
| sequelizeModel | The sequelize model to perform a bulk insert with. |
| deferUntilEnd | Boolean (default=false). Pause all cargo iterations until calling `end`. |
| deferUntilEnd | Boolean (default=false). Pause all cargo iterations until calling `end`. |
38 changes: 20 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,26 @@ streamer.getInserter = function(config) {
return require('./lib/inserters/sequelizeBulkInserter.js')(config);
} else if(config.dbConnString) {
parsed = parse(config.dbConnString);
}
switch(parsed.protocol) {
case 'postgres:':
return require('./lib/inserters/pgInserter.js')(config);
break;
case 'mysql:':
if(parsed) {
config.dbname = parsed.pathname.substr(1);
config.username = parsed.username;
config.password = parsed.password;
config.hostname = parsed.hostname;
config.port = parseInt(parsed.port, 10);
}
return require('./lib/inserters/mySqlInserter.js')(config);
default:
return require('./lib/inserters/sequelizeBulkInserter.js')(config);
break;
switch(parsed.protocol) {
case 'postgres:':
return require('./lib/inserters/pgInserter.js')(config);
break;
case 'mysql:':
if(parsed) {
config.dbname = parsed.pathname.substr(1);
config.username = parsed.username;
config.password = parsed.password;
config.hostname = parsed.hostname;
config.port = parseInt(parsed.port, 10);
}
return require('./lib/inserters/mySqlInserter.js')(config);
default:
return require('./lib/inserters/sequelizeBulkInserter.js')(config);
break;
}
} else if(config.sqliteStorage) {
return require('./lib/inserters/sqliteInserter.js')(config)
}
};

module.exports = streamer;
module.exports = streamer;
107 changes: 9 additions & 98 deletions lib/inserters/mySqlInserter.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ var fs = require('fs'),
path = require('path');

var async = require('async'),
streamsql = require('streamsql'),
mysql = require('mysql');

var RowInserter = require('./rowInserter.js'),
var StreamSqlInserter = require('./streamSqlInserter.js'),
util = require('../util.js');

var MySqlInserter = function(config) {

this.dbname = config.dbname;
this.driver = 'mysql';
this.username = config.username;
this.password = config.password;
this.tableName = config.tableName;
Expand All @@ -26,88 +26,18 @@ var MySqlInserter = function(config) {

};

MySqlInserter.prototype = new RowInserter();

MySqlInserter.prototype._connect = function(callback) {

var connectCfg = this.createConnectConfig();
connectCfg.driver = 'mysql';
this.db = streamsql.connect(connectCfg, callback);
};
MySqlInserter.prototype = new StreamSqlInserter();

MySqlInserter.prototype.createConnectConfig = function() {
return {
database: this.dbname,
host: this.hostname,
port: this.port,
user: this.username,
password: this.password,
database: this.dbname
port: this.port,
user: this.username
};
};

MySqlInserter.prototype.connect = function(callback) {

if(!this.defer) {
var self = this;
this._connect(function(err) {
if(!err) {
self.setModel();
}
callback(err);
});
} else {
callback();
}
};

MySqlInserter.prototype.getStreamSqlTableWriteStream = function() {
var table = this.db.table(this.tableName, {
fields: this.columns,
primaryKey: this.primaryKey
});
var ws = table.createWriteStream();
return table.createWriteStream();
};

MySqlInserter.prototype.setModel = function(newTable, newColumns) {

this.tableName = newTable ? newTable : this.tableName;
this.columns = newColumns ? newColumns : this.columns;

if(this.defer) {
// write to file instead
this.deferred = util.createDefered();
this.dataStream = this.deferred.dataStream;
} else {
this.dataStream = this.getStreamSqlTableWriteStream();
}

};

MySqlInserter.prototype.push = function(row) {

if(this.defer) {
this.dataStream.write(util.makeBufferText(row, this.columns, 'mysql'));
} else {
var filteredRow = {};
for (var i = 0; i < this.columns.length; i++) {
var k = this.columns[i];

switch(k) {
case 'createdAt':
case 'updatedAt':
filteredRow[k] = row[k] ? row[k] : new Date();
break;
default:
filteredRow[k] = row[k];
break;
}
}
this.dataStream.write(filteredRow);
}

};

MySqlInserter.prototype.end = function() {

this.dataStream.end();
Expand All @@ -130,7 +60,7 @@ MySqlInserter.prototype.end = function() {
conn.connect(cb);
},
load: ['connect', function(results, cb) {
conn.query(loadText,
conn.query(loadText,
[path.join(process.cwd(), self.deferred.tempDeferredFilename)],
cb);
}],
Expand All @@ -142,25 +72,6 @@ MySqlInserter.prototype.end = function() {

}

MySqlInserter.prototype.setEndHandler = function(fn) {
if(this.defer) {
this.endHandler = fn;
} else {
var self = this,
errors = [];

this.dataStream.on('error', function(err) {
errors.push(err);
});

this.dataStream.on('close', function(err) {
self.db.close(function(closeErr) {
fn(errors.length > 0 ? errors : closeErr);
});
});
}
};

module.exports = function(config) {
return new MySqlInserter(config);
}
}
64 changes: 64 additions & 0 deletions lib/inserters/sqliteInserter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
var exec = require('child_process').exec,
fs = require('fs'),
path = require('path');

var async = require('async'),
sqlite3 = require('sqlite3');

var StreamSqlInserter = require('./streamSqlInserter.js'),
util = require('../util.js');

var SqliteInserter = function(config) {
this.delimiter = '|';
this.driver = 'sqlite3';
this.tableName = config.tableName;
this.columns = config.columns;
this.defer = config.deferUntilEnd;
this.storage = config.sqliteStorage;

if(this.defer) {
this.setModel();
}

};

SqliteInserter.prototype = new StreamSqlInserter();

SqliteInserter.prototype.createConnectConfig = function() {
return {
filename: this.storage
};
};

SqliteInserter.prototype.end = function() {

this.dataStream.end();

if(this.defer) {
var self = this;

var runCommandInBinaryTool = function(cmd, cb) {
exec('echo "' + cmd + '" | sqlite3 ' + self.storage, cb)
}

async.auto({
setDelimiter: function(cb) {
runCommandInBinaryTool('.separator |', cb)
},
load: ['setDelimiter', function(results, cb) {
var cmd = '.import ' +
self.deferred.tempDeferredFilename +
' ' + self.tableName;
runCommandInBinaryTool(cmd, cb);
}],
deleteTempFile: ['load', function(results, cb) {
fs.unlink(self.deferred.tempDeferredFilename, cb);
}]
}, self.endHandler);
}

}

module.exports = function(config) {
return new SqliteInserter(config);
}
Loading

0 comments on commit a733f0e

Please sign in to comment.