Skip to content

Commit

Permalink
add command ID tracking, addresses #142
Browse files Browse the repository at this point in the history
this is not functional yet. when the pipeline is rerun all the follow functions increment the command number, whereas only ones that use the run method increment it originally. needs correction, see #151
  • Loading branch information
stolarczyk committed May 10, 2019
1 parent 143d257 commit 3335c8b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 9 deletions.
34 changes: 25 additions & 9 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .utils import \
check_shell, check_shell_pipes, checkpoint_filepath, clear_flags, flag_name, \
is_multi_target, make_lock_name, pipeline_filepath, \
CHECKPOINT_SPECIFICATIONS, split_by_pipes, get_proc_name
CHECKPOINT_SPECIFICATIONS, split_by_pipes, get_proc_name, parse_cmd
from ._version import __version__
import __main__

Expand Down Expand Up @@ -641,6 +641,7 @@ def run(self, cmd, target=None, lock_name=None, shell=None, nofail=False, clean=

process_return_code = 0
local_maxmem = 0
has_follow = False

# Decide how to do follow-up.
if not follow:
Expand Down Expand Up @@ -691,8 +692,24 @@ def call_follow():
if self.force_follow:
call_follow()

# Increment process count here

# Increment process count
increment_info_pattern = "Skipped command: `{}`\nCommand ID incremented by: `{}`. Current ID: `{}`\n"
if isinstance(cmd, list):
for c in cmd:
count = len(parse_cmd(c, shell))
self.proc_count += count
print(increment_info_pattern.format(str(c), count, self.proc_count))
else:
count = len(parse_cmd(cmd, shell))
self.proc_count += count
print(increment_info_pattern.format(str(cmd), count, self.proc_count))
if has_follow:
# TODO: all the follow functions increment the proc_count,
# whereas only ones that use the run method do it originally. Needs correction.
self.proc_count += 1
print(
"Command had a follow function. Skipped. \nCommand ID incremented by: `1`. Current ID: `{}`\n".format(
self.proc_count))
break # Do not run command

# Scenario 1: Lock file exists, but we're supposed to overwrite target; Run process.
Expand Down Expand Up @@ -894,10 +911,7 @@ def make_hash(o):
if container:
cmd = "docker exec " + container + " " + cmd

param_list = [make_dict(c) for c in split_by_pipes(cmd)] \
if not shell and check_shell_pipes(cmd) else [dict(args=cmd, stdout=None, shell=True)]


param_list = parse_cmd(cmd, shell)
proc_name = get_proc_name(cmd)

# stop_times = []
Expand All @@ -917,7 +931,7 @@ def make_hash(o):
"start_time": start_time,
"container": container,
"p": processes[-1],
"args_hash": make_hash(param_list[i]),
"args_hash": make_hash(param_list[i]["args"]),
"local_proc_id": self.proc_count
}

Expand Down Expand Up @@ -951,7 +965,8 @@ def proc_wrapup(i):
# report process profile
self._report_profile(self.procs[current_pid]["proc_name"], lock_file,
time.time() - self.procs[current_pid]["start_time"], local_maxmems[i],
current_pid, self.procs[current_pid]["args_hash"], self.procs[current_pid]["local_proc_id"])
current_pid, self.procs[current_pid]["args_hash"],
self.procs[current_pid]["local_proc_id"])

# Remove this as a running subprocess
del self.procs[current_pid]
Expand Down Expand Up @@ -1152,6 +1167,7 @@ def _report_profile(self, command, lock_name, elapsed_time, memory, pid, args_ha
rel_lock_name = lock_name if lock_name is None else os.path.relpath(lock_name, self.outfolder)
message_raw = str(pid) + "\t" + \
str(args_hash) + "\t" + \
str(proc_count) + "\t" + \
str(command) + "\t" + \
str(datetime.timedelta(seconds = round(elapsed_time, 2))) + "\t " + \
str(memory) + "\t" + \
Expand Down
18 changes: 18 additions & 0 deletions pypiper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
import sys
import re
from subprocess import PIPE
from shlex import split

if sys.version_info < (3, ):
CHECK_TEXT_TYPES = (str, unicode)
Expand Down Expand Up @@ -532,6 +534,22 @@ def is_multi_target(target):
format(target, type(target)))


def parse_cmd(cmd, shell):
"""
Create a list of Popen-distable dicts of commands. The commands are split by pipes, if possible
:param str cmd: the command
:param bool shell: if the command should be run in the shell rather that in a subprocess
:return list[dict]: list of dicts of commands
"""
def _make_dict(command):
a, s = (command, True) if check_shell(command, shell) else (split(command), False)
return dict(args=a, stdout=PIPE, shell=s)

return [_make_dict(c) for c in split_by_pipes(cmd)] if not shell and check_shell_pipes(cmd) \
else [dict(args=cmd, stdout=None, shell=True)]


def parse_cores(cores, pm, default):
"""
Framework to finalize number of cores for an operation.
Expand Down

0 comments on commit 3335c8b

Please sign in to comment.