Skip to content

Commit

Permalink
Change process reporting text display
Browse files Browse the repository at this point in the history
  • Loading branch information
nsheff committed May 13, 2019
1 parent 6681d6d commit ea914f3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 25 deletions.
72 changes: 49 additions & 23 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ def __init__(
self.last_timestamp = self.starttime # time of the last call to timestamp()

self.locks = []
self.procs = {}
self.running_procs = {}
self.completed_procs = {}

self.wait = True # turn off for debugging

Expand Down Expand Up @@ -918,6 +919,7 @@ def make_hash(o):
# stop_times = []
processes = []
running_processes = []
completed_processes = []
start_time = time.time()
for i in range(len(param_list)):
running_processes.append(i)
Expand All @@ -927,7 +929,7 @@ def make_hash(o):
else:
param_list[i]["stdin"] = processes[i - 1].stdout
processes.append(psutil.Popen(preexec_fn=os.setpgrp, **param_list[i]))
self.procs[processes[-1].pid] = {
self.running_procs[processes[-1].pid] = {
"proc_name": get_proc_name(param_list[i]["args"]),
"start_time": start_time,
"container": container,
Expand All @@ -943,6 +945,7 @@ def make_hash(o):

local_maxmems = [-1] * len(running_processes)
returncodes = [None] * len(running_processes)
proc_wrapup_text = [None] * len(running_processes)

if not self.wait:
print("</pre>")
Expand All @@ -958,33 +961,38 @@ def proc_wrapup(i):
returncode = processes[i].returncode
current_pid = processes[i].pid

info = "{pid}: {ret} ({mem}); ".format(
info = "PID: {pid};\tCommand: {cmd};\tReturn code: {ret};\tMemory used: {mem}".format(
pid=current_pid,
cmd=self.running_procs[current_pid]["proc_name"],
ret=processes[i].returncode,
mem=display_memory(local_maxmems[i]))

# report process profile
self._report_profile(self.procs[current_pid]["proc_name"], lock_file,
time.time() - self.procs[current_pid]["start_time"], local_maxmems[i],
current_pid, self.procs[current_pid]["args_hash"],
self.procs[current_pid]["local_proc_id"])
self._report_profile(self.running_procs[current_pid]["proc_name"], lock_file,
time.time() - self.running_procs[current_pid]["start_time"], local_maxmems[i],
current_pid, self.running_procs[current_pid]["args_hash"],
self.running_procs[current_pid]["local_proc_id"])

# Remove this as a running subprocess
del self.procs[current_pid]
self.running_procs[current_pid]["info"] = info
self.running_procs[current_pid]["returncode"] = returncode
self.completed_procs[current_pid] = self.running_procs[current_pid]
del self.running_procs[current_pid]
running_processes.remove(i)

completed_processes.append(i)
proc_wrapup_text[i] = info
returncodes[i] = returncode
return info


sleeptime = .0001
info = "Process returns and memory:"

while running_processes:
for i in running_processes:
local_maxmems[i] = max(local_maxmems[i], (get_mem_child_sum(processes[i])))
self.peak_memory = max(self.peak_memory, local_maxmems[i])
if not self._attend_process(processes[i], sleeptime):
info += proc_wrapup(i)
proc_wrapup_text[i] = proc_wrapup(i)

# the sleeptime is extremely short at the beginning and gets longer exponentially
# (+ constant to prevent copious checks at the very beginning)
Expand All @@ -993,11 +1001,17 @@ def proc_wrapup(i):

# All jobs are done, print a final closing and job info
stop_time = time.time()
info += " Elapsed: " + str(datetime.timedelta(seconds=self.time_elapsed(start_time))) + "."
info += " Peak memory: {pipe}.".format(pipe=display_memory(self.peak_memory))

proc_message = "Command completed. {info}"
info = "Elapsed time: " + str(datetime.timedelta(seconds=self.time_elapsed(start_time))) + "."
info += " Running peak memory: {pipe}.".format(pipe=display_memory(self.peak_memory))
# if len(proc_wrapup_text) == 1:
# info += " {}".format(proc_wrapup_text[0])

for i in completed_processes:
info += "\n {}".format(self.completed_procs[processes[i].pid]["info"])

print("</pre>")
print(info)
print(proc_message.format(info=info))

for rc in returncodes:
if rc != 0:
Expand Down Expand Up @@ -1031,7 +1045,7 @@ def _wait_for_process(self, p, shell=False):

self.peak_memory = max(self.peak_memory, local_maxmem)

del self.procs[p.pid]
del self.running_procs[p.pid]

info = "Process " + str(p.pid) + " returned: (" + str(p.returncode) + ")."
if not shell:
Expand All @@ -1052,25 +1066,37 @@ def _wait_for_lock(self, lock_file):
"""
sleeptime = .5
first_message_flag = False
long_message_flag = False
dot_count = 0
totaltime = 0
recover_file = self._recoverfile_from_lockfile(lock_file)
while os.path.isfile(lock_file):
if first_message_flag is False:
self.timestamp("Waiting for file lock: " + lock_file)
print("This indicates that another process may be executing this "
"command, or the pipeline was not properly shut down. If the "
"pipeline was not properly shut down last time, "
"you should restart it in 'recover' mode (-R) to indicate that "
"this step should be restarted.")
self._set_status_flag(WAIT_FLAG)
first_message_flag = True
else:
sys.stdout.write(".")
dot_count = dot_count + 1
if dot_count % 60 == 0:
print("") # linefeed
# prevents the issue of pypier waiting for the lock file to be gone infinitely
# prevents the issue of pypiper waiting for the lock file to be gone infinitely
# in case the recovery flag is sticked by other pipeline when it's interrupted
if os.path.isfile(recover_file):
sys.stdout.write(" Dynamic recovery flag found")
break
time.sleep(sleeptime)
totaltime += sleeptime
sleeptime = min(sleeptime + 2.5, 60)
if sleeptime > 3600 and not long_message_flag:
long_message_flag = True



if first_message_flag:
self.timestamp("File unlocked.")
Expand Down Expand Up @@ -1709,17 +1735,17 @@ def _exit_handler(self):
def _terminate_running_subprocesses(self):

# make a copy of the list to iterate over since we'll be removing items
for pid in self.procs.copy():
proc_dict = self.procs[pid]
for pid in self.running_procs.copy():
proc_dict = self.running_procs[pid]

# Close the preformat tag that we opened when the process was spawned.
# record profile of any running processes before killing
elapsed_time = time.time() - self.procs[pid]["start_time"]
elapsed_time = time.time() - self.running_procs[pid]["start_time"]
process_peak_mem = self._memory_usage(pid, container=proc_dict["container"])/1e6
self._report_profile(self.procs[pid]["proc_name"], None, elapsed_time, process_peak_mem, pid,
self.procs[pid]["args_hash"], self.procs[pid]["local_proc_id"])
self._report_profile(self.running_procs[pid]["proc_name"], None, elapsed_time, process_peak_mem, pid,
self.running_procs[pid]["args_hash"], self.running_procs[pid]["local_proc_id"])
self._kill_child_process(pid, proc_dict["proc_name"])
del self.procs[pid]
del self.running_procs[pid]


def _kill_child_process(self, child_pid, proc_name=None):
Expand Down
4 changes: 2 additions & 2 deletions tests/pipeline_manager/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ def test_me(self):
self.assertTrue(self.pp.time_elapsed(stamp) > 1)

print("Wait for subprocess...")
for p in self.pp.procs.copy():
self.pp._wait_for_process(self.pp.procs[p]["p"])
for p in self.pp.running_procs.copy():
self.pp._wait_for_process(self.pp.running_procs[p]["p"])
self.pp2.wait = True
self.pp.wait = True

Expand Down

0 comments on commit ea914f3

Please sign in to comment.