-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ioredis; atomic locking with redlock; support Node 6.x [Fixes #185, Fixes #370] #379
Conversation
aaf20c1
to
17ae9b1
Compare
}); | ||
|
||
this._initializing = Promise.join.apply(null, initializers) | ||
.then(function(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we could also use Promise.all(initializers)
for more succinct code.
this.eclient.select(redisDB) | ||
).then(function(){ | ||
var initializers = []; | ||
[this.client, this.bclient, this.eclient].forEach(function (client) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use .map instead of forEach for a shorter and more semantically correct code.
console.log('Worker spawned: #', worker.id); | ||
} | ||
}); | ||
|
||
var queue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great improvement!
Disturbed.prototype.on.apply(this, args); | ||
var promise = Disturbed.prototype.on.apply(this, args); | ||
var _this = this; | ||
promise.catch(function(err){ _this.emit('error', err); }); | ||
return this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the API in disturbed so that it will handle local events as well which will break this. I'll fix it asap.
@@ -304,6 +304,7 @@ var scripts = { | |||
* Takes a lock | |||
*/ | |||
takeLock: function(queue, job, token, renew){ | |||
console.log('attempting lock for job id #', job.jobId) | |||
var lockCall; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may need a debug flag to switch on/off this debug logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log is removed in a later commit, I'll make sure I have no such console.* statements in the final PR.
return execScript.apply(scripts, args).then(function(result){ | ||
return job.takeLock(null, !!job.lock) | ||
.then(function(lock) { return lock ? execScript.apply(scripts, args) : -2; }, function(err) { if (!(err instanceof Redlock.LockError)) { throw err; } }) | ||
.then(function(result){ | ||
switch (result){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be formatted into several lines or refactored into a separate function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I'll clean this up.
throw new Error('Unable to renew nonexisting lock'); | ||
} else if (renew) { | ||
return lock.extend(queue.LOCK_RENEW_TIME); | ||
} else if (lock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the if here is not needed, lock must be defined if we reached this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, if the lock is defined we can return null early and not need to call redlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I understand that, what I mean is that lock will always be defined in line 311, so you don't need the if, the else is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely don't need else after return or throw. Updated in this latest push.
return lock.extend(queue.LOCK_RENEW_TIME); | ||
} else if (lock) { | ||
return new Promise(function(resolve) { | ||
resolve(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use return Promise.resolve(null);
@@ -315,7 +315,8 @@ var scripts = { | |||
}); | |||
} | |||
var redlock = new Redlock(queue.clients, queue.redlock); | |||
return redlock.lock(job.lockKey(), queue.LOCK_RENEW_TIME); | |||
return redlock.lock(job.lockKey(), queue.LOCK_RENEW_TIME) | |||
.then(job.queue.client.hset(job.queue.toKey(job.jobId), "lockAcquired", 1)); | |||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem here is that getting the lock and setting the lockAcquired property is not an atomic operation anymore...
var lockAcquired = 'and redis.call("HSET", "' + queue.toKey(job.jobId) + '", "lockAcquired", "1")'; | ||
var success = 'then return 1 else return 0 end'; | ||
var opts = { | ||
lockScript: function(lockScript) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so here we can actually "monkey patch" the redlock code. I was actually wondering how we could solve the issue and at the same time ensuring that the job is in the active list while getting a redlock atomically. But how robust is this when new versions of redlock are released? Isn't there a risk that our code would stop working?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote the code which allows node-redlock to extend its script functionality, so hopefully it won't change much. I can tell the author would like to keep it stable.
mike-marcacci/node-redlock#25
node-redlock uses the same lock key naming scheme as bull is using (:lock), I don't think that would change either.
(Bull's test suite would catch such a breaking change immediately of course.)
return null; | ||
}); | ||
return scripts.moveUnlockedJobsToWait(this).then(function(responses){ | ||
var handleFailedJobs = responses[0].map(function(jobId){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use .spread
instead of .then
so that it is not needed to access the results as indexing an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooh, good catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this, and it seems that spread must have an array, but in bull sometimes scripts.moveUnlockedJobsToWait returns undefined:
Cluster
Worker spawned: # 1
Worker spawned: # 2
Worker spawned: # 3
Worker spawned: # 4
Worker spawned: # 5
Worker spawned: # 6
Worker spawned: # 7
Failed to handle unlocked job in active: TypeError: Cannot read property 'map' of undefined
at /home/charles/source/bull/lib/queue.js:576:40
at tryCatcher (/home/charles/source/bull/node_modules/bluebird/js/release/util.js:16:23)
at Promise._settlePromiseFromHandler (/home/charles/source/bull/node_modules/bluebird/js/release/promise.js:507:35)
at Promise._settlePromise (/home/charles/source/bull/node_modules/bluebird/js/release/promise.js:567:18)
at Promise._settlePromise0 (/home/charles/source/bull/node_modules/bluebird/js/release/promise.js:612:10)
at Promise._settlePromises (/home/charles/source/bull/node_modules/bluebird/js/release/promise.js:691:18)
great PR. A Lot of good stuff all over the place. Please check my comments. I am sure @bradvogel will also give you some valuable feedback. |
Thanks @manast. I really appreciate all the Promise API improvements, I'm not as familiar with it. I'll rebase this PR, clean it up and repush. |
17ae9b1
to
c1b18b5
Compare
I rebased, and cleaned up all the commits based on your feedback. The previous set of commits is saved at nextorigin/bull/commits/issue-185-dirty. Now there are two major commits: the ioredis refactor, and the redlock refactor, almost all the rest are test improvements. I get all tests passing on every run here, so I hope that is reflected shortly in travis 😃 |
With the lock and promise improvements, this PR might fix #333 as well. Leaving this note for future reference. EDIT: Also #283, similar error there, I ran into that issue during testing, and related is #278 . |
What about all the scripts that look for |
"lodash": "^4.16.6", | ||
"mocha": "^3.1.2", | ||
"node-uuid": "^1.4.7", | ||
"redis": "^2.6.3", | ||
"redlock": "nextorigin/node-redlock", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think Bull should have this dependency. Please try to upstream your fix into node-redlock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bradvogel definitely, that's in process here:
mike-marcacci/node-redlock#25
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])'; | ||
var redlock; | ||
if (ensureActive) { | ||
var isJobInList = this._isJobInList.replace(/KEYS\[1\]/, '"' + job.queue.toKey('active') + '"') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too fragile to be running string replacement. Please make _isJobInList
a function that generates the appropriate string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I'll clean that up.
if (renew && !lock) { | ||
throw new Error('Unable to renew nonexisting lock'); | ||
} else if (renew) { | ||
return lock.extend(queue.LOCK_RENEW_TIME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lock renewer runs on _this.LOCK_RENEW_TIME / 2
interval, so this will keep extending it out farther and farther on each renewal. So if it gets stalled, it'll potentially be locked for many minutes. Is there a way to extend it to the absolute time (currentTime + queue.LOCK_RENEW_TIME)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The redlock extend method uses PEXPIRE, and should be functionally equivalent to the SET ... PX method bull uses now. It only sets the expire from the current time, it does not add to the existing TTL, so it's typical redis expire behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, you're right. Filed mike-marcacci/node-redlock#26 to clarify their docs.
@bradvogel they actually work exactly the same, redlock's method is very similar to bull's method and it works with We can see Also in my previous runs where I was logging heavily, I logged that process in particular so I could ensure it was working, the |
Ok, I see. I verified moveUnlockedJobsToWait still works locally. Thanks! |
c1b18b5
to
5c1e9d9
Compare
_isJobInList: function(keyVar, argVar, operator) { | ||
keyVar = keyVar || 'KEYS[1]'; | ||
argVar = argVar || 'ARGV[1]'; | ||
operator = operator || 'return'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've rewritten ._isJobInList as a function, this is much cleaner.
@bradvogel For sure, I appreciate the push for code quality. I've rewritten |
btw, I'm not completely familiar with Redis cluster, but a potential hazard that might cause issues with cluster is how we concatenate job keys in some scripts: https://github.com/nextorigin/bull/blob/issue-185/lib/scripts.js#L439. It's my understanding that if keys aren't passed in to scripts via the |
@bradvogel good call, I'll make a note of that. Perhaps we can ask for some user stories for bull+redis cluster usage from those issues. I run a lot of jobs, and have been using it with Sentinel+Redishappy+Consul for failover, but not a redis cluster per se. |
// to do this on subsequent calls. | ||
return job.takeLock(_this.token, renew, !renew).then(function(locked){ | ||
if(locked){ | ||
return job.takeLock(renew, true).then(function(lock){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mismerge. The second parameter should be !renew
. Also include comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here takeLock
returns early and relies on redlock to extend the lock, ensureActive
is only considered for new locks. So I don't know if suggesting the second parameter matters on a renew is necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yes. That's fine.
please let me know if everybody is happy so that I can manually merge this in the morning swedish time. |
The bull Queue instance is an EventEmitter which emits errors bound to the error event of the stream.Writable ioredis client. EventEmitters throw if the error event is not handled. Bull is durable and will recover from lost writes by failing/retrying jobs where necessary. Therefore the error can be safely ignored since this is a test case where we rapidly destroy queues. The buffer waiting to be written to the stream is just a publish event for a waiting job: "*3 $7 publish $31 waiting@test queue added before $200 ["7867c958-8fff-4d5f-b54b-3b0962bebf23",{"data":{"bar2":"baz2"},"opts":{"jobId":"3"},"progress":0,"delay":0,"timestamp":1478967231496,"attempts":1,"attemptsMade":0,"stacktrace":[],"returnvalue":null}] "
otherwise `npm test -- --inspect --debug-brk` hangs the cluster workers waiting for the debug go-ahead, making it impossible to test mocha sections using describe.only/it.only
… if the redis client fails to pub/sub events Fixes unhandledRejection errors as seen in https://travis-ci.org/nextorigin/bull/jobs/175337242#L730
… to an 'error' event handler
As seen in https://travis-ci.org/nextorigin/bull/builds/175347564 Actual stacktrace: "Error: Connection is closed. at close (/home/charles/source/bull/node_modules/ioredis/lib/redis/event_handler.js:101:21) at Socket.<anonymous> (/home/charles/source/bull/node_modules/ioredis/lib/redis/event_handler.js:76:14) at Socket.g (events.js:291:16) at emitOne (events.js:96:13) at Socket.emit (events.js:188:7) at TCP._handle.close [as _onclose] (net.js:498:12) From previous event: at Command.initPromise (/home/charles/source/bull/node_modules/ioredis/lib/command.js:63:18) at new Command (/home/charles/source/bull/node_modules/ioredis/lib/command.js:51:8) at Redis.quit (/home/charles/source/bull/node_modules/ioredis/lib/commander.js:131:29) at Queue.disconnect (/home/charles/source/bull/lib/queue.js:290:18) at /home/charles/source/bull/lib/queue.js:311:20 From previous event: at /home/charles/source/bull/lib/queue.js:310:8 From previous event: at Queue.close (/home/charles/source/bull/lib/queue.js:302:44) at /home/charles/source/bull/test/utils.js:38:18 From previous event: at Object.cleanupQueues (/home/charles/source/bull/test/utils.js:36:18) at Context.<anonymous> (/home/charles/source/bull/test/test_queue.js:231:20) at callFn (/home/charles/source/bull/node_modules/mocha/lib/runnable.js:343:21) at Hook.Runnable.run (/home/charles/source/bull/node_modules/mocha/lib/runnable.js:335:7) at next (/home/charles/source/bull/node_modules/mocha/lib/runner.js:309:10) at Immediate.<anonymous> (/home/charles/source/bull/node_modules/mocha/lib/runner.js:339:5) at runCallback (timers.js:637:20) at tryOnImmediate (timers.js:610:5) at processImmediate [as _immediateCallback] (timers.js:582:5)"
Looks brilliant, does this introduce a new locking mechanism? If so is there some docs on that because I'll need to update fsprojects/oxen to stay compatible with bull. :) |
I'll give it one more thorough review in a few hours. @manast do you think this should be released as 2.0? While technically none of the APIs documented in the readme have changed with this PR, anyone using undocumented |
Must be from today's new commits on master. I can rebase again first thing tomorrow. On November 14, 2016 9:23:05 PM PST, Brad Vogel notifications@github.com wrote:
|
I guess it is safer to release it as 2.0, to avoid the risk of breaking users code. I can write a notice explaining that the changes are mostly internal but large enough to motivate a major release. |
} else { | ||
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")'; | ||
redlock = new Redlock(queue.clients, queue.redlock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally think it's cleaner to monkey patch Redlock (eg redlock.lockScript = ...
) for now until PR mike-marcacci/node-redlock#25 is resolved. To mitigate against Redlock changing, just lock the version in the package.json (so "redlock": "2.0.2"
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately it's a private variable without my patch. I've petitioned for an answer from the dev, I can fork it if necessary, I maintain forks of other packages too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I mean - overwrite the private variable. Lock the version down in package.json (eg "redlock": "2.0.2"
) to make sure that variable won't change on us. Then in code:
var redlock = new Redlock(..)
redlock.lockScript = redlock.lockScript.replace(...)
This is much safer because it still uses the real redlock
module, and a locked version that can't update. It's clearer in package.json so other devs can see what version we're using. You can even add a comment in package.json (http://stackoverflow.com/questions/14221579/how-do-i-add-comments-to-package-json-for-npm-install) to indicate why it's locked to a specific version.
Pointing this to nextorigin/node-redlock
is dangerous because if you or anyone on your team accidentally pushed to that master branch, it'll break Bull for all users. I trust that you wouldn't do that, but it's unnecessary risk.
We can also just wait and hold merging this PR until node-redlock#25 is resolved.
cc @manast
@albertjan it uses a different locking mechanism (redlock) that is supposed to work in a distributed environment (needed when running redis cluster). I wondering though if we are going to announce that bull works with redis cluster, I have not made any tests myself, and as mentioned before, our LUA scripts may not work well in such an environment. |
@albertjan Part of the reason to implement ioredis is to start exploring redis sentinel and cluster support, however as described in redis distlock, bull will lose lock guarantees in a failover scenario unless redlock is implemented. node-redlock manages those cluster guarantees as well. There are some .NET libraries for redlock listed in redis distlock. Bull is now doing up to 3 operations in one block to get a lock:
Unfortunately it looks like none of the .NET libs have the lock script as a publicly accessible or modify-able variable, I have a PR open for node-redlock for that functionality. |
5c1e9d9
to
d84a0c3
Compare
Rebased on master and re-pushed. Fingers crossed for node-redlock#25. |
} else { | ||
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")'; | ||
redlock = new Redlock(queue.clients, queue.redlock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I mean - overwrite the private variable. Lock the version down in package.json (eg "redlock": "2.0.2"
) to make sure that variable won't change on us. Then in code:
var redlock = new Redlock(..)
redlock.lockScript = redlock.lockScript.replace(...)
This is much safer because it still uses the real redlock
module, and a locked version that can't update. It's clearer in package.json so other devs can see what version we're using. You can even add a comment in package.json (http://stackoverflow.com/questions/14221579/how-do-i-add-comments-to-package-json-for-npm-install) to indicate why it's locked to a specific version.
Pointing this to nextorigin/node-redlock
is dangerous because if you or anyone on your team accidentally pushed to that master branch, it'll break Bull for all users. I trust that you wouldn't do that, but it's unnecessary risk.
We can also just wait and hold merging this PR until node-redlock#25 is resolved.
cc @manast
d84a0c3
to
d22b891
Compare
…ll active jobs to finish during close/pause
…queue to cleanup after itself when waiting for jobs
d22b891
to
10b6882
Compare
@bradvogel I totally agree about pulling packages from Github, I would have released a new package like I did with stream-combiner2-withopts. The lockscript was private like trapped in a closure private, but thankfully the PR was merged. I swear 90% of the time I have to maintain a forked package, it's just because some option is trapped in a closure. The only change in this latest push is to update redlock to |
Hello,
This PR changes the redis dependency from node_redis to ioredis, fixing #185 (and completing PR #188). ioredis uses promises "all-the-way-down", exposing errors in the test suite. These tests are fixed and passing, including the race condition described in #370.
There are several commits here, to demonstrate the bugs and fixes made to the test suite. I am happy to squash this down, but right now each commit contains a description of the issues solved with the updated tests.
As part of solving #370, node-redlock is now used to implement atomic locking with a guaranteed owner. node-redlock is a promise-based lib which keeps track of its own secret token/value for each job/resource, which makes node-redlock valuable whether or not it's used with redis clustering.
Thanks for your work, looking forward to your feedback.