Skip to content

Commit

Permalink
addresses #26
Browse files Browse the repository at this point in the history
use profile to determine total runtime
  • Loading branch information
stolarczyk committed Jun 19, 2019
1 parent f45551a commit ac64fcb
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 10 deletions.
1 change: 1 addition & 0 deletions pypiper/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
CHECKPOINT_EXTENSION = ".checkpoint"
PIPELINE_CHECKPOINT_DELIMITER = "_"
STAGE_NAME_SPACE_REPLACEMENT = "-"
PROFILE_COLNAMES = ['pid', 'hash', 'cid', 'runtime', 'mem', 'cmd', 'lock']
44 changes: 34 additions & 10 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import subprocess
import sys
import time
import pandas as _pd

if sys.version_info < (3, 3):
from collections import Iterable
Expand All @@ -31,9 +32,10 @@
from .exceptions import PipelineHalt, SubprocessError
from .flags import *
from .utils import \
check_shell, check_shell_pipes, checkpoint_filepath, clear_flags, flag_name, \
check_shell, checkpoint_filepath, clear_flags, flag_name, \
is_multi_target, make_lock_name, pipeline_filepath, \
CHECKPOINT_SPECIFICATIONS, split_by_pipes, get_proc_name, parse_cmd
CHECKPOINT_SPECIFICATIONS, get_proc_name, parse_cmd
from .const import PROFILE_COLNAMES
from ._version import __version__
import __main__

Expand Down Expand Up @@ -509,7 +511,8 @@ def start_pipeline(self, args=None, multi=False):
myfile.write("# Pipeline started at " + time.strftime("%m-%d %H:%M:%S", time.localtime(self.starttime)) + "\n\n")

with open(self.pipeline_profile_file, "a") as myfile:
myfile.write("# Pipeline started at " + time.strftime("%m-%d %H:%M:%S", time.localtime(self.starttime)) + "\n\n")
myfile.write("# Pipeline started at " + time.strftime("%m-%d %H:%M:%S", time.localtime(self.starttime))
+ "\n\n" + "# " + "\t".join(PROFILE_COLNAMES) + "\n")

def _set_status_flag(self, status):
"""
Expand Down Expand Up @@ -1166,7 +1169,8 @@ def timestamp(self, message="", checkpoint=None,
print(msg)
self.last_timestamp = time.time()

def time_elapsed(self, time_since):
@staticmethod
def time_elapsed(time_since):
"""
Returns the number of seconds that have elapsed since the time_since parameter.
Expand Down Expand Up @@ -1327,7 +1331,8 @@ def _report_command(self, cmd, procs=None):
# Filepath functions
###################################

def _create_file(self, file):
@staticmethod
def _create_file(file):
"""
Creates a file, but will not fail if the file already exists.
This is vulnerable to race conditions; use this for cases where it
Expand All @@ -1338,7 +1343,8 @@ def _create_file(self, file):
with open(file, 'w') as fout:
fout.write('')

def _create_file_racefree(self, file):
@staticmethod
def _create_file_racefree(file):
"""
Creates a file, but fails if the file already exists.
Expand Down Expand Up @@ -1390,7 +1396,8 @@ def _recoverfile_from_lockfile(self, lockfile):
lockfile = self._make_lock_path(lockfile)
return lockfile.replace(LOCK_PREFIX, "recover." + LOCK_PREFIX)

def make_sure_path_exists(self, path):
@staticmethod
def make_sure_path_exists(path):
"""
Creates all directories in a path if it does not exist.
Expand Down Expand Up @@ -1613,6 +1620,20 @@ def halt(self, checkpoint=None, finished=False, raise_error=True):
if raise_error:
raise PipelineHalt(checkpoint, finished)

def get_elapsed_time(self):
"""
Parse the pipeline profile file, collect the unique and last duplicated runtimes and sum them up. In case the
profile is not found, an estimate is calculated (which is correct only in case the pipeline was not rerun)
:return int: sum of runtimes in seconds
"""
if os.path.isfile(self.pipeline_profile_file):
df = _pd.read_csv(self.pipeline_profile_file, sep="\t", comment="#", names=PROFILE_COLNAMES)
df['runtime'] = _pd.to_timedelta(df['runtime'])
unique_df = df[~df.duplicated('cid', keep='last').values]
return sum(unique_df['runtime'].apply(lambda x: x.total_seconds()))
return self.time_elapsed(self.starttime)

def stop_pipeline(self, status=COMPLETE_FLAG):
"""
Terminate the pipeline.
Expand All @@ -1626,9 +1647,11 @@ def stop_pipeline(self, status=COMPLETE_FLAG):
self._cleanup()
self.report_result("Time", str(datetime.timedelta(seconds=self.time_elapsed(self.starttime))))
self.report_result("Success", time.strftime("%m-%d-%H:%M:%S"))

print("\n##### [Epilogue:]")
print("* " + "Total elapsed time".rjust(20) + ": "
+ str(datetime.timedelta(seconds=self.time_elapsed(self.starttime))))
# print("* " + "Total elapsed time".rjust(20) + ": "
# + str(datetime.timedelta(seconds=self.time_elapsed(self.starttime))))
print("* " + "Total elapsed time".rjust(20) + ": " + str(datetime.timedelta(seconds=self.get_elapsed_time())))
# print("Peak memory used: " + str(memory_usage()["peak"]) + "kb")
print("* " + "Peak memory used".rjust(20) + ": " + str(round(self.peak_memory, 2)) + " GB")
if self.halted:
Expand Down Expand Up @@ -1787,7 +1810,8 @@ def pskill(proc_pid, sig=signal.SIGINT):
child_pid=child_pid, proc_string=proc_string, note=note)
print(msg)

def _atexit_register(self, *args):
@staticmethod
def _atexit_register(*args):
""" Convenience alias to register exit functions without having to import atexit in the pipeline. """
atexit.register(*args)

Expand Down
1 change: 1 addition & 0 deletions requirements/reqs-pypiper.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
attmap>=0.12.5
pyyaml>=5
psutil
pandas

0 comments on commit ac64fcb

Please sign in to comment.