Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:OptimalBits/bull into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 17, 2019
2 parents 6f49f69 + 01ce4d7 commit 4f001e0
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 5,956 deletions.
70 changes: 41 additions & 29 deletions .eslintrc.yml
Original file line number Diff line number Diff line change
@@ -1,29 +1,41 @@
env:
node: true

parserOptions:
ecmaVersion: 8

extends:
- eslint:recommended
- plugin:node/recommended

rules:
valid-jsdoc: 0
func-style: 0
no-use-before-define: 0
camelcase: 1
no-unused-vars: 2
no-alert: 2
no-console: [2, { allow: ['warn', 'error'] }]
no-underscore-dangle: 0

strict: [2, 'global']
no-var: 2
prefer-arrow-callback: 2
prefer-const: 2
no-inner-declarations: 0
object-shorthand: [2, 'consistent-as-needed']
newline-per-chained-call: 2

node/no-deprecated-api: 0
env:
node: true

parserOptions:
ecmaVersion: 8

extends:
- eslint:recommended
- plugin:mocha/recommended
- plugin:node/recommended

plugins:
- mocha
- node

rules:
valid-jsdoc: 0
func-style: 0
no-use-before-define: 0
camelcase: 1
no-unused-vars: 2
no-alert: 2
no-console: [2, { allow: ['warn', 'error'] }]
no-underscore-dangle: 0

strict: [2, 'global']
no-var: 2
prefer-arrow-callback: 2
prefer-const: 2
no-inner-declarations: 0
object-shorthand: [2, 'consistent-as-needed']
newline-per-chained-call: 2

mocha/no-exclusive-tests: 2
mocha/no-hooks-for-single-case: 0
mocha/no-mocha-arrows: 0
mocha/no-setup-in-describe: 0
mocha/no-sibling-hooks: 0
mocha/no-skipped-tests: 0

node/no-deprecated-api: 0
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ node_modules
tmp
coverage
*.rdb
.vscode
.vscode
package-lock.json
12 changes: 12 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ language: node_js

sudo: false

cache:
yarn: true

# test on node.js versions
node_js:
- '12'
Expand All @@ -11,6 +14,15 @@ node_js:
services:
- redis-server

before_install:
# Use a specific version of yarn in CI. This ensures yarn.lock format doesn't change.
- curl -o- -L https://yarnpkg.com/install.sh | bash -s -- --version 1.19.1
- export PATH="$HOME/.yarn/bin:$PATH"

install:
# ensure unexpected changes to yarn.lock break the build
- yarn install --frozen-lockfile --non-interactive

script:
- npm run prettier -- --list-different
- npm run test
Expand Down
17 changes: 15 additions & 2 deletions REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [Queue#add](#queueadd)
- [Queue#pause](#queuepause)
- [Queue#resume](#queueresume)
- [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished)
- [Queue#count](#queuecount)
- [Queue#empty](#queueempty)
- [Queue#clean](#queueclean)
Expand Down Expand Up @@ -149,7 +150,7 @@ process(name: string, concurrency: number, processor: ((job, done?) => Promise<a

Defines a processing function for the jobs in a given Queue.

The callback is called everytime a job is placed in the queue. It is passed an instance of the job as first argument.
The callback is called every time a job is placed in the queue. It is passed an instance of the job as first argument.

If the callback signature contains the second optional `done` argument, the callback will be passed a `done` callback to be called after the job has been completed. The `done` callback can be called with an Error instance, to signal that the job did not complete successfully, or with a result as second argument (e.g.: `done(null, result);`) when the job is successful. Errors will be passed as a second argument to the "failed" event;
results, as a second argument to the "completed" event.
Expand Down Expand Up @@ -301,11 +302,13 @@ interface BackoffOpts {
### Queue#pause

```ts
pause(isLocal?: boolean): Promise
pause(isLocal?: boolean, doNotWaitActive?: boolean): Promise
```

Returns a promise that resolves when the queue is paused. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized. The pause can be either global or local. If global, all workers in all queue instances for a given queue will be paused. If local, just this worker will stop processing new jobs after the current lock expires. This can be useful to stop a worker from taking new jobs prior to shutting down.

If `doNotWaitActive` is `true`, `pause` will *not* wait for any active jobs to finish before resolving. Otherwise, `pause` *will* wait for active jobs to finish. See [Queue#whenCurrentJobsFinished](#queuewhencurrentjobsfinished) for more information.

Pausing a queue that is already paused does nothing.

---
Expand All @@ -322,6 +325,16 @@ Resuming a queue that is not paused does nothing.

---

### Queue#whenCurrentJobsFinished

```ts
whenCurrentJobsFinished(): Promise<Void>
```

Returns a promise that resolves when all jobs currently being processed by this worker have finished.

---

### Queue#count

```ts
Expand Down
58 changes: 17 additions & 41 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,11 @@ Queue.prototype.pause = function(isLocal, doNotWaitActive) {
});
}

if (!this.bclientInitialized) {
// bclient not yet initialized, so no jobs to wait for
return;
}

if (doNotWaitActive) {
// Force reconnection of blocking connection to abort blocking redis call immediately.
return redisClientDisconnect(this.bclient).then(() => {
Expand Down Expand Up @@ -1174,49 +1179,20 @@ Queue.prototype.clean = function(grace, type, limit) {
* @returns {Promise}
*/
Queue.prototype.whenCurrentJobsFinished = function() {
return new Promise((resolve, reject) => {
if (!this.bclientInitialized) {
// bclient not yet initialized, so no jobs to wait for
return resolve();
}

//
// Force reconnection of blocking connection to abort blocking redis call immediately.
//
const forcedReconnection = redisClientDisconnect(this.bclient).then(() => {
return this.bclient.connect();
});
if (!this.bclientInitialized) {
// bclient not yet initialized, so no jobs to wait for
return Promise.resolve();
}

Promise.all(this.processing)
.then(() => {
return forcedReconnection;
})
.then(resolve, reject);

/*
this.bclient.disconnect();
this.bclient.once('end', function(){
console.error('ENDED!');
setTimeout(function(){
this.bclient.connect();
}, 0);
});
//
// Force reconnection of blocking connection to abort blocking redis call immediately.
//
const forcedReconnection = redisClientDisconnect(this.bclient).then(() => {
return this.bclient.connect();
});

/*
var stream = this.bclient.connector.stream;
if(stream){
stream.on('finish', function(){
console.error('FINISHED!');
this.bclient.connect();
});
stream.on('error', function(err){
console.error('errir', err);
this.bclient.connect();
});
this.bclient.connect();
}
*/
//this.bclient.connect();
return Promise.all([this.processing[0]]).then(() => {
return forcedReconnection;
});
};

Expand Down
Loading

0 comments on commit 4f001e0

Please sign in to comment.