Permalink
Browse files

Some code style upgrades in Daemon

  • Loading branch information...
lmrodriguezr committed Nov 29, 2017
1 parent 04de407 commit f923499845c779d9c0169b1117245195cb7a18ed
Showing with 47 additions and 39 deletions.
  1. +3 −0 .codeclimate.yml
  2. +44 −39 lib/miga/daemon.rb
@@ -3,6 +3,9 @@ exclude_paths:
engines:
rubocop:
enabled: true
checks:
Rubocop/Style/AndOr:
enabled: false
duplication:
enabled: true
config:
@@ -1,27 +1,27 @@
# @package MiGA
# @license Artistic-2.0

require "miga/project"
require "daemons"
require "date"
require 'miga/project'
require 'daemons'
require 'date'

##
# MiGA Daemons handling job submissions.
class MiGA::Daemon < MiGA::MiGA

##
# When was the last time a daemon for the MiGA::Project +project+ was seen
# active? Returns DateTime.
def self.last_alive(project)
f = File.expand_path("daemon/alive", project.path)
f = File.expand_path('daemon/alive', project.path)
return nil unless File.exist? f
DateTime.parse(File.read(f))
end

# Shutdown all spawned daemons before exit.
$_MIGA_DAEMON_LAIR = []
END { $_MIGA_DAEMON_LAIR.each{ |d| d.terminate } }
END { $_MIGA_DAEMON_LAIR.each(&:terminate) }

# MiGA::Project in which the daemon is running.
attr_reader :project
# Options used to setup the daemon.
@@ -40,8 +40,8 @@ def initialize(project)
$_MIGA_DAEMON_LAIR << self
@project = project
@runopts = JSON.parse(
File.read(File.expand_path("daemon/daemon.json", project.path)),
{:symbolize_names=>true})
File.read(File.expand_path('daemon/daemon.json', project.path)),
symbolize_names: true)
@jobs_to_run = []
@jobs_running = []
@loop_i = -1
@@ -57,7 +57,7 @@ def last_alive
##
# Returns Hash containing the default options for the daemon.
def default_options
{ dir_mode: :normal, dir: File.expand_path("daemon", project.path),
{ dir_mode: :normal, dir: File.expand_path('daemon', project.path),
multiple: false, log_output: true }
end

@@ -68,53 +68,56 @@ def default_options
def runopts(k, v=nil, force=false)
k = k.to_sym
unless v.nil?
v = [:latency, :maxjobs, :ppn].include?(k) ? v.to_i :
[:shutdown_when_done].include?(k) ? !!v : v
if [:latency, :maxjobs, :ppn].include?(k)
v = v.to_i
elsif [:shutdown_when_done].include?(k)
v = !!v
end
raise "Daemon's #{k} cannot be set to zero." if !force and v==0
@runopts[k] = v
end
if k==:kill and v.nil?
case @runopts[:type].to_s
when "bash" ; return "kill -9 '%s'"
when "qsub" ; return "qdel '%s'"
else ; return "canceljob '%s'"
when 'bash' then return "kill -9 '%s'"
when 'qsub' then return "qdel '%s'"
else return "canceljob '%s'"
end
end
@runopts[k]
end

##
# Returns Integer indicating the number of seconds to sleep between checks.
def latency() runopts(:latency) ; end
def latency() runopts(:latency); end

##
# Returns Integer indicating the maximum number of concurrent jobs to run.
def maxjobs() runopts(:maxjobs) ; end
def maxjobs() runopts(:maxjobs); end

##
# Returns Integer indicating the number of CPUs per job.
def ppn() runopts(:ppn) ; end
def ppn() runopts(:ppn); end

##
# Returns Boolean indicating if the daemon should shutdown when processing is
# complete.
def shutdown_when_done?() !!runopts(:shutdown_when_done) ; end
def shutdown_when_done?() !!runopts(:shutdown_when_done); end

##
# Initializes the daemon with +opts+.
def start(opts=[]) daemon("start", opts) ; end
def start(opts=[]) daemon("start", opts); end

##
# Stops the daemon with +opts+.
def stop(opts=[]) daemon("stop", opts) ; end
def stop(opts=[]) daemon("stop", opts); end

##
# Restarts the daemon with +opts+.
def restart(opts=[]) daemon("restart", opts) ; end
def restart(opts=[]) daemon("restart", opts); end

##
# Returns the status of the daemon with +opts+.
def status(opts=[]) daemon("status", opts) ; end
def status(opts=[]) daemon("status", opts); end

##
# Launches the +task+ with options +opts+ (as command-line arguments).
@@ -164,11 +167,10 @@ def check_datasets
# project-level tasks
def check_project
return if project.dataset_names.empty?
if project.done_preprocessing?(false)
to_run = project.next_distances(true)
to_run = project.next_inclade(true) if to_run.nil?
queue_job(to_run) unless to_run.nil?
end
return unless project.done_preprocessing?(false)
to_run = project.next_distances(true)
to_run = project.next_inclade(true) if to_run.nil?
queue_job(to_run) unless to_run.nil?
end

##
@@ -177,21 +179,25 @@ def check_project
# scheduler (or to bash) see #flush!.
def queue_job(job, ds=nil)
return nil unless get_job(job, ds).nil?
ds_name = (ds.nil? ? "miga-project" : ds.name)
ds_name = (ds.nil? ? 'miga-project' : ds.name)
say "Queueing ", ds_name, ":#{job}"
vars = { "PROJECT"=>project.path, "RUNTYPE"=>runopts(:type),
"CORES"=>ppn, "MIGA"=>MiGA::MiGA.root_path }
vars = {
'PROJECT' => project.path,
'RUNTYPE' => runopts(:type),
'CORES' => ppn,
'MIGA' => MiGA::MiGA.root_path
}
vars["DATASET"] = ds.name unless ds.nil?
log_dir = File.expand_path("daemon/#{job}", project.path)
Dir.mkdir(log_dir) unless Dir.exist? log_dir
task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}"
to_run = {ds: ds, job: job, task_name: task_name,
cmd: sprintf(runopts(:cmd),
# 1: script
MiGA::MiGA.script_path(job, miga:vars["MIGA"], project:project),
MiGA::MiGA.script_path(job, miga:vars['MIGA'], project:project),
# 2: vars
vars.keys.map { |k|
sprintf(runopts(:var), k, vars[k]) }.join(runopts(:varsep)),
vars.keys.map { |k| sprintf(runopts(:var), k, vars[k]) }.
join(runopts(:varsep)),
# 3: CPUs
ppn,
# 4: log file
@@ -279,7 +285,7 @@ def say(*opts)
##
# Terminates a daemon.
def terminate
say "Terminating daemon..."
say 'Terminating daemon...'
report_status
k = runopts(:kill)
@jobs_running.each do |i|
@@ -294,17 +300,17 @@ def terminate

def launch_job(job)
# Execute job
if runopts(:type) == "bash"
if runopts(:type) == 'bash'
# Local job
job[:pid] = spawn job[:cmd]
Process.detach job[:pid] unless [nil, "", 0].include?(job[:pid])
Process.detach job[:pid] unless [nil, '', 0].include?(job[:pid])
else
# Schedule cluster job
job[:pid] = `#{job[:cmd]}`.chomp
end

# Check if registered
if [nil, "", 0].include?(job[:pid])
if [nil, '', 0].include? job[:pid]
job[:pid] = nil
@jobs_to_run << job
say "Unsuccessful #{job[:task_name]}, rescheduling."
@@ -313,5 +319,4 @@ def launch_job(job)
say "Spawned pid:#{job[:pid]} for #{job[:task_name]}."
end
end

end

0 comments on commit f923499

Please sign in to comment.