Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
speed up async module execution by removing some sleeps
Browse files Browse the repository at this point in the history
also, handle special case where status of waitpid under
FreeBSD is unreliable
  • Loading branch information
caseylucas committed Aug 23, 2016
1 parent e5c530a commit e1f42a4
Showing 1 changed file with 57 additions and 23 deletions.
80 changes: 57 additions & 23 deletions utilities/logic/async_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def daemonize_self():
os.dup2(dev_null.fileno(), sys.stderr.fileno())


def _run_module(wrapped_cmd, jid, job_path):
def _run_module(wrapped_cmd, jid, job_path, write_fd):

tmp_job_path = job_path + ".tmp"
jobfile = open(tmp_job_path, "w")
Expand All @@ -84,6 +84,9 @@ def _run_module(wrapped_cmd, jid, job_path):
try:
cmd = shlex.split(wrapped_cmd)
script = subprocess.Popen(cmd, shell=False, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Signal parent process that we have started, then close write side of pipe.
os.write(write_fd, "OK")
os.close(write_fd)
(outdata, stderr) = script.communicate()
result = json.loads(outdata)
if stderr:
Expand Down Expand Up @@ -129,11 +132,6 @@ def _run_module(wrapped_cmd, jid, job_path):
jid = "%s.%d" % (sys.argv[1], os.getpid())
time_limit = sys.argv[2]
wrapped_module = sys.argv[3]
if len(sys.argv) >= 5:
argsfile = sys.argv[4]
cmd = "%s %s" % (wrapped_module, argsfile)
else:
cmd = wrapped_module
step = 5

# setup job output directory
Expand All @@ -148,6 +146,27 @@ def _run_module(wrapped_cmd, jid, job_path):
"failed" : 1,
"msg" : "could not create: %s" % jobdir
}))

# Save the module and possible arguments so that they are available while
# the forked child process is running the module. Existing (non-async) code will attempt
# to clean up the files if we don't rename them. An alternative would be to make a copy
# of them but rename should be quicker.
saved_wrapped_module = job_path + ".module"
os.rename(wrapped_module, saved_wrapped_module)

if len(sys.argv) >= 5:
argsfile = sys.argv[4]
saved_argsfile = job_path + ".arguments"
os.rename(argsfile, saved_argsfile)
cmd = "%s %s" % (saved_wrapped_module, saved_argsfile)
else:
saved_argsfile = None
cmd = saved_wrapped_module

# Setup a pipe for parent/child synchronization so that child can let parent
# know when it (child) has started and the parent can continue.
read_fd, write_fd = os.pipe()

# immediately exit this process, leaving an orphaned process
# running which immediately forks a supervisory timing process

Expand All @@ -156,18 +175,22 @@ def _run_module(wrapped_cmd, jid, job_path):
if pid:
# Notify the overlord that the async process started

# we need to not return immmediately such that the launched command has an attempt
# to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile)
# this probably could be done with some IPC later. Modules should always read
# the argsfile at the very first start of their execution anyway
# Close write side of pipe - no longer needed in parent.
os.close(write_fd)
notice("Return async_wrapper task started.")
print(json.dumps({ "started" : 1, "finished" : 0, "ansible_job_id" : jid, "results_file" : job_path }))
sys.stdout.flush()
time.sleep(1)
# Read some dummy bytes ("OK") from child process. Child process writes these once it has
# started. Once child has started we can reliably continue (allowing attempted module and
# arguments file removal).
dummy_bytes_read = os.read(read_fd, 3)
sys.exit(0)
else:
# The actual wrapper process

# Close read side of pipe - no longer needed in child process.
os.close(read_fd)

# Daemonize, so we keep on running
daemonize_self()

Expand All @@ -176,30 +199,41 @@ def _run_module(wrapped_cmd, jid, job_path):

sub_pid = os.fork()
if sub_pid:
# Watchdog does not need write side of pipe.
os.close(write_fd)
# the parent stops the process after the time limit
remaining = int(time_limit)

# set the child process group id to kill all children
os.setpgid(sub_pid, sub_pid)

notice("Start watching %s (%s)"%(sub_pid, remaining))
time.sleep(step)
while os.waitpid(sub_pid, os.WNOHANG) == (0, 0):
notice("%s still running (%s)"%(sub_pid, remaining))
time.sleep(step)
remaining = remaining - step
if remaining <= 0:
notice("Now killing %s"%(sub_pid))
os.killpg(sub_pid, signal.SIGKILL)
notice("Sent kill to group %s"%sub_pid)
time.sleep(1)
sys.exit(0)
while True:
# On FreeBSD (shippable automated test server), waitpid can sometimes
# return (0,-512) if waitpid is called quickly from parent. So, only check
# the process id (waitpid_result[0]) for 0 value. Do not rely on
# waitpid_result[1] here.
waitpid_result = os.waitpid(sub_pid, os.WNOHANG)
if remaining > 0 and waitpid_result[0] == 0:
notice("%s still running (%s)"%(sub_pid, remaining))
# could probably use higher resolution than 5 seconds here
time.sleep(step)
remaining = remaining - step
else:
break
if remaining <= 0 and waitpid_result[0] == 0:
notice("Now killing %s"%(sub_pid))
os.killpg(sub_pid, signal.SIGKILL)
notice("Sent kill to group %s"%sub_pid)
os.remove(saved_wrapped_module)
if saved_argsfile is not None:
os.remove(saved_argsfile)
notice("Done in kid B.")
sys.exit(0)
else:
# the child process runs the actual module
notice("Start module (%s)"%os.getpid())
_run_module(cmd, jid, job_path)
_run_module(cmd, jid, job_path, write_fd)
notice("Module complete (%s)"%os.getpid())
sys.exit(0)

Expand Down

0 comments on commit e1f42a4

Please sign in to comment.