Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Counting jobs in progress for Resque #4

Open
wants to merge 1 commit into from

4 participants

@sadowski

This Macro is only counting the jobs in the queues and skips counting those that are being actively worked on. We need to look at the workers to see which jobs are active, otherwise we could have
the case where we spin up n workers for n jobs, each worker takes a job, there are 0 jobs left on the queue, and all the workers are spun down even though they are in the process of working.

@sadowski sadowski Counting jobs in progress for Resque
The Macro is only counting the jobs in the queues and skips 
counting those that are being actively worked on. We need to look
at the workers to see which jobs are active, otherwise we could have
the case where we spin up n workers for n jobs, each worker takes a
job, there are 0 jobs left on the queue, and all the workers are spun
down even though they are in the process of working.
253b71b
@meskyanichi
Owner

Hi @sadowski - Thanks for the PR. The initial implementation actually had this kind of behaviour in place. Unfortunately due to this issue with Resque the in_progress variable would be inaccurate since it would think there are still workers actively processing jobs while there aren't.

I haven't received any real confirmation on what the situation is with Resque and the pruning of dead workers. You can read up in the comments. Some of them suggest that it should be fixed with sigterm-all which is a heroku-labs kind of extension, but would become the default "soon" (which I'd think by now would already have happened, but not sure).

Have you run this piece of code in production yourself yet?

Cheers

@meskyanichi
Owner

Actually this page now suggests that it has in fact become the default: https://devcenter.heroku.com/articles/labs-sigterm-all - although this person claims that it doesn't fix the issue for his application.

So it's a little hard to tell what's really going on at this point. You always have the choice to your the macro's or custom logic you wish with HireFire of course. But it might be wise to wait it out a little to confirm whether you do or do not run in to any issues with dead workers.

I am definitely for adding the in_progress data to the queue size data, but will have to make sure that it doesn't get messed up by dead workers when it scales up or down.

@sadowski

We have something similar on our staging instances now, it should be going into production soon.

I see the 'dead worker' problem extremely rarely. Maybe 1 worker per month. When it happens I just go into the redis instance and clean out the keys for the dead worker so it stops showing up in the UI and stats.

I could definitely see how and inaccurate count from Resque could cause problems with scaling. However, the solution in the default macro causes us to always lose jobs whenever the queue is empty but the worker is still working, then getting scaled down and dropping any jobs it is currently working on.

We recently upgraded to Resque 1.22 and are using the TERM_CHILD env variable to allow Resque to correctly respond to heroku's TERM signal. I would expect the dead worker problem to mostly go away now that Resque is getting a chance to clean up after itself.

This is definitely not as simple a problem as it appears. Thanks for the info quick response.

Cheers,

Steve

@meskyanichi
Owner

Oh yeah, I read about that Resque release a while ago, by Hone at Heroku I believe. Well, if you could run this code in production for a while, along with the TERM_CHILD flag, and it runs well I would like to add your addition to the system. I'd have to update the documentation afterwards to ensure that people are in fact on the latest version of Resque with the flag enabled.

@meskyanichi
Owner

Just as a reference: http://hone.heroku.com/resque/2012/08/21/resque-signals.html since I managed to find the article.

You'll always want to ensure jobs get re-enqueued when they are SIGTERM'd. For example:

require 'resque/errors'

class DatabaseBackupJob
  def self.perform(id, foo, bar)
    # do backup work
  rescue Resque::TermException
    # write failure to database

    # re-enqueue the job so the job won't be lost when scaling down.
    Resque.enqueue(DatabaseBackupJob, id, foo, bar)
  end
end

I'm not sure whether Hone's patch covers this, but if not then you could probably get the job to re-enqueue itself by catching the SIGTERM, re-enqueuing the job, and then exiting cleanly. That is, if you don't want to job to get lost.

@barmstrong

Should this be the default behavior of perform? Or maybe an opt-in default. We would like to do this on almost all jobs, but it's redundant obviously to copy paste this to all jobs. Maybe a monkey patch would work as well - anyone have a snippet they've used for this? Thanks!

Also, we are using resque-retry which I thought might handle this automatically by just adding extend Resque::Plugins::ExponentialBackoff to the top of the job, but it doesn't seem to be retrying on Resque::TermException or Resque::DirtyExit. Thanks!

@skyshard

there's a similar issue for sidekiq- if all the jobs are being worked on, hirefire scales the dynos to 0. Sidekiq re-enqueues the jobs, but since it's a long-running job, they never get to finish

Then after the dynos are scaled down, sidekiq reinserts the incomplete jobs into their queues, and hirefire scales the dynos back up (this repeats forever)

@skyshard

@meskyanichi Is there any way to fix this for sidekiq? I'm seeing hirefire scale down the worker process while it's still working on jobs fairly frequently

@meskyanichi
Owner

For Resque this might be a bit hard, but for Sidekiq it might be possible actually.

A while ago Mike Perham released a public API that allows you to easily access queues/jobs/workers.

https://github.com/mperham/sidekiq/wiki/API

We might be able to rewrite the Sidekiq macro to take advantage of this API to do the following:

  1. Count the queue(s) in question for the amount of jobs
  2. Count the amount of active workers that are processing jobs from the queue(s) in question

Combine the data from the above points and you it should return > 0 if a worker is processing a job, even if the queue is empty.

I'll have a look at this soon. If you're in a hurry, feel free to explore this solution. Looking at the examples on the API page it looks like this can be rolled fairly quickly. Just make sure the Sidekiq gem is up to date so it has all these API features.

@meskyanichi
Owner

I actually managed to quickly write an untested solution. It'll probably need some tweaking unless I'm lucky. I'm unfortunately not in a good position to try this out at the moment to see if it actually works.

Here's the code I rolled:

module HireFire
  module Macro
    module Sidekiq
      extend self

      # Counts the amount of jobs in the (provided) Sidekiq queue(s).
      #
      # @example Sidekiq Macro Usage
      #   HireFire::Macro::Sidekiq.queue # all queues
      #   HireFire::Macro::Sidekiq.queue("email") # only email queue
      #   HireFire::Macro::Sidekiq.queue("audio", "video") # audio and video queues
      #
      # @param [Array] queues provide one or more queue names, or none for "all".
      #
      # @return [Integer] the number of jobs in the queue(s).
      def queue(*queues)
        queues.flatten!
        queues = Sidekiq::Stats.new.queues.map { |name, _| name } if queues.empty?

        jobs = queues.inject(0) do |memo, name|
          queue = Sidekiq::Queue.new(name)
          memo += queue.size
          memo
        end

        workers = Sidekiq::Workers.new.select { |_, work|
          queues.include?(work["queue"]) && work["run_at"] <= Time.now
        }.count

        jobs + workers
      end
    end
  end
end

Using reference: https://github.com/mperham/sidekiq/wiki/API

You could simply stick this in your hirefire.rb initializer, like so:

HireFire::Resource.configure do |config|
  config.dyno(:worker) do
    queues = ["worker"]
    queues.flatten!
    queues = Sidekiq::Stats.new.queues.map { |name, _| name } if queues.empty?

    jobs = queues.inject(0) do |memo, name|
      queue = Sidekiq::Queue.new(name)
      memo += queue.size
      memo
    end

    workers = Sidekiq::Workers.new.select { |_, work|
      queues.include?(work["queue"]) && work["run_at"] <= Time.now
    }.count

    jobs + workers
  end
end

The code is practically the same, except that I hardcoded the queues= ["worker"] since this wouldn't be used as a macro but inline for testing purposes. You could also omit this queues = ["worker"] assignment if you want it to automatically count all queues in Sidekiq. Feel free to try it out.

One thing that might fail by the way would be the work["run_at"] <= Time.now because I'm not 100% sure this attribute always returns a timestamp. It might be that it returns nil if you don't specify a run_at when enqueuing the job. However, I think the run_at will always get set, even if you don't specify it, it would default to Time.now so there's a good chance this chunk of code will just work.

Once I have time I'll properly test it out. Again, feel free to try it out for yourself and see how it works, even if it's just in development mode.

Cheers

@meskyanichi
Owner

@sadowski, @eckardt, @skyshard, @barmstrong so it has been quite a while since the "dead worker" issue came about and it might be that by now Heroku has fixed the issue with Resque. I also believe someone at Heroku took over the Resque project and thus has more control over getting it to work correctly with Heroku.

I think it'd be a good idea to create two separate branches, experimental-resque and experimental-sidekiq which contain the behaviour of combining in_queue and in_progress jobs for both Resque and Sidekiq. This way you could try it out for a while and see if you run in to any unexpected issues (e.g. dead workers).

Should everything be running smooth for a while (say a week or two) then I would like to merge in these changes in to master so you can use the gem instead of a specific branch.

I'll take a look at the Sidekiq code asap and make both the experimental-resque and experimental-sidekiq branches available in this repository so you can simply add it to your Gemfile.

That said, have any of you already used this PR in production, did it work well? If you've already tested it for a long time please let me know and I'll consider adding it to the gem immediately rather than create a separate branch for it.

Thanks!

@meskyanichi
Owner

I ended up creating a single branch (experimental) that includes the changes for both Resque and Sidekiq.

See ref: 684f135

Please try this out in your app with:

gem "hirefire-resource",
  :git    => "git://github.com/meskyanichi/hirefire-resource.git",
  :branch => "experimental"

Let me know how this works and whether things need to be tweaked. If all goes well for a while I'll merge it back to master.

Thanks!

@skyshard

I'm getting some stuck workers with sidekiq- memory usage looks fine but they aren't picking up jobs. Will investigate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 12, 2012
  1. @sadowski

    Counting jobs in progress for Resque

    sadowski authored
    The Macro is only counting the jobs in the queues and skips 
    counting those that are being actively worked on. We need to look
    at the workers to see which jobs are active, otherwise we could have
    the case where we spin up n workers for n jobs, each worker takes a
    job, there are 0 jobs left on the queue, and all the workers are spun
    down even though they are in the process of working.
This page is out of date. Refresh to see the latest.
Showing with 4 additions and 1 deletion.
  1. +4 −1 lib/hirefire/macro/resque.rb
View
5 lib/hirefire/macro/resque.rb
@@ -18,7 +18,10 @@ module Resque
def queue(*queues)
return ::Resque.info[:pending].to_i if queues.empty?
queues = queues.flatten.map(&:to_s)
- queues.inject(0) { |memo, queue| memo += ::Resque.size(queue); memo }
+ in_queues = queues.inject(0) { |memo, queue| memo += ::Resque.size(queue); memo }
+ in_progress = ::Resque::Worker.all.select{|worker| queues.include?(worker.job['queue']) }.count
+
+ in_queues + in_progress
end
end
end
Something went wrong with that request. Please try again.