Skip to content

Commit

Permalink
Retrying on deadlock, bumped version.
Browse files Browse the repository at this point in the history
  • Loading branch information
maxnachlinger committed Oct 16, 2014
1 parent c70a819 commit 9e557de
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 56 deletions.
2 changes: 1 addition & 1 deletion lib/insertBuilder.js
Expand Up @@ -24,7 +24,7 @@ module.exports = function (params, cb) {

sql.push(' INTO ', params.tableName, ' ');

// use the 1st item to get our fields, dollar-sign prefixed $fields are ignroed
// use the 1st item to get our fields, dollar-sign prefixed $fields are ignored
var fields = _.filter(_.keys(params.items[0]), function (key) {
return key.charAt(0) !== '$';
});
Expand Down
121 changes: 67 additions & 54 deletions lib/session.js
Expand Up @@ -29,38 +29,38 @@ module.exports = function (settings) {
}

function addPingToConnection(conn) {
conn.pingIntervalId = setInterval(function() {
if(settings.debugging.ping)
conn.pingIntervalId = setInterval(function () {
if (settings.debugging.ping)
console.log('conn', conn.id, 'ping');
conn.ping.apply(conn);
}, settings.connectionPingIntervalSeconds * 1000);

conn.on('error', function (err) {
clearInterval(conn.pingIntervalId);
if(settings.debugging.connectionError)
if (settings.debugging.connectionError)
console.error('conn', conn.id, err);
throw err;
});

conn.on('end', function (err) {
if(settings.debugging.connectionEnd)
console.log('conn', conn.id, 'end, err',err);
if (settings.debugging.connectionEnd)
console.log('conn', conn.id, 'end, err', err);
clearInterval(conn.pingIntervalId);
});
}

function getConnection(cb) {
if(settings.debugging.poolPerf)
if (settings.debugging.poolPerf)
console.time('Get connection from pool');

pool.getConnection(function (err, conn) {
if(settings.debugging.poolPerf)
if (settings.debugging.poolPerf)
console.timeEnd('Get connection from pool');

if (err) return cb(err);
if(!conn) return cb(new Error('Could not get a connection from the pool'));
if (!conn) return cb(new Error('Could not get a connection from the pool'));

if(!conn.id)
if (!conn.id)
conn.id = ++id;
if (!conn.pingIntervalId)
addPingToConnection(conn);
Expand All @@ -74,22 +74,23 @@ module.exports = function (settings) {
queryCb = queryParams;
queryParams = null;
}
queryCb = queryCb || function () {};
queryCb = queryCb || function () {
};

getConnection(function (err, conn) {
if(err) return queryCb(err);
if (err) return queryCb(err);

var queryStart;
if(settings.debugging.queryPerf)
if (settings.debugging.queryPerf)
queryStart = Date.now();

conn.query(sql, queryParams, function (err, result) {
if(settings.debugging.queryPerf) {
if (settings.debugging.queryPerf) {
var queryEnd = (Date.now() - queryStart) / 1000;
if(queryEnd > settings.debugging.queryPerfSlowQueryThresholdSec)
console.log('Slow query ('+queryEnd+' s), conn.id', conn.id, 'Query:',sql);
if (queryEnd > settings.debugging.queryPerfSlowQueryThresholdSec)
console.log('Slow query (' + queryEnd + ' s), conn.id', conn.id, 'Query:', sql);
else
console.log('Query ('+queryEnd+' s), conn.id', conn.id);
console.log('Query (' + queryEnd + ' s), conn.id', conn.id);
}

conn.release();
Expand All @@ -106,8 +107,8 @@ module.exports = function (settings) {
result = [];
}

if(settings.debugging.queryResult)
console.log('query:',sql,'params:',queryParams,'err:', err, 'result:', result);
if (settings.debugging.queryResult)
console.log('query:', sql, 'params:', queryParams, 'err:', err, 'result:', result);

queryCb(err, result);
});
Expand Down Expand Up @@ -145,6 +146,8 @@ module.exports = function (settings) {
}

function _insert(tableName, items, insertCb, options) {
var deadlockRetriesRemaining = 2;

async.each(items,
function (item, eachCb) {
if (options.insertMode !== insertModes.hilo)
Expand All @@ -155,39 +158,49 @@ module.exports = function (settings) {
eachCb();
});
},
function insertItems(err) {
insertItems
);

function insertItems(err) {
if (err) return insertCb(err);
insertBuilder({
upsert: options.upsert,
ignore: options.ignore,
tableName: tableName,
items: items,
insertRules: options.enforceRules ? obj.insertRules : null,
updateRules: options.enforceRules ? obj.updateRules : null
}, function (err, insertInfo) {
if (err) return insertCb(err);
insertBuilder({
upsert: options.upsert,
ignore: options.ignore,
tableName: tableName,
items: items,
insertRules: options.enforceRules ? obj.insertRules : null,
updateRules: options.enforceRules ? obj.updateRules : null
}, function (err, insertInfo) {
if (err) return insertCb(err);

query(insertInfo.sql, insertInfo.values, function queryCb(err, result) {
if(err) return insertCb(err);
//InnoDB guarantees sequential numbers for AUTO INCREMENT when doing bulk inserts
var startingInsertId = result.insertId-1;

// $insertId -> insertId
items = _.map(items, function (item) {
if (item.$insertId) {
item.insertId = item.$insertId;
delete item.$insertId;
}
else if(options.insertMode === insertModes.identity) {
item.insertId = ++startingInsertId;
}
return item;
});
insertCb(err, items);

query(insertInfo.sql, insertInfo.values, function queryCb(err, result) {
if (err) {
if (err.code === 'ER_LOCK_DEADLOCK' && --deadlockRetriesRemaining > 0) {
console.log('ER_LOCK_DEADLOCK returned, retrying, (' + deadlockRetriesRemaining + ') tries remaining');
return setTimeout(insertItems, 10);
}

return insertCb(err);
}

// InnoDB guarantees sequential numbers for AUTO INCREMENT when doing bulk inserts
var startingInsertId = result.insertId - 1;

// $insertId -> insertId
items = _.map(items, function (item) {
if (item.$insertId) {
item.insertId = item.$insertId;
delete item.$insertId;
}
else if (options.insertMode === insertModes.identity) {
item.insertId = ++startingInsertId;
}
return item;
});
insertCb(err, items);
});
}
);
});
}
}

function update(tableName, items, updateCb, options) {
Expand Down Expand Up @@ -234,7 +247,7 @@ module.exports = function (settings) {
var startId = hi + lo;
cb(null, startId);
});
};
}

function hilo() {
var maxLo = defaultMaxLo;
Expand All @@ -252,13 +265,13 @@ module.exports = function (settings) {
if (lo <= maxLo) {
var result = hi + lo;
lo++;
if(settings.debugging.hilo)
if (settings.debugging.hilo)
console.log('Handing out id ' + result);
return cb(result);
}

deferredCallbacks.push(cb);
if(settings.debugging.hilo)
if (settings.debugging.hilo)
console.log('Deferring while waiting for a new ID', deferredCallbacks.length, queryPending);

if (!queryPending) {
Expand All @@ -267,7 +280,7 @@ module.exports = function (settings) {
if (err) return cb(err);

var hival = result[0].NextHi;
if(settings.debugging.hilo)
if (settings.debugging.hilo)
console.log('New id range', hival);

lo = hival == 0 ? 1 : 0;
Expand All @@ -278,7 +291,7 @@ module.exports = function (settings) {

var runnableCallbacks = deferredCallbacks;
deferredCallbacks = [];
if(settings.debugging.hilo)
if (settings.debugging.hilo)
console.log('Running deferred', runnableCallbacks.length);
_.each(runnableCallbacks, function (cb) {
computeNextKey(mysql, cb);
Expand Down Expand Up @@ -379,7 +392,7 @@ module.exports = function (settings) {

logging: false,
preFillPool: preFillPool,
reserveHiLoIds:reserveHiLoIds
reserveHiLoIds: reserveHiLoIds
};
return obj;
};
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -9,7 +9,7 @@
"keywords": [
"mysql"
],
"version": "0.1.20",
"version": "0.1.21",
"bugs": {
"url": "https://github.com/Mindflash/mysqlutil/issues"
},
Expand Down

0 comments on commit 9e557de

Please sign in to comment.