Skip to content

Commit

Permalink
Merge pull request #8 from maxnachlinger/deadlockRetry
Browse files Browse the repository at this point in the history
Added deadlock retry, bumped version
  • Loading branch information
maxnachlinger committed Dec 26, 2014
2 parents 3a98045 + bbb437c commit 0b49a2f
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 47 deletions.
29 changes: 29 additions & 0 deletions lib/deadlockRetry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
var util = require('util');

module.exports = function (params, cb) {
var fn = params.fn;
var args = params.args;
var retryAmount = params.retryAmount || 2;

if (!fn || !cb)
return cb(new Error('Expected params not passed, param fn and a callback are required.'));

if (!args || !util.isArray(args))
args = [];

var fx = function () {
args.push(function retryCb(err, res) {
if (!err) return cb(err, res);

if (err.code && err.code === 'ER_LOCK_DEADLOCK' && --retryAmount > 0) {
console.log(err.code + ' returned, retrying, (' + retryAmount + ') tries remaining');
return setTimeout(fx, 10);
}

return cb(err, res);
});
fn.apply(fn, args)
};

fx();
};
80 changes: 34 additions & 46 deletions lib/session.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"use strict";
var _ = require('lodash');
var async = require('async');
var util = require('util');
var mysql = require('mysql');

var insertModes = require('./insertModes.js');
var updateBuilder = require('./updateBuilder.js');
var insertBuilder = require('./insertBuilder.js');
var deadlockRetry = require('./deadlockRetry.js');

var concurrencyLimit = 2;
var bulkInsertBatchSize = 1000;
Expand Down Expand Up @@ -115,6 +115,14 @@ module.exports = function (settings) {
});
}

function queryWithDeadlockRetry(sql, queryParams, retryAmount, queryCb) {
deadlockRetry({
fn: query,
args: [sql, queryParams],
retryAmount: retryAmount
}, queryCb);
}

function insert(tableName, items, insertCb, options) {
items = _.isArray(items) ? items : [items];
options = _.defaults(options || {}, {
Expand Down Expand Up @@ -146,8 +154,6 @@ module.exports = function (settings) {
}

function _insert(tableName, items, insertCb, options) {
var deadlockRetriesRemaining = 2;

async.each(items,
function (item, eachCb) {
if (options.insertMode !== insertModes.hilo)
Expand All @@ -173,35 +179,25 @@ module.exports = function (settings) {
}, function (err, insertInfo) {
if (err) return insertCb(err);

var fx = function() {
query(insertInfo.sql, insertInfo.values, function queryCb(err, result) {
if (err) {
if (err.code === 'ER_LOCK_DEADLOCK' && --deadlockRetriesRemaining > 0) {
console.log('insertItems - ER_LOCK_DEADLOCK returned, retrying, (' + deadlockRetriesRemaining + ') tries remaining');
return setTimeout(fx, 10);
}
queryWithDeadlockRetry(insertInfo.sql, insertInfo.values, 5, function (err, result) {
if (err) return insertCb(err);

return insertCb(err);
}
// InnoDB guarantees sequential numbers for AUTO INCREMENT when doing bulk inserts
var startingInsertId = result.insertId - 1;

// InnoDB guarantees sequential numbers for AUTO INCREMENT when doing bulk inserts
var startingInsertId = result.insertId - 1;

// $insertId -> insertId
items = _.map(items, function (item) {
if (item.$insertId) {
item.insertId = item.$insertId;
delete item.$insertId;
}
else if (options.insertMode === insertModes.identity) {
item.insertId = ++startingInsertId;
}
return item;
});
insertCb(err, items);
// $insertId -> insertId
items = _.map(items, function (item) {
if (item.$insertId) {
item.insertId = item.$insertId;
delete item.$insertId;
}
else if (options.insertMode === insertModes.identity) {
item.insertId = ++startingInsertId;
}
return item;
});
};
fx();
insertCb(err, items);
});
});
}
}
Expand All @@ -224,23 +220,12 @@ module.exports = function (settings) {
}, function (err, updateInfo) {
if (err) return updateCb(err);

var deadlockRetriesRemaining = 2;

var fx = function() {
query(updateInfo.sql, updateInfo.values, function queryCb(err, result) {
if (err) {
if (err.code === 'ER_LOCK_DEADLOCK' && --deadlockRetriesRemaining > 0) {
console.log('update - ER_LOCK_DEADLOCK returned, retrying, (' + deadlockRetriesRemaining + ') tries remaining');
return setTimeout(fx, 10);
}
return updateCb(err);
}

stack.push(result);
eachCb();
})
};
fx();
queryWithDeadlockRetry(updateInfo.sql, updateInfo.values, 5, function (err, result) {
if (err) return updateCb(err);

stack.push(result);
eachCb();
});
});
},
function finalEachCb(err) {
Expand Down Expand Up @@ -371,6 +356,9 @@ module.exports = function (settings) {
query: function (sql, queryParams, cb) {
query(sql, queryParams, cb);
},
queryWithDeadlockRetry: function (sql, queryParams, retryAmount, cb) {
queryWithDeadlockRetry(sql, queryParams, retryAmount, cb);
},
getConnection: getConnection,
queryOne: function (sql, queryParams, cb) {
if (_.isFunction(queryParams) && !cb) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"keywords": [
"mysql"
],
"version": "0.1.23",
"version": "0.1.24",
"bugs": {
"url": "https://github.com/Mindflash/mysqlutil/issues"
},
Expand Down
53 changes: 53 additions & 0 deletions tests/deadlockRetryTests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"use strict";
var _ = require('lodash');
var test = require('tape');
var deadlockRetry = require('../lib/deadlockRetry.js');

test("Retries the requested amount of time on deadlock errors", function (t) {
var amtCalls = 0;
var amtCallsExpected = 3;

function f(p, cb) {
amtCalls++;

var err = new Error();
err.code = 'ER_LOCK_DEADLOCK';

return cb(err);
}

deadlockRetry({fn: f, args: [1], retryAmount: amtCallsExpected}, function (err, res) {
t.equal(amtCalls, amtCallsExpected, amtCallsExpected + ' calls should have been made, received: ' + amtCalls);
t.end();
});
});

test("Doesn't retry for non-errors", function (t) {
var amtCalls = 0;
var amtCallsExpected = 1;

function f(p, cb) {
amtCalls++;
return cb(new Error());
}

deadlockRetry({fn: f, args: [1], retryAmount: amtCallsExpected}, function (err, res) {
t.equal(amtCalls, amtCallsExpected, amtCallsExpected + ' calls should have been made, received: ' + amtCalls);
t.end();
});
});

test("Handles missing args", function (t) {
var amtCalls = 0;
var amtCallsExpected = 1;

function f(cb) {
amtCalls++;
return cb(new Error());
}

deadlockRetry({fn: f}, function (err, res) {
t.equal(amtCalls, amtCallsExpected, amtCallsExpected + ' calls should have been made, received: ' + amtCalls);
t.end();
});
});

0 comments on commit 0b49a2f

Please sign in to comment.