Skip to content

Commit

Permalink
FEATURE: add support for multiple workers
Browse files Browse the repository at this point in the history
Previously mini scheduler was tied to a single runner. This was somewhat
arbitrary as all the internals were designed with multi runner support.

This unlocks the ability to run multiple threads for the jobs.
  • Loading branch information
SamSaffron committed Aug 29, 2019
1 parent 5235fff commit 7cf0f99
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.12.0 - 29-08-2019

- Add support for multiple workers which allows avoiding queue starvation

# 0.11.0 - 24-06-2019

- Correct situation where distributed mutex could end in a tight loop when
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ In a Rails application, create files needed in your application to configure min

An initializer is created named `config/initializers/mini_scheduler.rb` which lists all the configuration options.

## Configuring MiniScheduler

By default each instance of MiniScheduler will run with a single worker. To amend this behavior:

```
if Sidekiq.server? && defined?(Rails)
Rails.application.config.after_initialize do
MiniScheduler.start(workers: 5)
end
end
```

This is useful for cases where you have extremely long running tasks that you would prefer did not starve.

## Usage

Create jobs with a recurring schedule like this:
Expand Down
4 changes: 2 additions & 2 deletions lib/mini_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ def self.skip_schedule(&blk)
@skip_schedule
end

def self.start
def self.start(workers: 1)
schedules = Manager.discover_schedules

Manager.discover_queues.each do |queue|
manager = Manager.new(queue: queue)
manager = Manager.new(queue: queue, workers: workers)

schedules.each do |schedule|
if schedule.queue == queue
Expand Down
16 changes: 10 additions & 6 deletions lib/mini_scheduler/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module MiniScheduler
class Manager
attr_accessor :random_ratio, :redis, :enable_stats, :queue
attr_accessor :random_ratio, :redis, :enable_stats, :queue, :workers

class Runner
def initialize(manager)
Expand All @@ -29,9 +29,12 @@ def initialize(manager)
sleep (@manager.keep_alive_duration / 2)
end
end
@thread = Thread.new do
while !@stopped
process_queue
@threads = []
manager.workers.times do
@threads << Thread.new do
while !@stopped
process_queue
end
end
end
end
Expand Down Expand Up @@ -128,10 +131,10 @@ def stop!

kill_thread = Thread.new do
sleep 0.5
@thread.kill
@threads.each(&:kill)
end

@thread.join
@threads.each(&:join)
kill_thread.kill
kill_thread.join
end
Expand Down Expand Up @@ -171,6 +174,7 @@ def self.without_runner

def initialize(options = nil)
@queue = options && options[:queue] || "default"
@workers = options && options[:workers] || 1
@redis = MiniScheduler.redis
@random_ratio = 0.1
unless options && options[:skip_runner]
Expand Down
2 changes: 1 addition & 1 deletion lib/mini_scheduler/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module MiniScheduler
VERSION = "0.11.0"
VERSION = "0.12.0"
end
75 changes: 75 additions & 0 deletions spec/mini_scheduler/slow_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# frozen_string_literal: true
# encoding: utf-8

if ENV["SLOW"]

class FastJob
extend ::MiniScheduler::Schedule
every 1.second

def self.runs=(val)
@runs = val
end

def self.runs
@runs ||= 0
end

def perform
sleep 0.001
self.class.runs += 1
end
end

class SlowJob
extend ::MiniScheduler::Schedule
every 5.second

def self.runs=(val)
@runs = val
end

def self.runs
@runs ||= 0
end

def perform
sleep 5
self.class.runs += 1
end
end

describe MiniScheduler::Manager do

let(:redis) { MockRedis.new }

it "can correctly operate with multiple workers" do
MiniScheduler.configure do |config|
config.redis = redis
end

manager = MiniScheduler::Manager.new(enable_stats: false, workers: 2)

sched = manager.schedule_info(FastJob)
# we jitter start times, this bypasses it
sched.next_run = Time.now + 0.1
sched.schedule!

sched = manager.schedule_info(SlowJob)
# we jitter start times, this bypasses it
sched.next_run = Time.now + 0.1
sched.schedule!

10.times do
manager.tick
sleep 1
end

manager.stop!

expect(FastJob.runs).to be > 5
expect(SlowJob.runs).to be > 0

end
end
end

0 comments on commit 7cf0f99

Please sign in to comment.