From 19e1316c2a2b4447190eddcf3fe1c06a2b7475c5 Mon Sep 17 00:00:00 2001 From: Ricardo Maes Date: Sat, 6 Jul 2019 09:05:53 -0300 Subject: [PATCH] Implement "skipLocked()" and "noWait()" (#2961) --- src/dialects/mysql/query/compiler.js | 10 ++ src/dialects/postgres/query/compiler.js | 8 ++ src/query/builder.js | 56 +++++++++-- src/query/compiler.js | 22 +++++ src/query/constants.js | 13 +++ test/integration/builder/selects.js | 126 ++++++++++++++++++++++++ test/unit/query/builder.js | 80 +++++++++++++++ types/index.d.ts | 3 + 8 files changed, 312 insertions(+), 6 deletions(-) create mode 100644 src/query/constants.js diff --git a/src/dialects/mysql/query/compiler.js b/src/dialects/mysql/query/compiler.js index e517a84cdb..2565c3ff60 100644 --- a/src/dialects/mysql/query/compiler.js +++ b/src/dialects/mysql/query/compiler.js @@ -48,6 +48,16 @@ assign(QueryCompiler_MySQL.prototype, { return 'lock in share mode'; }, + // Only supported on MySQL 8.0+ + skipLocked() { + return 'skip locked'; + }, + + // Supported on MySQL 8.0+ and MariaDB 10.3.0+ + noWait() { + return 'nowait'; + }, + // Compiles a `columnInfo` query. columnInfo() { const column = this.single.columnInfo; diff --git a/src/dialects/postgres/query/compiler.js b/src/dialects/postgres/query/compiler.js index 461e6a5e9f..dae870dde5 100644 --- a/src/dialects/postgres/query/compiler.js +++ b/src/dialects/postgres/query/compiler.js @@ -103,6 +103,14 @@ assign(QueryCompiler_PG.prototype, { ); }, + skipLocked() { + return 'skip locked'; + }, + + noWait() { + return 'nowait'; + }, + // Compiles a columnInfo query columnInfo() { const column = this.single.columnInfo; diff --git a/src/query/builder.js b/src/query/builder.js index b385d7e4c3..496cb663af 100644 --- a/src/query/builder.js +++ b/src/query/builder.js @@ -25,6 +25,8 @@ const { } = require('lodash'); const saveAsyncStack = require('../util/save-async-stack'); +const { lockMode, waitMode } = require('./constants'); + // Typically called from `knex.builder`, // start a new query building chain. function Builder(client) { @@ -951,10 +953,8 @@ assign(Builder.prototype, { // Sets the values for a `select` query, informing that only the first // row should be returned (limit 1). first() { - const { _method } = this; - - if (!includes(['pluck', 'first', 'select'], _method)) { - throw new Error(`Cannot chain .first() on "${_method}" query!`); + if (!this._isSelectQuery()) { + throw new Error(`Cannot chain .first() on "${this._method}" query!`); } const args = new Array(arguments.length); @@ -1082,18 +1082,52 @@ assign(Builder.prototype, { // Set a lock for update constraint. forUpdate() { - this._single.lock = 'forUpdate'; + this._single.lock = lockMode.forUpdate; this._single.lockTables = helpers.normalizeArr.apply(null, arguments); return this; }, // Set a lock for share constraint. forShare() { - this._single.lock = 'forShare'; + this._single.lock = lockMode.forShare; this._single.lockTables = helpers.normalizeArr.apply(null, arguments); return this; }, + // Skips locked rows when using a lock constraint. + skipLocked() { + if (!this._isSelectQuery()) { + throw new Error(`Cannot chain .skipLocked() on "${this._method}" query!`); + } + if (!this._hasLockMode()) { + throw new Error( + '.skipLocked() can only be used after a call to .forShare() or .forUpdate()!' + ); + } + if (this._single.waitMode === waitMode.noWait) { + throw new Error('.skipLocked() cannot be used together with .noWait()!'); + } + this._single.waitMode = waitMode.skipLocked; + return this; + }, + + // Causes error when acessing a locked row instead of waiting for it to be released. + noWait() { + if (!this._isSelectQuery()) { + throw new Error(`Cannot chain .noWait() on "${this._method}" query!`); + } + if (!this._hasLockMode()) { + throw new Error( + '.noWait() can only be used after a call to .forShare() or .forUpdate()!' + ); + } + if (this._single.waitMode === waitMode.skipLocked) { + throw new Error('.noWait() cannot be used together with .skipLocked()!'); + } + this._single.waitMode = waitMode.noWait; + return this; + }, + // Takes a JS object of methods to call and calls them fromJS(obj) { each(obj, (val, key) => { @@ -1180,6 +1214,16 @@ assign(Builder.prototype, { _clearGrouping(grouping) { this._statements = reject(this._statements, { grouping }); }, + + // Helper function that checks if the builder will emit a select query + _isSelectQuery() { + return includes(['pluck', 'first', 'select'], this._method); + }, + + // Helper function that checks if the query has a lock mode set + _hasLockMode() { + return includes([lockMode.forShare, lockMode.forUpdate], this._single.lock); + }, }); Object.defineProperty(Builder.prototype, 'or', { diff --git a/src/query/compiler.js b/src/query/compiler.js index 08262575e2..31e175239c 100644 --- a/src/query/compiler.js +++ b/src/query/compiler.js @@ -48,6 +48,7 @@ const components = [ 'limit', 'offset', 'lock', + 'waitMode', ]; assign(QueryCompiler.prototype, { @@ -542,6 +543,27 @@ assign(QueryCompiler.prototype, { } }, + // Compiles the wait mode on the locks. + waitMode() { + if (this.single.waitMode) { + return this[this.single.waitMode](); + } + }, + + // Fail on unsupported databases + skipLocked() { + throw new Error( + '.skipLocked() is currently only supported on MySQL 8.0+ and PostgreSQL 9.5+' + ); + }, + + // Fail on unsupported databases + noWait() { + throw new Error( + '.noWait() is currently only supported on MySQL 8.0+, MariaDB 10.3.0+ and PostgreSQL 9.5+' + ); + }, + // On Clause // ------ diff --git a/src/query/constants.js b/src/query/constants.js new file mode 100644 index 0000000000..0b080715dd --- /dev/null +++ b/src/query/constants.js @@ -0,0 +1,13 @@ +/** + * internal constants, do not use in application code + */ +module.exports = { + lockMode: { + forShare: 'forShare', + forUpdate: 'forUpdate', + }, + waitMode: { + skipLocked: 'skipLocked', + noWait: 'noWait', + }, +}; diff --git a/test/integration/builder/selects.js b/test/integration/builder/selects.js index 53c809f2b6..4c74ad015c 100644 --- a/test/integration/builder/selects.js +++ b/test/integration/builder/selects.js @@ -1247,5 +1247,131 @@ module.exports = function(knex) { }); }); }); + + it('forUpdate().skipLocked() with order by should return the first non-locked row', async function() { + // Note: this test doesn't work properly on MySQL - see https://bugs.mysql.com/bug.php?id=67745 + if (knex.client.driverName !== 'pg') { + return; + } + + const rowName = 'row for skipLocked() test #1'; + await knex('test_default_table').insert([ + { string: rowName, tinyint: 1 }, + { string: rowName, tinyint: 2 }, + ]); + + const res = await knex.transaction(async (trx) => { + // lock the first row in the test + await trx('test_default_table') + .where({ string: rowName }) + .orderBy('tinyint', 'asc') + .first() + .forUpdate(); + + // try to lock the next available row + return await knex('test_default_table') + .where({ string: rowName }) + .orderBy('tinyint', 'asc') + .forUpdate() + .skipLocked() + .first(); + }); + + // assert that we got the second row because the first one was locked + expect(res.tinyint).to.equal(2); + }); + + it('forUpdate().skipLocked() should return an empty set when all rows are locked', async function() { + if ( + knex.client.driverName !== 'pg' && + knex.client.driverName !== 'mysql' + ) { + return; + } + + const rowName = 'row for skipLocked() test #2'; + await knex('test_default_table').insert([ + { string: rowName, tinyint: 1 }, + { string: rowName, tinyint: 2 }, + ]); + + const res = await knex.transaction(async (trx) => { + // lock all of the test rows + await trx('test_default_table') + .where({ string: rowName }) + .forUpdate(); + + // try to aquire the lock on one more row (which isn't available) + return await knex('test_default_table') + .where({ string: rowName }) + .forUpdate() + .skipLocked() + .limit(1); + }); + + expect(res).to.be.empty; + }); + + it('forUpdate().noWait() should throw immediately when a row is locked', async function() { + if ( + knex.client.driverName !== 'pg' && + knex.client.driverName !== 'mysql' + ) { + return; + } + + const rowName = 'row for noWait() test'; + await knex('test_default_table').insert([ + { string: rowName, tinyint: 1 }, + { string: rowName, tinyint: 2 }, + ]); + + const promise = knex.transaction(async (trx) => { + // select and lock only the first row from this test + // note: MySQL may lock both rows depending on how the results are fetched + await trx('test_default_table') + .where({ string: rowName }) + .orderBy('tinyint', 'asc') + .first() + .forUpdate(); + + // try to lock it again (and fail) + await trx('test_default_table') + .where({ string: rowName }) + .orderBy('tinyint', 'asc') + .forUpdate() + .noWait() + .first(); + }); + + // catch the expected errors + promise.catch((err) => { + switch (knex.client.driverName) { + case 'pg': + expect(err.message).to.contain('could not obtain lock on row'); + break; + case 'mysql': + case 'mysql2': + // mysql + expect(err.message).to.contain( + 'lock(s) could not be acquired immediately' + ); + // mariadb + // TODO: detect if test is being run on mysql or mariadb to check for the correct error message + // expect(err.message).to.contain('Lock wait timeout exceeded'); + break; + default: + // unsupported database + throw err; + } + }); + + // fail the test if the transaction succeeds + promise.then(() => { + expect( + 'The query should have been cancelled when trying to select a locked row with .noWait()' + ).to.be.false; + }); + }); }); }; diff --git a/test/unit/query/builder.js b/test/unit/query/builder.js index 2c45bc76f3..cf38dc103f 100644 --- a/test/unit/query/builder.js +++ b/test/unit/query/builder.js @@ -6998,6 +6998,86 @@ describe('QueryBuilder', function() { ); }); + it('lock for update with skip locked #1937', function() { + testsql( + qb() + .select('*') + .from('foo') + .first() + .forUpdate() + .skipLocked(), + { + mysql: { + sql: 'select * from `foo` limit ? for update skip locked', + bindings: [1], + }, + pg: { + sql: 'select * from "foo" limit ? for update skip locked', + bindings: [1], + }, + } + ); + }); + + it('lock for update with nowait #1937', function() { + testsql( + qb() + .select('*') + .from('foo') + .first() + .forUpdate() + .noWait(), + { + mysql: { + sql: 'select * from `foo` limit ? for update nowait', + bindings: [1], + }, + pg: { + sql: 'select * from "foo" limit ? for update nowait', + bindings: [1], + }, + } + ); + }); + + it('noWait and skipLocked require a lock mode to be set', function() { + expect(function() { + qb() + .select('*') + .noWait() + .toString(); + }).to.throw( + '.noWait() can only be used after a call to .forShare() or .forUpdate()!' + ); + expect(function() { + qb() + .select('*') + .skipLocked() + .toString(); + }).to.throw( + '.skipLocked() can only be used after a call to .forShare() or .forUpdate()!' + ); + }); + + it('skipLocked conflicts with noWait and vice-versa', function() { + expect(function() { + qb() + .select('*') + .forUpdate() + .noWait() + .skipLocked() + .toString(); + }).to.throw('.skipLocked() cannot be used together with .noWait()!'); + expect(function() { + qb() + .select('*') + .forUpdate() + .skipLocked() + .noWait() + .toString(); + }).to.throw('.noWait() cannot be used together with .skipLocked()!'); + }); + it('allows insert values of sub-select, #121', function() { testsql( qb() diff --git a/types/index.d.ts b/types/index.d.ts index 3836052c9b..079b240940 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -1360,6 +1360,9 @@ declare namespace Knex { forShare(...tableNames: string[]): QueryBuilder; forShare(tableNames: string[]): QueryBuilder; + skipLocked(): QueryBuilder; + noWait(): QueryBuilder; + toSQL(): Sql; on(event: string, callback: Function): QueryBuilder;