Skip to content

Commit

Permalink
Fixes #34. Closes #33. Mostly changes kmerdb/parse.py to incorporate …
Browse files Browse the repository at this point in the history
…metadata additions to the database, if relevant (--all-metadata). Otherwise, the database size is much simpler and is suitable for calculations, read associations aren't necessary for a functional kdb. Changes kmerdb/database.py to include proper defaults for the text field. Changes kmer.py to include a stderr statment if running on fasta. All of this updates the 'kmerdb profile' command to produce a full set of k-mer/read or k-mer/sequence interactions as metadata for each k-mer. The new metadata fields are seqids, starts, and reverses, of type str, int, and bool respectively.
  • Loading branch information
MatthewRalston committed Jan 23, 2021
1 parent f7563a3 commit 7b96e13
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 64 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
language: python
python:
- "3.6.4"
- "3.7.4"
- "3.8.4"
before_install:
Expand Down
16 changes: 10 additions & 6 deletions kmerdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,11 @@ def profile(arguments):

logger.debug("debugging the result of parsefile...")


logger.info("===============================")
logger.info("Outer scope")
result = db.conn.execute("SELECT * FROM kmers WHERE id = ?", 63).fetchone()
logger.info(result)
logger.info("===============================")
metadata=OrderedDict({
"version": VERSION,
"metadata_blocks": 1,
Expand Down Expand Up @@ -895,8 +899,6 @@ def profile(arguments):
print(kmer_dbrecs_per_file)


raise RuntimeError("Still depressing nothing is even here yet")

# raise RuntimeError("HOW DOES THIS HAVE NONE")
if len(kmer_dbrecs_per_file):
i = kmer_dbrecs_per_file[0][0] - 1 # Remove 1 for the Sqlite zero-based indexing
Expand All @@ -911,10 +913,12 @@ def profile(arguments):
# metadata now has three additional properties, based on the total number of times this k-mer occurred. Eventually the dimension of these new properties should match the count.
if arguments.all_metadata:


logger.debug("LAST ASPECT")
seqids = [x[1] for x in kmer_dbrecs_per_file if x[1] is not None]
starts = [x[2] for x in kmer_dbrecs_per_file if x[2] is not None]
reverses = [x[3] for x in kmer_dbrecs_per_file if x[3] is not None]

seqids = [x[2] for x in kmer_dbrecs_per_file]
starts = [x[3] for x in kmer_dbrecs_per_file]
reverses = [x[4] for x in kmer_dbrecs_per_file]
if "seqids" in kmer_metadata.keys():
kmer_metadata["seqids"] += seqids
else:
Expand Down
6 changes: 3 additions & 3 deletions kmerdb/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def __make_empty_database(self):
for x in range(self._max_records):
null_profile.append({
'count': 0,
'starts': None,
'reverses': None,
'seqids': None
'starts': '[]',
'reverses': '[]',
'seqids': '[]'
})

with self._engine.connect() as conn:
Expand Down
24 changes: 15 additions & 9 deletions kmerdb/kmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Kmers:
:ivar k: The choice of k to shred with
:ivar strand_specific: Include k-mers from forward strand only
"""
def __init__(self, k, strand_specific=True):
def __init__(self, k, strand_specific=True, fasta=False):
"""
:param k: The choice of k to shred with
Expand All @@ -53,8 +53,11 @@ def __init__(self, k, strand_specific=True):
raise TypeError("kmerdb.kmer.Kmers.__init__() expects an int as its first positional argument")
elif type(strand_specific) is not bool:
raise TypeError("kmerdb.kmer.Kmers.__init__() expects a bool as its second positional argument")
elif type(fasta) is not bool:
raise TypeError("kmerdb.kmer.Kmers.__init__() expects a bool as its third positional argument")
self.k = k
self.strand_specific = strand_specific
self.fasta = fasta

def shred(self, seqRecord):
"""
Expand All @@ -65,13 +68,19 @@ def shred(self, seqRecord):
:rtype:
"""


if not isinstance(seqRecord, Bio.SeqRecord.SeqRecord):
raise TypeError("kmerdb.kmer.Kmers expects a Bio.SeqRecord.SeqRecord object as its first positional argument")
seqlen = len(str(seqRecord.seq))
if seqlen < self.k:
logger.error("Offending sequence ID: {0}".format(seqRecord.id))
raise ValueError("kmerdb expects that each input sequence is longer than k.")
kmers = []
starts = []
reverses = []
# Each of the n-k+1 string slices become the k-mers
for c in range(len(seqRecord.seq) - self.k + 1):
for c in range(seqlen - self.k + 1):
s = seqRecord.seq[c:(c+self.k)]
kmers.append(str(s))
reverses.append(False)
Expand All @@ -80,13 +89,10 @@ def shred(self, seqRecord):
kmers.append(str(s.reverse_complement()))
reverses.append(True)
starts.append(c)

sys.stderr.write(" --- ~~~ --- ~~~ shredded ~~~ --- ~~~ ---\n")
sys.stderr.write("a {0}bp long sequence was shredded into L-k+1 {1} total and {1} unique k-mers\n\n".format(len(seqRecord.seq), len(seqRecord.seq)-self.k+1, len(kmers)))
output = {'id': seqRecord.id, 'kmers': list(filter(lambda x: x is not None, map(kmer_to_id, kmers))), "seqids": repeat(seqRecord.id, len(starts)), "starts": starts, 'reverses': reverses}
#logger.debug(output['seqids'])
return output
#return list(filter(lambda x: x is not None, map(kmer_to_id, kmers)))
if self.fasta:
sys.stderr.write(" --- ~~~ --- ~~~ shredded ~~~ --- ~~~ ---\n")
sys.stderr.write("a {0}bp long sequence was shredded into L-k+1 {1} total and {2} unique k-mers\n\n".format(len(seqRecord.seq), len(str(seqRecord.seq))-self.k+1, len(list(set(kmers)))))
return {'id': seqRecord.id, 'kmers': list(map(lambda x: kmer_to_id(x), kmers)), "seqids": repeat(seqRecord.id, len(starts)), "starts": starts, 'reverses': reverses}


def kmer_to_id(s):
Expand Down
86 changes: 43 additions & 43 deletions kmerdb/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,56 +83,49 @@ def parsefile(filepath:str, k:int, p:int=1, b:int=50000, n:int=1000, stranded:bo
# Build fasta/fastq parser object to stream reads into memory
logger.debug("Constructing SeqParser object...")
seqprsr = seqparser.SeqParser(filepath, b, k)
fasta = not seqprsr.fastq
logger.debug("Constructing multiprocessing pool with {0} processors".format(p))
pool = Pool(processes=p) # A multiprocessing pool of depth 'p'
Kmer = kmer.Kmers(k, strand_specific=stranded) # A wrapper class to shred k-mers with
Kmer = kmer.Kmers(k, strand_specific=stranded, fasta=fasta) # A wrapper class to shred k-mers with
# Look inside the seqprsr object for the type of file
s = "fastq" if seqprsr.fastq else "fasta"


recs = [r for r in seqprsr] # A block of exactly 'b' reads-per-block to process in parallel
if s == "fastq":
if not fasta:
logger.debug("Read exactly b=={0}=={1} records from the {2} seqparser object".format(b, len(recs), s))
assert len(recs) == b, "The seqparser should return exactly {0} records at a time".format(b)
else:
logger.debug("Read {0} sequences from the {1} seqparser object".format(len(recs), s))
logger.debug("Skipping the block size assertion for fasta files")
logger.info("Read {0} sequences from the {1} seqparser object".format(len(recs), "fasta" if fasta else "fastq"))

while len(recs): # While the seqprsr continues to produce blocks of reads
# Run each read through the shred method
list_of_dicts = pool.map(Kmer.shred, recs)

logger.info("Shredded up {0} sequences over {1} parallel cores, like a cheesesteak".format(len(list_of_dicts), p))

logger.info("Everything in list_of_dicts is perfect, everything past here is garbage")
#logger.debug("Everything in list_of_dicts is perfect, everything past here is garbage")

# Flatmap to 'kmers', the dictionary of {'id': read_id, 'kmers': [ ... ]}
kmer_ids = list(chain.from_iterable(map(lambda x: x['kmers'], list_of_dicts)))
logger.debug("Flatmapped {0} kmer ids for these {1} sequence ids".format(len(kmer_ids), len(list_of_dicts)))

reads = list(chain.from_iterable(map(lambda x: x['seqids'], list_of_dicts)))


logger.debug("Flatmapped and matched {0} kmers with these {1} sequence ids".format(len(reads), len(list_of_dicts)))


starts = list(chain.from_iterable(map(lambda x: x['starts'], list_of_dicts)))
logger.debug("Flatmapped {0} kmers for their {1} offsets".format(len(kmer_ids), len(starts)))

reverses = list(chain.from_iterable(map(lambda x: x['reverses'], list_of_dicts)))
logger.debug("Flatmapped {0} reverse? bools for these {1} k-mers that were shredded".format(len(reverses), len(kmer_ids)))
logger.debug("Flatmapped {0} kmers for their metadata aggregation".format(len(kmer_ids), len(starts)))
# Assert that all list lengths are equal before adding metadata to k-mers
if all_metadata is True and len(kmer_ids) == len(reads) and len(reads) == len(starts) and len(starts) == len(reverses):
N = len(starts)

data = list(zip(kmer_ids, reads, starts, reverses))

logger.error("Appended {0} records to rows".format(N))
logger.debug("Appended {0} records to rows".format(N))

rows += data

logger.warning("Dumping all metadata into the .kdb file eventually")
logger.warning("Deferring to dump metadata to SQLite3 later...")
logger.warning("Dumping all metadata into the .kdb file eventually. This could be expensive...")

elif len(kmer_ids) == len(reads) and len(reads) == len(starts) and len(starts) == len(reverses) and not all_metadata:
pass
else:
pass # If we're not doing metadata, don't do it
else: # Raise an error if the numbers of items per list are not equal
logger.error(len(kmer_ids))
logger.error(len(reads))
logger.error(len(start))
Expand All @@ -155,40 +148,47 @@ def parsefile(filepath:str, k:int, p:int=1, b:int=50000, n:int=1000, stranded:bo
recs = [r for r in seqprsr] # The next block of exactly 'b' reads
# This will be logged redundantly with the sys.stderr.write method calls at line 141 and 166 of seqparser.py (in the _next_fasta() and _next_fastq() methods)
#sys.stderr("\n")
logger.debug("Read {0} more records from the {1} seqparser object".format(len(recs), s))
logger.info("Read {0} more records from the {1} seqparser object".format(len(recs), "fasta" if fasta else "fastq"))
if all_metadata:
sys.stderr.write("Writing all metadata keys for each k-mer's relationships to reads into the SQLite3 database...\n")

unique_kmer_ids = list(set(list(map(lambda x: x[0], rows))))

executor = ThreadPoolExecutor()
futures = []
def submit(db, kmers, kid):
#logger.info(" === M E S S A G E ===")
#logger.debug("=====================================")
#logger.info("beginning to process all records of the {0} k-mer".format(kid))
db.conn.execute("UPDATE kmers SET seqids = ?, starts = ?, reverses = ? WHERE id = ?", json.dumps(list(map(lambda y: y[1], kmers))), json.dumps(list(map(lambda y: y[2], kmers))), json.dumps(list(map(lambda y: y[2], kmers))), kid)
#logger.info("Transaction completed for the {0} kmer.".format(kid))
def submit(conn, kmers, kid):
logger.debug(" === M E S S A G E ===")
logger.debug("=====================================")
logger.debug("beginning to process all records of the {0} k-mer".format(kid))
with conn.begin():
conn.execute("UPDATE kmers SET seqids = ?, starts = ?, reverses = ? WHERE id = ?", json.dumps(list(map(lambda y: y[1], kmers))), json.dumps(list(map(lambda y: y[2], kmers))), json.dumps(list(map(lambda y: y[2], kmers))), kid+1)

return kid

# If we round up to the nearest page, we should have 'pages' number of pages
# and we'd read n on each page.
pages = int(float(len(unique_kmer_ids))/n) + 1
sys.stderr.write("Submitting {0} k-mers to the SQLite3 database for on-disk metadata aggregation and k-mer counting\n\n".format(n))
for page in range(int(float(len(unique_kmer_ids))/pages)+1):
logger.info("PAGE {0}".format(page))
futures = []
with db.conn.begin():
for i, kid in enumerate(unique_kmer_ids[page*n:(page*n)+n]):
kmers = [x for x in rows if x[0] == kid]
future = executor.submit(submit, db, kmers, kid)
futures.append(future)
kid = 0
for i in range(len(unique_kmer_ids)):
kid = unique_kmer_ids[i] # FOr SQLlite 1-based indexing
logger.debug("Beginning to commit {0} to the database".format(kid))
sys.stderr.write("\n")
kmers = [x for x in rows if x[0] == kid]
logger.debug("Located {0} relationships involving k-mer {1}".format(len(kmers), kid))
submit(db.conn, kmers, kid)


logger.info("Transaction completed for the {0} kmer.".format(kid))

logger.debug("===================================")
logger.debug("Example record with metadata:")
result = db.conn.execute("SELECT * FROM kmers WHERE id = ?", kid).fetchone()
logger.debug(result)
logger.debug("===================================")

seqprsr.nullomers = db._get_nullomers() # Calculate nullomers at the end
seqprsr.total_kmers = db._get_sum_counts() # The total number of k-mers processed
finally:
sys.stderr.write("\n")
logger.info("Finished loading records from '{0}' into '{1}'...".format(filepath, temp.name))
#db._engine.dispose()

sys.stderr.write("\n\n\nFinished counting k-mers{0} from '{1}' into '{2}'...\n\n\n".format(' and metadata' if all_metadata else '', filepath, temp.name))

return db, seqprsr.header_dict()


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def can_import(module_name):
],
packages=find_packages(exclude=["tests", "*.tests", "*.tests.*", "tests.*"]),
package_dir={'kmerdb': 'kmerdb'},
package_data={'kmerdb': ['examples/example_report/*.Rmd', 'examples/example_report1/*.Rmd'], 'CITATION': ['CITATION']},
package_data={'kmerdb': ['CITATION']},
# If your package is a single module, use this instead of 'packages':
#py_modules=['kmerdb'],
#scripts=['bin/kmerdb', 'bin/kmerdb_report.R'],
Expand Down
20 changes: 19 additions & 1 deletion test/test_parsefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ def test_b_is_int_only(self):
with self.assertRaises(TypeError):
parse.parsefile(self.fasta, self.k, b={'hello': 'world'})

def test_n_is_int_only(self):
"""
parsefile throws a TypeError if it receives anything but an int
"""
with self.assertRaises(TypeError):
parse.parsefile(self.fasta, self.k, n=None)
with self.assertRaises(TypeError):
parse.parsefile(self.fasta, self.k, n=True)
with self.assertRaises(TypeError):
parse.parsefile(self.fasta, self.k, n='hello')
with self.assertRaises(TypeError):
parse.parsefile(self.fasta, self.k, n=1.0)
with self.assertRaises(TypeError):
parse.parsefile(self.fasta, self.k, n=[1])
with self.assertRaises(TypeError):
parse.parsefile(self.fasta, self.k, n={'hello': 'world'})


def test_stranded_is_bool_only(self):
"""
parsefile throws a TypeError if it receives anything but an int
Expand Down Expand Up @@ -117,7 +135,7 @@ def test_returns_database_and_header_dictionary(self):
'mononucleotides': {'A': 1016563, 'C': 418556, 'G': 404380, 'T': 1033834},
'nullomers': 0,
'sha256': '61b5cf99700219774f05beb9203f0388d95273b227ae7ae37164f1c9a53665ca',
'total_kmers': 5746658,
'total_kmers': 2873329,
'total_reads': 2,
'unique_kmers': 64}
, headerDict)
Expand Down

0 comments on commit 7b96e13

Please sign in to comment.