diff --git a/utilities/logic/async_wrapper.py b/utilities/logic/async_wrapper.py index 3a9b44d4cd0..b7cd19aa18e 100644 --- a/utilities/logic/async_wrapper.py +++ b/utilities/logic/async_wrapper.py @@ -70,7 +70,7 @@ def daemonize_self(): os.dup2(dev_null.fileno(), sys.stderr.fileno()) -def _run_module(wrapped_cmd, jid, job_path): +def _run_module(wrapped_cmd, jid, job_path, write_fd): tmp_job_path = job_path + ".tmp" jobfile = open(tmp_job_path, "w") @@ -84,6 +84,9 @@ def _run_module(wrapped_cmd, jid, job_path): try: cmd = shlex.split(wrapped_cmd) script = subprocess.Popen(cmd, shell=False, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # Signal parent process that we have started, then close write side of pipe. + os.write(write_fd, "OK") + os.close(write_fd) (outdata, stderr) = script.communicate() result = json.loads(outdata) if stderr: @@ -129,11 +132,6 @@ def _run_module(wrapped_cmd, jid, job_path): jid = "%s.%d" % (sys.argv[1], os.getpid()) time_limit = sys.argv[2] wrapped_module = sys.argv[3] - if len(sys.argv) >= 5: - argsfile = sys.argv[4] - cmd = "%s %s" % (wrapped_module, argsfile) - else: - cmd = wrapped_module step = 5 # setup job output directory @@ -148,6 +146,27 @@ def _run_module(wrapped_cmd, jid, job_path): "failed" : 1, "msg" : "could not create: %s" % jobdir })) + + # Save the module and possible arguments so that they are available while + # the forked child process is running the module. Existing (non-async) code will attempt + # to clean up the files if we don't rename them. An alternative would be to make a copy + # of them but rename should be quicker. + saved_wrapped_module = job_path + ".module" + os.rename(wrapped_module, saved_wrapped_module) + + if len(sys.argv) >= 5: + argsfile = sys.argv[4] + saved_argsfile = job_path + ".arguments" + os.rename(argsfile, saved_argsfile) + cmd = "%s %s" % (saved_wrapped_module, saved_argsfile) + else: + saved_argsfile = None + cmd = saved_wrapped_module + + # Setup a pipe for parent/child synchronization so that child can let parent + # know when it (child) has started and the parent can continue. + read_fd, write_fd = os.pipe() + # immediately exit this process, leaving an orphaned process # running which immediately forks a supervisory timing process @@ -156,18 +175,22 @@ def _run_module(wrapped_cmd, jid, job_path): if pid: # Notify the overlord that the async process started - # we need to not return immmediately such that the launched command has an attempt - # to initialize PRIOR to ansible trying to clean up the launch directory (and argsfile) - # this probably could be done with some IPC later. Modules should always read - # the argsfile at the very first start of their execution anyway + # Close write side of pipe - no longer needed in parent. + os.close(write_fd) notice("Return async_wrapper task started.") print(json.dumps({ "started" : 1, "finished" : 0, "ansible_job_id" : jid, "results_file" : job_path })) sys.stdout.flush() - time.sleep(1) + # Read some dummy bytes ("OK") from child process. Child process writes these once it has + # started. Once child has started we can reliably continue (allowing attempted module and + # arguments file removal). + dummy_bytes_read = os.read(read_fd, 3) sys.exit(0) else: # The actual wrapper process + # Close read side of pipe - no longer needed in child process. + os.close(read_fd) + # Daemonize, so we keep on running daemonize_self() @@ -176,6 +199,8 @@ def _run_module(wrapped_cmd, jid, job_path): sub_pid = os.fork() if sub_pid: + # Watchdog does not need write side of pipe. + os.close(write_fd) # the parent stops the process after the time limit remaining = int(time_limit) @@ -183,23 +208,32 @@ def _run_module(wrapped_cmd, jid, job_path): os.setpgid(sub_pid, sub_pid) notice("Start watching %s (%s)"%(sub_pid, remaining)) - time.sleep(step) - while os.waitpid(sub_pid, os.WNOHANG) == (0, 0): - notice("%s still running (%s)"%(sub_pid, remaining)) - time.sleep(step) - remaining = remaining - step - if remaining <= 0: - notice("Now killing %s"%(sub_pid)) - os.killpg(sub_pid, signal.SIGKILL) - notice("Sent kill to group %s"%sub_pid) - time.sleep(1) - sys.exit(0) + while True: + # On FreeBSD (shippable automated test server), waitpid can sometimes + # return (0,-512) if waitpid is called quickly from parent. So, only check + # the process id (waitpid_result[0]) for 0 value. Do not rely on + # waitpid_result[1] here. + waitpid_result = os.waitpid(sub_pid, os.WNOHANG) + if remaining > 0 and waitpid_result[0] == 0: + notice("%s still running (%s)"%(sub_pid, remaining)) + # could probably use higher resolution than 5 seconds here + time.sleep(step) + remaining = remaining - step + else: + break + if remaining <= 0 and waitpid_result[0] == 0: + notice("Now killing %s"%(sub_pid)) + os.killpg(sub_pid, signal.SIGKILL) + notice("Sent kill to group %s"%sub_pid) + os.remove(saved_wrapped_module) + if saved_argsfile is not None: + os.remove(saved_argsfile) notice("Done in kid B.") sys.exit(0) else: # the child process runs the actual module notice("Start module (%s)"%os.getpid()) - _run_module(cmd, jid, job_path) + _run_module(cmd, jid, job_path, write_fd) notice("Module complete (%s)"%os.getpid()) sys.exit(0)