/
work_queue.rb
95 lines (79 loc) · 2.41 KB
/
work_queue.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
require 'set'
require_relative 'task'
require_relative 'queue_worker'
module TaskQueue
# A queue that executes tasks in the order in which they were received
class WorkQueue
attr_reader :name
def initialize(name:, number_of_workers: 1)
@name = name
@queue = Queue.new
@available_workers = Set.new
@busy_workers = Set.new
@worker_list_mutex = Mutex.new
@queue_mutex = Mutex.new
number_of_workers.times do |_n|
worker = QueueWorker.new(worker_delegate: self)
@available_workers.add(worker)
end
start_task_distributor
end
def worker_completed_task(worker:)
@worker_list_mutex.synchronize do
# remove worker from busy list
@busy_workers.delete(worker)
# add this worker back the available worker pool
@available_workers.add(worker)
end
# wake up task distributor if it's asleep
@task_distributor_thread.wakeup
end
def available_worker
@worker_list_mutex.synchronize do
worker = @available_workers.first
return nil if worker.nil?
# remove worker from available pool
@available_workers.delete(worker)
# add this worker to the busy pool
@busy_workers.add(worker)
return worker
end
end
def hand_out_work
# get first available worker
while (worker = self.available_worker)
# if none are available, that's cool.
break if worker.nil?
# grab the next task, if no task, then this current thread will suspend until there is
task = @queue.pop
# assign it to the free worker
worker.process(task: task)
end
end
def start_task_distributor
start_task_distributor_ready = false
@task_distributor_thread = Thread.new do
Thread.abort_on_exception = true
start_task_distributor_ready = true
Thread.stop
loop do
hand_out_work
# only sleep if we have no workers or the queue is empty
Thread.stop if @available_workers.count == 0 || @queue.empty?
end
end
until start_task_distributor_ready
# Spin until the start_task_distributor is in sleeping state
sleep(0.0001)
end
rescue ex
puts ex
raise ex
end
def add_task_async(task:)
task.submitted = true
@queue << task
@task_distributor_thread.wakeup
end
end
end