Skip to content
This repository has been archived by the owner on Mar 17, 2023. It is now read-only.

add multiprocessing to sencha translate #84

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c7c1565
add multiprocessing to sencha translate
pranathivemuri Jul 28, 2020
9b73336
fix file handle writing
pranathivemuri Jul 28, 2020
a18d80b
format translate file per black
pranathivemuri Jul 28, 2020
3732bb3
fix read fasta sequence error
pranathivemuri Jul 28, 2020
3bdf475
trying to fix description key not found error
pranathivemuri Jul 28, 2020
ce9db77
wip
pranathivemuri Jul 28, 2020
5395d73
correct endswith
pranathivemuri Jul 28, 2020
1f94c3d
correct endswith logic
pranathivemuri Jul 28, 2020
6c6aa75
figuring out nodegraph pickling
pranathivemuri Jul 29, 2020
d305c44
black formatting
pranathivemuri Jul 29, 2020
d144757
wip
pranathivemuri Jul 29, 2020
5cbae27
parallel processing works
pranathivemuri Aug 3, 2020
3739103
Wip
pranathivemuri Aug 3, 2020
1b6ef7a
fix tests
pranathivemuri Aug 3, 2020
6a959df
black reformatting
pranathivemuri Aug 3, 2020
3ca6bb0
fix tests
pranathivemuri Aug 3, 2020
65bf138
imap unordered
pranathivemuri Aug 3, 2020
264cc54
print time
pranathivemuri Aug 3, 2020
b1128fe
lint
pranathivemuri Aug 3, 2020
fce3bb8
add pathos dependency
pranathivemuri Aug 3, 2020
f7e74d9
map chunksize instead
pranathivemuri Aug 3, 2020
c6b71bf
add linting
pranathivemuri Aug 3, 2020
4a27bb8
fix printing
pranathivemuri Aug 6, 2020
7f946f5
fix linting
pranathivemuri Aug 6, 2020
9fb4aca
named tuple to dictionary, to save time
pranathivemuri Aug 6, 2020
c3715f4
linting
pranathivemuri Aug 6, 2020
721e155
adding comment on global explanation
pranathivemuri Aug 6, 2020
9b1dd98
wip
pranathivemuri Aug 6, 2020
a9f771b
linting again
pranathivemuri Aug 6, 2020
79b66ef
use default chunksize
pranathivemuri Aug 10, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
click
httmock
pathos
khmer
networkx
numpy
Expand Down
2 changes: 1 addition & 1 deletion sencha/constants_translate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import itertools
from collections import namedtuple, defaultdict
from collections import namedtuple

from sencha.sequence_encodings import ALPHABET_ALIASES

Expand Down
13 changes: 8 additions & 5 deletions sencha/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@ def per_read_false_positive_coding_rate(
def load_nodegraph(*args, **kwargs):
"""Wrapper to load khmer-style bloom filter called a 'nodegraph'"""
try:
# khmer 2.1.1
return khmer.load_nodegraph(*args, **kwargs)
except AttributeError:
# khmer 3+/master branch
return khmer.Nodegraph.load(*args, **kwargs)
return sourmash.load_nodegraph(*args, **kwargs)
except:
try:
# khmer 2.1.1
return khmer.load_nodegraph(*args, **kwargs)
except AttributeError:
# khmer 3+/master branch
return khmer.Nodegraph.load(*args, **kwargs)


def maybe_read_peptide_file(peptide_file):
Expand Down
146 changes: 93 additions & 53 deletions sencha/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
Partition reads into coding, noncoding, and low-complexity bins
"""
import sys
import time
import warnings


import click
import numpy as np
import pandas as pd
import screed
from tqdm import tqdm
from pathos import multiprocessing
from sourmash._minhash import hash_murmur
from sencha.log_utils import get_logger
from sencha.sequence_encodings import encode_peptide
Expand Down Expand Up @@ -115,15 +115,27 @@ def write_fasta(file_handle, description, sequence):
class Translate:
def __init__(self, args):
"""Constructor"""
self.args = args
for key in args:
setattr(self, key, args[key])
if self.long_reads:
raise NotImplementedError("Not implemented! ... yet :)")
self.jaccard_threshold = get_jaccard_threshold(
self.jaccard_threshold, self.alphabet
)
self.peptide_bloom_filter = maybe_make_peptide_bloom_filter(
# Use global variable so the functions that are inside multiprocessing
# don't try to access the nodegraph by self.peptide_bloom_filter
# and hence avoiding the pickling error while serialization of the NodeGraph by multiprocessing
# which doesn't exist for nodegraph currently
# Con is that global variable definition inside a class defintion
# is advised to be avoided and is said that it might be enforced as incorrect
# NB PV: This is been said in the docs since Python2.7, but it hasn't changed, so I
# suggest keeping the global variable declaration here for now
# https://docs.python.org/3.8/reference/simple_stmts.html#the-global-statement
# 1) One solution is that if we can call create and declare peptide_bloom_filter inside the
# if__name__=main function but that would mean switching to argparse from click
# 2) Another solution is to make nodegraph serializable
global peptide_bloom_filter
peptide_bloom_filter = maybe_make_peptide_bloom_filter(
self.peptides,
self.peptide_ksize,
self.alphabet,
Expand All @@ -137,13 +149,13 @@ def __init__(self, args):
if not self.peptides_are_bloom_filter:
self.peptide_bloom_filter_filename = maybe_save_peptide_bloom_filter(
self.peptides,
self.peptide_bloom_filter,
peptide_bloom_filter,
self.alphabet,
self.save_peptide_bloom_filter,
)
else:
self.peptide_bloom_filter_filename = self.peptides
self.peptide_ksize = self.peptide_bloom_filter.ksize()
self.peptide_ksize = peptide_bloom_filter.ksize()
self.nucleotide_ksize = 3 * self.peptide_ksize

def maybe_write_fasta(self, file_handle, description, sequence):
Expand Down Expand Up @@ -188,15 +200,15 @@ def score_single_translation(self, translation):
hashes = [hash_murmur(kmer) for kmer in kmers]
n_kmers = len(kmers)
n_kmers_in_peptide_db = sum(
1 for h in hashes if self.peptide_bloom_filter.get(h) > 0
1 for h in hashes if peptide_bloom_filter.get(h) > 0
)
if self.verbose:
logger.info("\ttranslation: \t".format(encoded))
logger.info("\tkmers:", " ".join(kmers))

if self.verbose:
kmers_in_peptide_db = {
(k, h): self.peptide_bloom_filter.get(h) for k, h in zip(kmers, hashes)
(k, h): peptide_bloom_filter.get(h) for k, h in zip(kmers, hashes)
}
# Print keys (kmers) only
logger.info("\tK-mers in peptide database:")
Expand All @@ -208,6 +220,12 @@ def score_single_translation(self, translation):

def check_peptide_content(self, description, sequence):
"""Predict whether a nucleotide sequence could be protein-coding"""
fasta_seqs = {
"low_complexity_peptide": [],
"coding_nucleotide": [],
"noncoding_nucleotide": [],
"coding_peptide": [],
}
with warnings.catch_warnings():
warnings.simplefilter("ignore")
translations = TranslateSingleSeq(
Expand Down Expand Up @@ -243,10 +261,11 @@ def check_peptide_content(self, description, sequence):
encoded, self.peptide_ksize
)
if is_kmer_low_complex:
self.maybe_write_fasta(
self.file_handles["low_complexity_peptide"],
description + " translation_frame: {}.format(frame)",
translation,
fasta_seqs["low_complexity_peptide"].append(
[
description + " translation_frame: {} ".format(frame),
translation,
]
)
scoring_lines.append(
constants_translate.SingleReadScore(
Expand All @@ -269,10 +288,8 @@ def check_peptide_content(self, description, sequence):
description, frame
) + "jaccard: {}".format(fraction_in_peptide_db)
if fraction_in_peptide_db > self.jaccard_threshold:
write_fasta(sys.stdout, seqname, translation)
self.maybe_write_fasta(
self.file_handles["coding_nucleotide"], seqname, sequence
)
fasta_seqs["coding_peptide"].append([seqname, translation])
fasta_seqs["coding_nucleotide"].append([seqname, sequence])
scoring_lines.append(
constants_translate.SingleReadScore(
fraction_in_peptide_db,
Expand All @@ -282,9 +299,7 @@ def check_peptide_content(self, description, sequence):
)
)
else:
self.maybe_write_fasta(
self.file_handles["noncoding_nucleotide"], seqname, sequence
)
fasta_seqs["noncoding_nucleotide"].append([seqname, sequence])
scoring_lines.append(
constants_translate.SingleReadScore(
fraction_in_peptide_db,
Expand All @@ -295,7 +310,11 @@ def check_peptide_content(self, description, sequence):
frame,
)
)
return scoring_lines
lines = [
self.get_coding_score_line(description, x, y, z, frame)
for x, y, z, frame in scoring_lines
]
return lines, fasta_seqs

def check_nucleotide_content(self, description, n_kmers, sequence):
"""If passes, then this read can move on to checking protein translations
Expand All @@ -304,30 +323,38 @@ def check_nucleotide_content(self, description, n_kmers, sequence):
pass thresholds to be
checked for protein-coding-ness
"""
fasta_seqs = {"low_complexity_nucleotide": []}
if n_kmers > 0:
jaccard = np.nan
special_case = "Low complexity nucleotide"
self.maybe_write_fasta(
self.file_handles["low_complexity_nucleotide"], description, sequence
)
fasta_seqs["low_complexity_nucleotide"].append([description, sequence])
else:
jaccard = np.nan
n_kmers = np.nan
special_case = "Read length was shorter than 3 * peptide " "k-mer size"
return [
tranlated_scores = [
constants_translate.SingleReadScore(jaccard, n_kmers, special_case, i)
for i in [1, 2, 3, -1, -2, -3]
]
lines = [
self.get_coding_score_line(description, x, y, z, frame)
for x, y, z, frame in tranlated_scores
]
return lines, fasta_seqs

def maybe_score_single_read(self, description, sequence):
def maybe_score_single_read(self, record):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is record also a named tuple? if not i think a named tuple would also be really nice here.

"""Check if read is low complexity/too short, otherwise score it"""
# Check if nucleotide sequence is low complexity

description = record[0]
sequence = record[1]
if evaluate_is_fastp_low_complexity(sequence):
scores = self.check_nucleotide_content(description, np.nan, sequence)
scores, fasta_seqs = self.check_nucleotide_content(
description, np.nan, sequence
)
else:
scores = self.check_peptide_content(description, sequence)
return scores
scores, fasta_seqs = self.check_peptide_content(description, sequence)
results = {"scores": scores, "fasta_seqs": fasta_seqs}
return results

def get_coding_score_line(self, description, jaccard, n_kmers, special_case, frame):
if special_case is not None:
Expand All @@ -340,32 +367,34 @@ def get_coding_score_line(self, description, jaccard, n_kmers, special_case, fra

def score_reads_per_file(self, reads):
"""Assign a coding score to each read. Where the magic happens."""

scoring_lines = []

with screed.open(reads) as records:
for record in tqdm(records):
description = record["name"]
sequence = record["sequence"]
records = []
with screed.open(reads) as seqfile:
for read in seqfile:
description = read.name
if self.verbose:
logger.info(description)

for (
jaccard,
n_kmers,
special_case,
frame,
) in self.maybe_score_single_read(description, sequence):
if self.verbose:
logger.info(
"Jaccard: {}, n_kmers = {} for frame {}".format(
jaccard, n_kmers, frame
)
records.append(tuple((description, read.sequence)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok so it's not a named tuple, depending on memory usage this might actually be a better solution, otherwise we would have to store the extra space overhead of the named tuple. Would be interesting to look into what the storage differences actually are between named tuple and tuple, if it's fairly negligible it may still be good to have here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you brought up a great point - https://stackoverflow.com/questions/49419132/namedtuple-slow-compared-to-tuple-dictionary-class I am not sure how right the estimates in this link are but something to think about

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nice thanks for the reference! Best to keep as is then 👍

n_jobs = self.processes

pool = multiprocessing.Pool(processes=n_jobs)

results = pool.map(self.maybe_score_single_read, records)
pool.close()
pool.join()
for result in results:
for fasta_file_handle, seqs in result["fasta_seqs"].items():
if fasta_file_handle == "coding_peptide":
for description, seq in seqs:
write_fasta(sys.stdout, description, seq)
else:
for description, seq in seqs:
self.maybe_write_fasta(
self.file_handles[fasta_file_handle], description, seq
)
line = self.get_coding_score_line(
description, jaccard, n_kmers, special_case, frame
)
scoring_lines.append(line)
scoring_lines = []
for result in results:
for line in result["scores"]:
scoring_lines.append(line)
# Concatenate all the lines into a single dataframe
scoring_df = pd.DataFrame(
scoring_lines, columns=constants_translate.SCORING_DF_COLUMNS
Expand Down Expand Up @@ -444,7 +473,7 @@ def get_coding_scores_all_files(self):
@click.option(
"--csv",
default=False,
help="Name of csv file to write with all sequence reads and " "their coding scores",
help="Name of csv file to write with all sequence reads and" "their coding scores",
)
@click.option(
"--parquet",
Expand Down Expand Up @@ -495,6 +524,12 @@ def get_coding_scores_all_files(self):
"(TAG, TAA, TGA)",
)
@click.option("--verbose", is_flag=True, help="Print more output")
@click.option(
"--processes",
default=4,
type=int,
help="number of processes to parallel process on",
) # noqa
def cli(
peptides,
reads,
Expand All @@ -514,6 +549,7 @@ def cli(
n_tables=constants_index.DEFAULT_N_TABLES,
long_reads=False,
verbose=False,
processes=4,
):
"""Writes coding peptides from reads to standard output

Expand Down Expand Up @@ -582,6 +618,8 @@ def cli(
verbose : bool
Whether or not to print lots of stuff. Can specify multiple, e.g. -vv
if you really like having everything in stdout
processes: int
multiprocessing number of processes to run on

\b
Returns
Expand All @@ -590,6 +628,7 @@ def cli(
Outputs a fasta-formatted sequence of translated peptides
"""
# \b above prevents re-wrapping of paragraphs
startt = time.time()
translate_obj = Translate(locals())
translate_obj.set_coding_scores_all_files()
coding_scores = translate_obj.get_coding_scores_all_files()
Expand All @@ -606,6 +645,7 @@ def cli(
assemble_summary_obj.maybe_write_csv(coding_scores)
assemble_summary_obj.maybe_write_parquet(coding_scores)
assemble_summary_obj.maybe_write_json_summary(coding_scores)
print("time taken to translate is {:.5f} seconds".format(time.time() - startt))


if __name__ == "__main__":
Expand Down
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import pandas as pd
import pytest
import warnings

"""
conftest.py contains fixtures or functions-turned-variables that can be
Expand Down
4 changes: 2 additions & 2 deletions tests/test_translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ def translate_class(tmpdir, reads, peptide_fasta):
n_tables=constants_index.DEFAULT_N_TABLES,
long_reads=False,
verbose=True,
processes=4,
)
translate_obj = translate.Translate(args)
return translate_obj
return translate.Translate(args)


@pytest.fixture
Expand Down