Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
executable file 3931 lines (3438 sloc) 118 KB
#!/usr/bin/perl
# grun - lightweight jobs queueing system
# Copyright (C) 2011 Erik Aronesty
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
use strict;
use Carp qw(carp croak confess cluck);
use Getopt::Long qw(GetOptions);
use Data::UUID;
use ZMQ::LibZMQ3;
use ZMQ::Constants ':all';
use JSON::XS;
use Time::HiRes;
use BSD::Resource;
use IO::File;
use POSIX qw(:sys_wait_h strftime);
use Socket qw(IPPROTO_TCP TCP_NODELAY TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT);
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Safe;
use Cwd qw(abs_path cwd);
use List::Util qw(min max);
use File::Basename qw(dirname);
sub pretty_encode;
our ($REVISION) = (q$LastChangedRevision$ =~ /(\d+)/);
our $VERSION = "0.9.$REVISION"; # 0.10 wil be feature lock, after sql impl. 0.9.X is zmq & json::xs
my $STATUS_NEVERRUN=199;
my $STATUS_ORPHAN=-8;
my $STATUS_UNKNOWN=-9;
my $STATUS_EXECERR=-10;
my $PPID=$$;
my $WIN32 = ($^O =~ /Win32/);
my $TIMEFMT = 'command:%C\ncpu-real:%E\ncpu-user:%U\ncpu-sys:%S\nmem-max:%M\nmem-avg:%t\nctx-wait:%w\nfs-in:%I\nfs-out:%O';
my ($daemon, $killjob, $editjob);
my (%conf, %def, @metrics, @labels);
# defaults just run things locally, no master
$def{config} = "/etc/grun.conf"; # config file
$def{spool} = "/var/spool/grun"; # dir to place jobs
$def{port} = 5184; # listen/connect port
$def{bind} = '0.0.0.0'; # listen addr
$def{env} = ['PATH']; # list of environment vars to copy from submit through to exec
$def{default_memory} = 1000 * 1000; # default job memory
$def{default_priority} = 20; # default job priority (20 = always run)
$def{ping_secs} = 30; # how often to tell about load/mem/stats
$def{remove_secs} = '$ping_secs * 1000'; # don't even try kickstarting if the node is this old
$def{idle_load} = .3; # how often to tell about load/mem/stats
$def{retry_secs} = 10; # how often to retry notifications
$def{bench_secs} = 86400; # how often to re-benchmark
$def{max_buf} = 1000000; # how often to retry notifications
$def{expire_secs} = 14400; # remove jobs whose execution nodes haven't reported back in this amount of time
$def{io_keep} = 3600; # keep io for this long after a job with i/o is finished in a detached session
#$def{hard_factor} = 1.5; # hard limit factor
$def{max_sched} = 50; # how many different jobs to try and match before giving up on the rest (queue busy)
$def{spread_pct} = 5; # how often to "distribute jobs", versus "clump" them
$def{master} = 'localhost:5184'; # central scheduler
$def{services} = "queue exec"; # all can run
$def{pid_file} = "/var/run/grun.pid"; # pid file
$def{log_file} = "/var/log/grun.log"; # pid file
$def{hostname} = $ENV{HOSTNAME} ? $ENV{HOSTNAME} : $ENV{COMPUTERNAME} ? $ENV{COMPUTERNAME} : `hostname`;
$def{log_types} = "note error warn"; # log all
$def{nfs_sync} = 1; # enable nfs sync support
chomp $def{hostname};
sub debugging;
my $GRUN_PATH=abs_path($0);
my ($qinfo, $help, $config, $ver);
Getopt::Long::Configure qw(require_order no_ignore_case passthrough);
my $context = zmq_init();
my @ORIG_ARGV= @ARGV;
GetOptions("daemon"=>\$daemon, "CONF:s"=>\$config, "trace"=>\$def{trace}, "query"=>\$qinfo, "V"=>\$ver, "help"=>\$help) ||
die usage();
(print "grun $VERSION\n") && exit(0) if $ver;
my @send_files; # files to send
my $safe = new Safe;
$def{config} = $config if $config;
init();
my $stream_quit = 0;
if ($ARGV[0] eq '-X') {
do_stream();
exit(0);
}
if ($ARGV[0] eq '-Y') {
do_execute();
exit(0);
}
if ($ARGV[0] eq '-?') {
shift @ARGV;
$help = 1;
}
$help = 1 if defined $config && !$config;
if ($help) {
print usage();
exit 0;
}
if (!$daemon) {
# -k <id> works as long as -d wasn't specified
GetOptions("kill"=>\$killjob, "trace"=>\$def{trace}, "edit|e"=>\$editjob) ||
die usage();
}
if ($conf{debug_memory}) {
eval {require Devel::Gladiator;};
die $@ if $@;
}
my $gjobid = slurp("$conf{spool}/nextid");
my $log_to_stderr = 0;
if ($qinfo) {
# this is the code for grun -q
Getopt::Long::Configure qw(no_require_order no_ignore_case passthrough);
my %opt;
GetOptions(\%opt, "silent", "inplace", "hosts=s", "debug") || die usage();
my $cmd = shift @ARGV;
die usage() if !$cmd;
$log_to_stderr = 1 if $opt{debug};
my @arg = @ARGV;
$cmd =~ s/^-//;
my $tmp = substr bestunique($cmd, qw(config status jobs file history wait memory)), 0, 4;
if (!$tmp) {
die "Command $cmd is not available, for help type grun -query -?\n";
}
$cmd = $tmp;
# some commands default to localhost, others default to queue host... this is confusing... fix?
my @dest = $opt{hosts} ? expandnodes($opt{hosts}) :
$cmd eq 'conf' ? [$conf{bind}, $conf{port}] :
[$conf{master},$conf{master_port}];
if ($cmd eq 'file' && @dest > 1) {
die "Command $cmd cannot be run on multiple hosts";
}
my $ok=0;
for my $d (@dest) {
my ($host, $port) = @$d;
if ($cmd eq 'wait') {
my $st = 0;
for (@arg) {
my ($res) = waitmsg($host, $port, "jwait", $_);
if ($res && defined $res->{status}) {
print "Job $_ status $res->{status}\n";
$st = $res->{status} if $res->{status};
} else {
print "Job $_ status $STATUS_UNKNOWN\n";
$st = $STATUS_UNKNOWN;
}
}
exit $st;
} elsif ($cmd eq 'file') {
my $cwd = cwd;
my @need;
for (@arg) {
next if -e $_;
if ($_ !~ /^\//) {
$_ = "$cwd/$_";
}
push @need, $_;
}
die "not supported yet\n";
# if (@need) {
# my ($res, $error) = waitio({inplace=>$opt{inplace}}, $host, $port, "xcmd", 'file', @need);
# die $error, "\n" if $error && !$opt{silent};
# exit 1 if $error;
# }
} elsif ((!samehost($host,$conf{hostname}) || (!$ENV{_GRUN} && (!$conf{services}->{queue} || $cmd !~ /^stat|jobs|hist$/)))) {
# this could get ugly, if called a lot, may want to make more efficient
warn ("waitmsg($host, $port, 'xcmd', $cmd, @arg, @{[%opt]})\n") if $opt{debug};
my ($ret) = waitmsg($host, $port, "xcmd", $cmd, @arg, %opt);
print $ret;
$ok=1 if $ret;
} else {
warn ("Using local queue status, host $host is $conf{bind}/$conf{hostip}, name is $conf{hostname} \n") if $opt{debug};
my $ret;
if ($cmd eq 'stat') {
$ret = shownodes(@arg);
} elsif ($cmd eq 'jobs') {
$ret = showjobs(@arg);
} elsif ($cmd eq 'hist') {
$ret = showhist(@arg);
}
print $ret;
$ok=1 if $ret;
}
}
exit($ok ? 0 : 1);
}
my $gpid; # daemon pid
if (open(IN, $conf{pid_file})) {
$gpid = <IN>;
close IN;
}
if ($killjob) {
# grun -k code
my $sig = 15;
my $kforce = 0;
Getopt::Long::Configure qw(no_require_order no_ignore_case);
GetOptions("signal|n=i"=>\$sig, "force"=>\$kforce) || die usage();
my $exit = 0;
for my $job (@ARGV) {
my @id;
if ($job !~ /^\d/) {
@id=(guid=>"$job");
} else {
@id=(jid=>$job);
}
my $err = kill_job(@id, sig=>$sig, force=>$kforce);
if (!defined($err) && $@) {
warn $@,"\n";
$exit=-1;
} else {
my $ok = ($err =~ /^Job.*(aborted|kill requested)/);
$err =~ s/\n$//;
warn "$err\n" if $ok;
$err = 'No remote response to jkill' if !$ok && !$err;
warn "Error: $err\n" if !$ok;
$exit=-1 if !$ok;
}
}
exit 0;
}
if ($editjob) {
my %ed;
while (@ARGV) {
$_=$ARGV[0];
for (split /,/, $_) {
my ($key, $val) = $_ =~ /^([^=]+)(?:=(.*))?$/;
my $nk = bestunique($key, qw(hold resume memory cpus state hosts), @metrics, @labels);
$key = $nk if $nk;
$key = 'state', $val = 'hold' if ($key eq 'hold');
$key = 'state', $val = 'resume' if ($key eq 'resume');
if ($key eq 'state') {
$val = substr bestunique($val, qw(hold resume)), 0, 4;
die "' must be one of: h(old) r(esume)\n" unless $val;
}
$ed{$key}=$val;
}
shift;
last unless $ARGV[0] =~ /=/;
}
my @jids = @ARGV;
die usage() if !%ed || !@jids;
my $ex = 0;
for my $jid (@jids) {
warn "Edit " . packdump(\%ed) . "\n";
my ($err) = waitmsg($conf{master}, $conf{master_port}, 'jedit', $jid, %ed);
my $ok = ($err =~ /^Job.*edited/);
$err=~ s/\n$//;
warn "$err\n" if $ok;
$err = 'No remote response to jedit' if !$ok && !$err;
warn "Error: $err\n" if !$ok;
$ex = 1 if !$ok;
}
exit $ex;
}
my ($router, $read_set, $write_set, $quit, %pid_jobs, %j_wait, %io_wait, %start_wait); # daemon globals
my %ZMQS; # hash of open sockets
my %nodes; # hash of registered nodes
if ($daemon) {
startdaemon();
} else {
grun_client();
}
####################
# client mode
my $make;
my %sync_after;
my %sync_before;
my %sync_already;
sub grun_client {
my %jobs;
my %opt;
$opt{wait} = 1; # keep socket open until job is finished
$opt{io} = 1; # copy io back on the socket, implies wait
Getopt::Long::Configure qw(require_order no_ignore_case passthrough);
GetOptions(\%opt, "file=s", "int|I", "memory|m=i", "hosts|h=s", "cpus|c=f", "io!", "wait!", "err_a|err-append|ea=s", "err|e=s", "out|o=s", "out_a|out-append|oa=s", "ouer|out-err|oe=s", "ouer_a|out-err-append|oea=s", "jobx|jobid|j=s", "verbose", "make|M", "debug|D", "env|E=s", "alert=s", "param|p=s@", "wait-exists|W", "priority|r=i");
if ((!!$opt{out} + !!$opt{out_a} + !!$opt{ouer} + !!$opt{ouer_a})>1) {
$config=undef;
die "ERROR: Specify only one of --out, --out-append, --out-err or --out-err-append\n\n" . usage()
}
if ((!!$opt{err} + !!$opt{err_a} + !!$opt{ouer} + !!$opt{ouer_a})>1) {
$config=undef;
die "ERROR: Specify only one of --err, --err-append, --out-err or --out-err-append\n\n" . usage()
}
if (my $t=$opt{out_a}?$opt{out_a}:$opt{ouer_a}) {
$opt{out}=$t;
$opt{out_a}=1;
delete $opt{ouer_a};
}
if (my $t=$opt{err_a}?$opt{err_a}:$opt{ouer_a}) {
$opt{err}=$t;
$opt{err_a}=1;
delete $opt{ouer_a};
}
if ($opt{ouer}) {
$opt{out}=$opt{err}=$opt{ouer}; delete $opt{ouer};
}
my $verbose = $opt{verbose}; delete $opt{verbose};
my $env = $opt{env}; delete $opt{env};
$make = $opt{make}; delete $opt{make};
$log_to_stderr = 1 if $opt{debug};
if ($< == 0) {
die "Won't run a job as root\n";
}
if ($opt{err} && $opt{out}) {
$opt{io} = 0;
}
while ($ARGV[0] =~ /^--([\w-]+)=([\w=]+)/) {
# allow arbitrary job options, that jan later be referred to in match expressions
# or in execution wrappers, etc
$opt{$1} = $2;
shift @ARGV;
}
if ($make || $verbose) {
GetOptions(\%opt, "noexec");
}
my @cmd = @ARGV;
if ($opt{file}) {
# read options from a file
funpack($opt{file}, \%opt);
if ($opt{cmd}) {
if (@ARGV) {
die "Can't supply cmd: in the file and '@cmd' on the command line\n";
}
if ($opt{cmd} !~ /^[\w0-9:\/\t -]+$/) {
# not simple: let bash handle it
@cmd = ('bash', '-c', $opt{cmd});
} else {
# simple: split, pass as is to exec
$opt{cmd} =~ s/^\s+//;
$opt{cmd} =~ s/\s+$//;
@cmd = split /\s+/, $opt{cmd};
}
}
}
# force exec in "same as current dir"
$opt{cwd} = cwd;
if (!$opt{cwd}) {
die "Can't get current working directory, not executing unanchored remote command.\n";
}
if ($make) {
# %i:input %o:output
my (@i, @o);
for (@cmd) {
my @t = m/[#%]([io]):(\S+)/g;
if (@t) {
for (my $i=0;$i<@t;++$i) {
push @o, $t[$i+1] if $t[$i] eq 'o';
push @i, $t[$i+1] if $t[$i] eq 'i';
}
s/%[io]:(\S+)/$1/g;
s/#[io]:\S+//g;
}
my @t = m/([<>])\s*(\S+)/g;
if (@t) {
for (my $i=0;$i<@t;$i+=2) {
push @i, $t[$i+1] if $t[$i] eq '<' && $t[$i+1]=~/^\s*\w/;
push @o, $t[$i+1] if $t[$i] eq '>' && $t[$i+1]=~/^\s*\w/;
}
}
s/\%\!\>(>?\s*\S+)/>$1/g;
}
die "Unable to determine i/o for -M (make) semantics\n\n" . usage()
if !@i || !@o;
my $need=0;
for my $i (@i) {
syncfile($i);
add_syncfile_before($i);
for my $o (@o) {
syncfile($o);
if (! (-s $o) || (fmodtime($i) > fmodtime($o))) {
warn "# need $o\n" if $opt{noexec} || $verbose;
$need=1;
}
}
}
if (!$need) {
warn "Skipping: @cmd\n";
exit 0;
}
for (@o) {
add_syncfile_after($_);
}
warn "+@cmd\n" if $opt{noexec} || $verbose;
} else {
for (@cmd) {
if (m{([^\s,:]+)/([^\s,:]+)}) {
add_syncfile_after($_);
add_syncfile_before($_);
}
}
add_syncdir_after($opt{cwd});
add_syncdir_before($opt{cwd});
}
if ($ARGV[0] =~ /^-/) {
die "Unknown option $ARGV[0]\n";
}
#die pretty_encode \@cmd if $opt{debug};
if (!@cmd) {
die usage();
}
if ($cmd[$#cmd] =~ /\&$/) {
# TODO: grun should wait for all kids, and disallow detaching, not just get rude here
die "Not running background-only job. You might mean: grun \"command\" &.\n";
}
if ($conf{auto_profile}) {
if (-e ($conf{auto_profile})) {
my $cmd = join ' ', @cmd;
# safe'ish eval, just so there aren't weird side effects
my ($cpu, $mem, $prof) = evalctx(slurp($conf{auto_profile}) . ";\nreturn (\$cpu, \$mem, \%prof);", cmd=>$cmd, cmd=>\@cmd);
$prof = $cpu if ref($cpu) eq 'HASH';
# alias names
$prof->{cpus}=$prof->{cpu} if !$prof->{cpus} && $prof->{cpu};
$prof->{memory}=$prof->{mem} if !$prof->{memory} && $prof->{mem};
if ($prof && ref($prof) eq 'HASH') {
$prof->{memory} = $mem if defined($mem) && !ref($mem) && !$prof->{memory};
$prof->{cpus} = $cpu if defined($cpu) && !ref($cpu) && !$prof->{cpus};
$opt{memory}=$prof->{memory} if $prof->{memory} && !$opt{memory};
$opt{cpus}=$prof->{cpus} if $prof->{cpus} && !$opt{cpus};
$opt{hosts}=$prof->{hosts} if $prof->{hosts} && !$opt{hosts};
$opt{priority}=$prof->{priority} if $prof->{priority} && !$opt{priority};
for ((@metrics,@labels)) {
# as if the user entered it
next if !$_;
push @{$opt{"param"}}, "$_=" . $prof->{$_} if defined($prof->{$_});
}
} else {
$opt{memory}=$mem if ($mem > $opt{memory});
$opt{cpus}=$cpu if ($cpu > $opt{cpus});
}
if ($@) {
die "Can't run $conf{auto_profile}: $@\n";
}
} else {
die "Can't find $conf{auto_profile}: $!\n";
}
}
my %param;
if ($opt{param}) {
for (@{$opt{param}}) {
if (/^([^=]+)=(.*)/) {
$param{$1} = $2;
} else {
die "Parameter $_: should be name=value\n";
}
}
}
$opt{param} = \%param;
$opt{priority} = $ENV{GRUN_PRIORITY}+0 if !$opt{priority} && $ENV{GRUN_PRIORITY} >= 1;
$opt{priority} = $conf{default_priority} if !$opt{priority};
# convert memory to kB
if ($opt{memory}) {
if ($opt{memory} =~ /kb?$/i) {
# internal memory unit is kB
} elsif ($opt{memory} =~ /gb?$/i) {
# convert gB to kB
$opt{memory} *= 1000000;
} else {
# convert mB to kB
$opt{memory} *= 1000;
}
} else {
$opt{memory} = $conf{default_memory};
}
# no socket io unless waiting
if (!$opt{wait}) {
delete $opt{io};
}
# copy env
if ($env eq '*' || ($conf{env} && ($conf{env}->[0] eq '*')) ) {
for (keys %ENV) {
if (! /^_|LS_COLORS/) {
$opt{env}->{$_} = $ENV{$_};
}
}
} else {
for (@{$conf{env}}) {
$opt{env}->{$_} = $ENV{$_} if defined $ENV{$_};
}
for (split /\s+/, $env) {
$opt{env}->{$_} = $ENV{$_} if defined $ENV{$_};
}
}
$opt{user} = getpwuid($>);
$opt{group}=$);
$opt{umask} = umask();
$opt{env}->{USER} = $opt{user};
if (!$opt{wait}) {
open STDERR, ">&STDOUT";
}
$opt{memory} = $opt{memory} ? $opt{memory} : $conf{default_memory};
$opt{cpus} = $opt{cpus} ? $opt{cpus} : 1;
for (@metrics) {
if ($conf{"default_$_"}) {
$opt{$_} = $opt{$_} ? $opt{$_} : $conf{"default_$_"};
}
}
if ($verbose) {
printf STDERR "Memory: %d\n", $opt{memory};
printf STDERR "CPUs: %d\n", $opt{cpus};
printf STDERR "Hosts: %s\n", $opt{hosts} if $opt{hosts};
for ((@metrics,@labels)) {
printf STDERR proper($_) . ": %s\n", $opt{param}->{$_} ? $opt{param}->{$_} : 1;
}
}
if ($opt{jobx}) {
if ($opt{jobx} =~ /^\d/) {
die "External job id's should start with a non-numeric\n";
}
}
my %info;
sub client_sigh {
my $signame = shift;
$SIG{INT} = undef;
$SIG{TERM} = undef;
$SIG{PIPE} = undef;
if ($info{jid}||$info{guid}) {
if (!($signame eq 'PIPE')) {
print STDERR "Aborting command, sending jkill for $info{jid}\n";
}
my $code = $signame eq 'INT' ? 2 : $signame eq 'PIPE' ? 13 : 15;
my $err = kill_job(jid=>$info{jid}, guid=>$opt{guid}, sig=>$code, termio=>1);
if (!($signame eq 'PIPE')) {
if (!defined($err) && $@) {
warn $@,"\n";
}
}
exit 128+$code;
} else {
die "Interrupted before job sent\n";
}
};
# for testing -M make
exit 0 if $opt{noexec};
$SIG{INT} = \&client_sigh;
$SIG{TERM} = \&client_sigh;
$SIG{PIPE} = \&client_sigh;
$opt{cmd} = \@cmd;
$opt{hard_factor} = $conf{hard_factor};
$opt{frompid} = $$;
$opt{guid} = $opt{jobx} ? $opt{jobx} : create_guid();
$opt{syncdirs} = [keys(%sync_before)];
%info = waitmsg($conf{master}, $conf{master_port}, 'run', \%opt);
if (!%info) {
die ($@ ? $@ : "No response to 'run'") . "\n";
}
if ($info{error}) {
print STDERR $info{error}, "\n";
exit -1;
}
if (!$info{jid}) {
print STDERR "Failed to submit job", "\n";
exit -1;
}
my $save_jid = $info{jid};
$0 = "GRUN:$save_jid";
if ($verbose) {
printf STDERR "Job_ID: $info{jid}\n";
}
if ($info{already}) {
if ($opt{"wait-exists"}) {
my ($res) = waitmsg($conf{master},$conf{master_port}, "jwait", $info{jid});
my $st;
if ($res && defined $res->{status}) {
print "Job $_ status $res->{status}\n";
$st = $res->{status} if $res->{status};
} else {
print "Job $_ status $STATUS_UNKNOWN\n";
$st = $STATUS_UNKNOWN;
}
exit $st;
} else {
print STDOUT "Job_ID: $info{jid} \n" if !$verbose;
printf STDERR "Already running job named $opt{jobx}, try grun -q wait JOB\n";
exit -1;
}
}
if ($opt{wait}) {
# wait for a job ip
while (!defined($info{ip})) {
my %tmp = waitmsg_retry($conf{retry_secs}*1000, $conf{master}, $conf{master_port}, 'jinfo', $info{jid});
if ($tmp{error}) {
print STDERR $tmp{error}, "\n";
exit $tmp{status} != 0 ? $tmp{status} : -1;
}
if (!defined($tmp{ip})) {
xlog("error","Had to retry after a retry... BUG in job $save_jid\n");
sleep(5);
} else {
%info=%tmp;
}
}
my $host = $info{hostname} ? $info{hostname} : $info{ip}; # support old ver which didn't have hostname
if ($verbose) {
printf STDERR "Host: %s\n", $host;
}
# look at all the work you can avoid if you don't wait
my ($stat, $err, $diderr);
# connect to executing node directly and ask for stderr, stdout, and status based on job id
if (defined $info{status}) {
$stat=$info{status};
$err=$info{error};
}
while (!defined $stat && !defined $err) {
# shadow watcher....
if ($info{ip}=~/^\d/) {
if ($opt{io} && !($opt{err} && $opt{out}) ) {
($stat, $err, $diderr) = waitio($info{ip}, $info{port}, $info{jid}, \%opt);
} else {
my ($key, $dat) = waitmsg_retry($conf{retry_secs}*1000, $info{ip}, $info{port}, 'xstat', $info{jid});
$stat=$dat->{status};
$err=$dat->{error};
}
if ($stat == 65280 && !$err) {
print STDERR "Error: [$host] Command returned -1\n";
}
} else {
$stat=$STATUS_UNKNOWN;
$err="Error in grun protocol (ip=$info{ip}), unknown status!\n";
}
sleep 5 if (!defined $stat && !defined $err);
}
# send this command into the ether...if exec doesn't get it, clean up after timeout
if ($opt{io}||$opt{wait}) {
if ($info{ip}=~/^\d/) {
sendcmd($info{ip}, $info{port}, 'xclean', $info{jid});
}
}
if ($stat == 11) {
$err = 'Segmentation fault';
}
if ($stat > 127) {
# shift... but if that doesn't get you anything, set to -1 (unknown error)
$stat = $stat>>8;
$stat = -1 if !$stat;
}
syncafter();
if ($err) {
print STDERR "[$host] $err", "\n";
$stat = -1 if !$stat; # unknown error if undefined
} else {
if ($stat != 0 && ! $diderr) {
print STDERR "Error: [$host] Command returned $stat\n";
}
}
unlink("$ENV{HOME}/.grun/jobx/$opt{jobx}") if $opt{jobx};
exit($stat);
} else {
# aah... nicer
print STDOUT "Job_ID: $info{jid} \n" if !$verbose;
}
}
### nfs sync support
sub syncdirs {
for (@_) {
syncdir($_);
}
}
sub syncafter {
%sync_already = ();
return unless $conf{nfs_sync};
syncdirs(keys(%sync_after));
}
sub add_syncdir_after {
my ($d) = @_;
return unless $conf{nfs_sync};
$sync_after{abs_path($d)}=1;
}
sub add_syncdir_before {
my ($d) = @_;
return unless $conf{nfs_sync};
$sync_before{abs_path($d)}=1;
}
sub add_syncfile_before {
my ($f) = @_;
return unless $conf{nfs_sync};
add_syncdir_before(-d $f ? $f : dirname($f));
}
sub add_syncfile_after {
my ($f) = @_;
return unless $conf{nfs_sync};
add_syncdir_after(-d $f ? $f : dirname($f));
}
sub syncfile {
my ($f) = @_;
return unless $conf{nfs_sync};
syncdir(-d $f ? $f : dirname($f));
}
# this refreshes the lookupcasche, which is an issue when running scripts back to back on multiple nodes using NFS
sub syncdir {
my ($d) = @_;
my $tmp;
return if $sync_already{$d};
opendir($tmp,$d);
closedir($tmp);
}
sub readsocks {
my $did;
# identity & data received
my $cnt;
if (!$router) {
xlog("debug", "Readsocks called with no router: " . Carp::longmess() );
return;
}
while (my $id= zmq_recvmsg($router,ZMQ_NOBLOCK)) {
++$cnt;
$did=1;
$id = zmq_msg_data($id);
if (zmq_getsockopt($router,ZMQ_RCVMORE)) {
# print "getting sep\n";
my $sep = zmq_recvmsg($router);
}
if (zmq_getsockopt($router,ZMQ_RCVMORE)) {
# print "getting data\n";
my $msg = zmq_recvmsg($router);
my $data = zmq_msg_data($msg);
my @resp = process_message($data, $id);
if (@resp) {
print("Got resp: @resp\n");
replymsg($id, @resp);
} else {
# warn("No resp for $data\n");
}
}
while (zmq_getsockopt($router,ZMQ_RCVMORE)) {
print("Discarding excess multipart data!\n");
}
if ($cnt > 100000) {
xlog("error", "Getting flooded with messages");
last;
}
}
return $did;
}
sub jhiststat {
my ($jid) = @_;
my $jhistfile=jhistpath($jid);
if (-e $jhistfile) {
# job is toast... but maybe some streams are waiting
my $job=unpack_file($jhistfile);
delete $job->{env};
$job->{host}="n/a" if !$job->{host} && !defined $job->{status};
$job->{ip}=host2ip($job->{host}) if !$job->{ip} && ! defined $job->{status};
# needs ip!
$job->{ip}="n/a" if defined $job->{status} && ! $job->{ip};
$job->{hostname}=$job->{host};
return $job;
}
return undef;
}
sub replymsg {
my ($id, @resp) = @_;
if (debugging) {
my $hid=unpack("h*",$id);
xlog("debug", "Reply ($hid) " . packdump(\@resp) . "\n") if $conf{trace};
}
zmq_send($router, $id, length($id), ZMQ_SNDMORE);
zmq_send($router, "", 0, ZMQ_SNDMORE);
zmq_send($router, packref(\@resp));
}
sub selfcmd {
my $cmd = packcmd(@_);
process_message($cmd, undef);
}
my $debugzid;
sub process_message {
my ($src_data, $zid) = @_;
my ($ip, $trace, $cmd, @args) = unpackcmd($src_data);
$trace=$conf{trace} if !$trace;
if (debugging|$trace) {
my $hid=defined($zid) ? unpack("h*",$zid) : "";
xlog($trace ? "trace" : "debug", "Received command ($hid) '$cmd' : $src_data\n") if $trace;
}
return ('error'=>$@) if ($@);
if ($cmd eq 'xcmd') {
# these commands 'query or interfere' with normal running of the server
# they are initiated by a user
# they are limped together like this for basically no reason
if ($args[0] eq 'relo') {
# reread config... maybe rebind stuff too
xlog("note", "Reload from remote command (ARGS: @ORIG_ARGV)");
eval{init();};
if ($@) {
return "Error: $conf{config}, $@";
} else {
return "Ok, reloaded from $conf{config}";
}
} elsif ($args[0] eq 'term') {
xlog("note", "Shutdown from remote command");
$quit = 1;
return 'Ok, shutdown initiated';
} elsif ($args[0] eq 'rest') {
xlog("note", "Restarting from remote command ($GRUN_PATH @ORIG_ARGV)");
$quit = 1;
zmq_unbind($router, "tcp://$conf{bind}:$conf{port}");
if (!fork) {
zmq_fork_undef();
exec($GRUN_PATH, @ORIG_ARGV);
}
return "Ok, restart initiated";
} elsif ($args[0] eq 'stat') {
shift @args;
return shownodes(@args);
} elsif ($args[0] eq 'hist') {
shift @args;
# if (@args && (@args > 1 || $args[0] !~ /^\d+$/)) {
# fork... do it in parallel
# forkandgo($zid, \&showhist, @args);
# return();
# } else {
# inline... do it now, less expensive than fork!
return showhist(@args);
# }
} elsif ($args[0] eq 'conf') {
return showconf();
} elsif ($args[0] eq 'memo') {
return showmem();
} elsif ($args[0] eq 'jobs') {
shift @args;
return showjobs(@args);
# } elsif ($args[0] eq 'file') {
# shift @args;
# xlog("note", "Sending file [@args] to remote");
# return ($SOCK_FILE, @args);
} else {
return "Error: unknown xcmd '$args[0]'";
}
} elsif ($cmd eq 'frep') {
# warn("router is $router, zid is $debugzid\n");
my ($dat) = @args;
if ($dat->{zid}) {
xlog("debug", "Sending response to $dat->{zid}");
replymsg(pack("h*",$dat->{zid}),$dat->{out},$dat->{more});
#replymsg($debugzid,$dat->{out});
}
} elsif ($cmd eq 'node') {
# this is the 'node ping'
if (ref($args[0]) eq 'HASH') { # bit of validation
my $node = $args[0];
$node->{ip}=$ip unless $node->{ip};
$ip=$node->{ip};
if ($ip) {
my $file = "$conf{spool}/nodes/$ip.reg";
open(F, ">$file") || return("Error: can't create $file : $!");
print F packfile($node);
close F;
# also stick in memory
if (!$nodes{$ip}) {
xlog("note", "Registering node $ip:$node->{port} $node->{hostname}");
}
$node->{ping} = time();
$node->{ex_ping} = $nodes{$ip}->{ping};
$node->{zid} = $zid;
$nodes{$ip} = $node;
} else {
xlog("note", "Can't register node with no ip");
}
# save execution node ping time for diagnostic (times out of sync or very slow transmission, etc)
} else {
return "Error: invalid node registration info";
}
return ();
} elsif ($cmd eq 'xedit') {
my ($jid, $ed) = @args;
if ($ed->{state} =~ /hold|resu/) {
my $job = unpack_file("$conf{spool}/jpids/$jid");
if ($job && $job->{pid}) {
my ($sig, $stat);
if ($ed->{state} eq 'hold') {
$sig=-19; $stat="susp";
} else {
$sig=-18; $stat="";
}
kill($sig, $job->{pid});
sendcmd($conf{master},$conf{master_port}, 'jedit', $job->{id}, state=>$stat);
}
}
} elsif ($cmd eq 'jedit') {
my ($jid, %ed) = @args;
my $fname;
if (!$jid || !%ed) {
xlog("error", "Invalid edit request (@args)");
return "Invalid edit request (@args)\n";
} elsif (! -e "$conf{spool}/jobs/$jid") {
if (! -e "$conf{spool}/jobs/$jid.ip") {
xlog("error", "Job $jid not found during jedit");
return "Job $jid not found during jedit";
} else {
my $job_ip = slurp("$conf{spool}/jobs/$jid.ip");
# send an 'xedit' to the running node
# needed only for certain edits....think about this!
$fname = "$conf{spool}/jobs/$jid:$job_ip.run";
if ($ed{state} =~ /hold|resu/) {
if ($nodes{$job_ip}) {
sendcmd_nowait($job_ip, $nodes{$job_ip}->{port}, 'xedit', $jid, \%ed);
}
}
# for now we allow edit-in-place.... for no effect in some cases, but not others
}
} else {
$fname = "$conf{spool}/jobs/$jid";
}
# assert($fname)
my $ref = unpack_file($fname);
for my $key (keys(%ed)) {
$ref->{$key} = $ed{$key};
}
burp("$conf{spool}/jobs/$jid.jedit", packfile($ref));
rename("$conf{spool}/jobs/$jid.jedit", $fname);
return "Job $jid edited\n";
} elsif ($cmd eq 'jwait') {
my ($job) = @args;
# user can specify guid or jid
my $jid = jid_from_opts($job=~/^\d/?{jid=>$job}:{guid=>$job});
if (!$jid) {
return {error=>"Job $job not found"};
}
if ( -e "$conf{spool}/jobs/$jid" || -e "$conf{spool}/jobs/$jid.ip" ) {
$j_wait{$jid}->{$zid}=time();
return ();
} else {
my $jhistfile=jhistpath($jid);
if (-e $jhistfile) {
# repeated acks are ok
my $job=unpack_file($jhistfile);
if ($job) {
return $job;
} else {
return {error=>"Invalid job history $jhistfile"};
}
} else {
return {error=>"Job not running, and has no history"};
}
}
} elsif ($cmd eq 'jkill') {
# this is the 'job kill command'
my ($job) = @args;
# user can specify guid or jid
my $jid = jid_from_opts($job);
if (!$jid) {
if ($job->{guid}) {
return "Job $job->{guid} not found\n";
} else {
return "No job specified for jkill\n";
}
}
if (! -e "$conf{spool}/jobs/$jid") {
if (! -e "$conf{spool}/jobs/$jid.ip") {
xlog("error", "Job $jid not running or queued, but kill requested");
return "Job $jid not running or queued";
} else {
# send xabort to correct node
my $job_ip = slurp("$conf{spool}/jobs/$jid.ip");
if (!$job_ip) {
xlog("error", "Job $jid empty ip!");
my $job = unpack_file("$conf{spool}/jobs/$jid");
$job->{error}="Unknown state";
archive_job($jid, $job, $STATUS_UNKNOWN, undef, undef);
return "Job $jid, unknown state";
} else {
if ($job->{force}) {
xlog("error", "Job $jid forced kill");
my $job = unpack_file("$conf{spool}/jobs/$jid:$job_ip.run");
if ($nodes{$job_ip}) {
sendcmd_nowait($job_ip, $nodes{$job_ip}->{port}, 'xabort', $jid, 9);
}
$job->{error}="Forced kill";
archive_job($jid, $job, $STATUS_UNKNOWN, undef, undef);
return "Job $jid forced kill\n";
} else {
if ($nodes{$job_ip}) {
return "Forward $jid $job_ip:" . $nodes{$job_ip}->{port};
} else {
return "Job $jid, node $job_ip not online, can't kill\n";
}
}
}
}
} else {
my $job = unpack_file("$conf{spool}/jobs/$jid");
$job->{error}="Killed";
archive_job($jid, $job, $STATUS_NEVERRUN, undef, undef);
return "Job $jid aborted";
}
} elsif ($cmd eq 'jstat') {
# sent by the execution node to say what the status of a job is
my %opts = %{$args[0]};
if (my $jid = $opts{id}) {
my $should = slurp("$conf{spool}/jobs/$opts{id}.ip");
if (! -e "$conf{spool}/jobs/$opts{id}:$ip.run") {
if (!$should) {
# this could be a repeat, and that's ok
xlog("debug", "Probably already got 'jstat' for $jid");
} else {
if ($opts{pid}<0) {
xlog("error", "Orphaned job report $jid, from $ip for $should, status: $opts{status}");
$ip=$should if $opts{status} == $STATUS_ORPHAN;
} else {
if (!($ip eq $should)) {
xlog("error", "Got a report ($opts{status},$opts{jrun}) for job $jid from $ip, should be $should");
} else {
xlog("error", "Got a report ($opts{status},$opts{jrun}) for job $jid, but there's no $jid:$ip.run file");
}
}
}
}
if ($opts{jrun}) {
# just a ping
xlog("trace", "Still alive $jid from $ip") if $trace;
touch("$conf{spool}/jobs/$jid:$ip.run")
if -e "$conf{spool}/jobs/$jid:$ip.run";
if ( ! -e "$conf{spool}/jobs/$jid.ok" ) {
# jexok didn't come through, but should have
xlog("error", "Writing $jid.ok, from jrun signal, may need to restart exec node");
burp("$conf{spool}/jobs/$jid.ok","jrun");
}
return(); # no dacks for jrun
} elsif (defined $opts{status}) {
# job is done
if ( -e "$conf{spool}/jobs/$jid:$ip.run" ) {
my $job = unpack_file("$conf{spool}/jobs/$jid:$ip.run");
if ($job) {
if (my $n=$nodes{$ip}) {
xlog("debug", "Clearing cpu: $job->{cpus}, memory: $job->{memory} from $ip");
$n->{a_cpus} -= $job->{cpus}; # deallocate
$n->{a_memory} -= $job->{memory}; # deallocate
for (@metrics) {
$n->{"a_$_"} -= $job->{param}->{$_};
}
# TODO: probably would be nice, in a jstat, to include metrics so all these aren't guessing
$n->{avail} += $job->{cpus}; # return to avail, until %node is updated
$n->{mem} += $job->{memory}; # return mem
$n->{avail} = max($n->{avail},min($n->{cpus},$n->{orig_cpus}-$n->{load})); # better guess
} else {
xlog("error", "Got a report for $jid, from $ip, but there's no node like that");
}
archive_job($jid, $job, $opts{status}, $ip, \%opts);
} else {
xlog("debug", "Bad job file $conf{spool}/jobs/$jid:$ip.run");
}
} else {
my $jhistfile=jhistpath($jid);
if (-e $jhistfile) {
# repeated acks are ok
xlog("debug", "Got a duplicate report from $conf{master} for job $jid ($jhistfile)");
} else {
xlog("error", "Got a report for an unknown job $jid, from $ip, status: $opts{status}");
}
}
sendcmd_nowait($ip, $opts{replyport}?$opts{replyport}:$conf{port}, 'dack', jid=>$jid);
} else {
xlog("error", "Got a report for a job $jid with no status info ($src_data)");
}
# return ack even if not exists
}
} elsif ($cmd eq 'dack') {
# ackknowledge receipt of status signal, so you don't have to do it again
my %dack = @args;
xlog("debug", "Got dack for $dack{jid}") if $trace;
if ($dack{jid}) {
if (!$dack{jrun}) {
if (!$io_wait{$dack{jid}} || !$io_wait{$dack{jid}}->{streams}) {
exec_clean($dack{jid});
} else {
xlog("debug", "Still waiting on i/o: " . packdump($io_wait{$dack{jid}}) . ", will clean later") if $trace;
# ready to clean up
burp("$conf{spool}/jstat/$dack{jid}.dack",1);
}
}
}
} elsif ($cmd eq 'jexok') {
my ($jex) = @args;
my $jid = $jex->{id};
xlog("debug", "Writing $jid.ok") if $trace;
touch("$conf{spool}/jobs/$jid:$ip.run");
burp("$conf{spool}/jobs/$jid.ok",packfile($jex));
} elsif ($cmd eq 'xexec') {
my ($opts) = @args;
execute_job($opts);
} elsif ($cmd eq 'xclean') {
# cleanup iowait stuff.. i got everything
my ($jid) = @args;
xlog("debug", "Client is finished with $jid\n");
delete $io_wait{$jid};
delete $start_wait{$jid};
if (-e "$conf{spool}/jstat/$jid.dack") {
exec_clean($jid);
}
} elsif ($cmd eq 'xabort') {
# kill job
my ($jid, $sig, $termio) = @args;
if ($io_wait{$jid} && $io_wait{$jid}->{streams}) {
xlog("debug", "Alerting streams that $jid is dead\n");;
for(values %{$io_wait{$jid}->{streams}}) {
replymsg($_, "quit");
}
delete $io_wait{$jid}->{streams};
touch("$conf{spool}/jstat/$jid.dumped");
}
my $job = unpack_file("$conf{spool}/jpids/$jid");
xlog("debug", "Found job $jid with pid $job->{pid}\n");;
if ((my $pid = $job->{pid}) > 1) {
$sig = 2 if $sig !~ /^\d+$/; # force sig to 2 if not a number
# kill the whole shebang
my $ok = kill(-$sig, $pid);
xlog("note", "Kill ($sig) job $jid (pgrp:$pid), return: $ok");
if ($ok) {
# report status as "killed"
selfcmd("sstat", {id=>$jid,status=>127+$sig,error=>"Job $jid aborted",dumped=>1});
return "Job $jid aborted";
} else {
return "Job $jid kill $sig failed : $!";
}
if ($io_wait{$jid} && $io_wait{$jid}->{zid}) {
# tell waiters it's dead, and streams are dumped
replymsg($io_wait{$jid}->{zid},"$jid:stat",{status=>127+$sig, error=>"Job $jid aborted", dumped=>1});
}
} else {
return "Job $jid not found for xabort";
}
} elsif ($cmd eq 'xio') {
my ($jid) = @args;
my $ready = 0;
my $known = 0;
if ($io_wait{$jid} && $io_wait{$jid}->{streams}) {
# definitely not dumped
$ready = 1;
for(values %{$io_wait{$jid}->{streams}}) {
replymsg($_, "ready");
}
delete $io_wait{$jid}->{streams};
$known = 1;
}
# set wait flag, if i/o wasn't dumped
my $stat=getjobstathash($jid);
# streams are ready, or job isn't done, or io wasn't dumped
if ($ready||!$stat||!$stat->{dumped}) {
my $end_time;
# job is done
if ($stat && $stat->{rtime}) {
$end_time = (stat("$conf{spool}/jstat/$jid.stat"))[9]+$stat->{rtime};
}
# finished a while ago
if ($stat && ($end_time < (time()-(5*$conf{ping_secs}))) ) {
xlog("error", "Dumping i/o for completed job $jid because streamer hasn't responded in $def{ping_secs} secs, end-time: $end_time");
touch("$conf{spool}/jstat/$jid.dumped");
$stat->{dumped}=1;
} else {
xlog("debug", "Creating io_wait hash entry for $jid, (E:$end_time, $stat)") unless $io_wait{$jid}->{zid} eq $zid;
$io_wait{$jid}->{type} = 'io';
$io_wait{$jid}->{zid} = $zid;
$io_wait{$jid}->{time} = time(); # toss this entry if it gets oldi
$known = 1;
}
}
if ($stat) {
if ($stat->{dumped} && $ready ) {
xlog("error", "Dumped stream $jid before ready received");
}
xlog("debug", "Returning $jid:stat for $jid");
return("$jid:stat", $stat);
}
if (!$known) {
# unknown... no iowait, no stat
xlog("debug", "Returning $jid:unk for $jid, no io_wait set");
return("$jid:unk");
} else {
# iowait ready
return()
}
} elsif ($cmd eq 'xstat') {
my ($jid) = @args;
if ($io_wait{$jid} && $io_wait{$jid}->{streams}) {
for(values %{$io_wait{$jid}->{streams}}) {
replymsg($_, "quit");
}
delete $io_wait{$jid}->{streams};
}
my $stat=getjobstathash($jid);
return("$jid:stat", $stat) if $stat;
# only set wait flag if not found
xlog("debug", "Creating io_wait hash 'xstat' entry for $jid");
$io_wait{$jid}->{type} = 'stat';
$io_wait{$jid}->{zid} = $zid;
$io_wait{$jid}->{time} = time(); # toss this entry if it gets old
return (); # wait for response
} elsif ($cmd eq 'jinfo') { # tell me what host a job is on
my ($jid) = @args;
if (!$jid) {
xlog("error", "Job '$jid' does not exist from $ip");
return(error=>"Job '$jid' does not exist from $ip");
}
if (! -e "$conf{spool}/jobs/$jid") {
if (! -e "$conf{spool}/jobs/$jid.ip") {
my $jhist=jhiststat($jid);
if (! $jhist) {
xlog("error", "Job '$jid' does not exist from $ip") if $jid;
return (error=>"Job '$jid' does not exist during jinfo.");
} else {
return(%$jhist);
}
} else {
my $job_ip = slurp("$conf{spool}/jobs/$jid.ip");
if (!$job_ip) {
xlog("error", "No ip for job $jid");
return ();
} else {
# route to correct node
if (!$nodes{$job_ip}) {
my $jhist=jhiststat($jid);
if (!$jhist) {
xlog("error", "Job $jid is linked to $job_ip which is not responding");
return (warn=>"Job $jid is linked to $job_ip which is not responding");
} else {
return(%$jhist);
}
} else {
return (jid=>$jid, ip=>$job_ip, port=>$nodes{$job_ip}->{port}, hostname=>$nodes{$job_ip}->{hostname});
}
}
}
} else {
# wait for job start
$start_wait{$jid}->{zid} = $zid; # set router id
$start_wait{$jid}->{time} = time(); # refresh timestamp
return (); # wait for response
}
} elsif ($cmd eq 'run') {
# user command, returns jid=>jobid [, error=>string]
if (!$conf{services}->{queue}) {
return (error=>"No queue service running on this host");
} else {
my $time = time();
++$gjobid;
burp("$conf{spool}/nextid", $gjobid);
# the job file
my $jid = $gjobid;
my $file = "$conf{spool}/jobs/$jid";
my $job = $args[0];
my $gfile = "$conf{spool}/guids/$job->{guid}";
# stick the ip the job came from in the options
$job->{time}=time();
$job->{fromip}=$ip;
$job->{trace}=$trace;
if ( -e $gfile) {
return (jid=>slurp($gfile), already=>1);
}
xlog("debug", "Created $file\n") if $trace;
open(G, ">$gfile") || return('error'=>"Can't create $gfile : $!");
open(F, ">$file") || return('error'=>"Can't create $file : $!");
print F packfile($job);
close F;
print G $jid;
close G;
return (jid=>$jid);
}
} elsif ($cmd eq 'sstat') {
my ($stat) = @args;
notifystat($stat);
} elsif ($cmd eq 'sready') {
my ($key) = @args;
my ($jid) = $key =~ /^(\d+)/;
if ($io_wait{$jid} && $io_wait{$jid}->{zid}) {
if (!($io_wait{$jid}->{type} eq 'io')) {
return('quit');
} else {
return('ready');
}
}
if (! -e "$conf{spool}/jpids/$jid" ) {
xlog("debug", "Dumping stream, $jid is dead\n");
delete $io_wait{$jid};
return('quit');
}
if ( -s "$conf{spool}/jstat/$jid.stat" ) {
if ( $conf{loop_num} > 4 ) {
# any time you check mod times or abandon things, remember to ensure your loop num is more than some small number
if (fmodtime("$conf{spool}/jstat/$jid.stat")<time()-$conf{ping_secs}*3) {
xlog("error", "Abandoning $jid streams, because no one wants them\n") if $trace;
delete $io_wait{$jid};
return('quit');
}
}
}
xlog("debug", "Not ready for $jid, deferring (creating io_hash entry)\n") if $trace;
$io_wait{$jid}->{streams}->{$key}=$zid;
$io_wait{$jid}->{time}=time();
return ();
} elsif ($cmd eq 'stream') {
my ($key, $data) = @args;
my ($jid) = ($key =~ /^(\d+)/);
if ($io_wait{$jid} && $io_wait{$jid}->{type} eq 'io' && $io_wait{$jid}->{zid}) {
replymsg($io_wait{$jid}->{zid},$key, $data);
} else {
xlog("debug", "Dumping stream $key, no wait $jid\n") if $trace;
if ($data !~ /:end/) {
burp("$conf{spool}/jstat/$jid.dumped",1) if length($data)>0;
} else {
touch("$conf{spool}/jstat/$jid.dumped");
}
}
return ();
} else {
return ('error'=>"Unknown command $cmd ($src_data)");
}
return ();
}
sub jhistpath {
my ($id) = @_;
my $left = int($id/10000);
my $right = $id;
my $dir = "$conf{spool}/jhist/$left";
mkdir($dir) if ! -d $dir;
return "$dir/$right";
}
sub child_exit {
my ($kid, $status) = @_;
if ($pid_jobs{$kid}) {
my $jid=$pid_jobs{$kid}->{jid};
if ($jid) {
if (-s "$conf{spool}/jstat/$jid.stat") {
notifystat(unpack_file("$conf{spool}/jstat/$jid.stat"), 1);
touch("$conf{spool}/jpids/$jid") if -e "$conf{spool}/jpids/$jid";
}
}
delete $pid_jobs{$kid};
}
}
sub schedule {
my $did=0;
return unless %nodes;
while ((my $kid = waitpid(-1, WNOHANG))>1) {
$did=1;
if ($conf{services}->{exec}) {
child_exit($kid, $?);
}
}
my $tcpu;
for my $n (values %nodes) {
if (defined $n->{a_cpus}) {
$n->{a_cpus} = $n->{a_memory} = 0;
}
$tcpu+=$n->{cpus};
}
# pass 1 : deduct resources for running jobs
opendir(D,"$conf{spool}/jobs");
my @D = sort {(.5-rand()) cmp (.5-rand())} readdir(D);
closedir(D);
if (($conf{loop_num}%100)==5) {
# ocassionally check for expiration of start_waits, though it should never happen
for my $jid (keys(%start_wait)) {
if ($conf{expire_secs} && (time() > $start_wait{$jid}->{time}+$conf{expire_secs})) {
my $jhistfile=jhistpath($jid);
if ( -e $jhistfile ) {
$did=1;
my $job=unpack_file($jhistfile);
# possibly killed during messaging? in general this shouldn't happen anymore, so it's an error
xlog("error", "Job $jid, reply to jinfo after archive");
replymsg($start_wait{$jid}->{zid},jid=>$jid,status=>$job->{status}, error=>$job->{error}, hostname=>$job->{host}, ip=>"n/a");
}
}
}
}
my $jcnt=0;
my $jcpu=0;
my $jnee=0;
for my $jrun (@D) {
++$jnee unless ($jrun =~ /\.ip$/);
next unless ($jrun =~ /\.run$/);
--$jnee;
my $job=read_job($jrun);
my $job_ip=$1 if $jrun=~/\:(\S+)\.run$/;
# this should be config, min requirements
$job->{cpus} = 1 if $job->{cpus} == 0;
$job->{memory} = ($conf{default_memory}*1000) if $job->{memory} == 0;
for ((@metrics, @labels)) {
$job->{$_} = $conf{"default-$_"} if !$job->{$_} && $conf{"default-$_"};
}
# job is running, make sure it's resources are somewhat locked
my ($jid) = $jrun =~ m/^(\d+)/;
# check to see whether job was ever started
if ($conf{loop_num}>5 && (fmodtime($job->{file}) < (time()-$conf{ping_secs})) && ! -e "$conf{spool}/jobs/$jid.ok") {
rename("$conf{spool}/jobs/$jrun","$conf{spool}/jobs/$jid");
delete $nodes{$job_ip};
xlog("error", "No execution confirm. Deregister $job_ip as bad, put job $jid back in the queue");
}
# check to see whether job has expired
if ($conf{loop_num}>5 && $conf{expire_secs} && (fmodtime($job->{file}) < (time()-$conf{expire_secs}))) {
$did=1;
selfcmd('jstat', {pid=>-1, id=>$jid, status => $STATUS_ORPHAN, error=>"Orphaned job (" . (time()-fmodtime($job->{file})) . "s)"});
} else {
$job_ip = slurp("$conf{spool}/jobs/$jid.ip") unless $job_ip;
if ($job_ip =~ /\d/ && $nodes{$job_ip} && $nodes{$job_ip}->{ip}) {
++$jcnt;
$jcpu+=$job->{cpus};
$nodes{$job_ip}->{a_cpus} += $job->{cpus};
$nodes{$job_ip}->{a_memory} += $job->{memory};
for (@metrics) {
$nodes{$job_ip}->{"a_$_"} += $job->{param}->{$_};
}
}
}
}
my @nlist = values %nodes;
# no attempt here to prioritize jobs, just match and go
my $cnt=0;
my $full = ($jcpu/$tcpu);
for my $jid (@D) {
next unless ($jid =~ /^\d+$/o);
my $job = read_job($jid);
next if $job->{state} eq 'hold';
$job->{priority} = $conf{default_priority} if !$job->{priority};
next if ($job->{priority} < 20) && ($job->{priority}/5 < (sqrt(rand())*$full));
$full+=($job->{priority}*$job->{cpus})/($tcpu*5);
# this should be config, min requirements
$job->{cpus} = 1 if $job->{cpus} == 0;
$job->{memory} = ($conf{default_memory}*1000) if $job->{memory} == 0;
my @dereg;
my @n;
my ($max_av, $max_n);
if ($cnt >= $conf{max_sched}) {
last;
}
if (!@nlist) {
xlog("debug", "No available nodes, not scheduling");
last;
}
++$cnt;
my $spread = rand() > $conf{spread_pct}/100;
for (my $i = 0; $i < @nlist; ++$i) {
my $n = $nlist[$i];
# jobs need enough memory, cpu availability and disk space... that's it
if (!$n->{ip}) {
xlog("error", "Node has no ip! " . packdump($n));
next;
}
my $cpus = $n->{cpus} - $n->{a_cpus};
$cpus = $n->{avail} if $n->{avail} < $cpus;
my $mem = $n->{tmem} - $n->{a_memory};
$mem = $n->{mem} if ($n->{tmem} == 0) || ($n->{mem} < $mem);
# See below, only log reason
# if (debugging()) {
# xlog("debug", "Sched $jid: $n->{ip}: jcpu:$job->{cpus}, norig: $n->{orig_cpus}, ncpu:$n->{cpus}, nall:$n->{a_cpus}, nav:$n->{avail}, cpus:$cpus , jmem:$job->{memory}, nmem:$n->{mem}, avmem:$mem");
# }
if ($cpus <= 0) {
# pull the node out of the list
xlog("debug", "Removing $n->{hostname} from node list, cpus are $cpus");
splice(@nlist, $i, 1);
--$i;
}
# did it ping recently?
if ($n->{ping} > (time()-$conf{ping_secs}*6)) {
my $ok = 1;
for (@metrics) {
if (($n->{"param-$_"} - $n->{"a_$_"}) < $job->{param}->{$_}) {
$ok =0;
}
}
for (@labels) {
if (!($n->{"param-$_"} eq $job->{param}->{$_})) {
$ok =0;
}
}
if ( ($mem >= $job->{memory}) &&
(($cpus+$conf{idle_load}) >= $job->{cpus}) &&
($n->{disk} >= $job->{disk}) &&
($ok)
) {
next if $job->{hosts} && $job->{hosts} !~ /$n->{hostname}/;
my $match = 1;
if ($conf{match}) {
$match = evalctx($conf{match}, node=>$n, job=>$job); # eval perl expression
if ($@) {
$match = 1; # permit all on error?
}
}
next unless $match;
if ($spread) {
# use *least* busy node
if ($n->{load} < $conf{idle_load}) {
# don't bother checking further, this node is bored
$max_n = $n;
last;
} else {
if ($n->{avail} > $max_av) {
$max_n = $n;
$max_av = $n->{avail};
}
}
} else {
# use *first* node
$max_n = $n;
$max_av = $n->{avail};
last;
}
} else {
my $reason = "";
if (!($mem >= $job->{memory})) {
$reason = "memory ($mem)";
} elsif (!(($cpus+$conf{idle_load}) >= $job->{cpus})) {
$reason = "cpus ($cpus)";
} elsif (!(($n->{disk} >= $job->{disk}))) {
$reason = "disk ($n->{disk})";
} else {
for (@metrics) {
if (($n->{"param-$_"} - $n->{"a_$_"}) < $job->{param}->{$_}) {
$reason = "$_ (" . $n->{"a_$_"} . ")";
}
}
}
xlog("debug", "Sched $jid: $n->{ip}: $reason");
}
} else {
push @dereg, $n->{ip} if $n->{ip};
}
}
for my $ip (@dereg) {
xlog("note", "Deregister node '$ip', last ping was " . (time()-$nodes{$ip}->{ping}) . " seconds ago");
delete $nodes{$ip};
$did=1;
}
if ($max_n) {
$did=1;
xlog("debug", "Matched '$max_n->{ip}' to job $job->{file}") if $job->{trace};
# todo... change this... it's an ugly way of owning jobs
my $jmine = "$job->{file}:" . $max_n->{ip} . ".run";
touch($job->{file}) if -e $job->{file};
rename($job->{file}, $jmine);
if ( -e $jmine ) {
my $jptr = "$job->{file}" . ".ip";
burp($jptr, $max_n->{ip});
noderun($max_n, $jid, $job);
# TODO: handle metrics universally, allocated, total, and current
$max_n->{a_cpus} += $job->{cpus}; # allocate
$max_n->{a_memory} += $job->{memory};
for (@metrics) {
$max_n->{"a_$_"} += $job->{param}->{$_};
}
$max_n->{avail} -= $job->{cpus}; # assume being used
$max_n->{mem} -= $job->{memory}; # assume being used
++$jcnt;
} else {
xlog("error", "Rename failed for $jmine\n");
}
} else {
if ($conf{backup_grid}) {
# TODO: fork exec to backup grid, and add to list of pids to track... as if you're an exec node
}
xlog("debug", "Can't find node for $jid, $jcnt jobs running did=$did\n") if $conf{trace};
}
}
# kickstart nodes, if needed
$did|=kicknodes();
return $did;
}
sub read_job {
my ($jid)=@_;
my $jfil = "$conf{spool}/jobs/$jid";
next unless -f $jfil;
my $ref = unpack_file($jfil);
if (!(ref($ref) eq 'HASH')) {
xlog("error", "Invalid job file format ($ref): $jfil -> $conf{spool}/trash/$jid\n");
rename($jfil, "$conf{spool}/trash/$jid");
}
$ref->{file}=$jfil;
return $ref;
}
sub noderun {
my ($n, $jid, $job) = @_;
# send 'exec'
$job->{port} = $conf{port}; # reply to me on this port
$job->{id} = $jid;
if ($start_wait{$jid}) {
# info needed for status/stdio collection from execution node
replymsg($start_wait{$jid}->{zid},jid=>$jid, ip=>$n->{ip}, port=>$n->{port}, hostname=>$n->{hostname});
delete $start_wait{$jid};
}
sendcmd($n->{ip},$n->{port},'xexec', $job);
}
# called at start, and kill -HUP
sub init {
$ENV{HOSTNAME} = `hostname`;
chomp $ENV{HOSTNAME};
readconf();
$conf{version}=$VERSION;
if ($daemon) {
mkdir $conf{spool};
mkdir "$conf{spool}/jobs";
mkdir "$conf{spool}/jstat";
mkdir "$conf{spool}/jhist";
mkdir "$conf{spool}/nodes";
mkdir "$conf{spool}/pids";
mkdir "$conf{spool}/jpids";
mkdir "$conf{spool}/trash";
mkdir "$conf{spool}/guids";
# reregister on reread
delete $conf{node};
}
$conf{hostip} = host2ip($conf{hostname}) unless $conf{hostip};
if (!$conf{hostip}) {
die "Can't start server without knowing ip. $conf{hostname} does not resolve to ip, and no hostip defined\n";
}
}
sub getmem {
my ($cache, $free, $tot);
open F, "/proc/meminfo";
while (<F>) {
$tot = $1 if /MemTotal:\s*(\d+)/i;
$free = $1 if /MemFree:\s*(\d+)/i;
$cache = $1 if /Cached:\s*(\d+)/i;
last if $cache & $free;
}
close F;
return ($tot, $cache + $free);
}
sub getcpus {
my $cores;
open F, "/proc/cpuinfo";
my %cores;
while (<F>) {
$cores{$1}=1 if /processor\s*:\s*(\d+)/i;
}
close F;
$cores = scalar keys %cores if %cores;
return $cores;
}
sub getbench {
my ($force)=@_;
my $bench = slurp("$conf{spool}/bench");
if (!$bench||$force||(fmodtime("$conf{spool}/bench")<(time()-$conf{bench_secs}))) {
my $s = Time::HiRes::time();
my $i=0;
while (Time::HiRes::time() < $s+3) {
my %d = (1..10000);
map {$d{$_}*=3.333} keys(%d);
map {$d{$_}/=2.222} keys(%d);
++$i;
}
my $e = Time::HiRes::time();
$bench=$i/($e-$s);
burp("$conf{spool}/bench",$bench);
}
return $bench;
}
sub slurp
{
my $dat;
my $in = new IO::File;
return undef unless open($in, $_[0]);
local $/ = undef;
$dat = $in->getline;
$in->close;
close($in);
return $dat;
}
sub srvexec {
my $did=0;
# assure we can't flood on misconfig
$conf{ping_secs} = 5 if $conf{ping_secs} == 0;
if ($conf{services}->{exec} && (!$conf{node} || (time() > ($conf{node}->{ping}+$conf{ping_secs}-1)))) {
# ping master with stats
$conf{node}->{arch} = `arch`; chomp $conf{node}->{arch};
($conf{node}->{tmem},$conf{node}->{mem}) = getmem(); # free mem
$conf{node}->{load} = slurp("/proc/loadavg"); # load
$conf{node}->{orig_cpus} = getcpus();
$conf{node}->{cpus} = $conf{cpus} ? $conf{cpus} : $conf{node}->{orig_cpus}; # num cores
$conf{node}->{bench} = $conf{bench} ? $conf{bench} : getbench(); # num cores
$conf{node}->{avail} = min($conf{node}->{cpus}, $conf{node}->{orig_cpus} - $conf{node}->{load});
$conf{node}->{ping} = time();
$conf{node}->{port} = $conf{port};
$conf{node}->{ip} = $conf{hostip};
$conf{node}->{hostname} = $conf{hostname};
$conf{node}->{kernel} = `uname -rv`; chomp $conf{node}->{kernel};
$conf{node}->{arch} = `uname -m`; chomp $conf{node}->{arch};
for ((@labels,@metrics)) {
$conf{node}->{"param-$_"} = $conf{"param-$_"};
}
$did=1;
$conf{registered} = 1;
if (!sendcmd($conf{master}, $conf{master_port}, 'node', $conf{node})) {
$conf{registered} = 0;
}
}
while ((my $kid = waitpid(-1, WNOHANG))>1) {
$did=1;
child_exit($kid, $?);
}
if (time() > ($conf{lastpidtime}+$conf{ping_secs})) {
# check for expiration of io_wait
if ($conf{loop_num}>5) {
for (keys(%io_wait)) {
if ($conf{expire_secs} && (time() > $io_wait{$_}->{time}+$conf{expire_secs})) {
delete $io_wait{$_};
}
}
}
$did=1;
opendir(D,"$conf{spool}/jpids") or die "Can't open jpids\n";
my $mjob;
my $oksusp;
while(my $jid = readdir(D)) {
next unless $jid =~ /^\d/;
next unless fmodtime("$conf{spool}/jpids/$jid") < time()-$conf{ping_secs};
if (-s "$conf{spool}/jstat/$jid.stat") {
notifystat(unpack_file("$conf{spool}/jstat/$jid.stat"), 1);
next;
}
# been a while... check to see if it's alive
my $job = unpack_file("$conf{spool}/jpids/$jid");
my $pid = $job->{pid};
# there could be a fake pid for a jobs that "ran" but for whatever reason, never started
next unless $pid =~ /^\d+$/;
if ($conf{node}->{avail} < -($conf{node}->{cpus}/2)) {
if (!$mjob && ($job->{priority} < $mjob->{priority})) {
$mjob = $job;
}
++$oksusp;
}
if ($conf{node}->{avail} > 0) {
if ( -e "$conf{spool}/jstat/$job->{id}.held" ) {
kill(-18, $pid);
xlog("note", "Resuming $job->{id}, because node is available");
sendcmd($conf{master},$conf{master_port}, 'jedit', $mjob->{id}, state=>'');
unlink("$conf{spool}/jstat/$job->{id}.held");
}
}
# wait for pids
my $alive = kill(0, $pid);
if ($alive) {
$io_wait{$jid}->{time}=time() if ($io_wait{$jid});
notifystat({id=>$jid, jrun=>1});
touch("$conf{spool}/jpids/$jid") if -e "$conf{spool}/jpids/$jid";
} else {
notifystat({id=>$jid, status=>$STATUS_ORPHAN, error=>"Unknown $jid exit code, really bad!", dumped=>1});
}
}
closedir D;
if ($mjob && $oksusp > 1) {
kill(-19, $mjob->{pid});
touch("$conf{spool}/jstat/$mjob->{id}.held");
xlog("note", "Suspending $mjob->{id}, because node is busy");
sendcmd($conf{master},$conf{master_port}, 'jedit', $mjob->{id}, state=>'susp');
}
$conf{lastpidtime} = time();
}
}
sub touch {
my $nowisthe=time();
return utime($nowisthe, $nowisthe, @_);
}
sub fmodtime {
return (stat($_[0]))[9];
}
sub notifystat {
my ($stat, $from_jstat, $nowait) = @_;
confess("stat is required\n") if (!$stat);
if (!$stat->{jrun}) {
if ($io_wait{$stat->{id}} && $io_wait{$stat->{id}}->{zid}) {
# tell the client that the job is done
replymsg($io_wait{$stat->{id}}->{zid},"$stat->{id}:stat",$stat);
} else {
# it's not a request to notify that came from the file
# and yet, the file is there, with, presumably, valid status info
# so which do we report?
# I assume, report what's in the file....ie: it was there first
# this can (rarely) happen if you kill a job after it completes successfully, for example
# TODO: the safer thing is to report the "worst case" ... ie: if either is an error, report error
# and if both agree ... then don't log anything here... not a problem
if ( ! $from_jstat && -s "$conf{spool}/jstat/$stat->{id}.stat") {
# log the problem for inspection
xlog("error", "Got alternative status for $stat->{id}, this may not be correct!");
# save the new status as an error file to inspect later
burp("$conf{spool}/jstat/$stat->{id}.stat-err", packfile($stat));
}
}
}
# already dack'ed
if ( -e "$conf{spool}/jstat/$stat->{id}.dack" ) {
xlog("debug", "Not notifying status for $stat->{id}, dack already set");
exec_clean($stat->{id});
return;
}
xlog("debug", "Notifying status " . packdump($stat) . ".\n") if ($conf{trace} || $stat->{trace});
# tell main queue about the status change
$stat->{replyport}=$conf{port};
if ($nowait) {
sendcmd_nowait($conf{master},$conf{master_port}, 'jstat', $stat);
} else {
sendcmd($conf{master},$conf{master_port}, 'jstat', $stat);
}
}
# unpack a very simple configuration-style file
sub funpack {
my ($fil, $dat) = @_;
return gunpack(slurp($fil), $dat);
}
sub gunpack {
my ($msg, $dat) = @_;
$dat = {} if !$dat;
for (split(/\n/, $msg)) {
my ($k, $v) = m/^\s*([^:=]+)?\s*[:=]\s*(.*?)\s*$/;
$k = lc($k);
$dat->{$k}=$v;
}
return $dat;
}
# more complex config file support
# contains logic for turning delimited lists into array configs, etc.
sub readconf {
%conf = %def;
_readconf("$conf{config}");
# defines happen at the end so defaults can get unpacked
for (keys %conf) {
next if ref $conf{$_};
if ($_ eq 'match' || $_ =~ /^label/) {
# match rules are evaluated during matching, but reval now just to test
my $test = $conf{$_};
# see http://www.perlmonks.org/?node_id=685699 for why this is OK
$test =~ s/`[^`]+`/1/g; # turn off backtics
$test =~ s/system\([^\)]\)/1/g; # turn off system calls
$safe->reval($test); # check syntax
if ($@) {
# report a problem with the rule
xlog("error", "Error testing match rule : $@");
}
$@='';
} elsif ( ! ($conf{$_} =~ s/^\{(.*)\}$/eval($1)/gei) ) {
# evaluate simple inline vars at configure-time
if ( $conf{$_} =~ m/\$([\w-]+)\{/) {
xlog("error", "Error, rule has a hash variable, which requires braces\n");
} else {
$conf{$_} =~ s/\$([\w-]+)/$conf{lc($1)}?$conf{lc($1)}:$1/gei;
}
}
if ($_=~ /^param-(.*)/) {
my $nm=$1;
# evaluates to the value for that param... if numeric is a "metric" otherwise is a "label"
if ($conf{$_} =~ /^[\d.]+$/) {
push @metrics, $nm;
} else {
push @labels, $nm;
}
}
}
# reorganize some conf vars into a hash
for my $k (qw(services log_types)) {
my $v;
$v = $conf{$k};
$conf{$k} = {};
for (split(/[\s,]+/,$v)) {
$conf{$k}->{$_} = 1;
}
}
# these low-level entries are controlled by trace bits in packets... not by user preference
$conf{log_types}->{trace}=1;
# stored as an array reference
$conf{env} = [split(/[\s,]+/,$conf{env})] unless ref($conf{env}) eq 'ARRAY';
# split up host/port
$conf{port} = $1 if $conf{bind} =~ s/:(\d+)$//;
# same for master (if different - has to be if there's a queue/exec on the same box)
$conf{master_port} = $1 if $conf{master} =~ s/:(\d+)$//;
$conf{master_port} = $conf{port} if !$conf{master_port};
}
# basic config reader, like funpack, but uses the %conf and %def hashes
sub _readconf {
my ($f) = @_;
%conf = %def;
if (!open(CONF, $f)) {
xlog("error", "Can't open '$f'");
die("Can't open config '$f'\n");
}
while(<CONF>) {
next if /^\s*#/;
my ($k, $v) = m/^\s*([^:]+)?\s*:\s*(.*?)\s*$/;
$k = lc($k);
if ($k eq 'include') {
_readconf($v);
} else {
$conf{$k} = $v;
}
}
close CONF;
}
# log stuff. TODO: cache opened handles and test for operation... that way you won't have to reopen so many!
sub xlog {
my $m = join("\t", @_);
my $class = $_[0];
return unless ref($conf{log_types}) && $conf{log_types}->{$class};
$m =~ s/\n/ /g;
my $line = scalar(localtime) . "\t" . $m . "\n";
my $log = $conf{"log_file"};
if ($log && ! ($log eq '-')) {
open LOG, ">>" . $log;
print LOG $line;
close LOG;
print STDERR $line if $log_to_stderr;
} else {
print $line;
}
return $line;
}
# wait for STDERR and STDOUT from a command
sub waitio {
my ($host, $port, $jobid, $opt) = @_;
my @resp;
my $stat;
my $err;
my $diderr;
my $sock = _sendcmd($host, $port, 'xio', $jobid);
my $stat_time;
$|=1;
my $needio = !$opt->{err}+!$opt->{out};
my $start_wait=time();
my $unk;
while ((!defined $stat) || $needio) {
my $got = 0;
# wait up to 5 seconds for output
zmq_poll([{
socket=>$sock, events=>ZMQ_POLLIN, callback=> sub {
my ($ip, $trace, $key, $dat) = recvmsg($sock);
if ($ip) {
$got = 1;
my ($jid, $type, $cmd) = split /:/, $key;
if ($type eq 'err') {
$diderr = 1 if $dat;
print STDERR $dat if $dat;
--$needio if $cmd eq 'end';
} elsif($type eq 'out') {
print STDOUT $dat if $dat;
--$needio if $cmd eq 'end';
} elsif($type eq 'stat') {
$stat = $dat->{status};
$err = $dat->{error};
$stat_time=time() if !$stat_time;
if (time()>($stat_time+$conf{expire_secs})) {
# i'm taking charge of dumping it
xlog("error", "Job $jobid, dumping i/o, and reporting failure... took too long");
$dat->{dumped}=1;
}
if ($dat->{dumped}) {
# don't wait longer for i/o if the i/o was lost
# todo... fail here if stat is 0?
if (!$stat) {
$stat=37;
$err="Failing because i/o was dumped";
}
$needio = 0;
}
# what does the client do with the times?
} elsif($type eq 'unk') {
++$unk;
sleep(1);
} else {
xlog("error", "Job $jobid, got message ($jid, $type, $cmd) in response to xio");
}
} else {
xlog("error", "Job $jobid, got message ($key) in response to xio");
}
}}],$conf{retry_secs}*2*1000);
if (!$got) {
# if you didn't get anything, ask again
$sock = _sendcmd($host, $port, 'xio', $jobid);
if (time()>$start_wait+$conf{retry_secs}*7) {
# been a while...ask head node if this job is dead?
my %info = waitmsg($conf{master}, $conf{master_port}, 'jinfo', $jobid);
if ($info{status} =~ /\d/) {
$stat=$info{status};
$err=$info{error};
$needio = 0;
}
if ($unk > 200) {
$stat=$STATUS_UNKNOWN;
$err="Error, job submission failed";
$needio = 0;
}
# restart wait time
$start_wait=time();
}
}
}
return ($stat, $err, $diderr);
}
sub waitmsg {
my $sock = _sendcmd(@_);
my ($ip, $trace, @msg) = recvmsg($sock);
return @msg;
}
# this tries over and over to get a response....
# usually this is not needed, but if the outer disappears, the zqm-id will be lost, so
# this is just for recovery in the event of the router shutting down
sub waitmsg_retry {
my $retry = shift @_;
my $got=0;
my ($ip, $trace, @msg);
my $sock=_sendcmd(@_);
while (!$got) {
zmq_poll([
{
socket=>$sock, events=>ZMQ_POLLIN, callback=> sub {
$got=1;
($ip, $trace, @msg) = recvmsg($sock);
}},
],$retry);
if (!$got){
$sock=_sendcmd(@_);
}
}
return @msg;
}
sub sendcmd_nowait {
my ($host, $port, @cmd) = @_;
return 0 unless my $sock = getsock($host, $port);
my $xcmd = packcmd(@cmd);
# 1 millisecond wait
# zmq_poll([
# {
# socket=>$sock, events=>ZMQ_POLLOUT, callback=> sub {}
# },
# ],1000);
if (zmq_send($sock, "", 0, ZMQ_SNDMORE|ZMQ_NOBLOCK)==0) {
if (zmq_send($sock, $xcmd)==-1) {
xlog("error","Can't send [@cmd] to $host:$port : $!", Carp::longmess());
return 0;
}
} else {
return 0;
}
return 1;
}
sub recvmsg {
my ($sock) = @_;
my @ret;
my ($buf, $dat);
if (my $msg = zmq_recvmsg($sock)) {
$msg = zmq_recvmsg($sock);
if ($msg) {
my $buf = zmq_msg_data($msg);
xlog("debug", "Client $$ got response: $buf") if $conf{trace};
if ($buf) {
return unpackcmd($buf);
}
}
}
return @ret;
}
sub sendcmd {
my $sock = _sendcmd(@_);
return $sock ? 1 : undef;
}
sub packcmd {
if (!$conf{hostip}) {
confess("Need a defined hostip");
}
return encode_json([$conf{hostip},$conf{trace},@_]);
}
sub packref {
if (!(ref($_[0]) eq 'ARRAY')) {
croak "All packrefs are arrays";
} else {
return encode_json([$conf{hostip},$conf{trace},@{$_[0]}]);
}
}
sub unpackcmd {
my $ref = eval{decode_json($_[0])};
if (!$@ && (ref($ref) eq 'ARRAY')) {
return @$ref;
} else {
return undef;
}
}
sub packfile {
croak unless ref $_[0];
if (ref $_[0] eq 'HASH') {
$_[0]->{version}=$VERSION;
}
return encode_json($_[0]);
}
sub unpack_file {
my $ref;
eval {
$ref=decode_json(slurp($_[0]));
};
carp "$@" if $@;
return $ref;
}
### sends a command to a server/router, returns the socket to wait on
sub getsock {
my ($host, $port) = @_;
my $sock;
if (!$ZMQS{"tcp://$host:$port"}) {
$sock = zmq_socket($context, ZMQ_DEALER);
if (!$daemon) {
# clients should block if messages are queued, not bang on nonexistant servers
zmq_setsockopt($sock, ZMQ_HWM, 50);
xlog("debug", "Set HWM to 50 for $host in pid $$\n");
}
if (!zmq_connect($sock,"tcp://$host:$port")) {
$ZMQS{"tcp://$host:$port"} = $sock;
} else {
croak "Can't connect to tcp://$host:$port: $@\n";
}
} else {
$sock = $ZMQS{"tcp://$host:$port"};
}
if (!$sock) {
xlog("error",$@="Can't connect to $host:$port", Carp::longmess());
return undef;
}
return $sock;
}
sub _sendcmd {
my ($host, $port, @cmd) = @_;
return undef unless my $sock = getsock($host, $port);
my $xcmd = packcmd(@cmd);
zmq_send($sock, "", 0, ZMQ_SNDMORE);
if (zmq_send($sock, $xcmd)==-1) {
xlog("error","Can't send [@cmd] to $host:$port : $!", Carp::longmess());
}
return $sock;
}
sub burp
{
my ($f, $dat) = @_;
my $h = new IO::File;
do {
eval {
open ($h, ">$f.tmp") || die "$f: $!";
print $h $dat;
close $h;
rename("$f.tmp", $f) || die "$f: $!";
};
if ($@) {
xlog("error",$!);
}
} while ($@);
}
sub usage {
my $u;
$u .= <<'EOF' unless $make;
Usage: grun <exec-options> command...
or: grun -d [<local-daemon-options>]
or: grun -k <jobid> [<jobid2...>]
or: grun -e key=val[,key2=val2...] <jobid> [<jobid2...>]
or: grun -q [<query-options>] <query-command>
Lightweight job queueing system
For more help, run grun -?, grun -d -?, grun -C -? or grun -q -?.
EOF
$u .= <<'EOF' unless $daemon || $qinfo || $editjob || $make;
Execution Options:
-f|ile FILE Read FILE for job options (mem, cpu, cmd, etc)
-m|em INT memory minimum in MB
-c|pu CPUS minimum number of cpus
-host N1,N2 specify certain hosts
-j|obid TEXT user-supplied job ID
-v|erbose print job id, execution node and stats to STDERR
-M|make only run job if inputs are newer than outputs
-r|priority INT run job with only run job if inputs are newer than outputs
-noio disable io-processing, but wait for completion
-nowait no io and don't wait, just start the command
-e|rr FILE write stderr directly to FILE, no spool *
-o|ut FILE write stdout directly to FILE, no spool *
All options can be abbreviated to uniqueness.
* You can append error & output with -oa, -ea or -out-append -err-append,
or both with -oea, or --out-err-append.
If the command contains shell metacharacters, it's wrapped in a bash script
EOF
$u .= <<'EOF' if $make;
Make Semantics:
%i:file Input file, left in the command line
#i:file Input file, removed the command line before executing
%o:file Output file, left in the command line
#o:file Output file, removed the command line before executing
< file Input file, left in the command line
> file Output file, left in the command line
%!>file Output, but don't set as a dependency
For Example:
grun "gzip %i:x.foo #o>x.foo.gz"
If a command fails under Make Semantics, the output file(s) will be
set to FILE.failed.
NFS sync works better with make semantics.
EOF
$u .= <<'EOF' if $qinfo;
Query Options:
-a|ll Query all nodes
-n|odes ($master) List of nodes to query
Query Commands:
[-]status List nodes (q)
[-]jobs List jobs (q)
[-]history List prior jobs (q)
[-]conf Dump config from memory (q,e)
EOF
$u .= <<'EOF' if $daemon;
Daemon Options:
-h|osts (local) One or more hosts
-r|eload Reload config
-k|ill Kill running server
-R|ESTART Kill and restart a running server
Without an option, -d just starts the daemon on the local machine.
EOF
$u .= <<'EOF' if $editjob;
Edit Keys:
hold Hold job (no value needed)
resume Resume job
memory=N Memory in MB
cpus=N # of Cpus needed
EOF
$u .= <<'EOF' if !$make;
Common Options:
-C FILE (/etc/grun.conf) Config file location
-t|race Turn on debugging in the log file
-V Print version and exit
-? Show this help page
EOF
$u .= <<'EOF' if defined($config) && $config eq '';
Configuration File:
All config variables written as {value} are interpreted as perl code, and get evaluated at startup.
The "include" varialbe actually just includes the file specified, as if it were part of the original file.
All non-code configuration variables can include '$varname', which gets expanded to the value of another config var.
Be careful with match code. It it's slow, it will kill the performance of your main node.
Common variables:
master (localhost) Hostname[:port] of master node
spool (/var/spool/grun) Location for queue & io temp storage
log_file Location of the log
services Must be 'queue' and/or 'exec'
port Port to listen on (5184)
bind[:port] Address to bind to (0.0.0.0)
trace Turn tracing on for the whole server
Queue config vars:
env (PATH) List of environment varialbes to copy to the processes. An asterisk (*) means 'ALL'
expire_secs (0) If set, jobs that aren't pinged in time get (failed or retried)
expire_action (retry) Can be 'retry', 'fail'
idle_load (.3) If load is less than this amount, then considered idle
io_keep (3600) Time to keep unretrieved stdio files (0=forever)
log_file Where to send "xlog" output
pid_file (/var/run/grun.pid)
ping_secs (30) Nodes ping the master this often.
ping_expire (2*$ping_secs) Drop a node if it doesn't ping in time
Cli vars & defaults:
nfs_sync (1) Whether to force-sync the directory cache after a job is run
default_cpu (1) Default cpu reservation
default_memory (1m) Default memory for jobs
default_priority (20) Default priority for jobs
Execution node config vars:
match Perl code that must eval to TRUE for a node match
full_match (1) If jobs queue is full, this is evaluated
full_exec If full match returns true, then this command is run
wrap Job command wrapper
EOF
return $u;
}
sub showconf {
return pretty_encode(\%conf);
}
sub showhist {
my %opt;
$opt{fmt} = '%jid\t%user\t%stat\t%cwd\t%cmd\t%host\t%mtime\n';
{
local @ARGV = @_;
GetOptions(\%opt, "count|c=i", "user=s","job=i@","resubmit","fmt|F=s","long","grep|g=s","dump");
@_=@ARGV;
}
if ($_[0] =~ /[a-z]/ && !$opt{user}) {
$opt{user} = $_[0]
} else {
while ($_[0] =~ /^\d+$/) {
push @{$opt{job}}, $_[0];
shift;
}
}
my $r; # the result
my $count = $opt{count};
$count = 10 if !$count;
my @J;
if ($opt{job}) {
for (@{$opt{job}}) {
my $f = (jhistpath($_));
if ( -e $f ) {
my $t=unpack_file($f);
my ($jid) = $f =~ /\/(.+)$/;
$t->{jid}=$jid;
my $mtime = (stat($f))[9];
$t->{mtime}=$mtime;
push @J, $t;
} else {
xlog("error", "History for job $_ requested, but file not found\n");
}
}
} else {
my $k = 5;
my @mx = ((-1) x $k);
opendir(D,"$conf{spool}/jhist");
while(defined ($_=readdir(D))) {
next unless /^\d+$/;
for (my $i = 0; $i < $k; ++$i) {
if ($_ > $mx[$i]) {
# shift everything up
my $top=$i++;
for (; $i < $k; ++$i) {
$mx[$i]=$mx[$i-1];
}
$mx[$top]=$_;
last;
}
}
}
closedir(D);
for (my $i = 0; $i < $k; ++$i) {
if (@J < $count) {
opendir(D,"$conf{spool}/jhist/$mx[$i]");
my @T = readdir(D);
@T = sort {$b <=> $a} @T;
closedir(D);
# prepend dir
for my $jid (@T) {
next unless $jid=~/^\d/;
my $f = "$conf{spool}/jhist/$mx[$i]/$jid";
my $job=eval{unpack_file($f)};
next unless ref($job);
# support array/text formats
my @cmd=ref $job->{cmd} ? @{$job->{cmd}} : ($job->{cmd});
# user supplied filter
next if $opt{user} && ! ($job->{user} eq $opt{user});
next if $opt{grep} && "$job->{user}\t@cmd\t$job->{cwd}" !~ $opt{grep};
my $mtime = (stat($f))[9];
next unless $mtime;
$job->{jid}=$jid;
$job->{mtime}=$mtime;
push @J, $job;
last if @J >= $count;
}
}
}
}
for my $job (@J) {
my $jid=$job->{jid};
my $mtime = $job->{mtime};
my @cmd=ref $job->{cmd} ? @{$job->{cmd}} : ($job->{cmd});
next if $opt{user} && ! ($job->{user} eq $opt{user});
next if $opt{grep} && "$job->{user}\t@cmd\t$job->{cwd}" !~ $opt{grep};
--$count;
my %job=%{$job};
$job{status} = $job->{usage}->{status} if $job->{usage} && ! $job->{status};
$job{status} = ($job{status} >> 8) if 0 == ($job{status} & 0xFF);
# standard
$job{status} = 'OK' if $job{status} eq 0;
$job{status} = 'KILLED' if $job{status} eq 199;
$job{status} = 'INT' if $job{status} eq 2;
$job{status} = 'ORPHAN' if $job{status} eq $STATUS_ORPHAN;
$job{status} = 'SIGSEGV' if $job{status} eq 11;
$job{status} = 'SIGPIPE' if $job{status} eq 13;
$job{status} = 'SIGFPE' if $job{status} eq 8;
# linux specific
$job{status} = 'NOTFOUND' if $job{status} eq 127;
$job{status} = 'ASSERT' if $job{status} eq 134;
$job{status} = 'OUTOFMEM' if $job{status} eq 137;
$job{status} = 'ALTSEGV' if $job{status} eq 139;
$job{stat} = $job{status};
$job{wait} = $job{usage}->{start_time}-$job{time};
my $cmd = join(' ', @cmd);
$cmd =~ s/\n\s+/\n/g;
$cmd =~ s/^\s+//;
$cmd =~ s/\s+$//;
$cmd =~ s/\n/;/g;
if ($nodes{$job{host}} && $nodes{$job{host}}->{hostname}) {
$job{host}=$nodes{$job{host}}->{hostname}
} elsif (-e (my $n="$conf{spool}/nodes/$job{host}.reg")) {
my $node=unpack_file($n);
$job{host}=$node->{hostname};
}
$job{jid}=$jid;
$job{cmd}=$cmd;
# $job{env}=join ':' . map { $_.'='.$job{env}{$_} } keys %{$job{env}};
$job{mtime}=fmtime($mtime);
if (ref($job{usage})) {
for (keys(%{$job{usage}})) {
$job{"usage_".$_}=$job{usage}{$_};
}
delete $job{usage};
}
if ($opt{long}) {
$r .= "----------\n";
for(sort(keys(%job))) {
$r .= "$_=" . (ref($job{$_})?packdump($job{$_}):$job{$_}) . "\n";
}
} elsif ($opt{dump}) {
$r .= packdump($job) . "\n";
} else {
$r .= fmtstr($opt{fmt}, \%job);
}
last if $count == 0;
}
return $r;
}
sub fmtime {
my ($t) = @_;
return strftime("%m/%d %H:%M",localtime($t));
}
# grun -q status
sub shownodes {
my $r;
$r .= sprintf "%-15s %-14s %-8s Bench\n", 'Hostname','Memory', 'Cpu';
my @nodes = getnodes();
for my $node (sort {$a->{hostname} cmp $b->{hostname}} @nodes) {
if ($node->{ping} > time() - ($conf{ping_secs} * 2) ) {
chomp($node->{hostname});
$node->{hostname} = substr($node->{hostname},0,15);
my $cpus = $node->{cpus} - $node->{a_cpus};
$cpus = $node->{avail} if $node->{avail} < $cpus;
my $mem = $node->{tmem} - $node->{a_memory};
$mem = $node->{mem} if $node->{tmem} > 0 && $node->{mem} < $mem;
$r .= sprintf "%-15s %6dm/%-6d %4.1f/%-2d %3d\n", $node->{hostname}, $node->{mem}/1000, $node->{tmem}/1000,$node->{avail},$node->{cpus},$node->{bench};
}
}
return $r;
}
# grun -q jobs
sub showjobs {
my %opt;
$opt{fmt} = '%s:jid\t%s:user\t%s:stat\t%s:cwd\t%s:cmd\n';
{
local @ARGV = @_;
GetOptions(\%opt, "dump|d", "user=s","job=i","fmt|F=s","long","debug");
@_=@ARGV;
}
if ($_[0]) {
my $user = shift @_;
if ($user =~ /^\d+$/) {
$opt{job}=$user;
} else {
$opt{user}=$user if !($user eq '');
}
}
xlog("debug", "Show job for user:$opt{user}, job:$opt{job}\n");
my $r;
# $r .= sprintf "%s\t%s\t%s\t%s\t%s\n", 'JobID','User','Host','Cwd','Command';
my $now = time();
opendir(D,"$conf{spool}/jobs");
while(my $jid=readdir(D)) {
next if $jid =~ /\.ip$/;
next if $jid =~ /\.ok$/;
my $f = "$conf{spool}/jobs/$jid";
next unless -f $f;
my $job = unpack_file($f);
if (ref($job)) {
my %job=%{$job};
next if $opt{user} && ! ($opt{user} eq $job{user});
next if $opt{job} && ! ($jid =~ /^$opt{job}\b/);
my $stat = '(I)';
$stat = '(H)' if $job->{state} eq 'hold';
if ($jid =~ s/:([.\d]+?)\.run$//) {
my $ip = $1;
my $node = unpack_file("$conf{spool}/nodes/$ip.reg");
$stat = $node->{hostname}; chomp $stat;
$job->{host} = $stat;
# ip file is created at start
# run file is updated regularly to prevent orphans
if ( -e "$conf{spool}/jobs/$jid.ip" ) {
$job->{start} = (stat("$conf{spool}/jobs/$jid.ip"))[9];
}
$stat .= ' (S)' if $job->{state} eq 'susp';
} else {
$stat = '(S)' if $job->{state} eq 'susp';
}
# trim for display
my @cmd = @{$job->{cmd}};
for (@cmd) {
s/^\s+//g;
s/\n\s*/;/g;
}
$job->{wait} = ($job->{start}?$job->{start}:$now)-$job->{time};
$job->{jid} = $jid;
$job->{stat} = $stat;
$job->{cpus} = 1 if ! $job->{cpus};
$job->{memory} = $conf{default_memory} if ! $job->{memory};
$job->{priority} = $conf{default_priority} if ! $job->{priority};
$job->{cmd} = join(' ', @cmd);
if ($opt{long}) {
$r .= "----------\n";
for(sort(keys(%$job))) {
$r .= "$_=" . (ref($job->{$_})?packdump($job->{$_}):$job->{$_}) . "\n";
}
} elsif ($opt{dump}) {
$r .= packdump($job) . "\n";
} else {
$r .= fmtstr($opt{fmt}, $job);
}
}
}
closedir(D);
return $r;
}
sub fmtstr {
my ($fmt, $hash) = @_;
# get list of fields
my @fds = $fmt =~ m/%(?:[#0 +,I:-]*(?:\d+)?(?:\.\d+)?\w{1,2}:)?([\w-]+|(?:{[^{}]+}))/g;
%{$safe->varglob("i")}=%{$hash};
my @vals;
for (@fds) {
if ($_ =~ /^\{(.+)\}/) {
push @vals, $safe->reval($1);
} else {
if (ref($hash->{$_}) eq 'HASH') {
push @vals, packdump($hash->{$_});
} else {
push @vals, $hash->{$_};
}
}
}
undef %{$safe->varglob("i")};
# replace formatted - with format-only
$fmt =~ s/(%[#0 +,I:-]*(?:\d+)?(?:\.\d+)?\w{1,2}):([\w-]+|({[^{}]+}))/$1/g;
my $fds=join '|', map quotemeta($_), @fds;
# replace pure-fields with field-only
$fmt =~ s/%($fds)/%s/g;
# expand vars
$fmt =~ s/\\n/\n/g;
$fmt =~ s/\\t/\t/g;
$fmt =~ s/\\r/\r/g;
# format the rest
return sprintf($fmt, @vals);
}
sub bestunique {
my ($c, @c) = @_;
my $b;
for (@c) {
if ($_ =~ /^$c/) {
return undef if ($b);
$b = $_;
}
}
return $b;
}
# returns an array of [host,port], given a list of host[:port] names
sub expandnodes {
my @r;
my @n;
for (split(/[\s,]/, join ' ', @_)) {
my ($h, $p) = m/^(.*)(:\d+)?$/;
$p = $conf{port} if !$p;
if ($h =~ s/\*/\.\*/g) {
if (!@n) {
@n=getnodes();
}
for (@n) {
push @r, [$_->{hostname}, $p] if $_->{hostname} =~ /$h/;
}
} else {
push @r, [$h, $p];
}
}
return @r;
}
sub evalctx {
my ($expr, @ctx) = @_;
my $msafe = new Safe;
$msafe->permit(qw(:subprocess :filesys_read)); # allow backticks
for(my $i = 0; $i < @ctx; $i+=2) {
my ($name, $var) = ($ctx[$i], $ctx[$i+1]);
# references to hashes/arrays become dereferenced hashes/arrays
if (ref($var) eq 'HASH') {
%{$msafe->varglob($name)}=%{$var};
} elsif (ref($var) eq 'ARRAY') {
@{$msafe->varglob($name)}=@{$var};
} else {
${$msafe->varglob($name)}=$var;
}
}
$msafe->share(qw(%conf %ENV));
# if ($conf{trace}) {
# xlog("debug", "Evaluating {$expr}\n");
# }
my ($res, @res);
if (wantarray) {
@res = $msafe->reval($expr);
} else {
$res = $msafe->reval($expr);
}
my $save = $@;
if ($@) {
xlog("error", "Error evaluating {$expr} : $@\n");
return undef;
}
$@=$save;
if (wantarray) {
return @res;
} else {
return $res;
}
}
sub kicknodes {
my $did=0;
my @nodes = getnodes(cached=>1);
return if !$conf{kickstart};
return if (time() < ($conf{lastkicktime} + $conf{ping_secs} * 4));
$conf{lastkicktime} = time();
for my $node (@nodes) {
# not just started
if ($conf{loop_num} > 5) {
# sometime in the last 24 hours?
if ($node->{ping} > (time() - ($conf{remove_secs}))) {
# but not in the last couple minutes
if ($node->{ping} < (time() - ($conf{ping_secs} * 4)) ) {
# kick it
$did=1;
xlog("note", "Kicking node $node->{hostname}\n");
if (!fork) {
eval {
zmq_fork_undef();
# be sure this is gone
my $cmd = $conf{kickstart};
if ($cmd =~ /^\{(.*)\}$/) {
$cmd = evalctx($1, node=>$node);
}
if ($cmd && $cmd !~ /\$/) {
exec("$cmd");
}
};
exit(0);
}
}
}
}
}
return $did;
}
sub getnodes {
my (%opt) = @_;
my @r;
if (!$opt{cached} && %nodes) {
# return memcached values
return values %nodes;
}
# read all from disk, including old ones
opendir(D,"$conf{spool}/nodes");
while($_=readdir(D)) {
$_ = "$conf{spool}/nodes/$_";
next unless -f $_;
my $node=eval{unpack_file($_)};
if ($node) {
if (! defined $node->{avail}) {
$node->{avail} = $node->{cpus}-$node->{load};
}
push @r, $node;
}
}
closedir D;
return @r;
}
sub startdaemon {
my ($dkill, $restart, $reload, $node, $all, $dobench, $nofork);
GetOptions("kill"=>\$dkill, "restart|R"=>\$restart, "benchmark|B"=>\$dobench, "reload|r"=>\$reload, "host=s"=>\$node, "all"=>\$all, "nofork|F"=>\$nofork) ||
die usage();
$node = '*' if $all;
if ($reload || $node) {
my @n = $node ? expandnodes($node) : ([$conf{master},$conf{master_port}]);
my $stat = 0;
my $xcmd = $reload ? 'relo' : $restart ? 'rest' : $dkill ? 'term' : '';
if (!$xcmd) {
die usage();
}
for (@n) {
my ($res) = waitmsg($_->[0], $_->[1], "xcmd", $xcmd);
if (! defined $res) {
print "$@\n";
$stat = 1;
} else {
print "$res\n";
}
}
exit $stat;
}
if ($dobench) {
print STDERR "Bench\t" . int(getbench(1)) . "\n";
exit 0;
}
if ($restart) {
$daemon = 1;
}
killgpid() if $dkill || $restart;
exit 0 if $dkill;
sleep 1 if $restart;
# start daemon
if ($gpid) { # already running?
if (kill(0, $gpid)) {
die "Already running ($gpid), not starting twice\n";
}
}
xlog("note", "Starting daemon as " . getpwuid($<));
my $limit = `bash -c "ulimit -v -f -t -m -s"`;
if (@{[($limit =~ m/unlimited$/mg)]}!=5) {
my @n=qw(virt file time mem stack);
my @v=$limit =~ m/(\w+)$/mg;
my %d;
map {$d{$n[$_]}=$v[$_] if ! ($v[$_] eq 'unlimited')} (0..4);
warn "Note: ulimit is not unlimited for daemon (" . join(' ',%d) . ")\n";
}
if ($conf{ulimit}) {
if (eval {require BSD::Resource;}) {
local @ARGV = split / /, $conf{ulimit};
Getopt::Long::Configure qw(no_require_order no_ignore_case);
my %opt;
GetOptions(\%opt,"f=i","v=i","m=i","t=i");
if (@ARGV) {
die "Options @ARGV not supported by grun, use a regular ulimit wrapper";
}
no strict "subs";
BSD::Resource::setrlimit(BSD::Resource::RLIMIT_FSIZE(),$opt{f},$opt{f}) if ($opt{f});
BSD::Resource::setrlimit(BSD::Resource::RLIMIT_VMEM(),$opt{v},$opt{v}) if ($opt{v});
BSD::Resource::setrlimit(BSD::Resource::RLIMIT_RSS(),$opt{m},$opt{m}) if ($opt{m});
BSD::Resource::setrlimit(BSD::Resource::RLIMIT_CPU(),$opt{t},$opt{t}) if ($opt{t});
} else {
die "Please install BSD::Resource to use the 'ulimit' option in $def{config}\n";
}
}
zmq_safe_term();
if ($nofork || (!($gpid=fork))) {
zmq_fork_undef() if !$nofork;
$log_to_stderr = 1 if $nofork;
$gpid = $$ if $nofork;
die "Can't fork child process\n" if (! defined $gpid);
$context=zmq_init();
open (P, ">$conf{pid_file}") || die "Can't open pidfile '$conf{pid_file}' : $!\n";
print P $$;
close P; # save pid
if (!$nofork) {
open STDIN, '</dev/null';
if (! -t STDOUT || ( $conf{log_file} && ! ($conf{log_file} eq '-'))) {
open STDOUT, '>/dev/null';
open STDERR, '>&STDOUT';
}
}
POSIX::setsid();
$SIG{INT} = sub { $quit = 1; };
$SIG{TERM} = $SIG{INT};
$SIG{HUP} = \&init;
$router = zmq_socket($context, ZMQ_ROUTER);
zmq_bind($router, "tcp://$conf{bind}:$conf{port}")
and die "Can't make ZMQ router on port $conf{port}: $!";
$quit = 0;
$conf{loop_num} = 0;
while (!$quit) {
my $did=0;
my $start = Time::HiRes::time();
++$conf{loop_num};
eval {$did|=readsocks()};
last if $quit;
xlog("error", "Daemon $$ readsocks exception: $@") if $@;
# theoretically these could be in separate threads, or forked off
if ( $conf{services}->{queue} ) {
eval {$did|=schedule()};
xlog("error", "Daemon $$ schedule exception: $@") if $@;
}
if ( $conf{services}->{exec} ) {
eval {$did|=srvexec()};
xlog("error", "Daemon $$ srvexec exception: $@") if $@;
};
if (!$did) {
my $elapsed = Time::HiRes::time()-$start;
if ($elapsed < .25) {
# fractional sleep
# xlog("debug", "Did nothing, fractional sleep for " . (.25-$elapsed));
select(undef, undef, undef, .25-$elapsed);
}
}
# print "HERE\n";
}
xlog("note", "Shutdown");
zmq_unbind($router, "tcp://$conf{bind}:$conf{port}");
sleep(1);
eval {readsocks()};
zmq_safe_term();
unlink $conf{pid_file};
}
exit 0;
}
sub END {
zmq_safe_term();
}
sub zmq_safe_term() {
# if i'm the parent pid
if ($router) {
zmq_setsockopt($router, ZMQ_LINGER, 1);
zmq_unbind($router,"tcp://$conf{bind}:$conf{port}");
zmq_close($router);
$router=undef;
}
# these are all the execution nodes... if any
for (values(%ZMQS)) {
zmq_setsockopt($_, ZMQ_LINGER, 1);
zmq_close($_);
}
%ZMQS=();
if ($context) {
zmq_term($context);
$context=undef;
}
}
sub zmq_fork_undef() {
# close all duped file descriptors
opendir(D,"/proc/$$/fd");
while(my $fd = readdir(D)) {
if ($fd > 2 && (POSIX::lseek($fd, 0, POSIX::SEEK_CUR) == -1)) {
if (POSIX::ESPIPE == POSIX::errno()) {
POSIX::close($fd) if $fd > 2;
}
}
}
closedir(D);
# kill access to any forked sockets
$router=undef;
%ZMQS=();
$context=undef;
}
sub killgpid {
die "Can't find pid $conf{pid_file} for daemon\n" if !$gpid;
if (!kill(2, $gpid)) {
die "Can't kill -INT $gpid: $!\n";
}
sleep 1;
$gpid = 0;
}
sub samehost {
my($h1, $h2) = @_;
$h1 =~ s/\s+$//;
$h2 =~ s/\s+$//;
$h1=host2ip($h1);
$h2=host2ip($h2);
# localhost = dig `hostname`
$h1 =~ s/^(0\.0\.0\.0|127\.0\.0\.1)$/$conf{hostip}/;
$h2 =~ s/^(0\.0\.0\.0|127\.0\.0\.1)$/$conf{hostip}/;
return $h1 eq $h2;
}
sub host2ip {
my (@octets, $raw_addr, $ip);
return $_[0] if $_[0] =~ /^(\d+\.){3}\d+$/;
$raw_addr = (gethostbyname($_[0]))[4];
@octets = unpack("C4", $raw_addr);
$ip = join(".", @octets);
return($ip);
}
sub packdump {
return encode_json($_[0]);
}
sub kill_job {
my %op = @_;
my ($err) = waitmsg($conf{master}, $conf{master_port}, 'jkill', \%op);
if ($err =~ /Forward (\d+):?\s*([\d.]+):?(\d*)/i) {
my ($jid, $ip, $port) = ($1, $2, $3);
$port = $conf{port} if !$port;
($err) = waitmsg($ip, $port, 'xabort', $jid, $op{sig}, $op{termio});
}
return $err;
}
sub proper {
my $x = shift;
$x=~ s/\b(\S)/uc($1)/eg;
return $x;
}
sub execute_job {
my ($opts) = @_;
my @cmd = @{$opts->{cmd}};
my ($uid, $gid, $err, $ret);
if ($opts->{user}) {
(undef, undef, $uid, $gid) = getpwnam($opts->{user});
}
if (defined($uid) && !$conf{run_asroot} && !$uid) {
$err="Won't run as root";
}
if (!defined($uid)) {
$err="User $opts->{user} is unknown on this machine, not executing";
}
if ($opts->{group}) {
$gid=$opts->{group};
}
# expecting someone to come pick up output?
xlog("Creating io_wait for $opts->{id}") if $opts->{wait} || $opts->{io};
$io_wait{$opts->{id}}->{type} = 'stat' if $opts->{wait}; # status wait
$io_wait{$opts->{id}}->{type} = 'io' if $opts->{io}; # io wait
$io_wait{$opts->{id}}->{time} = time() if $opts->{wait} || $opts->{io}; # io wait
my $pid;
if ($conf{wrap}) {
@cmd = ($conf{wrap}, @cmd);
}
my $pfile = "$conf{spool}/jpids/$opts->{id}";
if (-e $pfile) {
xlog($err="Job file $pfile already exists, not executing.");
} else {
if (!open(PF, ">$pfile")) {
xlog($err="Can't create $pfile: $!");
}
}
my $bfile = "$conf{spool}/jstat/$opts->{id}";
my $tfile = "$bfile.stat";
my $ifile = "$bfile.job";
$opts->{uid}=$uid;
$opts->{gid}=$gid;
# job info, saved
burp($ifile, packfile($opts));
chown($uid,0,$ifile);
if (!open(XOUT, ">", "$tfile")) {
$err=$!;
$ret=$?;
}
chown($uid,0,$tfile);
# leave the file open for fork
if (!$err && !($pid=fork)) {
if (! defined $pid) {
$err = "Can't fork";
} else {
$0="GRUN:$opts->{id}";
zmq_fork_undef();
# restore signal to default ... regular kill
$SIG{INT} = undef;
$SIG{TERM} = undef;
$ENV{USER} = $opts->{user};
# kill me with a negative number, all kids die too
# kill my parent.... i stay alive, and my IO is still ready to go
my $pgid=POSIX::setsid();
xlog("debug", "PGID set to $pgid\n");
# copy in the umask & the environment
umask $opts->{umask};
for (keys(%{$opts->{env}})) {
$ENV{$_}=$opts->{env}->{$_};
}
for (keys(%{$opts})) {
next if ref($opts->{$_});
$ENV{"_GRUN_OPT_$_"} = $opts->{$_};
}
$ENV{"_GRUN"} = $opts->{id};
$ENV{"SGE_TASK_ID"} = $opts->{id} if (!$ENV{SGE_TASK_ID});
my ($err, $ret);
my $shfile = "$bfile.sh";
if (!open(SHFILE, ">", $shfile)) {
$err=$!;
$ret=$?;
}
chown($uid,0,$shfile);
my $hard_factor = defined($opts->{hard_factor}) ? $opts->{hard_factor} : $conf{hard_factor};
if ($hard_factor && $opts->{memory}) {eval{
# add 4mb for o/s stuff to load
print SHFILE "ulimit -v " . (4000+int(($opts->{memory} * $conf{hard_factor}))) . "\n"
if $opts->{memory};
}}
if ($cmd[0] !~ / /) {
for (@cmd) {
if ($_ =~ / /) {
$_ =~ s/"/\\"/g;
$_ = '"' . $_ . '"';
}
}
}
print SHFILE join(" ", @cmd), "\n";
close SHFILE;
xlog("debug", "Wrote $shfile\n") if $opts->{trace};
if (!$err) {
xlog("debug", "Setting uid to $uid, gid to $gid.\n") if $opts->{trace};
eval {
if ($gid) {
$) = $gid;
$( = $gid;
}
$> = $uid;
$< = $uid;
if ($opts->{cwd}) {
if (!chdir($opts->{cwd})) {
$err = "Can't cd to $opts->{cwd} : $!";
$ret = 103;
}
}
};
}
if (!$err && $@) {
$ret = 102;
$err = "Error setting uid to $uid: $@\n";
}
xlog("debug", "About to launch (@cmd)\n") if $opts->{trace};
if (!$err) {
my $confarg = "-C $conf{config}"; # same config!
# this frees all ram... yay, finally
exec("$GRUN_PATH $confarg -Y $bfile");
$err = "Can't exec: $!";
$pid=-1;
} else {
eval {
# save output
# immediate reply if possible
xlog("debug", "Error before launch: $ret \"$err\" (@cmd)\n") if $opts->{trace};
my $out = {
id=>$opts->{id},
status=>$ret,
error=>$err,
dumped=>1,
};
print XOUT packfile($out);
close XOUT;
$context=zmq_init();
send_status_for_job($out);
printf STDERR "$err\n";
};
if ($@) {
xlog("error", "Error reporting error : $@\n");
}
exit $ret;
}
}
}
# fake pid
if ($err) {
$ret = $STATUS_EXECERR if !$ret;
$pid = 'e' . $opts->{id};
xlog("note", "Error '$err' with job $opts->{id}, pid '$pid'");
my $out = {
id=>$opts->{id},
status=>$ret,
error=>$err,
dumped=>1,
};
print XOUT packfile($out);
close XOUT;
xlog("debug", "FILE: $tfile");
notifystat($out, 1, 1);
# special case... exec failed
exit 1 if $pid == -1;
} else {
xlog("note", "Started job $opts->{id}, pid $pid, '@cmd'") if !$err;
close XOUT;
# record pid for wait
$pid_jobs{$pid}->{jid}=$opts->{id};
$pid_jobs{$pid}->{time}=time();
}
$opts->{pid} = $pid;
$opts->{err} = $err;
print PF packfile($opts);
close PF;
# ok we've officially either a) started this job or b) notified of failure at this point....
sendcmd_nowait($conf{master}, $conf{port}, 'jexok', { map { $_ => $opts->{$_} } qw/id pid uid gid err/ });
}
sub send_status_for_job {
my ($stat, %op) = @_;
# this could be in a fork... be safe
my $sock = zmq_socket($context, ZMQ_DEALER);
xlog("note", "Sending status for $stat->{id} as $stat->{status}\n") if $conf{trace};
zmq_connect($sock,"tcp://$conf{bind}:$conf{port}");
zmq_send($sock, "", 0, ZMQ_SNDMORE);
zmq_send($sock, packcmd("sstat", $stat));
zmq_close($sock);
zmq_term($context);
}
sub jid_from_opts {
my ($job) =@_;
return $job->{jid} ? $job->{jid} : jid_from_guid($job->{guid});
}
sub jid_from_guid {
my ($guid) = @_;
if (-s "$conf{spool}/guids/$guid") {
return slurp("$conf{spool}/guids/$guid");
}
}
sub create_guid {
substr(Data::UUID->new()->create_hex(),2)
}
sub exec_clean {
my ($jid) = @_;
if (!-e "$conf{spool}/jpids/$jid") {
xlog("debug", "Clean $jid which is already gone");
} else {
xlog("debug", "Cleaning $jid") if $conf{trace};
# stop tracking this pid, head node got all the info
my $job = unpack_file("$conf{spool}/jpids/$jid");
# nobody asked for the job info, so don't keep it around after exec node knows about it
unlink("$conf{spool}/jpids/$jid");
unlink("$conf{spool}/jstat/$jid.stat");
unlink("$conf{spool}/jstat/$jid.held");
unlink("$conf{spool}/jstat/$jid.stat-err");
unlink("$conf{spool}/jstat/$jid.sh");
unlink("$conf{spool}/jstat/$jid.job");
unlink("$conf{spool}/jstat/$jid.dack");
unlink("$conf{spool}/jstat/$jid.dumped");
}
}
sub stream_sigh {
xlog("debug", "stream $$ SIG $_[0]");
# set the quit flag
$stream_quit = 1;
close STDIN;
close STDOUT;
close STDERR;
# don't exit... you still need to send the "quit" signal
# exit(-1);
}
sub do_stream {
# stream results back to execution top
$SIG{INT} = \&stream_sigh;
$SIG{TERM} = \&stream_sigh;
$SIG{PIPE} = \&stream_sigh;
# shift off the -X
shift @ARGV;
my $termio;
# line by line?
if ($ARGV[0] eq '-I') {
$termio=1;
shift @ARGV;
}
my $connect = shift @ARGV;
my $key = shift @ARGV;
my $data = shift @ARGV;
my $sock = zmq_socket($context, ZMQ_DEALER);
zmq_setsockopt($sock, ZMQ_LINGER, 1000);
zmq_setsockopt($sock, ZMQ_HWM, 100);
# zmq_setsockopt($sock, ZMQ_RCVTIMEO, 1000);
xlog("debug", "stream $$ $connect $key");
zmq_connect($sock,"tcp://$connect");
xlog("debug", "stream sready $$ $connect $key");
zmq_send($sock, "", 0, ZMQ_SNDMORE);
zmq_send($sock, packcmd('sready', $key));
my $ready=0;
my $time=time();
my $wait = 5;
while (!$stream_quit && !$ready) {
my $got = 0;
zmq_poll([{
socket=>$sock, events=>ZMQ_POLLIN, callback=> sub {
my $ignore = zmq_recvmsg($sock);
my $msg = zmq_recvmsg($sock);
my $data = zmq_msg_data($msg);
$ready = $data=~/ready|quit/;
$stream_quit = $data=~/quit/;
$got = 1;
}}],1000);
if (!$stream_quit && !$got && (time() > ($time+$wait))) {
# ask again ... are you ready for the stream
zmq_send($sock, "", 0, ZMQ_SNDMORE);
zmq_send($sock, packcmd('sready', $key));
$time=time();
$wait += 5;
if ($wait > 3600) {
xlog("debug", "stream abandon $$ $key\n");
exit(0);
}
}
}
xlog("debug", "stream response $$ $key $data\n");
my $sent = 1;
while(!$stream_quit) {
if ($sent) {
if ($termio) {
# line by line
$_=<>;
} else {
# block by block
read STDIN, $_, 4096;
}
if ($_ eq "") {
$stream_quit = 1 ;
last;
}
$sent=0;
}
zmq_poll([{
socket=>$sock, events=>ZMQ_POLLOUT, callback=> sub {
my $ret;
if ($ret=zmq_send($sock, "", 0, ZMQ_SNDMORE)) {
xlog("debug", "stream error $ret $$ $key : $?/$!\n");
} else {
if (($ret=zmq_send($sock, packcmd('stream', $key, $_))) && $? ) {
xlog("debug", "stream error $ret $$ $key : $?/$!\n");
} else {
# ok, that chunk of data went out
$sent=1;
}
}
}}]);
}
# let daddy know we're done
zmq_send($sock, "", 0, ZMQ_SNDMORE);
zmq_send($sock, packcmd("stream", "$key:end"));
zmq_close($sock);
zmq_term($context);
xlog("debug", "stream exit $$ $key\n");
exit(0);
}
sub debugging {
return $conf{log_types}->{"debug"}
}
sub getjobstathash {
my ($jid)=@_;
if ( -s "$conf{spool}/jstat/$jid.stat" ) {
my $stat = unpack_file("$conf{spool}/jstat/$jid.stat");
if ( $stat && -e "$conf{spool}/jstat/$jid.dumped" ) {
$stat->{dumped}=slurp("$conf{spool}/jstat/$jid.dumped");
}
return $stat;
}
if ( -s "$conf{spool}/jstat/$jid.stat-err" ) {
my $stat = unpack_file("$conf{spool}/jstat/$jid.stat-err");
if ( $stat && -e "$conf{spool}/jstat/$jid.dumped" ) {
$stat->{dumped}=slurp("$conf{spool}/jstat/$jid.dumped");
}
return $stat;
}
return undef;
}
sub forkandgo {
my ($zid, $sub, @args) = @_;
print("FORKING\n");
if (!(my $pid=fork)) {
my $out="";
my $err="";
if (! defined $pid) {
$err = "Error: Can't fork";
sendcmd($conf{bind}, $conf{port}, 'frep', {zid=>unpack("h*",$zid), out=>$err});
} else {
eval {
zmq_fork_undef();
$daemon=0;
$context=zmq_init();
$0="GRUN:fork-handler";
# no accidental messages go out on this context
$out=eval{&$sub(@args)};
$err=$@;
$out = $err if $err && !$out;
while(length($out)>100000) {
sendcmd($conf{bind}, $conf{port}, 'frep', {zid=>unpack("h*",$zid), out=>substr($out,0,100000,""), more=>1});
}
sendcmd($conf{bind}, $conf{port}, 'frep', {zid=>unpack("h*",$zid), out=>$out});
};
exit(0);
}
}
}
sub archive_job {
my ($jid, $job, $status, $ip, $usage) = @_;
# double-check
carp "Need a job ref" unless ref($job);
carp "Need a usage ref" unless !defined($usage) || ref($usage);
$status+=0;
if ($usage) {
delete $usage->{ip};
delete $usage->{id};
$usage->{status} = $status if defined $status;
$job->{usage} = $usage;
} else {
$job->{usage}->{status} = $status if defined($status);
}
$job->{status} = $status;
if ($ip) {
$job->{host} = $nodes{$ip} && $nodes{$ip}->{hostname} ? $nodes{$ip}->{hostname} : $ip;
$job->{hostip} = $ip;
}
for (keys(%{$j_wait{$jid}})) {
replymsg($_, $job);
}
delete $j_wait{$jid};
my $jhistfile=jhistpath($jid);
xlog("debug", "Writing history for $jid to $jhistfile");
open(ST, ">$jhistfile");
print ST packfile($job);
close ST;
xlog("debug", "Unlinking $conf{spool}/jobs/$jid:$ip.run, $conf{spool}/jobs/$jid.ip, $conf{spool}/jobs/$jid");
if (!$ip) {
$ip = slurp("$conf{spool}/jobs/$jid.ip");
}
unlink("$conf{spool}/jobs/$jid:$ip.run");
unlink("$conf{spool}/jobs/$jid.ip");
unlink("$conf{spool}/jobs/$jid.ok");
unlink("$conf{spool}/jobs/$jid");
if ($job->{guid}) {
# can't query by guid anymore, except maybe in a history database
unlink("$conf{spool}/guids/$job->{guid}");
}
if ($start_wait{$jid}) {
# info needed for status/stdio collection from execution node
replymsg($start_wait{$jid}->{zid},jid=>$jid, status=>$status, error=>$job->{error}, hostname=>$job->{host}, ip=>$job->{host}?$job->{host}:"n/a");
delete $start_wait{$jid};
}
}
my $coder;
sub pretty_encode {
$coder = JSON::XS->new->ascii->pretty->canonical->allow_blessed unless defined $coder;
if (@_ > 1 || !ref(@_[0])) {
$coder->encode([@_]);
} else {
$coder->encode(@_[0]);
}
}
sub showmem {
require PadWalker;
my $o = PadWalker::peek_our(0);
my $h = PadWalker::peek_my(1);
for (keys(%$o)) {
$h->{$_}=$o->{$_};
}
for (keys(%$h)) {
$h->{$_}=${$h->{$_}} if ref($h->{$_}) eq 'SCALAR';
$h->{$_}=${$h->{$_}} if ref($h->{$_}) eq 'REF';
}
return pretty_encode($h);
}
sub do_execute {
# shift off the -Y
shift @ARGV;
my ($bfile) = @ARGV;
my $opts = unpack_file("$bfile.job");
my $uid = $opts->{uid};
my $code = -1;
my $start = Time::HiRes::time();
my $shfile = "$bfile.sh";
my @cmd = ("bash", $shfile);
open(OLDERR, ">&STDERR");
open(OLDOUT, ">&STDOUT");
close(STDERR);
close(STDOUT);
my ($out_pid, $err_pid);
$SIG{INT} = $SIG{TERM} = sub {
kill 2, $out_pid;
kill 2, $err_pid;
};
my ($err, $ret);
my $ok=1;
eval {
if ($opts->{out}) {
if ($opts->{out_a}) {
$ok&&=open(STDOUT, ">>$opts->{out}");
} else {
$ok&&=open(STDOUT, ">$opts->{out}");
}
if ($opts->{out} eq $opts->{err}) {
$ok&&=open(STDERR, ">&STDOUT");
}
}
if ($opts->{err} && !(($opts->{out} eq $opts->{err}))) {
if ($opts->{err_a}) {
$ok&&=open(STDERR, ">>$opts->{err}");
} else {
$ok&&=open(STDERR, ">$opts->{err}");
}
}
if ($opts->{io}) {
my $confarg = "-C $conf{config}"; # same config!
my $streamarg = "-X"; # grun streamer
$streamarg .= " -I" if $opts->{int}; # interactive mode
if (!$opts->{out}) {
my $cmd="/usr/bin/perl $GRUN_PATH $confarg $streamarg $conf{bind}:$conf{port} $opts->{id}:out";
$out_pid=open(STDOUT, "|$cmd");
$ok&&=$out_pid;
}
if (!$opts->{err}) {
my $cmd="/usr/bin/perl $GRUN_PATH $confarg $streamarg $conf{bind}:$conf{port} $opts->{id}:err";
$err_pid=open(STDERR, "|$cmd");
$ok&&=$err_pid;
}
} else {
# save disk and time, i never want i/o
if (!$opts->{err}) {
$ok&&=open(STDERR, ">/dev/null");
}
if (!$opts->{out}) {
$ok&&=open(STDOUT, ">/dev/null");
}
}
};
if ($@ || !$ok) {
close(STDERR);
close(STDOUT);
open(STDERR, ">&OLDERR");
open(STDOUT, ">&OLDOUT");
if ($@) {
$err=$@;
$ret=109;
} else {
$err="Error opening i/o files: $!";
$ret=109;
}
xlog("error", "$ret: $err\n");
}
my $elapsed=0;
# deal with nfs sync, if needed
syncdirs(@{$opts->{syncdirs}}) if $opts->{syncdirs};
if (!$err) {
eval {
$code = system(@cmd);
$0="GRUN:$opts->{id}:$code";
};
$elapsed = Time::HiRes::time() - $start;
} else {
$code = $ret ? $ret : -1;
}
open(SAVERR, ">&STDERR");
open(SAVOUT, ">&STDOUT");
open(STDERR, ">&OLDERR");
open(STDOUT, ">&OLDOUT");
xlog("debug", "Job $opts->{id} original exit code is $code");
$code = ($code == -1) ? $code : ($code & 127) ? $code & 127 : ($code >> 8);
xlog("debug", "Done running job $opts->{id}, code $code as user $uid") if $opts->{trace};
my ($utime, $stime,
$maxrss, $ixrss, $idrss, $isrss, $minflt, $majflt, $nswap,
$inblock, $oublock, $msgsnd, $msgrcv,
$nsignals, $nvcsw, $nivcsw);
eval {
($utime, $stime,
$maxrss, $ixrss, $idrss, $isrss, $minflt, $majflt, $nswap,
$inblock, $oublock, $msgsnd, $msgrcv,
$nsignals, $nvcsw, $nivcsw) = getrusage(RUSAGE_CHILDREN);
};
xlog("debug", "Really done with job $opts->{id} (@cmd = $@)\n") if $opts->{trace};
# $msgsnd, $msgrcv, $nsignals not used in Linux....
my $out = {
id=>$opts->{id},
status=>$code,
start_time=>$start,
utime=>$utime,
stime=>$stime,
rtime=>$elapsed,
pid=>$$,
maxrss=>$maxrss,
minflt=>$minflt,
majflt=>$majflt,
nswap=>$nswap,
inblock=>$inblock,
oublock=>$oublock,
nvcsw=>$nvcsw,
nivcsw=>$nivcsw
};
$out->{error} = $err if $code && $err;
xlog("debug", "Writing output job $opts->{id}\n") if $opts->{trace};
print XOUT packfile($out);
close XOUT;
xlog("debug", "Job $opts->{id} exit code $code\n") if $opts->{trace};
send_status_for_job($out);
exit $code;
# special exit code 101 means couldn't run the command
close STDOUT;
close STDERR;
exit(0);
}