Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

### Bug fixes

- Fixed forks with same source process name.
- Fixed `inspect` issue when tasks took more than a day in duration.

## 1.3.1
Expand Down
132 changes: 123 additions & 9 deletions flowcraft/generator/pipeline_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def remove_inner_forks(text):

return text


def empty_tasks(p_string):
"""
Function to check if pipeline string is empty or has an empty string
Expand Down Expand Up @@ -141,7 +142,8 @@ def brackets_insanity_check(p_string):
raise SanityError(
"A different number of '(' and ')' was specified. There are "
"{} extra '{}'. The number of '(' and ')'should be equal.".format(
str(abs(p_string.count(FORK_TOKEN) - p_string.count(CLOSE_TOKEN))),
str(abs(
p_string.count(FORK_TOKEN) - p_string.count(CLOSE_TOKEN))),
max_bracket))


Expand Down Expand Up @@ -261,7 +263,7 @@ def inner_fork_insanity_checks(pipeline_string):

# first lets get all forks to a list.
list_of_forks = [] # stores forks
left_indexes = [] # stores indexes of left brackets
left_indexes = [] # stores indexes of left brackets

# iterate through the string looking for '(' and ')'.
for pos, char in enumerate(pipeline_string):
Expand Down Expand Up @@ -337,8 +339,8 @@ def insanity_checks(pipeline_str):


def parse_pipeline(pipeline_str):
"""Parses a pipeline string into a dictionary with the connections between
process
"""Parses a pipeline string into a list of dictionaries with the connections
between processes

Parameters
----------
Expand Down Expand Up @@ -368,16 +370,25 @@ def parse_pipeline(pipeline_str):
pipeline_links = []
lane = 1

# Add unique identifiers to each process to allow a correct connection
# between forks with same processes
pipeline_str_modified, identifiers_to_tags = add_unique_identifiers(
pipeline_str)

# Get number of forks in the pipeline
nforks = pipeline_str.count(FORK_TOKEN)
nforks = pipeline_str_modified.count(FORK_TOKEN)
logger.debug("Found {} fork(s)".format(nforks))

# If there are no forks, connect the pipeline as purely linear
if not nforks:
logger.debug("Detected linear pipeline string : {}".format(
pipeline_str))
linear_pipeline = ["__init__"] + pipeline_str.split()
linear_pipeline = ["__init__"] + pipeline_str_modified.split()
pipeline_links.extend(linear_connection(linear_pipeline, lane))
# Removes unique identifiers used for correctly assign fork parents with
# a possible same process name
pipeline_links = remove_unique_identifiers(identifiers_to_tags,
pipeline_links)
return pipeline_links

for i in range(nforks):
Expand All @@ -386,7 +397,7 @@ def parse_pipeline(pipeline_str):
# Split the pipeline at each fork start position. fields[-1] will
# hold the process after the fork. fields[-2] will hold the processes
# before the fork.
fields = pipeline_str.split(FORK_TOKEN, i + 1)
fields = pipeline_str_modified.split(FORK_TOKEN, i + 1)

# Get the processes before the fork. This may be empty when the
# fork is at the beginning of the pipeline.
Expand Down Expand Up @@ -431,6 +442,8 @@ def parse_pipeline(pipeline_str):

lane += len(fork_sink)

pipeline_links = remove_unique_identifiers(identifiers_to_tags,
pipeline_links)
return pipeline_links


Expand Down Expand Up @@ -498,7 +511,6 @@ def get_lanes(lanes_str):
# Flag used to determined whether the cursor is inside or outside the
# right fork
infork = 0

for i in lanes_str:

# Nested fork started
Expand Down Expand Up @@ -565,7 +577,7 @@ def linear_connection(plist, lane):

def fork_connection(source, sink, source_lane, lane):
"""Makes the connection between a process and the first processes in the
lanes to wich it forks.
lanes to which it forks.

The ``lane`` argument should correspond to the lane of the source process.
For each lane in ``sink``, the lane counter will increase.
Expand Down Expand Up @@ -640,3 +652,105 @@ def linear_lane_connection(lane_list, lane):
lane += 1

return res


def add_unique_identifiers(pipeline_str):
"""Returns the pipeline string with unique identifiers and a dictionary with
references between the unique keys and the original values

Parameters
----------
pipeline_str : str
Pipeline string

Returns
-------
str
Pipeline string with unique identifiers
dict
Match between process unique values and original names
"""

# Add space at beginning and end of pipeline to allow regex mapping of final
# process in linear pipelines
pipeline_str_modified = " {} ".format(pipeline_str)

# Regex to get all process names. Catch all words without spaces and that
# are not fork tokens or pipes
reg_find_proc = r"[^\s{}{}{}]+".format(LANE_TOKEN, FORK_TOKEN, CLOSE_TOKEN)
process_names = re.findall(reg_find_proc, pipeline_str_modified)

identifiers_to_tags = {}
"""
dict: Matches new process names (identifiers) with original process
names
"""

new_process_names = []
"""
list: New process names used to replace in the pipeline string
"""

# Assigns the new process names by appending a numeric id at the end of
# the process name
for index, val in enumerate(process_names):
if "=" in val:
parts = val.split("=")
new_id = "{}_{}={}".format(parts[0], index, parts[1])
else:
new_id = "{}_{}".format(val, index)

# add new process with id
new_process_names.append(new_id)
# makes a match between new process name and original process name
identifiers_to_tags[new_id] = val

# Add space between forks, pipes and the process names for the replace
# regex to work
match_result = lambda match: " {} ".format(match.group())

# force to add a space between each token so that regex modification can
# be applied
find = r'[{}{}{}]+'.format(FORK_TOKEN, LANE_TOKEN, CLOSE_TOKEN)
pipeline_str_modified = re.sub(find, match_result, pipeline_str_modified)

# Replace original process names by the unique identifiers
for index, val in enumerate(process_names):
# regex to replace process names with non assigned process ids
# escape characters are required to match to the dict keys
# (identifiers_to_tags), since python keys with escape characters
# must be escaped
find = r'{}[^_]'.format(val).replace("\\", "\\\\")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't usually like regex, but this is very nice. Great job!

pipeline_str_modified = re.sub(find, new_process_names[index] + " ",
pipeline_str_modified, 1)

return pipeline_str_modified, identifiers_to_tags


def remove_unique_identifiers(identifiers_to_tags, pipeline_links):
"""Removes unique identifiers and add the original process names to the
already parsed pipelines

Parameters
----------
identifiers_to_tags : dict
Match between unique process identifiers and process names
pipeline_links: list
Parsed pipeline list with unique identifiers

Returns
-------
list
Pipeline list with original identifiers
"""

# Replaces the unique identifiers by the original process names
for index, val in enumerate(pipeline_links):
if val["input"]["process"] != "__init__":
val["input"]["process"] = identifiers_to_tags[
val["input"]["process"]]
if val["output"]["process"] != "__init__":
val["output"]["process"] = identifiers_to_tags[
val["output"]["process"]]

return pipeline_links
87 changes: 87 additions & 0 deletions flowcraft/tests/test_pipeline_parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import json

import flowcraft.generator.pipeline_parser as ps
from flowcraft.tests.data_pipelines import pipelines as pipes
Expand Down Expand Up @@ -258,3 +259,89 @@ def test_parse_pipeline_file():
res = ps.parse_pipeline(p_path)
print(res)
assert res == expected

def test_unique_id_len():

pip_list = [
"A B C",
"A (B C (D | E)| B C (D | E))",
"A (B C (D | E)| C (D | E))",
"A (B C (D | E)| B (D | E))",
]

res_list = [
"A_0 B_1 C_2",
"A_0 (B_1 C_2 (D_3 | E_4)| B_5 C_6 (D_7 | E_8))",
"A_0 (B_1 C_2 (D_3 | E_4)| C_5 (D_6 | E_7))",
"A_0 (B_1 C_2 (D_3 | E_4)| B_5 (D_6 | E_7))",
]

for x, pip_str in enumerate(pip_list):
res_str, res_ids = ps.add_unique_identifiers(pip_str)
assert res_str.replace(" ", "") == res_list[x].replace(" ", "")

def test_remove_id():

pip_list = [
"A B C",
"A (B C (D | E)| B C (D | E))",
]

pipeline_mod_links = [
[{'input': {'process': '__init__', 'lane': 1},
'output': {'process': 'A_0', 'lane': 1}},
{'input': {'process': 'A_0', 'lane': 1},
'output': {'process': 'B_1', 'lane': 1}},
{'input': {'process': 'B_1', 'lane': 1},
'output': {'process': 'C_2', 'lane': 1}}],
[{'input': {'process': '__init__', 'lane': 1},
'output': {'process': 'A_0', 'lane': 1}},
{'input': {'process': 'A_0', 'lane': 1},
'output': {'process': 'B_1', 'lane': 2}},
{'input': {'process': 'A_0', 'lane': 1},
'output': {'process': 'B_5', 'lane': 3}},
{'input': {'process': 'B_1', 'lane': 2},
'output': {'process': 'C_2', 'lane': 2}},
{'input': {'process': 'B_5', 'lane': 3},
'output': {'process': 'C_6', 'lane': 3}},
{'input': {'process': 'C_2', 'lane': 2},
'output': {'process': 'D_3', 'lane': 4}},
{'input': {'process': 'C_2', 'lane': 2},
'output': {'process': 'E_4', 'lane': 5}},
{'input': {'process': 'C_6', 'lane': 3},
'output': {'process': 'D_7', 'lane': 6}},
{'input': {'process': 'C_6', 'lane': 3},
'output': {'process': 'E_8', 'lane': 7}}]
]

pipeline_exp_links = [
[{'input': {'process': '__init__', 'lane': 1},
'output': {'process': 'A', 'lane': 1}},
{'input': {'process': 'A', 'lane': 1},
'output': {'process': 'B', 'lane': 1}},
{'input': {'process': 'B', 'lane': 1},
'output': {'process': 'C', 'lane': 1}}],
[{'input': {'process': '__init__', 'lane': 1},
'output': {'process': 'A', 'lane': 1}},
{'input': {'process': 'A', 'lane': 1},
'output': {'process': 'B', 'lane': 2}},
{'input': {'process': 'A', 'lane': 1},
'output': {'process': 'B', 'lane': 3}},
{'input': {'process': 'B', 'lane': 2},
'output': {'process': 'C', 'lane': 2}},
{'input': {'process': 'B', 'lane': 3},
'output': {'process': 'C', 'lane': 3}},
{'input': {'process': 'C', 'lane': 2},
'output': {'process': 'D', 'lane': 4}},
{'input': {'process': 'C', 'lane': 2},
'output': {'process': 'E', 'lane': 5}},
{'input': {'process': 'C', 'lane': 3},
'output': {'process': 'D', 'lane': 6}},
{'input': {'process': 'C', 'lane': 3},
'output': {'process': 'E', 'lane': 7}}]
]

for x, pip_str in enumerate(pip_list):
res_str, res_ids = ps.add_unique_identifiers(pip_str)
res = ps.remove_unique_identifiers(res_ids, pipeline_mod_links[x])
assert json.dumps(res) == json.dumps(pipeline_exp_links[x])