Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debounced Jobs #1034

Open
tophep opened this issue Aug 27, 2018 · 29 comments
Open

Debounced Jobs #1034

tophep opened this issue Aug 27, 2018 · 29 comments

Comments

@tophep
Copy link

tophep commented Aug 27, 2018

Issuehunt badges

Description

Let's say I want to queue a job every time one of my users updates their profile. But if a user makes many rapid updates I don't want to queue a bunch of jobs that repeat the same work. I'd rather wait until the user is done with the flurry of updates.

This was previously discussed here but it was unrelated to the main thread and there was no resolution.

A simple solution could be to use a custom Id with a delay. When another job needs to be queued, delete the old job and re-queue with delay again. The caveat (touched on in thread above) is when the job is active and being processed and another job needs to be queued. @manast Any thoughts on how to handle this edge case? cc @lxcid

Separately, would be convenient to increase the delay of existing jobs (instead of delete + enqueue).


IssueHunt Summary

Backers (Total: $0.00)

Become a backer now!

Or submit a pull request to get the deposits!

Tips

@manast
Copy link
Member

manast commented Aug 27, 2018

Well I guess that should be like any other debouncing, i.e. if the job already started it will continue working, and the job added after that should be either ignored or queued.

@sebasmagri
Copy link

Any chance to look at this again?

This would solve a very common use case I have. A possible solution I've been looking at is to have deduplication set up on jobId + state, so that if there's a job already waiting, there will not be new waiting jobs for the same jobId, but if there's a task already active for the same jobId we can still create a task that will be executed after that.

This requires concurrence to be 1, however, and doesn't consider failure/completion states. Also would require some additional checks.

Have also looked at a double queue setup, one for debouncing and the second for the actual processing, but still sounds very hacky.

So a native solution for this would be perfect.

Thoughts? Any direction in which we could help?

@shivgarg5676
Copy link

Did you guys find any good solution to this problem? I am facing the same situation where the same job can be queued multiple times and I need to process one. Currently, I am storing all the jobs in the Redis myself and batching them up and sending one job in the queue after 10 seconds. But a native solution could be better.

@GriffinSauce
Copy link

GriffinSauce commented Dec 23, 2019

I have the same challenge, in my case with chat message notifications (but when we have a working system it could be applied to many things). The same applies: allow user to trigger a bunch of jobs, only process the last one.

So I think it comes down to the fact that there is a unique context (ie. one chat or one user in @tophep's case) and you want jobs to behave differently based on different queued jobs in that same context.

The opaqueness of redis values + those two dimensions (jobs + contexts) kinda forces you to work with multiple queues or "collections" here I think. Either that or do some whackyness with key prefixes / suffixes?

My current idea involved saving the jobId to debounce:[chatId] and creating a new delayed job (that will check that value to assert whether it should actually do anything). That still feels like it's rife with potential issues though.

@manast
Copy link
Member

manast commented Dec 25, 2019

@issuehunt-oss
Copy link

issuehunt-oss bot commented Nov 25, 2020

@mauricedoepke has funded $250.00 to this issue.


@issuehunt-oss issuehunt-oss bot added the 💵 Funded on Issuehunt This issue has been funded on Issuehunt label Nov 25, 2020
@mauricedoepke
Copy link

@manast Would be great if we could get that feature.

@manast
Copy link
Member

manast commented Nov 25, 2020

@mauricedoepke I am going to look into it, it was long time ago so I barely remember it anymore.

@manast
Copy link
Member

manast commented Nov 26, 2020

@mauricedoepke just to make clear the requirements for this issue:

  1. A job added with a debounce parameter will wait X milliseconds before starting to process.
  2. If a new job is added before X milliseconds have passed from previous job then the old job is replaced by the new one.
  3. If the old job has already started and a new job is added, the new job will not replace the old one, it will just be added to the queue.

Are these the correct expectations? in other words, a quite standard debounce functionality.

@mauricedoepke
Copy link

mauricedoepke commented Nov 27, 2020

@manast Yes, thats correct. We are meaning exactly the same.

@majidsajadi
Copy link

@mauricedoepke just to make clear the requirements for this issue:

  1. A job added with a debounce parameter will wait X milliseconds before starting to process.
  2. If a new job is added before X milliseconds have passed from previous job then the old job is replaced by the new one.
  3. If the old job has already started and a new job is added, the new job will not replace the old one, it will just be added to the queue.

Are these the correct expectations? in other words, a quite standard debounce functionality.

I have a question about the requirements.
by new job, you mean a job with the same name and data? or the new job can have different data? if the data is different that the old job we should replace the old one with the new one?

@mauricedoepke
Copy link

@majidsajadi

I think we should define "new job" by name only. It should be able to have different data and still replace a previous job with the same name.

In my case it will be for debounce sending users notifications
This way I can have a queue named "notifications" with a job named by the userId and put all the necessary data for the notifications in the data.

This way I can insert the newest data, but it will still debounce by the userId(job name)

@manast
Copy link
Member

manast commented Dec 8, 2020

We will need to use the jobId option for this, since that is the proven mechanism we have today to avoid duplicated jobs. In your case it should work since you can use userId as jobId.

@majidsajadi
Copy link

majidsajadi commented Dec 10, 2020

In this case we can delay a job with delay option in certain amount of time w and set userId as jobId (overwrite the default id). and on creating new job just check if a job with the provided id exists. we can add an 'overwrite' option for this matter.

or we can add 'debounce' option that works like 'delay' but overwrite any existing job with same jobId.

if im correct i would like to tackle this issue.

@veedeo
Copy link

veedeo commented Dec 31, 2020

Adding a job with an existing ID will do nothing (by default), it will be great to have an option to "overwrite" the job instead.
Waiting for this feature.
Also, note that overwrites should be atomic, as there might be many publishers

@manast
Copy link
Member

manast commented Jan 2, 2021

@veedeo this is feasible but only if the overwrite is done before the job has actually started to be processed, it should be as easy as having an extra option "overwrite" and when active instead of ignoring it just adds the job normally (as long as the job is in the wait or delayed sets)

@Nytelife26
Copy link

I do intend to work on this. So, let me get this straight:

  1. Atomic overwriting / upserting of jobs (that have not yet started to process) [would also align with taskforces/bullmq#240]
  2. Integrate delay functionality for debouncing, where all jobs of same ID queued within a certain delay will overwrite the prior before allowing it to process
  3. If the job has already started processing or is finished, just add another one to the queue as normal, with the same process as prior

@jamesholcomb
Copy link

Also, the delay specified by queue.add(name, data, {delay: x}) would override the existing delay.

@Nytelife26
Copy link

Ah, so, @jamesholcomb, the plan is to make overwriting jobs extend the delay as well, correct? That makes sense now I think about it.

@jamesholcomb
Copy link

That would satisfy my use case...For instance, a job is scheduled 3 months out. A user makes a change to some job data that requires it to be moved up to tomorrow.

@Nytelife26
Copy link

Well, it only makes sense for atomic overwriting to overwrite all properties, so that works fine. Obviously this is my first contribution to Bull, and so I am unsure of the design philosophy, but it sounds like the logical solution to me.

@janat08
Copy link

janat08 commented Jan 27, 2021

I'd like to work on this. @mandast you promise to review and pay within a week?

@janat08
Copy link

janat08 commented Jan 30, 2021

Because my experience with bounties is far from ideal, and I don't have finances to manage.

@mauricedoepke
Copy link

@tophep @sebasmagri @shivgarg5676 @GriffinSauce

Is anyone of you maybe interested throwing some dollars into the issuehunt as well to make it more appealing for someone to add this feature?

@issuehunt-oss
Copy link

issuehunt-oss bot commented Mar 1, 2021

@mauricedoepke has cancelled funding for this issue.(Cancelled amount: $250.00) See it on IssueHunt

@issuehunt-oss issuehunt-oss bot removed the 💵 Funded on Issuehunt This issue has been funded on Issuehunt label Mar 1, 2021
@sevetseh28
Copy link

Any updates on this super handy feature ? :) Thanks!

@KirillSuhodolov
Copy link

hey, everyone, what is current status for this feature?
@mauricedoepke could we restore funding?, I am interesting to donate

@AntonPuko
Copy link

AntonPuko commented May 5, 2022

here is our crutch for debounced jobs with the current bull. maybe will help to somebody

  async addDebounced<TData extends Record<string, unknown>>({
    data,
    debounceKey,
    debounceTimeout,
    jobName,
    options,
    queueName,
  }: {
    data?: TData;
    debounceKey: string;
    debounceTimeout: number;
    jobName: JobName;
    options?: Omit<JobOptions, 'delay' | 'jobId'>;
    queueName?: QueueName;
  }): Promise<Bull.Job<TData>> {
    const queue = queuesByName[queueName || QueueName.AsyncQueue];
    const jobIdPrefix = `Job:${jobName}:${debounceKey}`;
    const jobId = `${jobIdPrefix}:${uuid()}`;
    await queue.removeJobs(`${jobIdPrefix}:*`);

    return queue.add(
      jobName,
      {
        ...data,
        _context: { initialEnvironment: process.env.ENVIRONMENT, requestId },
      },
      {
        ...options,
        delay: debounceTimeout,
        jobId,
      },
    );
  }

@jmbeach
Copy link

jmbeach commented Jul 1, 2022

@AntonPuko that won't affect active jobs right? Just remove ones waiting?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests