Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible to push (unattempted) jobs back on a queue? #1546

Open
chris72205 opened this issue Nov 6, 2019 · 9 comments
Open

Possible to push (unattempted) jobs back on a queue? #1546

chris72205 opened this issue Nov 6, 2019 · 9 comments

Comments

@chris72205
Copy link

We have a use case where we process jobs and rate limit them based on some property (property A) in the job payload. For this reason, we cannot easily use the built in Rate Limiter (as far as I know) without creating different queues for each variation of property A.

Is there a way to put a job back on the queue without affecting it's retry count? I'm thinking of something similar to a nack in RabbitMQ. Something else I initially thought of is to simply create a new job but then we lose all of the retry functionality and history for the original one.

@chris72205
Copy link
Author

After looking into this a little more, moveToDelayed seems to do what I'm wanting but it's not documented, not present in the TS definition, and I have to pass ignoreLock as true to get it to work. I'm not familiar with the code base enough to understand the importance of locking here or if passing ignoreLock will cause problems elsewhere.

@manast
Copy link
Member

manast commented Nov 6, 2019

It is not documented for good reasons, since it is an internal API. If you call that method from the processor you will most likely create undesired side effects since the job will still try to complete and at the same time it is moved to the delayed set instead of being in the Active list.
What you are asking is certainly in the realms of possible to implement as a new feature though. Currently when a job is processed there are only 2 possible outcomes, either it completes or it fails. We would need a new possibility, one that just cancels the job and then you are free to move it to the delay set or do something else.

@chris72205
Copy link
Author

Gotcha. So this third outcome is something you're potentially open to adding to the project?

Something like this?

        --> priorities      -- > completed
       /     |            /
    job -> wait -> active -   -----------> rejected
       \     ^            \                   |
        v    |             -- > failed        |
        delayed    <------------------------- |

@gabegorelick
Copy link
Contributor

I've wanted this for a long time. Would be great to see it in Bull.

@theoephraim
Copy link

theoephraim commented Jun 30, 2020

+1 - this is very necessary to handle more complex rate limiting scenarios.
Or another option would be if we could adjust the job data (new delay to use and retry count) while failing the job?

@chris72205 - did you ever find a better workaround?

@chris72205
Copy link
Author

@theoephraim yes and no... the workaround I settled on is a bit hacky but ultimately worked. I ended up adding an "internal" property to our jobs that we use to store some meta info about the job itself (number of retries, previous errors, etc) and do a check on the jobs when they emit a failed event to determine if the job should, in fact, be removed... or retried again. It works alright, but is more of a stop gap until we can come back around and (hopefully) update this library or switch to something else. The flow I settled on looks something like this:

queue.on('failed', (job, error) => {
    if(job.attemptsMade === somethingHoldingOurMaxAttemptsSetting) {
        // job would be thrown away by Bull so we decide if we should spin up a new one

        if(notActuallyExhausted(job)) {
            // notActuallyExhausted(job) is some method that looks through our job._internal property
            // and decides how many times it actually failed vs how many times we "rejected" it due to
            // not being ready to process it

            // job is not actually exhausted, so we create a new one and retain some info about the previous one(s)
            queue.add(
                job.name, // same name
                {
                    ...job.data, // same data
                    _internal: { ... }, // populate with existing metadata and existing errors + new errors from this job
                },
                job.opts, // same options
            )
        }

        // we remove the existing job since that's what _should_ happen or we've created a new one so we
        // don't need this one anyways
        job.remove();
    }
});

I also added some logic in the backoffStrategies option to better handle retry delays when we're simply not ready to process a job. It's not perfect, but works for the most part.

@manast
Copy link
Member

manast commented Jul 1, 2020

Do you think this will help in resolving this issue? #1753

@theoephraim
Copy link

@manast - that could help some for some simple scenarios where jobs can be bucketed easily.

In my case, I have several different rate limits that need to be checked (per user, per org, and then per api token being used), so a single bucket doesn't work. Additionally, I may make an api request and get back a "youve been rate limited" response. In any of these cases, I just want to retry the job after some delay, but not count it as an "attempt" in the normal retry logic or use the normal exponential backoff setup.

@theoephraim
Copy link

@chris72205 thanks for taking the time to explain your solution - very helpful. I already have a wrapper around each job handler, so was able to do something similar and re-enqueue a new job, but I'll see if hooking into the backoff strategies or error event handler may simplify some things. Cheers!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants