Skip to content

Commit

Permalink
Fixing phylodb download and changing memory allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
akrinos committed Aug 19, 2020
1 parent 5b72073 commit 9b0933d
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
*out
.coverage*
.ipynb_checkpoints/
.snakemake/
*output/
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.1
0.0.15
77 changes: 77 additions & 0 deletions code_of_conduct.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@

# Contributor Covenant Code of Conduct

## Our Pledge

In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to make participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, sex characteristics, gender identity and expression,
level of experience, education, socio-economic status, nationality, personal
appearance, race, religion, or sexual identity and orientation.

## Our Standards

Examples of behavior that contributes to creating a positive environment
include:

* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members

Examples of unacceptable behavior by participants include:

* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting

## Our Responsibilities

Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.

Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.

## Scope

This Code of Conduct applies within all project spaces, and it also applies when
an individual is representing the project or its community in public spaces.
Examples of representing a project or community include using an official
project e-mail address, posting via an official social media account, or acting
as an appointed representative at an online or offline event. Representation of
a project may be further defined and clarified by project maintainers.

## Enforcement

Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at [akrinos@mit.edu](mailto:akrinos@mit.edu). All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.

Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.

## Attribution

This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html

[homepage]: https://www.contributor-covenant.org

For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
# -- Project information -----------------------------------------------------

project = 'EUKulele'
copyright = '2020, Arianna Krinos, Sarah Hu, Harriet Alexander'
author = 'Arianna Krinos, Sarah Hu, Harriet Alexander'
copyright = '2020, Arianna Krinos, Sarah Hu, Natalie Cohen, Harriet Alexander'
author = 'Arianna Krinos, Sarah Hu, Natalie Cohen, Harriet Alexander'

# The full version, including alpha/beta/rc tags
release = '1'
Expand Down
2 changes: 2 additions & 0 deletions scripts/create_protein_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
python EUKulele/scripts/create_protein_table.py --infile_peptide EUKulele/tests/aux_data/mmetsp/reference-pep-trunc.pep.faa --infile_taxonomy EUKulele/tests/aux_data/mmetsp/taxonomy-table.txt --outfile_json EUKulele/tests/aux_data/mmetsp/protein-map.json --output EUKulele/tests/aux_data/mmetsp/tax-table.txt --delim "/" --strain_col_id strain_name --taxonomy_col_id taxonomy --column SOURCE_ID
create_protein_table.py --infile_peptide reference.pep.fa --infile_taxonomy taxonomy-table.txt --outfile_json prot-map.json --output tax-table.txt --delim "/" --col_source_id strain_name --taxonomy_col_id taxonomy --column 2
python /vortexfs1/omics/alexander/akrinos/remodeling/EUKulele/scripts/create_protein_table.py --infile_peptide /vortexfs1/omics/alexander/akrinos/EUKulele-Reference/phylodb_db/reference.pep.fa --infile_taxonomy /vortexfs1/omics/alexander/akrinos/EUKulele-Reference/phylodb_db/tax-table.txt --outfile_json /vortexfs1/omics/alexander/akrinos/EUKulele-Reference/phylodb_db/prot-map.json --output /vortexfs1/omics/alexander/akrinos/EUKulele-Reference/phylodb_db/taxonomy_table.txt --delim "\t" --strain_col_id strain_name --taxonomy_col_id taxonomy --column 2
"""

Expand Down
6 changes: 4 additions & 2 deletions scripts/download_database.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ elif [[ $DATABASE == "eukprot" ]]; then
echo "All reference files for EukProt downloaded to ${PWD}/$DATABASE"
elif [[ $DATABASE == "phylodb" ]]; then
# Download PhyloDB reference FASTA
wget -O ${PWD}/$DATABASE/$REF_FASTA $REF_FASTA_URL
wget -O ${PWD}/$DATABASE/$REF_FASTA.gz $REF_FASTA_URL
gunzip -f ${PWD}/$DATABASE/$REF_FASTA.gz
ALLEXITS=$(($ALLEXITS + $?))

# Download PhyloDB reference taxonomy table
wget -O ${PWD}/$DATABASE/$REF_TABLE $REF_TABLE_URL
wget -O ${PWD}/$DATABASE/$REF_TABLE.gz $REF_TABLE_URL
gunzip -f ${PWD}/$DATABASE/$REF_TABLE.gz
ALLEXITS=$(($ALLEXITS + $?))

# Download PhyloDB files from Google Drive
Expand Down
1 change: 0 additions & 1 deletion src/EUKulele/EUKulele_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ def main(args_in):
parser.add_argument('--test', action='store_true', default=False)

## OTHER USER CHOICES ##
#cutoff_file = pkgutil.get_data(__name__, "tax-cutoffs.yaml")
cutoff_file = "tax-cutoffs.yaml"
parser.add_argument('--cutoff_file', default = cutoff_file)
parser.add_argument('--filter_metric', default = "evalue", choices = ["pid", "evalue", "bitscore"])
Expand Down
4 changes: 2 additions & 2 deletions src/EUKulele/busco_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
MEM_AVAIL_GB = pd.read_csv("free.csv", sep = "\s+").free[0] / 10**3
except:
pass
MAX_JOBS = math.floor(MEM_AVAIL_GB / 80)
MAX_JOBS = math.floor(MEM_AVAIL_GB / 40) #80
if MAX_JOBS == 0:
MAX_JOBS = 1

Expand Down Expand Up @@ -45,7 +45,7 @@ def configRunBusco(output_dir, mets_or_mags, pep_ext, nt_ext, sample_dir, sample
## Run BUSCO on the full dataset ##
busco_db = "eukaryota_odb10"
busco_config_res = configure_busco(busco_db)
n_jobs_busco = min(multiprocessing.cpu_count(), len(samples), MAX_JOBS)
n_jobs_busco = min(multiprocessing.cpu_count(), len(samples), max(1, int(MAX_JOBS / len(samples))))
print("Running busco...", flush=True)
busco_res = Parallel(n_jobs=n_jobs_busco, prefer="threads")(delayed(run_busco)(sample_name,
os.path.join(output_dir,
Expand Down
9 changes: 6 additions & 3 deletions src/EUKulele/download_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ def downloadDatabase(database_name, alignment_choice):
if database_name == "mmetsp":
column_id = "SOURCE_ID"
delimiter = "/"
sourceID = "Source_ID"
elif (database_name == "eukprot"):
column_id = 0
delimiter = "\t"
create_protein_table_args.append("--euk-prot")
sourceID = "EukProt_ID"
elif (database_name == "phylodb"):
column_id = 0
column_id = 2
delimiter = "\t"
sourceID = "strain_name"
create_protein_table_args.append("--reformat_tax")
else:
print("Specified reference database, " + database_name + " is not supported.")
Expand All @@ -55,8 +58,8 @@ def downloadDatabase(database_name, alignment_choice):

create_protein_table_args.extend(["--infile_peptide",os.path.join(database_name,fasta_name),
"--infile_taxonomy",orig_tax_name,"--output",tax_table,"--outfile_json",
protein_json,"--delim",str(delimiter),"--strain_col_id",
"strain_name","--taxonomy_col_id", "taxonomy","--column",
protein_json,"--delim",str(delimiter),"--col_source_id",
sourceID,"--taxonomy_col_id", "taxonomy","--column",
str(column_id)])

## Run function to create protein table file from scripts/create_protein_table.py ##
Expand Down
41 changes: 23 additions & 18 deletions src/EUKulele/manage_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
pass

# 25 GB memory per GB file size
def calc_max_jobs(size_in_bytes = 2147483648):
def calc_max_jobs(num_files, size_in_bytes = 2147483648):
size_in_gb = size_in_bytes / (1024*1024*1024)
if size_in_gb == 0:
size_in_gb = 0.01
MAX_JOBS = math.floor(MEM_AVAIL_GB / (25 * size_in_gb)) #48)
MAX_JOBS = math.floor(MEM_AVAIL_GB / (25 * size_in_gb * num_files)) #48)
if MAX_JOBS == 0:
MAX_JOBS = 1
return MAX_JOBS

MAX_JOBS = calc_max_jobs()
MAX_JOBS = max(1, calc_max_jobs(1))

# For DIAMOND: The program can be expected to use roughly six times this number of memory (in GB).
# So for the default value of -b2.0, the memory usage will be about 12 GB.
Expand Down Expand Up @@ -189,9 +189,11 @@ def manageTrandecode(met_samples, output_dir, rerun_rules, sample_dir,
"""

print("Running TransDecoder for MET samples...", flush = True)
MAX_JOBS = max([calc_max_jobs(pathlib.Path(os.path.join(sample_dir, sample + nt_ext)).stat().st_size)
for sample in met_samples])
n_jobs_align = min(multiprocessing.cpu_count(), len(met_samples), MAX_JOBS)

MAX_JOBS = min([calc_max_jobs(len(met_samples), pathlib.Path(os.path.join(sample_dir, sample + nt_ext)).stat().st_size) \
for sample in met_samples \
if os.path.isfile(os.path.join(sample_dir, sample + nt_ext))])
n_jobs_align = min(multiprocessing.cpu_count(), len(met_samples), max(1,MAX_JOBS))
transdecoder_res = Parallel(n_jobs=n_jobs_align)(delayed(transdecodeToPeptide)(sample_name, output_dir,
rerun_rules, sample_dir,
mets_or_mags = "mets", transdecoder_orf_size = 100,
Expand Down Expand Up @@ -235,9 +237,8 @@ def manageAlignment(alignment_choice, samples, filter_metric, output_dir, ref_fa
else:
fastas = [os.path.join(sample_dir, sample + "." + pep_ext) for sample in samples]

MAX_JOBS = max([calc_max_jobs(pathlib.Path(sample).stat().st_size) for sample in fastas])
n_jobs_align = min(multiprocessing.cpu_count(), len(samples), MAX_JOBS)
print(MAX_JOBS)
MAX_JOBS = min([calc_max_jobs(len(fastas), pathlib.Path(sample).stat().st_size) for sample in fastas])
n_jobs_align = min(multiprocessing.cpu_count(), len(samples), max(1,MAX_JOBS))
alignment_res = Parallel(n_jobs=n_jobs_align, prefer="threads")(delayed(alignToDatabase)(alignment_choice,
sample_name, filter_metric,
output_dir, ref_fasta,
Expand Down Expand Up @@ -333,12 +334,12 @@ def alignToDatabase(alignment_choice, sample_name, filter_metric, output_dir, re
rc1 = subprocess.Popen(["diamond", alignment_method, "--db", align_db, "-q", fasta, "-o",
diamond_out, "--outfmt", str(outfmt), "-k", str(k), "--min-score",
str(bitscore), '-b3.0'], stdout = diamond_log, stderr = diamond_err).wait()
print("Diamond process exited.", flush = True)
print("Diamond process exited for sample " + str(sample_name) + ".", flush = True)
elif filter_metric == "pid":
rc1 = subprocess.Popen(["diamond", alignment_method, "--db", align_db, "-q", fasta, "-o",
diamond_out, "--outfmt", str(outfmt), "-k", str(k), "--id",
str(pid_cutoff), '-b3.0'], stdout = diamond_log, stderr = diamond_err).wait()
print("Diamond process exited.", flush = True)
print("Diamond process exited for sample " + str(sample_name) + ".", flush = True)
else:
rc1 = subprocess.Popen(["diamond", alignment_method, "--db", align_db, "-q", fasta, "-o",
diamond_out, "--outfmt", str(outfmt), "-k", str(k), "-e",
Expand All @@ -349,7 +350,7 @@ def alignToDatabase(alignment_choice, sample_name, filter_metric, output_dir, re
#rc1 = p.returncode
#print(stderr)
#print(stdout)
print("Diamond process exited.", flush = True)
print("Diamond process exited for sample " + str(sample_name) + ".", flush = True)
if rc1 != 0:
print("Diamond did not complete successfully.")
os.system("rm -f " + diamond_out)
Expand Down Expand Up @@ -411,17 +412,20 @@ def manageTaxEstimation(output_dir, mets_or_mags, tax_tab, cutoff_file, consensu
else:
fastas = [os.path.join(sample_dir, sample + "." + pep_ext) for sample in samples]

MAX_JOBS = max([calc_max_jobs(pathlib.Path(sample).stat().st_size) for sample in fastas])
n_jobs_align = min(multiprocessing.cpu_count(), len(alignment_res), MAX_JOBS)
MAX_JOBS = min([calc_max_jobs(len(fastas), pathlib.Path(sample).stat().st_size) for sample in fastas])
n_jobs_align = min(multiprocessing.cpu_count(), len(alignment_res), max(1, MAX_JOBS))
for t in range(len(alignment_res)):
curr_out = place_taxonomy(tax_tab, cutoff_file, consensus_cutoff,\
prot_tab, use_salmon_counts, names_to_reads,\
alignment_res[t], outfiles[t], rerun_rules)
try:
sys.stdout = open(os.path.join("log", "tax_est_" + alignment_res[t].split("/")[-1].split(".")[0] + ".out"), "w")
sys.stderr = open(os.path.join("log", "tax_est_" + alignment_res[t].split("/")[-1].split(".")[0] + ".err"), "w")
curr_out = place_taxonomy(tax_tab, cutoff_file, consensus_cutoff,\
prot_tab, use_salmon_counts, names_to_reads,\
alignment_res[t], outfiles[t], rerun_rules)
except:
print("Taxonomic estimation for core genes did not complete successfully. Check log file for details.")
print("Taxonomic estimation did not complete successfully. Check log file for details.")
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__

Expand All @@ -442,7 +446,7 @@ def manageCoreTaxEstimation(output_dir, mets_or_mags, tax_tab, cutoff_file, cons
else:
fastas = [os.path.join(sample_dir, sample + "." + pep_ext) for sample in samples]

MAX_JOBS = max([calc_max_jobs(pathlib.Path(sample).stat().st_size) for sample in fastas])
MAX_JOBS = min([calc_max_jobs(len(fastas), pathlib.Path(sample).stat().st_size) for sample in fastas])
n_jobs_align = min(multiprocessing.cpu_count(), len(alignment_res), MAX_JOBS)
for t in range(len(alignment_res)):
try:
Expand Down Expand Up @@ -484,9 +488,10 @@ def manageCoreTaxVisualization(output_dir, mets_or_mags, sample_dir, pep_ext, nt
def manageTaxAssignment(samples, mets_or_mags, output_dir, sample_dir, pep_ext, core = False):
if mets_or_mags == "mags":
print("Performing taxonomic assignment steps...", flush=True)
MAX_JOBS = max([calc_max_jobs(pathlib.Path(os.path.join(sample_dir, sample + "." + pep_ext)).stat().st_size)
MAX_JOBS = min([calc_max_jobs(len(samples),
pathlib.Path(os.path.join(sample_dir, sample + "." + pep_ext)).stat().st_size)
for sample in samples])
n_jobs_viz = min(multiprocessing.cpu_count(), len(samples), MAX_JOBS)
n_jobs_viz = min(multiprocessing.cpu_count(), len(samples), max(1,MAX_JOBS))
try:
if core:
assign_res = Parallel(n_jobs=n_jobs_viz, prefer="threads")(delayed(assignTaxonomy)(samp, output_dir,
Expand Down
18 changes: 15 additions & 3 deletions src/EUKulele/tax_placement.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def lca(full_classifications):
full_classifications_split = [[str(subtax).strip() for subtax in curr.split(";")] for curr in full_classifications]
length_classes = [len(curr) for curr in full_classifications_split]
if len(set(length_classes)) != 1:
print("Error: not all classifications at at the same taxonomic level.")
print("Error: not all classifications at at the same taxonomic level.", flush = True)
sys.exit(1)
for l in reversed(range(length_classes[0])):
set_classifications = [curr[l] for curr in full_classifications_split]
Expand All @@ -81,7 +81,7 @@ def match_maker(dd, consensus_cutoff, tax_dict, use_counts, tax_cutoffs):
md = dd.pident.max()
transcript_name = set(list(dd["qseqid"]))
if len(transcript_name) > 1:
print("More than 1 transcript name included in the group.")
print("More than 1 transcript name included in the group.", flush = True)
transcript_name = list(transcript_name)[0]
ds = list(set(dd[dd.pident==md]['ssqid_TAXID']))
counts = list(set(dd[dd.pident==md]['counts']))
Expand Down Expand Up @@ -147,6 +147,19 @@ def apply_parallel(grouped_data, match_maker, consensus_cutoff, tax_dict, use_co
def classify_taxonomy_parallel(df, tax_dict, namestoreads, pdict, consensus_cutoff, tax_cutoffs):
chunksize = 10 ** 6
counter = 0

## Return an empty dataframe if no matches made ##
if os.stat(str(df)).st_size == 0:
if namestoreads != 0:
return pd.DataFrame([[transcript_name, assignment, full_classification, best_classification,
md, chosen_count, ambiguous]],\
columns=['transcript_name','classification_level', 'full_classification',
'classification', 'max_pid', 'counts', 'ambiguous'])
else:
return pd.DataFrame([[transcript_name, assignment, full_classification, best_classification, md, ambiguous]],\
columns=['transcript_name', 'classification_level', 'full_classification',
'classification', 'max_pid', 'ambiguous'])

for chunk in pd.read_csv(str(df), sep = '\t', header = None, chunksize=chunksize):
chunk.columns = ['qseqid', 'sseqid', 'pident', 'length', 'mismatch', 'gapopen', 'qstart',
'qend', 'sstart', 'send', 'evalue', 'bitscore']
Expand All @@ -158,7 +171,6 @@ def classify_taxonomy_parallel(df, tax_dict, namestoreads, pdict, consensus_cuto
chunk['counts'] = [0] * len(chunk.qseqid) # if no reads dict, each count is just assumed to be 0 and isn't recorded later
use_counts = 0

print(chunk, flush=True)
if counter == 0:
outdf = apply_parallel(chunk.groupby('qseqid'), match_maker,
consensus_cutoff, tax_dict, use_counts, tax_cutoffs)
Expand Down

0 comments on commit 9b0933d

Please sign in to comment.