Skip to content

Commit

Permalink
tag follow command processes with f; dont increment ID for any follow
Browse files Browse the repository at this point in the history
see #151
  • Loading branch information
stolarczyk committed May 14, 2019
1 parent 0299a7b commit 0823a0d
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions pypiper/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,6 @@ def run(self, cmd, target=None, lock_name=None, shell=None, nofail=False, clean=

process_return_code = 0
local_maxmem = 0
has_follow = False

# Decide how to do follow-up.
if not follow:
Expand All @@ -655,9 +654,12 @@ def run(self, cmd, target=None, lock_name=None, shell=None, nofail=False, clean=
call_follow = lambda: None
else:
# Wrap the follow-up function so that the log shows what's going on.
# additionally, the in_follow attribute is set to enable proper command count handling
def call_follow():
print("Follow:")
self.in_follow = True
follow()
self.in_follow = False


# The while=True loop here is unlikely to be triggered, and is just a
Expand Down Expand Up @@ -693,7 +695,6 @@ def call_follow():
# Normally we don't run the follow, but if you want to force. . .
if self.force_follow:
call_follow()

# Increment process count
increment_info_pattern = "Skipped command: `{}`\nCommand ID incremented by: `{}`. Current ID: `{}`\n"
if isinstance(cmd, list):
Expand All @@ -705,13 +706,6 @@ def call_follow():
count = len(parse_cmd(cmd, shell))
self.proc_count += count
print(increment_info_pattern.format(str(cmd), count, self.proc_count))
if has_follow:
# TODO: all the follow functions increment the proc_count,
# whereas only ones that use the run method do it originally. Needs correction.
self.proc_count += 1
print(
"Command had a follow function. Skipped. \nCommand ID incremented by: `1`. Current ID: `{}`\n".format(
self.proc_count))
break # Do not run command

# Scenario 1: Lock file exists, but we're supposed to overwrite target; Run process.
Expand Down Expand Up @@ -916,14 +910,12 @@ def make_hash(o):
param_list = parse_cmd(cmd, shell)
proc_name = get_proc_name(cmd)

# stop_times = []
processes = []
running_processes = []
completed_processes = []
start_time = time.time()
for i in range(len(param_list)):
running_processes.append(i)
self.proc_count += 1
if i == 0:
processes.append(psutil.Popen(preexec_fn=os.setpgrp, **param_list[i]))
else:
Expand All @@ -935,7 +927,7 @@ def make_hash(o):
"container": container,
"p": processes[-1],
"args_hash": make_hash(param_list[i]["args"]),
"local_proc_id": self.proc_count
"local_proc_id": self.process_counter()
}

self._report_command(cmd, [x.pid for x in processes])
Expand Down Expand Up @@ -1022,6 +1014,23 @@ def proc_wrapup(i):

return [returncodes, local_maxmems]

def process_counter(self):
"""
Increments process counter with regard to the follow state:
if currently executed command is a follow function of another one, the counter is not incremented.
:return str | int: current counter state, a number if the counter has beed incremented or a number
of the previous process plus "f" otherwise
"""
try:
if self.in_follow:
return str(self.proc_count) + "f"
else:
self.proc_count += 1
return self.proc_count
except AttributeError:
self.proc_count += 1
return self.proc_count

###################################
# Waiting functions
Expand Down

0 comments on commit 0823a0d

Please sign in to comment.