From c33508f351aee2a3b0fb998640f1b1664ce14b55 Mon Sep 17 00:00:00 2001 From: bfrgoncalves Date: Thu, 4 Oct 2018 02:35:36 +0100 Subject: [PATCH 1/4] prototype for unique process name identifier. Allows adding processes with same name --- flowcraft/generator/pipeline_parser.py | 97 +++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 3 deletions(-) diff --git a/flowcraft/generator/pipeline_parser.py b/flowcraft/generator/pipeline_parser.py index 55f87cb9..eeeea410 100644 --- a/flowcraft/generator/pipeline_parser.py +++ b/flowcraft/generator/pipeline_parser.py @@ -337,8 +337,8 @@ def insanity_checks(pipeline_str): def parse_pipeline(pipeline_str): - """Parses a pipeline string into a dictionary with the connections between - process + """Parses a pipeline string into a list of dictionaries with the connections + between processes Parameters ---------- @@ -368,6 +368,10 @@ def parse_pipeline(pipeline_str): pipeline_links = [] lane = 1 + # Add unique identifiers to each process to allow a correct connection + # between forks + pipeline_str, identifiers_to_tags = add_unique_identifiers(pipeline_str) + # Get number of forks in the pipeline nforks = pipeline_str.count(FORK_TOKEN) logger.debug("Found {} fork(s)".format(nforks)) @@ -378,6 +382,9 @@ def parse_pipeline(pipeline_str): pipeline_str)) linear_pipeline = ["__init__"] + pipeline_str.split() pipeline_links.extend(linear_connection(linear_pipeline, lane)) + # Removes unique identifiers used for correctly assign fork parents with + # a possible same process name + pipeline_links = remove_unique_identifiers(identifiers_to_tags, pipeline_links) return pipeline_links for i in range(nforks): @@ -431,6 +438,7 @@ def parse_pipeline(pipeline_str): lane += len(fork_sink) + pipeline_links = remove_unique_identifiers(identifiers_to_tags, pipeline_links) return pipeline_links @@ -498,7 +506,6 @@ def get_lanes(lanes_str): # Flag used to determined whether the cursor is inside or outside the # right fork infork = 0 - for i in lanes_str: # Nested fork started @@ -640,3 +647,87 @@ def linear_lane_connection(lane_list, lane): lane += 1 return res + + +def add_unique_identifiers(pipeline_str): + """Returns the pipeline string with unique identifiers and a dictionary with + references between the unique keys and the original values + + Parameters + ---------- + pipeline_str : str + Pipeline string + + Returns + ------- + str + Pipeline string with unique identifiers + dict + Match between process unique values and original names + """ + identifiers_to_tags = {} + + # Add space at beginning and end of pipeline to allow regex mapping of final + # process in linear pipelines + pipeline_str = " {} ".format(pipeline_str) + + # Regex to get all process names. Catch all words without spaces and that + # are not fork tokens or pipes + process_names = re.findall(r"[^\s()|]+", pipeline_str) + new_process_names = [] + + # Assigns the new process names by appending a numeric id at the end of + # the process name + for index, val in enumerate(process_names): + if "=" in val: + parts = val.split("=") + new_id = "{}_{}={}".format(parts[0], index, parts[1]) + new_process_names.append(new_id) + else: + new_id = "{}_{}".format(val, index) + new_process_names.append(new_id) + + identifiers_to_tags[new_id] = val + + # Add space between forks, pipes and the process names for the replace + # regex to work + match_result = lambda match: " {} ".format(match.group()) + + find = r'[)(|]+' + pipeline_str = re.sub(find, match_result, pipeline_str) + + # Replace original process names by the unique identifiers + for index, val in enumerate(process_names): + find = r'{}[^_)(|]'.format(val) + find = find.replace("\\", "\\\\") + pipeline_str = re.sub(find, new_process_names[index] + " ", + pipeline_str, 1) + + return pipeline_str, identifiers_to_tags + + +def remove_unique_identifiers(identifiers_to_tags, pipeline_links): + """Removes unique identifiers and add the original process names to the + already parsed pipelines + + Parameters + ---------- + identifiers_to_tags : dict + Match between unique process identifiers and process names + pipeline_links: list + Parsed pipeline list with unique identifiers + + Returns + ------- + list + Pipeline list with original identifiers + """ + + # Replaces the unique identifiers by the original process names + for index, val in enumerate(pipeline_links): + if val["input"]["process"] != "__init__": + val["input"]["process"] = identifiers_to_tags[val["input"]["process"]] + if val["output"]["process"] != "__init__": + val["output"]["process"] = identifiers_to_tags[val["output"]["process"]] + + return pipeline_links From 47f1b32a670ff12cc6a957146f0edf0ce5525918 Mon Sep 17 00:00:00 2001 From: Tiago Jesu Date: Thu, 4 Oct 2018 12:02:45 +0100 Subject: [PATCH 2/4] linted and documented code Co-authored-by: tiagofilipe12 Co-authored-by: bfrgoncalves --- flowcraft/generator/pipeline_parser.py | 54 +++++++++++++++++--------- 1 file changed, 35 insertions(+), 19 deletions(-) diff --git a/flowcraft/generator/pipeline_parser.py b/flowcraft/generator/pipeline_parser.py index eeeea410..fd8fe286 100644 --- a/flowcraft/generator/pipeline_parser.py +++ b/flowcraft/generator/pipeline_parser.py @@ -369,18 +369,19 @@ def parse_pipeline(pipeline_str): lane = 1 # Add unique identifiers to each process to allow a correct connection - # between forks - pipeline_str, identifiers_to_tags = add_unique_identifiers(pipeline_str) + # between forks with same processes + pipeline_str_modified, identifiers_to_tags = add_unique_identifiers( + pipeline_str) # Get number of forks in the pipeline - nforks = pipeline_str.count(FORK_TOKEN) + nforks = pipeline_str_modified.count(FORK_TOKEN) logger.debug("Found {} fork(s)".format(nforks)) # If there are no forks, connect the pipeline as purely linear if not nforks: logger.debug("Detected linear pipeline string : {}".format( pipeline_str)) - linear_pipeline = ["__init__"] + pipeline_str.split() + linear_pipeline = ["__init__"] + pipeline_str_modified.split() pipeline_links.extend(linear_connection(linear_pipeline, lane)) # Removes unique identifiers used for correctly assign fork parents with # a possible same process name @@ -393,7 +394,7 @@ def parse_pipeline(pipeline_str): # Split the pipeline at each fork start position. fields[-1] will # hold the process after the fork. fields[-2] will hold the processes # before the fork. - fields = pipeline_str.split(FORK_TOKEN, i + 1) + fields = pipeline_str_modified.split(FORK_TOKEN, i + 1) # Get the processes before the fork. This may be empty when the # fork is at the beginning of the pipeline. @@ -572,7 +573,7 @@ def linear_connection(plist, lane): def fork_connection(source, sink, source_lane, lane): """Makes the connection between a process and the first processes in the - lanes to wich it forks. + lanes to which it forks. The ``lane`` argument should correspond to the lane of the source process. For each lane in ``sink``, the lane counter will increase. @@ -665,16 +666,25 @@ def add_unique_identifiers(pipeline_str): dict Match between process unique values and original names """ - identifiers_to_tags = {} # Add space at beginning and end of pipeline to allow regex mapping of final - # process in linear pipelines - pipeline_str = " {} ".format(pipeline_str) + # process in linear pipelines + pipeline_str_modified = " {} ".format(pipeline_str) # Regex to get all process names. Catch all words without spaces and that # are not fork tokens or pipes - process_names = re.findall(r"[^\s()|]+", pipeline_str) + process_names = re.findall(r"[^\s()|]+", pipeline_str_modified) + + identifiers_to_tags = {} + """ + Dictionary to match new process names (identifiers) with original process + names + """ + new_process_names = [] + """ + List of new process names used to replace in the pipeline string + """ # Assigns the new process names by appending a numeric id at the end of # the process name @@ -682,28 +692,34 @@ def add_unique_identifiers(pipeline_str): if "=" in val: parts = val.split("=") new_id = "{}_{}={}".format(parts[0], index, parts[1]) - new_process_names.append(new_id) else: new_id = "{}_{}".format(val, index) - new_process_names.append(new_id) + # add new process with id + new_process_names.append(new_id) + # makes a match between new process name and original process name identifiers_to_tags[new_id] = val # Add space between forks, pipes and the process names for the replace # regex to work match_result = lambda match: " {} ".format(match.group()) + # force to add a space between each token so that regex modification can + # be applied find = r'[)(|]+' - pipeline_str = re.sub(find, match_result, pipeline_str) + pipeline_str_modified = re.sub(find, match_result, pipeline_str_modified) # Replace original process names by the unique identifiers for index, val in enumerate(process_names): - find = r'{}[^_)(|]'.format(val) - find = find.replace("\\", "\\\\") - pipeline_str = re.sub(find, new_process_names[index] + " ", - pipeline_str, 1) - - return pipeline_str, identifiers_to_tags + # regex to replace process names with non assigned process ids + # escape characters are required to match to the dict keys + # (identifiers_to_tags), since python keys with escape characters + # must be escaped + find = r'{}[^_]'.format(val).replace("\\", "\\\\") + pipeline_str_modified = re.sub(find, new_process_names[index] + " ", + pipeline_str_modified, 1) + + return pipeline_str_modified, identifiers_to_tags def remove_unique_identifiers(identifiers_to_tags, pipeline_links): From 2cddd9aa26e90014c461251fec875ddfc1f84885 Mon Sep 17 00:00:00 2001 From: Tiago Jesu Date: Thu, 4 Oct 2018 14:21:40 +0100 Subject: [PATCH 3/4] added unit tests and included review suggestions Co-authored-by: tiagofilipe12 Co-authored-by: bfrgoncalves --- flowcraft/generator/pipeline_parser.py | 27 +++++--- flowcraft/tests/test_pipeline_parser.py | 87 +++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 10 deletions(-) diff --git a/flowcraft/generator/pipeline_parser.py b/flowcraft/generator/pipeline_parser.py index fd8fe286..18a49968 100644 --- a/flowcraft/generator/pipeline_parser.py +++ b/flowcraft/generator/pipeline_parser.py @@ -83,6 +83,7 @@ def remove_inner_forks(text): return text + def empty_tasks(p_string): """ Function to check if pipeline string is empty or has an empty string @@ -141,7 +142,8 @@ def brackets_insanity_check(p_string): raise SanityError( "A different number of '(' and ')' was specified. There are " "{} extra '{}'. The number of '(' and ')'should be equal.".format( - str(abs(p_string.count(FORK_TOKEN) - p_string.count(CLOSE_TOKEN))), + str(abs( + p_string.count(FORK_TOKEN) - p_string.count(CLOSE_TOKEN))), max_bracket)) @@ -261,7 +263,7 @@ def inner_fork_insanity_checks(pipeline_string): # first lets get all forks to a list. list_of_forks = [] # stores forks - left_indexes = [] # stores indexes of left brackets + left_indexes = [] # stores indexes of left brackets # iterate through the string looking for '(' and ')'. for pos, char in enumerate(pipeline_string): @@ -385,7 +387,8 @@ def parse_pipeline(pipeline_str): pipeline_links.extend(linear_connection(linear_pipeline, lane)) # Removes unique identifiers used for correctly assign fork parents with # a possible same process name - pipeline_links = remove_unique_identifiers(identifiers_to_tags, pipeline_links) + pipeline_links = remove_unique_identifiers(identifiers_to_tags, + pipeline_links) return pipeline_links for i in range(nforks): @@ -439,7 +442,8 @@ def parse_pipeline(pipeline_str): lane += len(fork_sink) - pipeline_links = remove_unique_identifiers(identifiers_to_tags, pipeline_links) + pipeline_links = remove_unique_identifiers(identifiers_to_tags, + pipeline_links) return pipeline_links @@ -673,17 +677,18 @@ def add_unique_identifiers(pipeline_str): # Regex to get all process names. Catch all words without spaces and that # are not fork tokens or pipes - process_names = re.findall(r"[^\s()|]+", pipeline_str_modified) + reg_find_proc = r"[^\s{}{}{}]+".format(LANE_TOKEN, FORK_TOKEN, CLOSE_TOKEN) + process_names = re.findall(reg_find_proc, pipeline_str_modified) identifiers_to_tags = {} """ - Dictionary to match new process names (identifiers) with original process + dict: Matches new process names (identifiers) with original process names """ new_process_names = [] """ - List of new process names used to replace in the pipeline string + list: New process names used to replace in the pipeline string """ # Assigns the new process names by appending a numeric id at the end of @@ -706,7 +711,7 @@ def add_unique_identifiers(pipeline_str): # force to add a space between each token so that regex modification can # be applied - find = r'[)(|]+' + find = r'[{}{}{}]+'.format(FORK_TOKEN, LANE_TOKEN, CLOSE_TOKEN) pipeline_str_modified = re.sub(find, match_result, pipeline_str_modified) # Replace original process names by the unique identifiers @@ -742,8 +747,10 @@ def remove_unique_identifiers(identifiers_to_tags, pipeline_links): # Replaces the unique identifiers by the original process names for index, val in enumerate(pipeline_links): if val["input"]["process"] != "__init__": - val["input"]["process"] = identifiers_to_tags[val["input"]["process"]] + val["input"]["process"] = identifiers_to_tags[ + val["input"]["process"]] if val["output"]["process"] != "__init__": - val["output"]["process"] = identifiers_to_tags[val["output"]["process"]] + val["output"]["process"] = identifiers_to_tags[ + val["output"]["process"]] return pipeline_links diff --git a/flowcraft/tests/test_pipeline_parser.py b/flowcraft/tests/test_pipeline_parser.py index ec843a05..54d658dc 100644 --- a/flowcraft/tests/test_pipeline_parser.py +++ b/flowcraft/tests/test_pipeline_parser.py @@ -1,4 +1,5 @@ import os +import json import flowcraft.generator.pipeline_parser as ps from flowcraft.tests.data_pipelines import pipelines as pipes @@ -258,3 +259,89 @@ def test_parse_pipeline_file(): res = ps.parse_pipeline(p_path) print(res) assert res == expected + +def test_unique_id_len(): + + pip_list = [ + "A B C", + "A (B C (D | E)| B C (D | E))", + "A (B C (D | E)| C (D | E))", + "A (B C (D | E)| B (D | E))", + ] + + res_list = [ + "A_0 B_1 C_2", + "A_0 (B_1 C_2 (D_3 | E_4)| B_5 C_6 (D_7 | E_8))", + "A_0 (B_1 C_2 (D_3 | E_4)| C_5 (D_6 | E_7))", + "A_0 (B_1 C_2 (D_3 | E_4)| B_5 (D_6 | E_7))", + ] + + for x, pip_str in enumerate(pip_list): + res_str, res_ids = ps.add_unique_identifiers(pip_str) + assert res_str.replace(" ", "") == res_list[x].replace(" ", "") + +def test_remove_id(): + + pip_list = [ + "A B C", + "A (B C (D | E)| B C (D | E))", + ] + + pipeline_mod_links = [ + [{'input': {'process': '__init__', 'lane': 1}, + 'output': {'process': 'A_0', 'lane': 1}}, + {'input': {'process': 'A_0', 'lane': 1}, + 'output': {'process': 'B_1', 'lane': 1}}, + {'input': {'process': 'B_1', 'lane': 1}, + 'output': {'process': 'C_2', 'lane': 1}}], + [{'input': {'process': '__init__', 'lane': 1}, + 'output': {'process': 'A_0', 'lane': 1}}, + {'input': {'process': 'A_0', 'lane': 1}, + 'output': {'process': 'B_1', 'lane': 2}}, + {'input': {'process': 'A_0', 'lane': 1}, + 'output': {'process': 'B_5', 'lane': 3}}, + {'input': {'process': 'B_1', 'lane': 2}, + 'output': {'process': 'C_2', 'lane': 2}}, + {'input': {'process': 'B_5', 'lane': 3}, + 'output': {'process': 'C_6', 'lane': 3}}, + {'input': {'process': 'C_2', 'lane': 2}, + 'output': {'process': 'D_3', 'lane': 4}}, + {'input': {'process': 'C_2', 'lane': 2}, + 'output': {'process': 'E_4', 'lane': 5}}, + {'input': {'process': 'C_6', 'lane': 3}, + 'output': {'process': 'D_7', 'lane': 6}}, + {'input': {'process': 'C_6', 'lane': 3}, + 'output': {'process': 'E_8', 'lane': 7}}] + ] + + pipeline_exp_links = [ + [{'input': {'process': '__init__', 'lane': 1}, + 'output': {'process': 'A', 'lane': 1}}, + {'input': {'process': 'A', 'lane': 1}, + 'output': {'process': 'B', 'lane': 1}}, + {'input': {'process': 'B', 'lane': 1}, + 'output': {'process': 'C', 'lane': 1}}], + [{'input': {'process': '__init__', 'lane': 1}, + 'output': {'process': 'A', 'lane': 1}}, + {'input': {'process': 'A', 'lane': 1}, + 'output': {'process': 'B', 'lane': 2}}, + {'input': {'process': 'A', 'lane': 1}, + 'output': {'process': 'B', 'lane': 3}}, + {'input': {'process': 'B', 'lane': 2}, + 'output': {'process': 'C', 'lane': 2}}, + {'input': {'process': 'B', 'lane': 3}, + 'output': {'process': 'C', 'lane': 3}}, + {'input': {'process': 'C', 'lane': 2}, + 'output': {'process': 'D', 'lane': 4}}, + {'input': {'process': 'C', 'lane': 2}, + 'output': {'process': 'E', 'lane': 5}}, + {'input': {'process': 'C', 'lane': 3}, + 'output': {'process': 'D', 'lane': 6}}, + {'input': {'process': 'C', 'lane': 3}, + 'output': {'process': 'E', 'lane': 7}}] + ] + + for x, pip_str in enumerate(pip_list): + res_str, res_ids = ps.add_unique_identifiers(pip_str) + res = ps.remove_unique_identifiers(res_ids, pipeline_mod_links[x]) + assert json.dumps(res) == json.dumps(pipeline_exp_links[x]) \ No newline at end of file From 5b6d973262acd6e48a69f390b7f7009a5ec29eb3 Mon Sep 17 00:00:00 2001 From: Tiago Jesu Date: Thu, 4 Oct 2018 14:22:57 +0100 Subject: [PATCH 4/4] updated changelog Co-authored-by: tiagofilipe12 Co-authored-by: bfrgoncalves --- changelog.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/changelog.md b/changelog.md index 0ce02190..7007c04c 100644 --- a/changelog.md +++ b/changelog.md @@ -7,6 +7,10 @@ - Added removal of duplicate IDs from `reads_download` component input. - Added seed parameter to `downsample_fastq` component. +### Bug fixes + +- Fixed forks with same source process name. + ## 1.3.1 ### Features