Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
nsheff committed May 13, 2019
2 parents b8bd830 + 3335c8b commit 5dd58ed
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 18 deletions.
50 changes: 32 additions & 18 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from .utils import \
check_shell, check_shell_pipes, checkpoint_filepath, clear_flags, flag_name, \
is_multi_target, make_lock_name, pipeline_filepath, \
CHECKPOINT_SPECIFICATIONS, split_by_pipes, get_proc_name
CHECKPOINT_SPECIFICATIONS, split_by_pipes, get_proc_name, parse_cmd
from ._version import __version__
import __main__

Expand Down Expand Up @@ -642,6 +642,7 @@ def run(self, cmd, target=None, lock_name=None, shell=None, nofail=False, clean=

process_return_code = 0
local_maxmem = 0
has_follow = False

# Decide how to do follow-up.
if not follow:
Expand Down Expand Up @@ -692,8 +693,24 @@ def call_follow():
if self.force_follow:
call_follow()

# Increment process count here

# Increment process count
increment_info_pattern = "Skipped command: `{}`\nCommand ID incremented by: `{}`. Current ID: `{}`\n"
if isinstance(cmd, list):
for c in cmd:
count = len(parse_cmd(c, shell))
self.proc_count += count
print(increment_info_pattern.format(str(c), count, self.proc_count))
else:
count = len(parse_cmd(cmd, shell))
self.proc_count += count
print(increment_info_pattern.format(str(cmd), count, self.proc_count))
if has_follow:
# TODO: all the follow functions increment the proc_count,
# whereas only ones that use the run method do it originally. Needs correction.
self.proc_count += 1
print(
"Command had a follow function. Skipped. \nCommand ID incremented by: `1`. Current ID: `{}`\n".format(
self.proc_count))
break # Do not run command

# Scenario 1: Lock file exists, but we're supposed to overwrite target; Run process.
Expand Down Expand Up @@ -895,10 +912,7 @@ def make_hash(o):
if container:
cmd = "docker exec " + container + " " + cmd

param_list = [make_dict(c) for c in split_by_pipes(cmd)] \
if not shell and check_shell_pipes(cmd) else [dict(args=cmd, stdout=None, shell=True)]


param_list = parse_cmd(cmd, shell)
proc_name = get_proc_name(cmd)

# stop_times = []
Expand All @@ -918,7 +932,7 @@ def make_hash(o):
"start_time": start_time,
"container": container,
"p": processes[-1],
"args_hash": make_hash(param_list[i]),
"args_hash": make_hash(param_list[i]["args"]),
"local_proc_id": self.proc_count
}

Expand Down Expand Up @@ -952,7 +966,8 @@ def proc_wrapup(i):
# report process profile
self._report_profile(self.procs[current_pid]["proc_name"], lock_file,
time.time() - self.procs[current_pid]["start_time"], local_maxmems[i],
current_pid, self.procs[current_pid]["args_hash"], self.procs[current_pid]["local_proc_id"])
current_pid, self.procs[current_pid]["args_hash"],
self.procs[current_pid]["local_proc_id"])

# Remove this as a running subprocess
del self.procs[current_pid]
Expand Down Expand Up @@ -1153,6 +1168,7 @@ def _report_profile(self, command, lock_name, elapsed_time, memory, pid, args_ha
rel_lock_name = lock_name if lock_name is None else os.path.relpath(lock_name, self.outfolder)
message_raw = str(pid) + "\t" + \
str(args_hash) + "\t" + \
str(proc_count) + "\t" + \
str(command) + "\t" + \
str(datetime.timedelta(seconds = round(elapsed_time, 2))) + "\t " + \
str(memory) + "\t" + \
Expand Down Expand Up @@ -1835,9 +1851,11 @@ def clean_add(self, regex, conditional=False, manual=False):
self.clean_initialized = True

if manual:
try:
filenames = glob.glob(regex)
for filename in filenames:
filenames = glob.glob(regex)
if not filenames:
print("No files match cleanup pattern: {}".format(regex))
for filename in filenames:
try:
with open(self.cleanup_file, "a") as myfile:
if os.path.isabs(filename):
relative_filename = os.path.relpath(filename, self.outfolder)
Expand All @@ -1856,11 +1874,8 @@ def clean_add(self, regex, conditional=False, manual=False):
myfile.write("rmdir " + relative_filename + "\n")
else:
print("File not added to cleanup: {}".format(relative_filename))
except Exception as e:
# TODO: print this message in debug
print("Could not add {} to cleanup".format(regex))
print(e)
pass
except Exception as e:
print("Error in clean_add on path {}: {}".format(filename, str(e)))
elif conditional:
self.cleanup_list_conditional.append(regex)
else:
Expand All @@ -1870,7 +1885,6 @@ def clean_add(self, regex, conditional=False, manual=False):
while regex in self.cleanup_list_conditional:
self.cleanup_list_conditional.remove(regex)


def _cleanup(self, dry_run=False):
"""
Cleans up (removes) intermediate files.
Expand Down
18 changes: 18 additions & 0 deletions pypiper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
import sys
import re
from subprocess import PIPE
from shlex import split

if sys.version_info < (3, ):
CHECK_TEXT_TYPES = (str, unicode)
Expand Down Expand Up @@ -532,6 +534,22 @@ def is_multi_target(target):
format(target, type(target)))


def parse_cmd(cmd, shell):
"""
Create a list of Popen-distable dicts of commands. The commands are split by pipes, if possible
:param str cmd: the command
:param bool shell: if the command should be run in the shell rather that in a subprocess
:return list[dict]: list of dicts of commands
"""
def _make_dict(command):
a, s = (command, True) if check_shell(command, shell) else (split(command), False)
return dict(args=a, stdout=PIPE, shell=s)

return [_make_dict(c) for c in split_by_pipes(cmd)] if not shell and check_shell_pipes(cmd) \
else [dict(args=cmd, stdout=None, shell=True)]


def parse_cores(cores, pm, default):
"""
Framework to finalize number of cores for an operation.
Expand Down

0 comments on commit 5dd58ed

Please sign in to comment.