Skip to content

Commit

Permalink
Merge pull request #44 from MatthewRalston/metadata_parallelism_fix
Browse files Browse the repository at this point in the history
Fixes #42. The metadata issue after the postgresql update.
  • Loading branch information
MatthewRalston committed Apr 14, 2021
2 parents f88511b + 4116f78 commit a7968d5
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 22 deletions.
31 changes: 25 additions & 6 deletions kmerdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,9 +920,28 @@ def profile(arguments):
# metadata now has three additional properties, based on the total number of times this k-mer occurred. Eventually the dimension of these new properties should match the count.
if arguments.all_metadata:

seqids = [x[2] for x in kmer_dbrecs_per_file]
starts = [x[3] for x in kmer_dbrecs_per_file]
reverses = [x[4] for x in kmer_dbrecs_per_file]

seqids = [x[4] for x in kmer_dbrecs_per_file]
starts = [x[2] for x in kmer_dbrecs_per_file]
reverses = [x[3] for x in kmer_dbrecs_per_file]


if len(reverses) == 0:
logger.error("REVERSES: {0}".format(reverses[0]))
raise RuntimeError("reverses: IS THIS INCORRECT?")
elif len(starts) == 0:
logger.error("STARTS: {0}".format(starts[0]))
raise RuntimeError("starts: IS THIS INCORRECT?")
elif len(seqids) == 0:
logger.error("SEQIDS: {0}".format(seqids[0]))
raise RuntimeError("seqids: IS THIS INCORRECT?")
elif len(seqids) == 1 and type(seqids) is list and type(seqids[0]) is list:
seqids = seqids[0]
elif len(starts) == 1 and type(starts) is list and type(starts[0]) is list:
starts = starts[0]
elif len(reverses) == 1 and type(reverses) is list and type(reverses[0]) is list:
reverses = reverses[0]

if "seqids" in kmer_metadata.keys():
kmer_metadata["seqids"] += seqids
else:
Expand Down Expand Up @@ -954,17 +973,17 @@ def profile(arguments):
logger.debug("Don't know how seqids became a dictionary, but this will not parse correctly. RuntimeError")
raise RuntimeError("The implicit type of the Text blob in the Postgres database has changed, and will not parse correctly in kmerdb, rerun with verbose")
elif type(kmer_metadata["reverses"]) is str:
raise TypeError("kmerdb profile could not decode strand information from its sQLite3 database.")
raise TypeError("kmerdb profile could not decode strand information from its PostgreSQL database.")
elif type(kmer_metadata["reverses"]) is list and all(type(x) is bool for x in kmer_metadata["reverses"]):
if arguments.verbose == 2:
sys.stderr.write("Parsed {0} reverse? bools associated with this k-mer.".format(len(kmer_metadata["seqids"])))
elif type(kmer_metadata["reverses"]) is dict:
logger.debug("Don't know how reverses became a dictionary, but this will not parse correctly. RuntimeError")
raise RuntimeError("The implicit type of the Text blob in the Postgres database has changed, and will not parse correctly in kmerdb, rerun with verbose")
elif not all(type(x) is bool for x in kmer_metadata["reverses"]):
#logger.error("kmer metadata: {0}".format(kmer_metadata))
logger.error("kmer metadata: {0}".format(kmer_metadata))
logger.error("number of k-mer elements: {0}".format(len(kmer_metadata.values())))
#logger.error(list(set(type(x) for x in kmer_metadata["reverses"])))
logger.error(list(set(type(x) for x in kmer_metadata["reverses"])))
raise TypeError("Not all reverse bools were boolean")
elif count == 0:
n += 1
Expand Down
3 changes: 3 additions & 0 deletions kmerdb/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
# conn.execute(ins, [{'count': 0}, ...])





def histogram(conn):
res = conn.execute("SELECT COUNT(count) FROM kmers GROUP BY count ORDER BY count")
hist = array.array('H')
Expand Down
89 changes: 73 additions & 16 deletions kmerdb/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
import sys
import yaml
import json
import time
from datetime import datetime
from math import ceil
from itertools import chain, repeat
from concurrent.futures import ThreadPoolExecutor

from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, Sequence, JSON, Boolean

from psycopg2 import sql
import tempfile
Expand All @@ -39,6 +44,13 @@
logger = logging.getLogger(__file__)




Base = declarative_base()




def parsefile(filepath:str, k:int, connection_string:str, p:int=1, rows_per_batch:int=100000, b:int=50000, n:int=1000, stranded:bool=True, all_metadata:bool=False):
"""Parse a single sequence file in blocks/chunks with multiprocessing support
Expand Down Expand Up @@ -127,7 +139,7 @@ def parsefile(filepath:str, k:int, connection_string:str, p:int=1, rows_per_batc
num_sus = 0
for i, k in enumerate(kmer_ids):
if k is None:
sus.add(i)
sus.add(i) # This propagates the N removal, by adding those to the set for removal later.
num_sus += 1

logger.info("Created {0} gaps in the k-mer profile to clean it of N-content".format(num_sus))
Expand Down Expand Up @@ -156,22 +168,22 @@ def parsefile(filepath:str, k:int, connection_string:str, p:int=1, rows_per_batc
reads = list(chain.from_iterable(map(lambda x: x['seqids'], list_of_dicts)))
starts = list(chain.from_iterable(map(lambda x: x['starts'], list_of_dicts)))
reverses = list(chain.from_iterable(map(lambda x: x['reverses'], list_of_dicts)))
for i, x in enumerate(kmer_ids):
if i in sus:
for i, x in enumerate(kmer_ids): # This removes N content
if i in sus: # This is where we actually delete the N content, in case that is eventually supported.
kmer_ids[i] = None
reads[i] = None
starts[i] = None
reverses[i] = None
kmer_ids = list(filter(lambda k: k is not None, kmer_ids))
kmer_ids = list(filter(lambda k: k is not None, kmer_ids))
reads = list(filter(lambda r: r is not None, reads))
starts = list(filter(lambda s: s is not None, starts))
reverses = list(filter(lambda r: r is not None, reverses))
else:
for i, x in enumerate(kmer_ids):
for i, x in enumerate(kmer_ids): # Here we remove the k-mer ids where N-content is detected, in case they are needed, you can use kmer_ids prior to this point to build functionality.
if i in sus:
kmer_ids[i] = None
kmer_ids = list(filter(lambda k: k is not None, kmer_ids))
reads = []
reads = [] # I'm keeping this in, just in case for some reason the variable names are needed in the
starts = []
reverses = []
if None in kmer_ids:
Expand All @@ -191,8 +203,20 @@ def parsefile(filepath:str, k:int, connection_string:str, p:int=1, rows_per_batc
if all_metadata is True and len(kmer_ids) == len(reads) and len(reads) == len(starts) and len(starts) == len(reverses):
N = len(starts)

data = list(zip(kmer_ids, reads, starts, reverses))


data = list(zip(kmer_ids, reads, starts, reverses))
# Everything is in the right order
# logger.debug("num k-mer ids: {0}".format(len(kmer_ids)))
# logger.debug("K-mer id types: {0}".format(type(kmer_ids[0])))
# logger.debug("Example: {0}".format(kmer_ids[0]))
# logger.debug("Num reads: {0}".format(len(reads)))
# logger.debug("reads type: {0}".format(type(reads[0])))
# logger.debug("Example: {0}".format(reads[0]))
# logger.debug("Num reverses: {0}".format(len(reverses)))
# logger.debug("reverse type: {0}".format(type(reverses[0])))
# logger.debug("Example: {0}".format(reverses[0]))
# raise RuntimeError("Deciding whether to set a dictionary, or a 4x? array")
logger.debug("Appended {0} records to rows".format(N))

rows += data
Expand Down Expand Up @@ -258,27 +282,60 @@ def parsefile(filepath:str, k:int, connection_string:str, p:int=1, rows_per_batc
# If we round up to the nearest page, we should have 'pages' number of pages
# and we'd read n on each page.
kid = 0
Session = sessionmaker(bind=db._engine)
session = Session()
class Kmer(Base):
__tablename__ = db._tablename

id = Column(Integer, primary_key=True)
count = Column(Integer)
starts = Column(JSON)
reverses = Column(JSON)
seqids = Column(JSON)

for i in range(len(unique_kmer_ids)):
kid = unique_kmer_ids[i] # FOr SQLlite 1-based indexing
kid = kmer_ids[i] # FOr SQLlite 1-based indexing
logger.debug("Beginning to commit {0} to the database".format(kid))
sys.stderr.write("\n")
kmers = [x for x in rows if x[0] == kid]
logger.debug("Located {0} relationships involving k-mer {1}".format(len(kmers), kid))
logger.debug(" === M E S S A G E ===")
logger.debug("=====================================")
logger.debug("beginning to process all records of the {0} k-mer".format(kid))
with conn.begin():
conn.execute("UPDATE kmers SET seqids = ?, starts = ?, reverses = ? WHERE id = ?", json.dumps(list(map(lambda y: y[1], kmers))), json.dumps(list(map(lambda y: y[2], kmers))), json.dumps(list(map(lambda y: y[2], kmers))), kid+1)


logger.info("Transaction completed for the {0} kmer.".format(kid))

row = session.query(Kmer).filter_by(id=kid+1).first()
if row is None:
logger.error(kid+1)
raise RuntimeError("Could not locate k-mer with id {0} in the Postgres table".format(kid+1))


logger.debug("Before: {0}".format(len(row.reverses)))
row.seqids = list(map(lambda y: y[1], kmers))
row.starts = list(map(lambda y: y[2], kmers))
row.reverses = list(map(lambda y: y[3], kmers))
#flag_modified(Kmer, 'seqids')
#flag_modified(Kmer, 'starts')
#flag_modified(Kmer, 'reverses')


logger.debug("After: {0}".format(len(row.reverses)))
session.add(row)

if i % rows_per_batch == 0:
session.commit()
logger.info("Transaction completed for the {0} kmer.".format(kid+1))
session.commit()
logger.debug("Sleeping...")
time.sleep(200)
session.commit()

logger.debug("===================================")
logger.debug("Example record with metadata:")
result = db.conn.execute("SELECT * FROM kmers WHERE id = ?", kid).fetchone()
logger.debug(result)
result = session.query(Kmer).filter_by(id=kid+1).first()
logger.debug(result.__dict__)
logger.debug("===================================")

session.close()
seqprsr.nullomers = db._get_nullomers() # Calculate nullomers at the end
seqprsr.total_kmers = db._get_sum_counts() # The total number of k-mers processed
# Get nullomer ids
Expand All @@ -288,7 +345,7 @@ def parsefile(filepath:str, k:int, connection_string:str, p:int=1, rows_per_batc
logger.error("Type of res: {0}".format(type(res)))
logger.error("Types of values: {0}".format([type(x[0]) for x in res]))
logger.error("The result of the query was not a list of singleton tuples of ints")
raise ValueError("SQLite3 database query returned unexpected data types")
raise ValueError("PostgreSQL database query returned unexpected data types")
nullomers = list(map(lambda x: x[0], res))

finally:
Expand Down

0 comments on commit a7968d5

Please sign in to comment.