From 48597231518fef4a60fe28c5ea1d21fecfb74295 Mon Sep 17 00:00:00 2001 From: evansiroky Date: Wed, 8 Jun 2016 10:19:05 -0700 Subject: [PATCH] fix(mysql): Remove commit statement in mysql inserter It was actually a bug with the node-mysql package which was fixed. See https://github.com/felixge/node-mysql/pull/1438 --- README.md | 168 ++++++++--------- lib/inserters/mySqlInserter.js | 333 ++++++++++++++++----------------- package.json | 132 ++++++------- 3 files changed, 313 insertions(+), 320 deletions(-) diff --git a/README.md b/README.md index dd9c449..d55e7b6 100644 --- a/README.md +++ b/README.md @@ -1,86 +1,82 @@ -# db-streamer - -[![npm version](https://badge.fury.io/js/db-streamer.svg)](http://badge.fury.io/js/db-streamer) [![Build Status](https://travis-ci.org/evansiroky/db-streamer.svg?branch=master)](https://travis-ci.org/evansiroky/db-streamer) [![Dependency Status](https://david-dm.org/evansiroky/db-streamer.svg)](https://david-dm.org/evansiroky/db-streamer) [![Test Coverage](https://codeclimate.com/github/evansiroky/db-streamer/badges/coverage.svg)](https://codeclimate.com/github/evansiroky/db-streamer/coverage) - -A library to stream data into a SQL database. Currently supports streaming data into PostgreSQL or MySQL tables. - -## Additional Dependencies - -In order to use this library, you must also install the additional libraries in your project depending on the database that you use. - -### PostgreSQL - - npm install pg --save - npm install pg-copy-streams --save - npm install pg-hstore --save - -### MySQL - - npm install mysql --save - npm install streamsql --save - -## Usage - - var dbStreamer = require('db-streamer'), - connString = 'postgres://streamer:streamer@localhost:5432/streamer-test'; - - // create inserter - var inserter = dbStreamer.getInserter({ - dbConnString: connString, - tableName: 'test_table', - columns: ['a', 'b', 'c'] - }); - - // establish connection - inserter.connect(function(err, client) { - - // push some rows - inserter.push({a: 1, b: 'one', c: new Date() }); - inserter.push({a: 2, b: 'two', c: new Date() }); - inserter.push({a: 3, b: 'three', c: new Date() }); - - // create child table inserter using deferring strategy - // this is useful to avoid missing foreign key conflicts as a result of race conditions - var childInserter = dbStreamer.getInserter({ - dbConnString: connString, - tableName: 'child_table', - columns: ['a', 'd', 'e'], - deferUntilEnd: true - }); - - childInserter.push({a: 2, d: 'asdf', e: new Date() }); - childInserter.push({a: 3, d: 'ghjk', e: new Date() }); - - childInserter.setEndHandler(callback); - - // set end callback - inserter.setEndHandler(function() { - childInserter.end(); - }); - - // announce end - inserter.end(); - - }); - -### Inserter Config - -| Key | Description | -| --- | --- | -| dbConnString | A database connection string. | -| tableName | The tablename to insert into. | -| columns | Array of column names. | -| primaryKey | Required if using MySQL. String of the primary key (defaults to `id` if omitted). | -| deferUntilEnd | Boolean (default=false). Stream output to temporary file which is then streamed in all at once into table upon calling `end`. | - -### Inserter Config (Sequelize Bulk Insert alternative) - -| Key | Description | -| --- | --- | -| useSequelizeBulkInsert | Boolean. Perform the insert using a combination of [async.cargo](https://github.com/caolan/async#cargo) and [sequelize bulkInsert](http://docs.sequelizejs.com/en/latest/api/model/#bulkcreaterecords-options-promisearrayinstance). Must provide `sequelizeModel` parameter too. | -| sequelizeModel | The sequelize model to perform a bulk insert with. | -| deferUntilEnd | Boolean (default=false). Pause all cargo iterations until calling `end`. | - -### A note on MySQL - -If using the deferred inserter with MySQL, the `commit` statement is called after the data is loaded into the database. +# db-streamer + +[![npm version](https://badge.fury.io/js/db-streamer.svg)](http://badge.fury.io/js/db-streamer) [![Build Status](https://travis-ci.org/evansiroky/db-streamer.svg?branch=master)](https://travis-ci.org/evansiroky/db-streamer) [![Dependency Status](https://david-dm.org/evansiroky/db-streamer.svg)](https://david-dm.org/evansiroky/db-streamer) [![Test Coverage](https://codeclimate.com/github/evansiroky/db-streamer/badges/coverage.svg)](https://codeclimate.com/github/evansiroky/db-streamer/coverage) + +A library to stream data into a SQL database. Currently supports streaming data into PostgreSQL or MySQL tables. + +## Additional Dependencies + +In order to use this library, you must also install the additional libraries in your project depending on the database that you use. + +### PostgreSQL + + npm install pg --save + npm install pg-copy-streams --save + npm install pg-hstore --save + +### MySQL + + npm install mysql --save + npm install streamsql --save + +## Usage + + var dbStreamer = require('db-streamer'), + connString = 'postgres://streamer:streamer@localhost:5432/streamer-test'; + + // create inserter + var inserter = dbStreamer.getInserter({ + dbConnString: connString, + tableName: 'test_table', + columns: ['a', 'b', 'c'] + }); + + // establish connection + inserter.connect(function(err, client) { + + // push some rows + inserter.push({a: 1, b: 'one', c: new Date() }); + inserter.push({a: 2, b: 'two', c: new Date() }); + inserter.push({a: 3, b: 'three', c: new Date() }); + + // create child table inserter using deferring strategy + // this is useful to avoid missing foreign key conflicts as a result of race conditions + var childInserter = dbStreamer.getInserter({ + dbConnString: connString, + tableName: 'child_table', + columns: ['a', 'd', 'e'], + deferUntilEnd: true + }); + + childInserter.push({a: 2, d: 'asdf', e: new Date() }); + childInserter.push({a: 3, d: 'ghjk', e: new Date() }); + + childInserter.setEndHandler(callback); + + // set end callback + inserter.setEndHandler(function() { + childInserter.end(); + }); + + // announce end + inserter.end(); + + }); + +### Inserter Config + +| Key | Description | +| --- | --- | +| dbConnString | A database connection string. | +| tableName | The tablename to insert into. | +| columns | Array of column names. | +| primaryKey | Required if using MySQL. String of the primary key (defaults to `id` if omitted). | +| deferUntilEnd | Boolean (default=false). Stream output to temporary file which is then streamed in all at once into table upon calling `end`. | + +### Inserter Config (Sequelize Bulk Insert alternative) + +| Key | Description | +| --- | --- | +| useSequelizeBulkInsert | Boolean. Perform the insert using a combination of [async.cargo](https://github.com/caolan/async#cargo) and [sequelize bulkInsert](http://docs.sequelizejs.com/en/latest/api/model/#bulkcreaterecords-options-promisearrayinstance). Must provide `sequelizeModel` parameter too. | +| sequelizeModel | The sequelize model to perform a bulk insert with. | +| deferUntilEnd | Boolean (default=false). Pause all cargo iterations until calling `end`. | \ No newline at end of file diff --git a/lib/inserters/mySqlInserter.js b/lib/inserters/mySqlInserter.js index eb7b77b..b51a563 100644 --- a/lib/inserters/mySqlInserter.js +++ b/lib/inserters/mySqlInserter.js @@ -1,169 +1,166 @@ -var fs = require('fs'), - path = require('path'); - -var async = require('async'), - streamsql = require('streamsql'), - mysql = require('mysql'); - -var RowInserter = require('./rowInserter.js'), - util = require('../util.js'); - -var MySqlInserter = function(config) { - - this.dbname = config.dbname; - this.username = config.username; - this.password = config.password; - this.tableName = config.tableName; - this.columns = config.columns; - this.hostname = config.hostname; - this.port = config.port; - this.defer = config.deferUntilEnd; - this.primaryKey = config.primaryKey; - - if(this.defer) { - this.setModel(); - } - -}; - -MySqlInserter.prototype = new RowInserter(); - -MySqlInserter.prototype._connect = function(callback) { - - var connectCfg = this.createConnectConfig(); - connectCfg.driver = 'mysql'; - this.db = streamsql.connect(connectCfg, callback); -}; - -MySqlInserter.prototype.createConnectConfig = function() { - return { - host: this.hostname, - port: this.port, - user: this.username, - password: this.password, - database: this.dbname - }; -}; - -MySqlInserter.prototype.connect = function(callback) { - - if(!this.defer) { - var self = this; - this._connect(function(err) { - if(!err) { - self.setModel(); - } - callback(err); - }); - } else { - callback(); - } -}; - -MySqlInserter.prototype.getStreamSqlTableWriteStream = function() { - var table = this.db.table(this.tableName, { - fields: this.columns, - primaryKey: this.primaryKey - }); - var ws = table.createWriteStream(); - return table.createWriteStream(); -}; - -MySqlInserter.prototype.setModel = function(newTable, newColumns) { - - this.tableName = newTable ? newTable : this.tableName; - this.columns = newColumns ? newColumns : this.columns; - - if(this.defer) { - // write to file instead - this.deferred = util.createDefered(); - this.dataStream = this.deferred.dataStream; - } else { - this.dataStream = this.getStreamSqlTableWriteStream(); - } - -}; - -MySqlInserter.prototype.push = function(row) { - - if(this.defer) { - this.dataStream.write(util.makeBufferText(row, this.columns, 'mysql')); - } else { - var filteredRow = {}; - for (var i = 0; i < this.columns.length; i++) { - var k = this.columns[i]; - - switch(k) { - case 'createdAt': - case 'updatedAt': - filteredRow[k] = row[k] ? row[k] : new Date(); - break; - default: - filteredRow[k] = row[k]; - break; - } - } - this.dataStream.write(filteredRow); - } - -}; - -MySqlInserter.prototype.end = function() { - - this.dataStream.end(); - - if(this.defer) { - var self = this, - conn = mysql.createConnection(this.createConnectConfig()), - loadText = 'LOAD DATA LOCAL INFILE ? INTO TABLE ' + this.tableName + ' ('; - - for (var i = 0; i < self.columns.length; i++) { - if(i > 0) { - loadText += ','; - } - loadText += self.columns[i]; - }; - loadText += ')'; - - async.auto({ - connect: function(cb) { - conn.connect(cb); - }, - load: ['connect', function(results, cb) { - conn.query(loadText, - [path.join(process.cwd(), self.deferred.tempDeferredFilename)], - cb); - }], - commit: ['load', function(results, cb) { - conn.commit(cb) - }], - deleteTempFile: ['load', function(results, cb) { - fs.unlink(self.deferred.tempDeferredFilename, cb); - }] - }, self.endHandler); - } - -} - -MySqlInserter.prototype.setEndHandler = function(fn) { - if(this.defer) { - this.endHandler = fn; - } else { - var self = this, - errors = []; - - this.dataStream.on('error', function(err) { - errors.push(err); - }); - - this.dataStream.on('close', function(err) { - self.db.close(function(closeErr) { - fn(errors.length > 0 ? errors : closeErr); - }); - }); - } -}; - -module.exports = function(config) { - return new MySqlInserter(config); +var fs = require('fs'), + path = require('path'); + +var async = require('async'), + streamsql = require('streamsql'), + mysql = require('mysql'); + +var RowInserter = require('./rowInserter.js'), + util = require('../util.js'); + +var MySqlInserter = function(config) { + + this.dbname = config.dbname; + this.username = config.username; + this.password = config.password; + this.tableName = config.tableName; + this.columns = config.columns; + this.hostname = config.hostname; + this.port = config.port; + this.defer = config.deferUntilEnd; + this.primaryKey = config.primaryKey; + + if(this.defer) { + this.setModel(); + } + +}; + +MySqlInserter.prototype = new RowInserter(); + +MySqlInserter.prototype._connect = function(callback) { + + var connectCfg = this.createConnectConfig(); + connectCfg.driver = 'mysql'; + this.db = streamsql.connect(connectCfg, callback); +}; + +MySqlInserter.prototype.createConnectConfig = function() { + return { + host: this.hostname, + port: this.port, + user: this.username, + password: this.password, + database: this.dbname + }; +}; + +MySqlInserter.prototype.connect = function(callback) { + + if(!this.defer) { + var self = this; + this._connect(function(err) { + if(!err) { + self.setModel(); + } + callback(err); + }); + } else { + callback(); + } +}; + +MySqlInserter.prototype.getStreamSqlTableWriteStream = function() { + var table = this.db.table(this.tableName, { + fields: this.columns, + primaryKey: this.primaryKey + }); + var ws = table.createWriteStream(); + return table.createWriteStream(); +}; + +MySqlInserter.prototype.setModel = function(newTable, newColumns) { + + this.tableName = newTable ? newTable : this.tableName; + this.columns = newColumns ? newColumns : this.columns; + + if(this.defer) { + // write to file instead + this.deferred = util.createDefered(); + this.dataStream = this.deferred.dataStream; + } else { + this.dataStream = this.getStreamSqlTableWriteStream(); + } + +}; + +MySqlInserter.prototype.push = function(row) { + + if(this.defer) { + this.dataStream.write(util.makeBufferText(row, this.columns, 'mysql')); + } else { + var filteredRow = {}; + for (var i = 0; i < this.columns.length; i++) { + var k = this.columns[i]; + + switch(k) { + case 'createdAt': + case 'updatedAt': + filteredRow[k] = row[k] ? row[k] : new Date(); + break; + default: + filteredRow[k] = row[k]; + break; + } + } + this.dataStream.write(filteredRow); + } + +}; + +MySqlInserter.prototype.end = function() { + + this.dataStream.end(); + + if(this.defer) { + var self = this, + conn = mysql.createConnection(this.createConnectConfig()), + loadText = 'LOAD DATA LOCAL INFILE ? INTO TABLE ' + this.tableName + ' ('; + + for (var i = 0; i < self.columns.length; i++) { + if(i > 0) { + loadText += ','; + } + loadText += self.columns[i]; + }; + loadText += ')'; + + async.auto({ + connect: function(cb) { + conn.connect(cb); + }, + load: ['connect', function(results, cb) { + conn.query(loadText, + [path.join(process.cwd(), self.deferred.tempDeferredFilename)], + cb); + }], + deleteTempFile: ['load', function(results, cb) { + fs.unlink(self.deferred.tempDeferredFilename, cb); + }] + }, self.endHandler); + } + +} + +MySqlInserter.prototype.setEndHandler = function(fn) { + if(this.defer) { + this.endHandler = fn; + } else { + var self = this, + errors = []; + + this.dataStream.on('error', function(err) { + errors.push(err); + }); + + this.dataStream.on('close', function(err) { + self.db.close(function(closeErr) { + fn(errors.length > 0 ? errors : closeErr); + }); + }); + } +}; + +module.exports = function(config) { + return new MySqlInserter(config); } \ No newline at end of file diff --git a/package.json b/package.json index a88149a..f7e53f0 100644 --- a/package.json +++ b/package.json @@ -1,66 +1,66 @@ -{ - "name": "db-streamer", - "version": "0.0.0-semantically-released", - "description": "A library to stream data into a SQL database.", - "main": "index.js", - "scripts": { - "test": "npm run test-mysql && npm run test-postgres", - "test-mysql": "cross-env DIALECT=mysql mocha tests/integration/load.test.js", - "test-postgres": "cross-env DIALECT=postgres mocha tests/integration/load.test.js", - "cover-all": "npm run cover-mysql && npm run cover-postgres && npm run merge-coverage", - "precover-mysql": "rimraf coverage && rimraf coverage-mysql", - "cover-mysql": "cross-env DIALECT=mysql ./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha -- tests/integration/load.test.js", - "postcover-mysql": "cross-env DIALECT=mysql node scripts/renameCoverageFolder.js", - "precover-postgres": "rimraf coverage && rimraf coverage-postgres", - "cover-postgres": "cross-env DIALECT=postgres ./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha -- tests/integration/load.test.js", - "postcover-postgres": "cross-env DIALECT=postgres node scripts/renameCoverageFolder.js", - "premerge-coverage": "node scripts/preMerge.js", - "merge-coverage": "./node_modules/.bin/lcov-result-merger \"coverage-*/lcov.info\" \"coverage/lcov.info\"", - "codeclimate-send": "cross-env CODECLIMATE_REPO_TOKEN=b02fd8c72b40135e6a9173ef6cb43672b1274a14375e7c4a83837bc13c4c2f43 ./node_modules/.bin/codeclimate-test-reporter < coverage/lcov.info", - "codeclimate": "npm run cover-all && npm run codeclimate-send", - "semantic-release": "semantic-release pre && npm publish && semantic-release post" - }, - "repository": { - "type": "git", - "url": "https://github.com/evansiroky/db-streamer.git" - }, - "keywords": [ - "database", - "db", - "stream", - "postgresql" - ], - "author": "Evan Siroky", - "license": "MIT", - "bugs": { - "url": "https://github.com/evansiroky/db-streamer/issues" - }, - "homepage": "https://github.com/evansiroky/db-streamer#readme", - "dependencies": { - "async": "^2.0.0-rc.5", - "moment": "^2.11.2", - "url-parse": "^1.0.2" - }, - "devDependencies": { - "chai": "^3.5.0", - "codeclimate-test-reporter": "^0.3.1", - "cross-env": "^1.0.7", - "cz-conventional-changelog": "^1.1.6", - "istanbul": "^0.4.2", - "lcov-result-merger": "^1.0.2", - "mocha": "^2.5.1", - "mysql": "^2.9.0", - "pg": "^5.0.0", - "pg-copy-streams": "^1.0.0", - "pg-hstore": "^2.3.2", - "rimraf": "^2.5.2", - "semantic-release": "^4.3.5", - "sequelize": "^3.9.0", - "streamsql": "^0.8.5" - }, - "config": { - "commitizen": { - "path": "./node_modules/cz-conventional-changelog" - } - } -} +{ + "name": "db-streamer", + "version": "0.0.0-semantically-released", + "description": "A library to stream data into a SQL database.", + "main": "index.js", + "scripts": { + "test": "npm run test-mysql && npm run test-postgres", + "test-mysql": "cross-env DIALECT=mysql mocha tests/integration/load.test.js", + "test-postgres": "cross-env DIALECT=postgres mocha tests/integration/load.test.js", + "cover-all": "npm run cover-mysql && npm run cover-postgres && npm run merge-coverage", + "precover-mysql": "rimraf coverage && rimraf coverage-mysql", + "cover-mysql": "cross-env DIALECT=mysql ./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha -- tests/integration/load.test.js", + "postcover-mysql": "cross-env DIALECT=mysql node scripts/renameCoverageFolder.js", + "precover-postgres": "rimraf coverage && rimraf coverage-postgres", + "cover-postgres": "cross-env DIALECT=postgres ./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha -- tests/integration/load.test.js", + "postcover-postgres": "cross-env DIALECT=postgres node scripts/renameCoverageFolder.js", + "premerge-coverage": "node scripts/preMerge.js", + "merge-coverage": "./node_modules/.bin/lcov-result-merger \"coverage-*/lcov.info\" \"coverage/lcov.info\"", + "codeclimate-send": "cross-env CODECLIMATE_REPO_TOKEN=b02fd8c72b40135e6a9173ef6cb43672b1274a14375e7c4a83837bc13c4c2f43 ./node_modules/.bin/codeclimate-test-reporter < coverage/lcov.info", + "codeclimate": "npm run cover-all && npm run codeclimate-send", + "semantic-release": "semantic-release pre && npm publish && semantic-release post" + }, + "repository": { + "type": "git", + "url": "https://github.com/evansiroky/db-streamer.git" + }, + "keywords": [ + "database", + "db", + "stream", + "postgresql" + ], + "author": "Evan Siroky", + "license": "MIT", + "bugs": { + "url": "https://github.com/evansiroky/db-streamer/issues" + }, + "homepage": "https://github.com/evansiroky/db-streamer#readme", + "dependencies": { + "async": "^2.0.0-rc.5", + "moment": "^2.11.2", + "url-parse": "^1.0.2" + }, + "devDependencies": { + "chai": "^3.5.0", + "codeclimate-test-reporter": "^0.3.1", + "cross-env": "^1.0.7", + "cz-conventional-changelog": "^1.1.6", + "istanbul": "^0.4.2", + "lcov-result-merger": "^1.0.2", + "mocha": "^2.5.1", + "mysql": "^2.11.1", + "pg": "^5.0.0", + "pg-copy-streams": "^1.0.0", + "pg-hstore": "^2.3.2", + "rimraf": "^2.5.2", + "semantic-release": "^4.3.5", + "sequelize": "^3.9.0", + "streamsql": "^0.8.5" + }, + "config": { + "commitizen": { + "path": "./node_modules/cz-conventional-changelog" + } + } +}