Skip to content

Commit

Permalink
profile changes:
Browse files Browse the repository at this point in the history
- add unique key to each entry in the profile (hashed Popen args), #142
- add process number, #144
  • Loading branch information
stolarczyk committed May 9, 2019
1 parent a1fdec1 commit 9e39ace
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
35 changes: 28 additions & 7 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from collections.abc import Iterable

from attmap import AttMapEcho
from hashlib import md5
from .exceptions import PipelineHalt, SubprocessError
from .flags import *
from .utils import \
Expand Down Expand Up @@ -872,6 +873,18 @@ def make_dict(command):
a, s = (command, True) if check_shell(command, shell) else (shlex.split(command), False)
return dict(args=a, stdout=subprocess.PIPE, shell=s)

def make_hash(o):
"""
Convert the object to string and hash it, return None in case of failure
:param o: object of any type, in our case it is a dict
:return str: hahsed string representation of the dict
"""
try:
hsh = md5(str(o).encode("utf-8")).hexdigest()[:10]
except:
hsh = None
return hsh

if container:
cmd = "docker exec " + container + " " + cmd

Expand All @@ -896,7 +909,8 @@ def make_dict(command):
"proc_name": get_proc_name(param_list[i]["args"]),
"start_time": start_time,
"container": container,
"p": processes[-1]
"p": processes[-1],
"args_hash": make_hash(param_list[i])
}

self._report_command(cmd, [x.pid for x in processes])
Expand Down Expand Up @@ -926,7 +940,9 @@ def proc_wrapup(i):
mem=display_memory(local_maxmems[i]))

# report process profile
self._report_profile(self.procs[current_pid]["proc_name"], lock_file, time.time() - self.procs[current_pid]["start_time"], local_maxmems[i])
self._report_profile(self.procs[current_pid]["proc_name"], lock_file,
time.time() - self.procs[current_pid]["start_time"], local_maxmems[i],
current_pid, self.procs[current_pid]["args_hash"])

# Remove this as a running subprocess
del self.procs[current_pid]
Expand Down Expand Up @@ -1120,12 +1136,14 @@ def time_elapsed(self, time_since):
return round(time.time() - time_since, 0)


def _report_profile(self, command, lock_name, elapsed_time, memory):
def _report_profile(self, command, lock_name, elapsed_time, memory, pid, args_hash):
"""
Writes a string to self.pipeline_profile_file.
"""
rel_lock_name = lock_name if lock_name is None else os.path.relpath(lock_name, self.outfolder)
message_raw = str(command) + "\t" + \
message_raw = str(pid) + "\t" + \
str(args_hash) + "\t" + \
str(command) + "\t" + \
str(datetime.timedelta(seconds = round(elapsed_time, 2))) + "\t " + \
str(memory) + "\t" + \
str(rel_lock_name)
Expand Down Expand Up @@ -1584,7 +1602,8 @@ def stop_pipeline(self, status=COMPLETE_FLAG):
self.report_result("Time", str(datetime.timedelta(seconds=self.time_elapsed(self.starttime))))
self.report_result("Success", time.strftime("%m-%d-%H:%M:%S"))
print("\n##### [Epilogue:]")
print("* " + "Total elapsed time".rjust(20) + ": " + str(datetime.timedelta(seconds=self.time_elapsed(self.starttime))))
print("* " + "Total elapsed time".rjust(20) + ": "
+ str(datetime.timedelta(seconds=self.time_elapsed(self.starttime))))
# print("Peak memory used: " + str(memory_usage()["peak"]) + "kb")
print("* " + "Peak memory used".rjust(20) + ": " + str(round(self.peak_memory, 2)) + " GB")
if self.halted:
Expand Down Expand Up @@ -1671,7 +1690,8 @@ def _terminate_running_subprocesses(self):
# record profile of any running processes before killing
elapsed_time = time.time() - self.procs[pid]["start_time"]
process_peak_mem = self._memory_usage(pid, container=proc_dict["container"])/1e6
self._report_profile(self.procs[pid]["proc_name"], None, elapsed_time, process_peak_mem)
self._report_profile(self.procs[pid]["proc_name"], None, elapsed_time, process_peak_mem, pid,
self.procs[pid]["args_hash"])
self._kill_child_process(pid, proc_dict["proc_name"])
del self.procs[pid]

Expand Down Expand Up @@ -1892,7 +1912,8 @@ def _cleanup(self, dry_run=False):
pass
else:
print("\nConditional flag found: " + str([os.path.basename(i) for i in flag_files]))
print("\nThese conditional files were left in place:\n\n- " + "\n- ".join(self.cleanup_list_conditional))
print("\nThese conditional files were left in place:\n\n- " +
"\n- ".join(self.cleanup_list_conditional))
# Produce a cleanup script.
no_cleanup_script = []
for cleandir in self.cleanup_list_conditional:
Expand Down
1 change: 1 addition & 0 deletions requirements/reqs-pypiper.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
attmap>=0.4
pyyaml>=5
psutil
hashlib

0 comments on commit 9e39ace

Please sign in to comment.