Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrying jobs with a single queue and multiple consumers #2512

Closed
olivermrbl opened this issue Jan 5, 2023 · 2 comments
Closed

Retrying jobs with a single queue and multiple consumers #2512

olivermrbl opened this issue Jan 5, 2023 · 2 comments

Comments

@olivermrbl
Copy link

I've been grooming issues (both open and closed) and StackOverflow to find an answer to my question, but without any luck. Therefore, I apologize upfront if this has in fact been answered previously. In that case, the link to the solution is highly appreciated.

Description

I want to configure retries on jobs, but I can't figure out the best approach to tackle this when there are multiple consumers of a single job. The job options { attempts: 5 } does indeed work, but my issue is that when it retries, all consumers are processing the job once more instead of only the single consumer that threw an error.

It might be the intended behavior; in that case, I would appreciate alternative ways of configuring my queue, processor, and consumer setup.

Minimal, Working Test code to reproduce the issue.

As this is part of a larger application, I can't provide you with reproducible code. Instead, I'll outline the high-level flow of events processing (in code that can't be compiled):

// Initializing the queue
const queue = new Bull(`bull-events-queue`, { ... })

// Emitting a job
queue.add({ jobName, data }, { attempts: 5, removeOnComplete: true })

// Consuming a job
// It is important to stress that we typically have multiple consumer per job
consume(jobName, consumer) {
    const existingConsumers = consumers_.get(event) ?? [] // these are saved on the class itself
    this.consumers_.set(jobName, [...existingConsumers, consumer])
}

// Processing jobs
worker_ = (job) => {
    const { jobName, data } = job.data
    const consumers = this.consumers_.get(jobName) || []

    return await Promise.all(
      consumers.map(async (consumer) => {
        return consumer(data, eventName).catch((err) => {
          throw err // this triggers the retry mechanism of bull
        })
      })
    )
  }

// Example scenario:
consume("test.event", () => console.log("hello"));
consume("test.event", () => throw Error("Some error")); // this consumer happens to fail for some reason
consume("test.event", () => console.log("hello 3"));

queue.add({ "test.event", { "key": "value" } }, { attempts: 5, removeOnComplete: true })

In this case, all three consumers would be run 5 times due to the attempts option, although it's only a single consumer actually failing. It should be obvious that this can lead to data inconsistencies as jobs are processed more than once even though it completed.

Does Bull natively support some kind of idempotency mechanism for this use case, or would I have to build that myself?

Looking forward to hearing from you.

Bull version

3.29.3

Additional information

@olivermrbl
Copy link
Author

olivermrbl commented Jan 5, 2023

Update: For now, I've built it custom storing the completed consumers on the actual job

worker_ = (job) => {

  // Pull already completed consumers from the job data
  const completedConsumers = job.data.completed_consumer_ids || []
  
  // Filter out already completed consumers from the all consumers
  const consumersForCurrentAttempt = consumers.filter(
    (cons) => cons.id && !completedConsumers.includes(cons.id)
  )
  
  const completedConsumersInCurrentAttempt: string[] = []
  
  await Promise.all(
    consumersForCurrentAttempt.map(async ({ id, consumer }) => {
      return consumer(data, eventName)
        .then(() => {
          // For every consumer that completes successfully, add their id to the list of completed consumer
          completedConsumersInCurrentAttempt.push(id)
        })
        .catch((err) => {
          this.logger_.warn(
            `An error occurred while processing ${eventName}: ${err}`
          )
          console.error(err)
          return err
        })
    })
  )
  
  // If the number of completed consumers is different from the number of consumers to process in current attempt, some of them failed
  // Therefore, we update the job and retry
  if (completedConsumersInCurrentAttempt.length !== consumers.length) {
    const updatedCompletedConsumers = [
      ...completedConsumers,
      ...completedConsumersInCurrentAttempt,
    ]
  
    job.update({
      ...job.data,
      completed_consumer_ids: updatedCompletedConsumers,
    })
  
    const errorMessage = `One or more consumers of ${eventName} failed. Retrying...`
  
    this.logger_.warn(errorMessage)
  
    return Promise.reject(new Error(errorMessage))
  }
  
  return Promise.resolve(undefined)
}

@olivermrbl
Copy link
Author

Now, the next big issue spawns when we want to allow multiple instances i.e. scaling horizontally. Because then we can no longer rely on the consumers registered on the class, because all classes are scoped to the server instance. I will get back If I find a solution.

Let me know if anyone has ideas :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant