diff --git a/changelog.md b/changelog.md index 2b659cec..4829b503 100644 --- a/changelog.md +++ b/changelog.md @@ -9,6 +9,7 @@ ### Bug fixes +- Fixed forks with same source process name. - Fixed `inspect` issue when tasks took more than a day in duration. ## 1.3.1 diff --git a/flowcraft/generator/pipeline_parser.py b/flowcraft/generator/pipeline_parser.py index 55f87cb9..18a49968 100644 --- a/flowcraft/generator/pipeline_parser.py +++ b/flowcraft/generator/pipeline_parser.py @@ -83,6 +83,7 @@ def remove_inner_forks(text): return text + def empty_tasks(p_string): """ Function to check if pipeline string is empty or has an empty string @@ -141,7 +142,8 @@ def brackets_insanity_check(p_string): raise SanityError( "A different number of '(' and ')' was specified. There are " "{} extra '{}'. The number of '(' and ')'should be equal.".format( - str(abs(p_string.count(FORK_TOKEN) - p_string.count(CLOSE_TOKEN))), + str(abs( + p_string.count(FORK_TOKEN) - p_string.count(CLOSE_TOKEN))), max_bracket)) @@ -261,7 +263,7 @@ def inner_fork_insanity_checks(pipeline_string): # first lets get all forks to a list. list_of_forks = [] # stores forks - left_indexes = [] # stores indexes of left brackets + left_indexes = [] # stores indexes of left brackets # iterate through the string looking for '(' and ')'. for pos, char in enumerate(pipeline_string): @@ -337,8 +339,8 @@ def insanity_checks(pipeline_str): def parse_pipeline(pipeline_str): - """Parses a pipeline string into a dictionary with the connections between - process + """Parses a pipeline string into a list of dictionaries with the connections + between processes Parameters ---------- @@ -368,16 +370,25 @@ def parse_pipeline(pipeline_str): pipeline_links = [] lane = 1 + # Add unique identifiers to each process to allow a correct connection + # between forks with same processes + pipeline_str_modified, identifiers_to_tags = add_unique_identifiers( + pipeline_str) + # Get number of forks in the pipeline - nforks = pipeline_str.count(FORK_TOKEN) + nforks = pipeline_str_modified.count(FORK_TOKEN) logger.debug("Found {} fork(s)".format(nforks)) # If there are no forks, connect the pipeline as purely linear if not nforks: logger.debug("Detected linear pipeline string : {}".format( pipeline_str)) - linear_pipeline = ["__init__"] + pipeline_str.split() + linear_pipeline = ["__init__"] + pipeline_str_modified.split() pipeline_links.extend(linear_connection(linear_pipeline, lane)) + # Removes unique identifiers used for correctly assign fork parents with + # a possible same process name + pipeline_links = remove_unique_identifiers(identifiers_to_tags, + pipeline_links) return pipeline_links for i in range(nforks): @@ -386,7 +397,7 @@ def parse_pipeline(pipeline_str): # Split the pipeline at each fork start position. fields[-1] will # hold the process after the fork. fields[-2] will hold the processes # before the fork. - fields = pipeline_str.split(FORK_TOKEN, i + 1) + fields = pipeline_str_modified.split(FORK_TOKEN, i + 1) # Get the processes before the fork. This may be empty when the # fork is at the beginning of the pipeline. @@ -431,6 +442,8 @@ def parse_pipeline(pipeline_str): lane += len(fork_sink) + pipeline_links = remove_unique_identifiers(identifiers_to_tags, + pipeline_links) return pipeline_links @@ -498,7 +511,6 @@ def get_lanes(lanes_str): # Flag used to determined whether the cursor is inside or outside the # right fork infork = 0 - for i in lanes_str: # Nested fork started @@ -565,7 +577,7 @@ def linear_connection(plist, lane): def fork_connection(source, sink, source_lane, lane): """Makes the connection between a process and the first processes in the - lanes to wich it forks. + lanes to which it forks. The ``lane`` argument should correspond to the lane of the source process. For each lane in ``sink``, the lane counter will increase. @@ -640,3 +652,105 @@ def linear_lane_connection(lane_list, lane): lane += 1 return res + + +def add_unique_identifiers(pipeline_str): + """Returns the pipeline string with unique identifiers and a dictionary with + references between the unique keys and the original values + + Parameters + ---------- + pipeline_str : str + Pipeline string + + Returns + ------- + str + Pipeline string with unique identifiers + dict + Match between process unique values and original names + """ + + # Add space at beginning and end of pipeline to allow regex mapping of final + # process in linear pipelines + pipeline_str_modified = " {} ".format(pipeline_str) + + # Regex to get all process names. Catch all words without spaces and that + # are not fork tokens or pipes + reg_find_proc = r"[^\s{}{}{}]+".format(LANE_TOKEN, FORK_TOKEN, CLOSE_TOKEN) + process_names = re.findall(reg_find_proc, pipeline_str_modified) + + identifiers_to_tags = {} + """ + dict: Matches new process names (identifiers) with original process + names + """ + + new_process_names = [] + """ + list: New process names used to replace in the pipeline string + """ + + # Assigns the new process names by appending a numeric id at the end of + # the process name + for index, val in enumerate(process_names): + if "=" in val: + parts = val.split("=") + new_id = "{}_{}={}".format(parts[0], index, parts[1]) + else: + new_id = "{}_{}".format(val, index) + + # add new process with id + new_process_names.append(new_id) + # makes a match between new process name and original process name + identifiers_to_tags[new_id] = val + + # Add space between forks, pipes and the process names for the replace + # regex to work + match_result = lambda match: " {} ".format(match.group()) + + # force to add a space between each token so that regex modification can + # be applied + find = r'[{}{}{}]+'.format(FORK_TOKEN, LANE_TOKEN, CLOSE_TOKEN) + pipeline_str_modified = re.sub(find, match_result, pipeline_str_modified) + + # Replace original process names by the unique identifiers + for index, val in enumerate(process_names): + # regex to replace process names with non assigned process ids + # escape characters are required to match to the dict keys + # (identifiers_to_tags), since python keys with escape characters + # must be escaped + find = r'{}[^_]'.format(val).replace("\\", "\\\\") + pipeline_str_modified = re.sub(find, new_process_names[index] + " ", + pipeline_str_modified, 1) + + return pipeline_str_modified, identifiers_to_tags + + +def remove_unique_identifiers(identifiers_to_tags, pipeline_links): + """Removes unique identifiers and add the original process names to the + already parsed pipelines + + Parameters + ---------- + identifiers_to_tags : dict + Match between unique process identifiers and process names + pipeline_links: list + Parsed pipeline list with unique identifiers + + Returns + ------- + list + Pipeline list with original identifiers + """ + + # Replaces the unique identifiers by the original process names + for index, val in enumerate(pipeline_links): + if val["input"]["process"] != "__init__": + val["input"]["process"] = identifiers_to_tags[ + val["input"]["process"]] + if val["output"]["process"] != "__init__": + val["output"]["process"] = identifiers_to_tags[ + val["output"]["process"]] + + return pipeline_links diff --git a/flowcraft/tests/test_pipeline_parser.py b/flowcraft/tests/test_pipeline_parser.py index ec843a05..54d658dc 100644 --- a/flowcraft/tests/test_pipeline_parser.py +++ b/flowcraft/tests/test_pipeline_parser.py @@ -1,4 +1,5 @@ import os +import json import flowcraft.generator.pipeline_parser as ps from flowcraft.tests.data_pipelines import pipelines as pipes @@ -258,3 +259,89 @@ def test_parse_pipeline_file(): res = ps.parse_pipeline(p_path) print(res) assert res == expected + +def test_unique_id_len(): + + pip_list = [ + "A B C", + "A (B C (D | E)| B C (D | E))", + "A (B C (D | E)| C (D | E))", + "A (B C (D | E)| B (D | E))", + ] + + res_list = [ + "A_0 B_1 C_2", + "A_0 (B_1 C_2 (D_3 | E_4)| B_5 C_6 (D_7 | E_8))", + "A_0 (B_1 C_2 (D_3 | E_4)| C_5 (D_6 | E_7))", + "A_0 (B_1 C_2 (D_3 | E_4)| B_5 (D_6 | E_7))", + ] + + for x, pip_str in enumerate(pip_list): + res_str, res_ids = ps.add_unique_identifiers(pip_str) + assert res_str.replace(" ", "") == res_list[x].replace(" ", "") + +def test_remove_id(): + + pip_list = [ + "A B C", + "A (B C (D | E)| B C (D | E))", + ] + + pipeline_mod_links = [ + [{'input': {'process': '__init__', 'lane': 1}, + 'output': {'process': 'A_0', 'lane': 1}}, + {'input': {'process': 'A_0', 'lane': 1}, + 'output': {'process': 'B_1', 'lane': 1}}, + {'input': {'process': 'B_1', 'lane': 1}, + 'output': {'process': 'C_2', 'lane': 1}}], + [{'input': {'process': '__init__', 'lane': 1}, + 'output': {'process': 'A_0', 'lane': 1}}, + {'input': {'process': 'A_0', 'lane': 1}, + 'output': {'process': 'B_1', 'lane': 2}}, + {'input': {'process': 'A_0', 'lane': 1}, + 'output': {'process': 'B_5', 'lane': 3}}, + {'input': {'process': 'B_1', 'lane': 2}, + 'output': {'process': 'C_2', 'lane': 2}}, + {'input': {'process': 'B_5', 'lane': 3}, + 'output': {'process': 'C_6', 'lane': 3}}, + {'input': {'process': 'C_2', 'lane': 2}, + 'output': {'process': 'D_3', 'lane': 4}}, + {'input': {'process': 'C_2', 'lane': 2}, + 'output': {'process': 'E_4', 'lane': 5}}, + {'input': {'process': 'C_6', 'lane': 3}, + 'output': {'process': 'D_7', 'lane': 6}}, + {'input': {'process': 'C_6', 'lane': 3}, + 'output': {'process': 'E_8', 'lane': 7}}] + ] + + pipeline_exp_links = [ + [{'input': {'process': '__init__', 'lane': 1}, + 'output': {'process': 'A', 'lane': 1}}, + {'input': {'process': 'A', 'lane': 1}, + 'output': {'process': 'B', 'lane': 1}}, + {'input': {'process': 'B', 'lane': 1}, + 'output': {'process': 'C', 'lane': 1}}], + [{'input': {'process': '__init__', 'lane': 1}, + 'output': {'process': 'A', 'lane': 1}}, + {'input': {'process': 'A', 'lane': 1}, + 'output': {'process': 'B', 'lane': 2}}, + {'input': {'process': 'A', 'lane': 1}, + 'output': {'process': 'B', 'lane': 3}}, + {'input': {'process': 'B', 'lane': 2}, + 'output': {'process': 'C', 'lane': 2}}, + {'input': {'process': 'B', 'lane': 3}, + 'output': {'process': 'C', 'lane': 3}}, + {'input': {'process': 'C', 'lane': 2}, + 'output': {'process': 'D', 'lane': 4}}, + {'input': {'process': 'C', 'lane': 2}, + 'output': {'process': 'E', 'lane': 5}}, + {'input': {'process': 'C', 'lane': 3}, + 'output': {'process': 'D', 'lane': 6}}, + {'input': {'process': 'C', 'lane': 3}, + 'output': {'process': 'E', 'lane': 7}}] + ] + + for x, pip_str in enumerate(pip_list): + res_str, res_ids = ps.add_unique_identifiers(pip_str) + res = ps.remove_unique_identifiers(res_ids, pipeline_mod_links[x]) + assert json.dumps(res) == json.dumps(pipeline_exp_links[x]) \ No newline at end of file