Skip to content

Commit

Permalink
Heartbeat, which enables the client to find out, if the QueueDispatch…
Browse files Browse the repository at this point in the history
…er is still running
  • Loading branch information
philkman committed Apr 17, 2015
1 parent 825ef1d commit 3c67abc
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
31 changes: 23 additions & 8 deletions lib/queue_dispatcher/acts_as_task_queue.rb
Expand Up @@ -52,6 +52,27 @@ def acts_as_task_queue_config
end


# Check if a certain PID is still running and is a ruby process
def pid_running?(pid)
ps = pid ? Sys::ProcTable.ps(pid) : nil
if ps
# Asume, that if the command of the 'ps'-output is 'ruby', the process is still running
ps.comm == 'ruby'
else
false
end
end


# Check if QueueDispatcher is running.
def qd_running?
hb_tqs = TaskQueue.where(state: 'heartbeat')
running = false
hb_tqs.each { |tq| running = true if pid_running?(tq.pid) && tq.updated_at > 1.minute.ago }
running
end


# Are there any running task_queues?
def any_running?
running = false
Expand All @@ -66,7 +87,7 @@ def get_next_pending

transaction do
# Find next task_queue which is not running and not in state error
order(:id).lock(true).all.each { |tq| task_queue = tq unless task_queue || tq.pid_running? || tq.state == 'error' }
order(:id).lock(true).all.each { |tq| task_queue = tq unless task_queue || tq.pid_running? || tq.state == 'error' || tq.state == 'heartbeat' }

# Update pid inside the atomic transaction to be sure, the next call of this method will not give the same queue a second time
task_queue.update_attribute :pid, $$ if task_queue
Expand Down Expand Up @@ -179,13 +200,7 @@ def task_states

# Return true, if the command of the process with pid 'self.pid' is 'ruby'
def pid_running?
ps = self.pid ? Sys::ProcTable.ps(self.pid) : nil
if ps
# Asume, that if the command of the 'ps'-output is 'ruby', the process is still running
ps.comm == 'ruby'
else
false
end
self.class.pid_running?(self.pid)
end


Expand Down
2 changes: 1 addition & 1 deletion lib/queue_dispatcher/version.rb
@@ -1,3 +1,3 @@
module QueueDispatcher
VERSION = "2.3.0"
VERSION = "2.4.0"
end
23 changes: 20 additions & 3 deletions script/queue_worker_dispatcher
Expand Up @@ -35,7 +35,8 @@ $daemon = {
:background => false, # background mode
:work => true, # daemon work flag
:logger_msg_prefix => nil, # Prefix for logging
:worker_pids => [] # Remember PIDs
:worker_pids => [], # Remember PIDs
:heartbeat_task_queue_id => nil # Remember ID of the Heartbeat-TaskQueue
}

$worker = {
Expand Down Expand Up @@ -74,6 +75,7 @@ end

# Clean up before a daemon terminates
def daemon_clean_up
TaskQueue.where(id: $daemon[:heartbeat_task_queue_id]).destroy_all
File.delete($daemon[:pid_file])
end

Expand Down Expand Up @@ -137,14 +139,15 @@ def daemon_start

# Asume, that if the command of the 'ps'-output is 'ruby', the process is still running
if ps && (ps.comm == 'ruby')
daemon_log :msg => "Process already running!", :sev => :error
daemon_log :msg => 'Process already running!', :sev => :error
exit
else
File.delete($daemon[:pid_file])
end
end

daemon_log :msg => "Starting process..."
# Start daemon
daemon_log :msg => 'Starting process...'
if $daemon[:background]
Spawnling.new { daemon_runner }
else
Expand All @@ -153,11 +156,25 @@ def daemon_start
end


# update heartbeat
def daemon_update_heartbeat
hb_tq = TaskQueue.find_by(id: $daemon[:heartbeat_task_queue_id])
if hb_tq
hb_tq.touch
else
hb_tq = TaskQueue.create(name: 'QD_HeartBeat', state: 'heartbeat', pid: Process.pid)
$daemon[:heartbeat_task_queue_id] = hb_tq.id
end
end


# Start an amount of workers
def spawn_and_monitor_workers
daemon_log :msg => "Spawning #{$daemon[:worker_count]} workers..."

while $daemon[:work]
daemon_update_heartbeat

# (Re)start workers
while $daemon[:worker_pids].count < $daemon[:worker_count] do
sp = Spawnling.new(:argv => $worker[:process_prefix]) do
Expand Down

0 comments on commit 3c67abc

Please sign in to comment.