Skip to content

Commit

Permalink
Implement "skipLocked()" and "noWait()" (knex#2961)
Browse files Browse the repository at this point in the history
  • Loading branch information
ricmzn authored and felixmosh committed Jul 13, 2019
1 parent c35b58e commit 19e1316
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 6 deletions.
10 changes: 10 additions & 0 deletions src/dialects/mysql/query/compiler.js
Expand Up @@ -48,6 +48,16 @@ assign(QueryCompiler_MySQL.prototype, {
return 'lock in share mode';
},

// Only supported on MySQL 8.0+
skipLocked() {
return 'skip locked';
},

// Supported on MySQL 8.0+ and MariaDB 10.3.0+
noWait() {
return 'nowait';
},

// Compiles a `columnInfo` query.
columnInfo() {
const column = this.single.columnInfo;
Expand Down
8 changes: 8 additions & 0 deletions src/dialects/postgres/query/compiler.js
Expand Up @@ -103,6 +103,14 @@ assign(QueryCompiler_PG.prototype, {
);
},

skipLocked() {
return 'skip locked';
},

noWait() {
return 'nowait';
},

// Compiles a columnInfo query
columnInfo() {
const column = this.single.columnInfo;
Expand Down
56 changes: 50 additions & 6 deletions src/query/builder.js
Expand Up @@ -25,6 +25,8 @@ const {
} = require('lodash');
const saveAsyncStack = require('../util/save-async-stack');

const { lockMode, waitMode } = require('./constants');

// Typically called from `knex.builder`,
// start a new query building chain.
function Builder(client) {
Expand Down Expand Up @@ -951,10 +953,8 @@ assign(Builder.prototype, {
// Sets the values for a `select` query, informing that only the first
// row should be returned (limit 1).
first() {
const { _method } = this;

if (!includes(['pluck', 'first', 'select'], _method)) {
throw new Error(`Cannot chain .first() on "${_method}" query!`);
if (!this._isSelectQuery()) {
throw new Error(`Cannot chain .first() on "${this._method}" query!`);
}

const args = new Array(arguments.length);
Expand Down Expand Up @@ -1082,18 +1082,52 @@ assign(Builder.prototype, {

// Set a lock for update constraint.
forUpdate() {
this._single.lock = 'forUpdate';
this._single.lock = lockMode.forUpdate;
this._single.lockTables = helpers.normalizeArr.apply(null, arguments);
return this;
},

// Set a lock for share constraint.
forShare() {
this._single.lock = 'forShare';
this._single.lock = lockMode.forShare;
this._single.lockTables = helpers.normalizeArr.apply(null, arguments);
return this;
},

// Skips locked rows when using a lock constraint.
skipLocked() {
if (!this._isSelectQuery()) {
throw new Error(`Cannot chain .skipLocked() on "${this._method}" query!`);
}
if (!this._hasLockMode()) {
throw new Error(
'.skipLocked() can only be used after a call to .forShare() or .forUpdate()!'
);
}
if (this._single.waitMode === waitMode.noWait) {
throw new Error('.skipLocked() cannot be used together with .noWait()!');
}
this._single.waitMode = waitMode.skipLocked;
return this;
},

// Causes error when acessing a locked row instead of waiting for it to be released.
noWait() {
if (!this._isSelectQuery()) {
throw new Error(`Cannot chain .noWait() on "${this._method}" query!`);
}
if (!this._hasLockMode()) {
throw new Error(
'.noWait() can only be used after a call to .forShare() or .forUpdate()!'
);
}
if (this._single.waitMode === waitMode.skipLocked) {
throw new Error('.noWait() cannot be used together with .skipLocked()!');
}
this._single.waitMode = waitMode.noWait;
return this;
},

// Takes a JS object of methods to call and calls them
fromJS(obj) {
each(obj, (val, key) => {
Expand Down Expand Up @@ -1180,6 +1214,16 @@ assign(Builder.prototype, {
_clearGrouping(grouping) {
this._statements = reject(this._statements, { grouping });
},

// Helper function that checks if the builder will emit a select query
_isSelectQuery() {
return includes(['pluck', 'first', 'select'], this._method);
},

// Helper function that checks if the query has a lock mode set
_hasLockMode() {
return includes([lockMode.forShare, lockMode.forUpdate], this._single.lock);
},
});

Object.defineProperty(Builder.prototype, 'or', {
Expand Down
22 changes: 22 additions & 0 deletions src/query/compiler.js
Expand Up @@ -48,6 +48,7 @@ const components = [
'limit',
'offset',
'lock',
'waitMode',
];

assign(QueryCompiler.prototype, {
Expand Down Expand Up @@ -542,6 +543,27 @@ assign(QueryCompiler.prototype, {
}
},

// Compiles the wait mode on the locks.
waitMode() {
if (this.single.waitMode) {
return this[this.single.waitMode]();
}
},

// Fail on unsupported databases
skipLocked() {
throw new Error(
'.skipLocked() is currently only supported on MySQL 8.0+ and PostgreSQL 9.5+'
);
},

// Fail on unsupported databases
noWait() {
throw new Error(
'.noWait() is currently only supported on MySQL 8.0+, MariaDB 10.3.0+ and PostgreSQL 9.5+'
);
},

// On Clause
// ------

Expand Down
13 changes: 13 additions & 0 deletions src/query/constants.js
@@ -0,0 +1,13 @@
/**
* internal constants, do not use in application code
*/
module.exports = {
lockMode: {
forShare: 'forShare',
forUpdate: 'forUpdate',
},
waitMode: {
skipLocked: 'skipLocked',
noWait: 'noWait',
},
};
126 changes: 126 additions & 0 deletions test/integration/builder/selects.js
Expand Up @@ -1247,5 +1247,131 @@ module.exports = function(knex) {
});
});
});

it('forUpdate().skipLocked() with order by should return the first non-locked row', async function() {
// Note: this test doesn't work properly on MySQL - see https://bugs.mysql.com/bug.php?id=67745
if (knex.client.driverName !== 'pg') {
return;
}

const rowName = 'row for skipLocked() test #1';
await knex('test_default_table').insert([
{ string: rowName, tinyint: 1 },
{ string: rowName, tinyint: 2 },
]);

const res = await knex.transaction(async (trx) => {
// lock the first row in the test
await trx('test_default_table')
.where({ string: rowName })
.orderBy('tinyint', 'asc')
.first()
.forUpdate();

// try to lock the next available row
return await knex('test_default_table')
.where({ string: rowName })
.orderBy('tinyint', 'asc')
.forUpdate()
.skipLocked()
.first();
});

// assert that we got the second row because the first one was locked
expect(res.tinyint).to.equal(2);
});

it('forUpdate().skipLocked() should return an empty set when all rows are locked', async function() {
if (
knex.client.driverName !== 'pg' &&
knex.client.driverName !== 'mysql'
) {
return;
}

const rowName = 'row for skipLocked() test #2';
await knex('test_default_table').insert([
{ string: rowName, tinyint: 1 },
{ string: rowName, tinyint: 2 },
]);

const res = await knex.transaction(async (trx) => {
// lock all of the test rows
await trx('test_default_table')
.where({ string: rowName })
.forUpdate();

// try to aquire the lock on one more row (which isn't available)
return await knex('test_default_table')
.where({ string: rowName })
.forUpdate()
.skipLocked()
.limit(1);
});

expect(res).to.be.empty;
});

it('forUpdate().noWait() should throw immediately when a row is locked', async function() {
if (
knex.client.driverName !== 'pg' &&
knex.client.driverName !== 'mysql'
) {
return;
}

const rowName = 'row for noWait() test';
await knex('test_default_table').insert([
{ string: rowName, tinyint: 1 },
{ string: rowName, tinyint: 2 },
]);

const promise = knex.transaction(async (trx) => {
// select and lock only the first row from this test
// note: MySQL may lock both rows depending on how the results are fetched
await trx('test_default_table')
.where({ string: rowName })
.orderBy('tinyint', 'asc')
.first()
.forUpdate();

// try to lock it again (and fail)
await trx('test_default_table')
.where({ string: rowName })
.orderBy('tinyint', 'asc')
.forUpdate()
.noWait()
.first();
});

// catch the expected errors
promise.catch((err) => {
switch (knex.client.driverName) {
case 'pg':
expect(err.message).to.contain('could not obtain lock on row');
break;
case 'mysql':
case 'mysql2':
// mysql
expect(err.message).to.contain(
'lock(s) could not be acquired immediately'
);
// mariadb
// TODO: detect if test is being run on mysql or mariadb to check for the correct error message
// expect(err.message).to.contain('Lock wait timeout exceeded');
break;
default:
// unsupported database
throw err;
}
});

// fail the test if the transaction succeeds
promise.then(() => {
expect(
'The query should have been cancelled when trying to select a locked row with .noWait()'
).to.be.false;
});
});
});
};
80 changes: 80 additions & 0 deletions test/unit/query/builder.js
Expand Up @@ -6998,6 +6998,86 @@ describe('QueryBuilder', function() {
);
});

it('lock for update with skip locked #1937', function() {
testsql(
qb()
.select('*')
.from('foo')
.first()
.forUpdate()
.skipLocked(),
{
mysql: {
sql: 'select * from `foo` limit ? for update skip locked',
bindings: [1],
},
pg: {
sql: 'select * from "foo" limit ? for update skip locked',
bindings: [1],
},
}
);
});

it('lock for update with nowait #1937', function() {
testsql(
qb()
.select('*')
.from('foo')
.first()
.forUpdate()
.noWait(),
{
mysql: {
sql: 'select * from `foo` limit ? for update nowait',
bindings: [1],
},
pg: {
sql: 'select * from "foo" limit ? for update nowait',
bindings: [1],
},
}
);
});

it('noWait and skipLocked require a lock mode to be set', function() {
expect(function() {
qb()
.select('*')
.noWait()
.toString();
}).to.throw(
'.noWait() can only be used after a call to .forShare() or .forUpdate()!'
);
expect(function() {
qb()
.select('*')
.skipLocked()
.toString();
}).to.throw(
'.skipLocked() can only be used after a call to .forShare() or .forUpdate()!'
);
});

it('skipLocked conflicts with noWait and vice-versa', function() {
expect(function() {
qb()
.select('*')
.forUpdate()
.noWait()
.skipLocked()
.toString();
}).to.throw('.skipLocked() cannot be used together with .noWait()!');
expect(function() {
qb()
.select('*')
.forUpdate()
.skipLocked()
.noWait()
.toString();
}).to.throw('.noWait() cannot be used together with .skipLocked()!');
});

it('allows insert values of sub-select, #121', function() {
testsql(
qb()
Expand Down

0 comments on commit 19e1316

Please sign in to comment.