Skip to content

Commit

Permalink
Replace Pool map with starmap.
Browse files Browse the repository at this point in the history
Need a lot of testing to avoid issues.
  • Loading branch information
ArnaudBelcour committed Aug 19, 2020
1 parent 0ac502f commit 4384094
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 71 deletions.
73 changes: 49 additions & 24 deletions mpwt/mpwt_workflow.py
Expand Up @@ -15,7 +15,7 @@
from mpwt import utils
from mpwt.pwt_wrapper import run_pwt, run_pwt_dat, run_move_pgdb
from mpwt.results_check import check_dat, check_pwt, permission_change
from mpwt.pathologic_input import check_input_and_existing_pgdb, create_mpwt_input, pwt_input_files, create_only_dat_lisp, create_dat_creation_script, read_taxon_id
from mpwt.pathologic_input import check_input_and_existing_pgdb, create_mpwt_input, pwt_input_files, create_only_dat_lisp, create_dat_creation_script, read_taxon_id, retrieve_complete_id
from multiprocessing import Pool

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -149,13 +149,22 @@ def multiprocess_pwt(input_folder=None, output_folder=None, patho_inference=None
# Launch PathoLogic inference on species with no PGDBs.
if run_patho_dat_ids:
# Create the list containing all the data used by the multiprocessing call.
multiprocess_inputs = create_mpwt_input(run_ids=run_patho_dat_ids, input_folder=input_folder, pgdbs_folder_path=pgdbs_folder_path,
patho_hole_filler=patho_hole_filler, patho_operon_predictor=patho_operon_predictor,
dat_extraction=dat_extraction, output_folder=output_folder, size_reduction=size_reduction,
only_dat_creation=None, taxon_file=taxon_file)
multiprocess_pwt_input_files = []
multiprocess_run_pwts = []
multiprocess_run_pwt_dats = []
multiprocess_run_move_pgdbs = []
for run_id in run_ids:
input_folder_path = input_folder + '/' + run_id + '/'
species_pgdb_folder = pgdbs_folder_path + run_id.lower() + 'cyc/'
input_run_move_pgdbs = [run_id, species_pgdb_folder]
input_run_move_pgdbs.extend([dat_extraction, output_folder, size_reduction])
multiprocess_pwt_input_files.append([input_folder_path, taxon_file])
multiprocess_run_pwts.append([input_folder_path, patho_hole_filler, patho_operon_predictor])
multiprocess_run_pwt_dats.append([input_folder_path])
multiprocess_run_move_pgdbs.append(input_run_move_pgdbs)

logger.info('~~~~~~~~~~Creation of input data from Genbank/GFF/PF~~~~~~~~~~')
input_error_status = mpwt_pool.map(pwt_input_files, multiprocess_inputs)
input_error_status = mpwt_pool.starmap(pwt_input_files, multiprocess_pwt_input_files)
if any(input_error_status):
sys.exit('Error during PathoLogic input files creation.')

Expand All @@ -167,11 +176,11 @@ def multiprocess_pwt(input_folder=None, output_folder=None, patho_inference=None
# Launch PathoLogic.
if patho_inference:
logger.info('~~~~~~~~~~Inference on the data~~~~~~~~~~')
error_status = mpwt_pool.map(run_pwt, multiprocess_inputs)
error_status = mpwt_pool.starmap(run_pwt, multiprocess_run_pwts)

# Check PathoLogic build.
logger.info('~~~~~~~~~~Check inference~~~~~~~~~~')
passed_inferences = check_pwt(multiprocess_inputs, patho_log)
passed_inferences = check_pwt(multiprocess_run_pwts, patho_log)
if any(error_status):
if ignore_error:
logger.critical('Error during inference. Process stopped. Look at the command log. Also by using --log argument, you can have additional information.')
Expand All @@ -183,7 +192,10 @@ def multiprocess_pwt(input_folder=None, output_folder=None, patho_inference=None
steps.append('PathoLogic inference')
logger.info('----------End of PathoLogic inference: {0:.2f}s----------'.format(times[-1] - times[-2]))
else:
multiprocess_inputs = []
multiprocess_pwt_input_files = []
multiprocess_run_pwts = []
multiprocess_run_pwt_dats = []
multiprocess_run_move_pgdbs = []
passed_inferences = []

# Create path for lisp if there is no folder given.
Expand All @@ -197,16 +209,24 @@ def multiprocess_pwt(input_folder=None, output_folder=None, patho_inference=None

# Create a lisp script file for each PGDB in the ptools-local folder.
dat_run_ids = create_only_dat_lisp(pgdbs_folder_path, tmp_folder)
multiprocess_run_pwt_dats = []
multiprocess_run_move_pgdbs = []
for dat_run_id in dat_run_ids:
input_folder_path = input_folder + '/' + dat_run_id + '/'
species_pgdb_folder = pgdbs_folder_path + dat_run_id.lower() + 'cyc/'
input_run_move_pgdbs = [dat_run_id, species_pgdb_folder]
if only_dat_creation:
input_run_move_pgdbs = retrieve_complete_id(input_run_move_pgdbs)
input_run_move_pgdbs.extend([dat_extraction, output_folder, size_reduction])
multiprocess_run_pwt_dats.append([input_folder_path])
multiprocess_run_move_pgdbs.append(input_run_move_pgdbs)

multiprocess_inputs = create_mpwt_input(run_ids=dat_run_ids, input_folder=tmp_folder, pgdbs_folder_path=pgdbs_folder_path,
patho_hole_filler=patho_hole_filler, patho_operon_predictor=patho_operon_predictor,
dat_extraction=dat_extraction, output_folder=output_folder, size_reduction=size_reduction,
only_dat_creation=only_dat_creation, taxon_file=taxon_file)
# Add species that have data in PGDB but are not present in output folder.
# Or if ignore_error has been used, select only PathoLogic build that have succeed + species in input with PGDB and not in output.
if input_folder:
if ignore_error:
multiprocess_inputs = []
multiprocess_run_pwt_dats = []
multiprocess_run_move_pgdbs = []
if run_patho_dat_ids:
if passed_inferences:
tmp_run_dat_ids = list(set(passed_inferences).intersection(set(run_patho_dat_ids)))
Expand All @@ -220,23 +240,28 @@ def multiprocess_pwt(input_folder=None, output_folder=None, patho_inference=None
if run_dat_ids:
for run_dat_id in run_dat_ids:
create_dat_creation_script(run_dat_id, input_folder + "/" + run_dat_id + "/" + "dat_creation.lisp")
multiprocess_dat_inputs = create_mpwt_input(run_ids=run_dat_ids, input_folder=input_folder, pgdbs_folder_path=pgdbs_folder_path,
patho_hole_filler=patho_hole_filler, patho_operon_predictor=patho_operon_predictor,
dat_extraction=dat_extraction, output_folder=output_folder, size_reduction=size_reduction,
only_dat_creation=None, taxon_file=taxon_file)
multiprocess_inputs.extend(multiprocess_dat_inputs)
input_folder_path = input_folder + '/' + run_dat_id + '/'
species_pgdb_folder = pgdbs_folder_path + run_dat_id.lower() + 'cyc/'
input_run_move_pgdbs = [run_dat_id, species_pgdb_folder]
multiprocess_run_pwt_dats.append([input_folder_path])
input_run_move_pgdbs.extend([dat_extraction, output_folder, size_reduction])
multiprocess_run_move_pgdbs.append(input_run_move_pgdbs)

if not multiprocess_run_pwt_dats:
logger.critical('No PGDB to export to move to output folder.')
return

if not multiprocess_inputs:
if not multiprocess_run_move_pgdbs:
logger.critical('No PGDB to export in dat format or to move to output folder.')
return

# Create BioPAX/attributes-values dat files.
if (input_folder and dat_creation) or dat_creation:
logger.info('~~~~~~~~~~Creation of the .dat files~~~~~~~~~~')
dat_error_status = mpwt_pool.map(run_pwt_dat, multiprocess_inputs)
dat_error_status = mpwt_pool.starmap(run_pwt_dat, multiprocess_run_pwt_dats)
logger.info('~~~~~~~~~~Check .dat~~~~~~~~~~')
for multiprocess_input in multiprocess_inputs:
check_dat(multiprocess_input)
for multiprocess_run_move_pgdb in multiprocess_run_move_pgdbs:
check_dat(multiprocess_run_move_pgdb[0], multiprocess_run_move_pgdb[1])
if any(dat_error_status):
if ignore_error:
logger.critical('Error during dat creation. Process stopped. Look at the command log. Also by using --log argument, you can have additional information.')
Expand All @@ -257,7 +282,7 @@ def multiprocess_pwt(input_folder=None, output_folder=None, patho_inference=None
# Move PGDBs or attribute-values/dat files.
if output_folder:
logger.info('~~~~~~~~~~Moving result files~~~~~~~~~~')
mpwt_pool.map(run_move_pgdb, multiprocess_inputs)
mpwt_pool.starmap(run_move_pgdb, multiprocess_run_move_pgdbs)
# Give access to the file for user outside the container.
permission_change(output_folder)

Expand Down
11 changes: 4 additions & 7 deletions mpwt/pathologic_input.py
Expand Up @@ -566,17 +566,14 @@ def read_taxon_id(run_folder):
return taxon_ids


def pwt_input_files(multiprocess_input):
def pwt_input_files(run_folder, taxon_file):
"""
Check if files needed by Pathway Tools are available, if not create them.
Check if there is a pathologic.log from a previous run. If yes, delete it.
Args:
multiprocess_input (dict): multiprocess dictionary input
"""
run_folder = multiprocess_input['species_input_folder_path']
taxon_file = multiprocess_input['taxon_file']

required_files = set(['organism-params.dat', 'genetic-elements.dat', 'dat_creation.lisp'])
files_in = set(next(os.walk(run_folder))[2])

Expand Down Expand Up @@ -675,13 +672,13 @@ def retrieve_complete_id(pgdb_id_folder):
Retrieve the ID of the PGDB from the genetic-elements.dat file.
Args:
pgdb_id_folder (tuple): second tuple argument is the pathname to the PGDB
pgdb_id_folder (list): second tuple argument is the pathname to the PGDB
Returns:
tuple: (new PGDB ID (according to input file), pathname to PGDB folder)
list: (new PGDB ID (according to input file), pathname to PGDB folder)
"""
with open(pgdb_id_folder[1] + '/1.0/input/genetic-elements.dat') as organism_file:
for line in organism_file:
if 'ANNOT-FILE' in line and ';;' not in line:
pgdb_id_complete = line.split('\t')[1].replace('.gff','').replace('.gbk','').strip()

return (pgdb_id_complete, pgdb_id_folder[1])
return [pgdb_id_complete, pgdb_id_folder[1]]
30 changes: 12 additions & 18 deletions mpwt/pwt_wrapper.py
Expand Up @@ -89,7 +89,7 @@ def check_log(species_input_folder_path, log_filename, error_status, log_errors)
return error_status


def run_pwt(multiprocess_input):
def run_pwt(species_input_folder_path, patho_hole_filler, patho_operon_predictor):
"""
Create PGDB using files created during 'create_dats_and_lisp' ('organism-params.dat' and 'genetic-elements.dat').
With verbose run check_output to retrieve the output of subprocess (and show when Pathway Tools has been killed).
Expand All @@ -98,14 +98,12 @@ def run_pwt(multiprocess_input):
pathway-tools -no-web-cel-overview -no-cel-overview -no-patch-download -disable-metadata-saving -nologfile -patho
Args:
multiprocess_input (dictionary): contains multiprocess input (mpwt argument: input folder, output folder, ...)
species_input_folder_path (str): path to input folder
patho_hole_filler (bool): boolean to use or not PathoLogic Hole Filler
patho_operon_predictor (bool): boolean to use or not PathoLogic Operon Predictor
Returns:
boolean: True if there is an error during Pathway Tools run
"""
species_input_folder_path = multiprocess_input['species_input_folder_path']
patho_hole_filler = multiprocess_input['patho_hole_filler']
patho_operon_predictor = multiprocess_input['patho_operon_predictor']

cmd_options = ['-no-web-cel-overview', '-no-cel-overview', '-no-patch-download', '-disable-metadata-saving', '-nologfile']

cmd_pwt = ['pathway-tools', *cmd_options, '-patho', species_input_folder_path]
Expand Down Expand Up @@ -162,7 +160,7 @@ def run_pwt(multiprocess_input):
return error_status


def run_pwt_dat(multiprocess_input):
def run_pwt_dat(species_input_folder_path):
"""
Create dat file using a lisp script created during 'create_dats_and_lisp'.
Kill the subprocess when the command reach the Navigator Window opening proposition.
Expand All @@ -171,12 +169,10 @@ def run_pwt_dat(multiprocess_input):
pathway-tools -no-patch-download -disable-metadata-saving -nologfile -load
Args:
multiprocess_input (dictionary): contains multiprocess input (mpwt argument: input folder, output folder, ...)
species_input_folder_path (str): path to input folder
Returns:
boolean: True if there is an error during lisp script execution
"""
species_input_folder_path = multiprocess_input['species_input_folder_path']

lisp_path = species_input_folder_path + 'dat_creation.lisp'
cmd_options = ['-no-patch-download', '-disable-metadata-saving', '-nologfile']
cmd_dat = ['pathway-tools', *cmd_options, '-load', lisp_path]
Expand Down Expand Up @@ -225,21 +221,19 @@ def run_pwt_dat(multiprocess_input):
return error_status


def run_move_pgdb(move_data):
def run_move_pgdb(pgdb_folder_dbname, pgdb_folder_path, dat_extraction, output_folder, size_reduction):
"""
Move the result files inside the shared folder containing the input data.
pgdb_folder_dbname: ID of the species.
pgdb_folder_path: path to the PGDB of the species (in ptools-local).
Args:
move_data (dictionary): contains multiprocess input (PGDB ID, ptools-local PGDB pathname, ...)
pgdb_folder_dbname (str): species ID
pgdb_folder_path (str): path to species PGDB folder
dat_extraction (bool): to extract or not the attribute-values files (.dat files)
output_folder (str): path to output folder
size_reduction (bool): to compress or not the results
"""
pgdb_folder_dbname = move_data['pgdb_folders'][0]
pgdb_folder_path = move_data['pgdb_folders'][1]
dat_extraction = move_data['dat_extraction']
output_folder = move_data['output_folder']
size_reduction = move_data['size_reduction']

output_species = output_folder + '/' + pgdb_folder_dbname +'/'

if dat_extraction:
Expand Down
18 changes: 9 additions & 9 deletions mpwt/results_check.py
Expand Up @@ -14,13 +14,13 @@
logger = logging.getLogger(__name__)


def check_pwt(multiprocess_inputs, patho_log_folder):
def check_pwt(multiprocess_run_pwts, patho_log_folder):
"""
Check PathoLogic's log.
Create two log files (log_error.txt which contains Pathway Tools log and resume_inference.tsv which contains summary of metabolic networks).
Args:
multiprocess_inputs (list): list of dictionary contaning multiprocess input data
multiprocess_run_pwts (list): list of dictionary contaning multiprocess input data
patho_log_folder (str): pathname to the PathoLogic log folder.
Returns:
Expand All @@ -42,8 +42,8 @@ def check_pwt(multiprocess_inputs, patho_log_folder):
failed_inferences = []
passed_inferences = []

for multiprocess_input in multiprocess_inputs:
species_input_folder_path = multiprocess_input['species_input_folder_path']
for multiprocess_run_pwt in multiprocess_run_pwts:
species_input_folder_path = multiprocess_run_pwt[0]
species = species_input_folder_path.split('/')[-2]
patho_log = species_input_folder_path + '/pathologic.log'

Expand Down Expand Up @@ -140,17 +140,17 @@ def check_pwt(multiprocess_inputs, patho_log_folder):

return passed_inferences

def check_dat(multiprocess_input):
def check_dat(run_dat_id, species_pgdb_folder):
"""
Check dats creation.
Args:
multiprocess_input (dictionary): contains multiprocess input (mpwt argument: input folder, output folder, ...)
run_dat_id (str): species ID
species_pgdb_folder (str): path to species PGDB folder
"""
pgdb_folder = multiprocess_input['pgdb_folders']
pgdb_folder_dbname = pgdb_folder[0].lower() + 'cyc'
pgdb_folder_dbname = run_dat_id.lower() + 'cyc'

dats_path = pgdb_folder[1] +'/1.0/data/'
dats_path = species_pgdb_folder +'/1.0/data/'

dat_files = ["classes.dat", "compound-links.dat", "compounds.dat", "dnabindsites.dat", "enzrxns.dat", "gene-links.dat", "genes.dat", "pathway-links.dat",
"pathways.dat", "promoters.dat", "protein-features.dat", "protein-links.dat", "proteins.dat", "protligandcplxes.dat", "pubs.dat",
Expand Down
24 changes: 11 additions & 13 deletions mpwt/utils.py
Expand Up @@ -246,15 +246,15 @@ def create_pathologic_file(input_folder, output_folder, number_cpu=None):
if not os.path.exists(output_folder):
os.makedirs(output_folder)

multiprocessing_dict = {'input_path': input_path, 'output_path': output_path,
'output_folder': output_folder, 'input_name': input_name}
if taxon_ids:
if input_name in taxon_ids:
multiprocessing_dict['taxon_id'] = taxon_ids[input_name]
taxon_id = taxon_ids[input_name]
else:
taxon_id = None

multiprocessing_input_data.append(multiprocessing_dict)
multiprocessing_input_data.append([input_path, output_path, output_folder, input_name, taxon_id])

check_boolean = mpwt_pool.map(run_create_pathologic_file, multiprocessing_input_data)
check_boolean = mpwt_pool.starmap(run_create_pathologic_file, multiprocessing_input_data)

mpwt_pool.close()
mpwt_pool.join()
Expand All @@ -275,21 +275,20 @@ def write_taxon_id_file(input_name, taxon_id, output_folder):
taxon_writer.writerow([input_name, taxon_id])


def run_create_pathologic_file(multiprocessing_input_data):
def run_create_pathologic_file(input_path, output_path, output_folder, input_name, taxon_id):
"""
Create PathoLogic files from a Genbank or a GFF file.
Args:
multiprocess_input (dictionary): contains multiprocess input (input folder, output_path, output folder and input_name)
input_path (str): path to species input folder
output_path (str): path to output species folder
output_folder (str): path to output folder
input_name (str): species name
taxon_id (dictionary): dictionary with the taxon_id for each species, if taxon_id.tsv does not exit None
"""
input_path = multiprocessing_input_data['input_path']
output_folder = multiprocessing_input_data['output_folder']
output_path = multiprocessing_input_data['output_path']
input_name = multiprocessing_input_data['input_name']
taxon_id = None
# Add taxon ID in taxon_id.tsv if available.
if input_path.endswith('.gbk') or input_path.endswith('.gbff'):

logger.info('Creating PathoLogic file for ' + input_path)
if not os.path.exists(output_path):
os.makedirs(output_path)
Expand Down Expand Up @@ -517,7 +516,6 @@ def run_create_pathologic_file(multiprocessing_input_data):
element_file.write('//\n\n')

elif all([True if '.pf' in species_file or '.fasta' in species_file or '.fsa' in species_file else False for species_file in os.listdir(input_path)]):
taxon_id = multiprocessing_input_data['taxon_id']
write_taxon_id_file(input_name, taxon_id, output_folder)
shutil.copytree(input_path, output_path)

Expand Down

0 comments on commit 4384094

Please sign in to comment.