1454b3b Jul 20, 2016
@peterjc @cbrueffer @chapmanb @bosborne @bow
575 lines (508 sloc) 22.3 KB
# Copyright 2002 by Andrew Dalke. All rights reserved.
# Revisions 2007-2016 copyright by Peter Cock. All rights reserved.
# Revisions 2008-2009 copyright by Cymon J. Cox. All rights reserved.
# This code is part of the Biopython distribution and governed by its
# license. Please see the LICENSE file that should have been included
# as part of this package.
# Note that BioSQL (including the database schema and scripts) is
# available and licensed separately. Please consult
"""Implementations of Biopython-like Seq objects on top of BioSQL.
This allows retrival of items stored in a BioSQL database using
a biopython-like SeqRecord and Seq interface.
Note: Currently we do not support recording per-letter-annotations
(like quality scores) in BioSQL.
from Bio._py3k import unicode
from Bio import Alphabet
from Bio.Seq import Seq, UnknownSeq
from Bio.SeqRecord import SeqRecord, _RestrictedDict
from Bio import SeqFeature
class DBSeq(Seq):
"""BioSQL equivalent of the Biopython Seq object."""
def __init__(self, primary_id, adaptor, alphabet, start, length):
"""Create a new DBSeq object referring to a BioSQL entry.
You wouldn't normally create a DBSeq object yourself, this is done
for you when retreiving a DBSeqRecord object from the database.
self.primary_id = primary_id
self.adaptor = adaptor
self.alphabet = alphabet
self._length = length
self.start = start
def __len__(self):
return self._length
def __getitem__(self, index): # Seq API requirement
# Note since Python 2.0, __getslice__ is deprecated
# and __getitem__ is used instead.
# See
if isinstance(index, int):
# Return a single letter as a string
i = index
if i < 0:
if -i > self._length:
raise IndexError(i)
i = i + self._length
elif i >= self._length:
raise IndexError(i)
return self.adaptor.get_subseq_as_string(self.primary_id,
self.start + i,
self.start + i + 1)
if not isinstance(index, slice):
raise ValueError("Unexpected index type")
# Return the (sub)sequence as another DBSeq or Seq object
# (see the Seq obect's __getitem__ method)
if index.start is None:
i = 0
i = index.start
if i < 0:
# Map to equavilent positive index
if -i > self._length:
raise IndexError(i)
i = i + self._length
elif i >= self._length:
# Trivial case, should return empty string!
i = self._length
if index.stop is None:
j = self._length
j = index.stop
if j < 0:
# Map to equavilent positive index
if -j > self._length:
raise IndexError(j)
j = j + self._length
elif j >= self._length:
j = self._length
if i >= j:
# Trivial case, empty string.
return Seq("", self.alphabet)
elif index.step is None or index.step == 1:
# Easy case - can return a DBSeq with the start and end adjusted
return self.__class__(self.primary_id, self.adaptor, self.alphabet,
self.start + i, j - i)
# Tricky. Will have to create a Seq object because of the stride
full = self.adaptor.get_subseq_as_string(self.primary_id,
self.start + i,
self.start + j)
return Seq(full[::index.step], self.alphabet)
def tostring(self):
"""Returns the full sequence as a python string (DEPRECATED).
You are now encouraged to use str(my_seq) instead of
import warnings
warnings.warn("This method is obsolete; please use str(my_seq) "
"instead of my_seq.tostring().",
return self.adaptor.get_subseq_as_string(self.primary_id,
self.start + self._length)
def __str__(self):
"""Returns the full sequence as a python string."""
return self.adaptor.get_subseq_as_string(self.primary_id,
self.start + self._length)
data = property(tostring, doc="Sequence as string (DEPRECATED)")
def toseq(self):
"""Returns the full sequence as a Seq object."""
# Note - the method name copies that of the MutableSeq object
return Seq(str(self), self.alphabet)
def __add__(self, other):
# Let the Seq object deal with the alphabet issues etc
return self.toseq() + other
def __radd__(self, other):
# Let the Seq object deal with the alphabet issues etc
return other + self.toseq()
def _retrieve_seq_len(adaptor, primary_id):
# The database schema ensures there will be only one matching row
seqs = adaptor.execute_and_fetchall(
"SELECT length FROM biosequence WHERE bioentry_id = %s", (primary_id,))
if not seqs:
return None
assert len(seqs) == 1
given_length, = seqs[0]
return int(given_length)
def _retrieve_seq(adaptor, primary_id):
# The database schema ensures there will be only one matching
# row in the table.
# If an UnknownSeq was recorded, seq will be NULL,
# but length will be populated. This means length(seq)
# will return None.
seqs = adaptor.execute_and_fetchall(
"SELECT alphabet, length, length(seq) FROM biosequence"
" WHERE bioentry_id = %s", (primary_id,))
if not seqs:
assert len(seqs) == 1
moltype, given_length, length = seqs[0]
length = int(length)
given_length = int(length)
assert length == given_length
have_seq = True
except TypeError:
assert length is None
seqs = adaptor.execute_and_fetchall(
"SELECT alphabet, length, seq FROM biosequence"
" WHERE bioentry_id = %s", (primary_id,))
assert len(seqs) == 1
moltype, given_length, seq = seqs[0]
assert seq is None or seq == ""
length = int(given_length)
have_seq = False
del seq
del given_length
moltype = moltype.lower() # might be upper case in database
# We have no way of knowing if these sequences will use IUPAC
# alphabets, and we certainly can't assume they are unambiguous!
if moltype == "dna":
alphabet = Alphabet.generic_dna
elif moltype == "rna":
alphabet = Alphabet.generic_rna
elif moltype == "protein":
alphabet = Alphabet.generic_protein
elif moltype == "unknown":
# This is used in BioSQL/ and would happen
# for any generic or nucleotide alphabets.
alphabet = Alphabet.single_letter_alphabet
raise AssertionError("Unknown moltype: %s" % moltype)
if have_seq:
return DBSeq(primary_id, adaptor, alphabet, 0, int(length))
return UnknownSeq(length, alphabet)
def _retrieve_dbxrefs(adaptor, primary_id):
"""Retrieve the database cross references for the sequence."""
_dbxrefs = []
dbxrefs = adaptor.execute_and_fetchall(
"SELECT dbname, accession, version"
" FROM bioentry_dbxref join dbxref using (dbxref_id)"
" WHERE bioentry_id = %s"
" ORDER BY rank", (primary_id,))
for dbname, accession, version in dbxrefs:
if version and version != "0":
v = "%s.%s" % (accession, version)
v = accession
_dbxrefs.append("%s:%s" % (dbname, v))
return _dbxrefs
def _retrieve_features(adaptor, primary_id):
sql = "SELECT seqfeature_id,, rank" \
" FROM seqfeature join term type on (type_term_id = type.term_id)" \
" WHERE bioentry_id = %s" \
" ORDER BY rank"
results = adaptor.execute_and_fetchall(sql, (primary_id,))
seq_feature_list = []
for seqfeature_id, seqfeature_type, seqfeature_rank in results:
# Get qualifiers [except for db_xref which is stored separately]
qvs = adaptor.execute_and_fetchall(
"SELECT name, value"
" FROM seqfeature_qualifier_value join term using (term_id)"
" WHERE seqfeature_id = %s"
" ORDER BY rank", (seqfeature_id,))
qualifiers = {}
for qv_name, qv_value in qvs:
qualifiers.setdefault(qv_name, []).append(qv_value)
# Get db_xrefs [special case of qualifiers]
qvs = adaptor.execute_and_fetchall(
"SELECT dbxref.dbname, dbxref.accession"
" FROM dbxref join seqfeature_dbxref using (dbxref_id)"
" WHERE seqfeature_dbxref.seqfeature_id = %s"
" ORDER BY rank", (seqfeature_id,))
for qv_name, qv_value in qvs:
value = "%s:%s" % (qv_name, qv_value)
qualifiers.setdefault("db_xref", []).append(value)
# Get locations
results = adaptor.execute_and_fetchall(
"SELECT location_id, start_pos, end_pos, strand"
" FROM location"
" WHERE seqfeature_id = %s"
" ORDER BY rank", (seqfeature_id,))
locations = []
# convert to Python standard form
# Convert strand = 0 to strand = None
# re: comment in
# Biopython uses None when we don't know strand information but
# BioSQL requires something (non null) and sets this as zero
# So we'll use the strand or 0 if Biopython spits out None
for location_id, start, end, strand in results:
if start:
start -= 1
if strand == 0:
strand = None
if strand not in (+1, -1, None):
raise ValueError("Invalid strand %s found in database for "
"seqfeature_id %s" % (strand, seqfeature_id))
if end < start:
import warnings
from Bio import BiopythonWarning
warnings.warn("Inverted location start/end (%i and %i) for "
"seqfeature_id %s" % (start, end, seqfeature_id),
locations.append((location_id, start, end, strand))
# Get possible remote reference information
remote_results = adaptor.execute_and_fetchall(
"SELECT location_id, dbname, accession, version"
" FROM location join dbxref using (dbxref_id)"
" WHERE seqfeature_id = %s", (seqfeature_id,))
lookup = {}
for location_id, dbname, accession, version in remote_results:
if version and version != "0":
v = "%s.%s" % (accession, version)
v = accession
# subfeature remote location db_ref are stored as a empty string
# when not present
if dbname == "":
dbname = None
lookup[location_id] = (dbname, v)
feature = SeqFeature.SeqFeature(type=seqfeature_type)
# Store the key as a private property
feature._seqfeature_id = seqfeature_id
feature.qualifiers = qualifiers
if len(locations) == 0:
elif len(locations) == 1:
location_id, start, end, strand = locations[0]
# See Bug 2677, we currently don't record the location_operator
# For consistency with older versions Biopython, default to "".
feature.location_operator = \
_retrieve_location_qualifier_value(adaptor, location_id)
dbname, version = lookup.get(location_id, (None, None))
feature.location = SeqFeature.FeatureLocation(start, end)
feature.strand = strand
feature.ref_db = dbname
feature.ref = version
locs = []
for location in locations:
location_id, start, end, strand = location
dbname, version = lookup.get(location_id, (None, None))
locs.append(SeqFeature.FeatureLocation(start, end,
# Locations are typically in biological in order (see negative
# strands below), but because of remote locations for
# sub-features they are not necessarily in numerical order:
strands = set(l.strand for l in locs)
if len(strands) == 1 and -1 in strands:
# Evil hack time for backwards compatibility
# TODO - Check if BioPerl and (old) Biopython did the same,
# we may have an existing incompatibility lurking here...
locs = locs[::-1]
feature.location = SeqFeature.CompoundLocation(locs, "join")
# TODO - See Bug 2677 - we don't yet record location operator,
# so for consistency with older versions of Biopython default
# to assuming its a join.
return seq_feature_list
def _retrieve_location_qualifier_value(adaptor, location_id):
value = adaptor.execute_and_fetch_col0(
"SELECT value FROM location_qualifier_value"
" WHERE location_id = %s", (location_id,))
return value[0]
except IndexError:
return ""
def _retrieve_annotations(adaptor, primary_id, taxon_id):
annotations = {}
annotations.update(_retrieve_qualifier_value(adaptor, primary_id))
annotations.update(_retrieve_reference(adaptor, primary_id))
annotations.update(_retrieve_taxon(adaptor, primary_id, taxon_id))
annotations.update(_retrieve_comment(adaptor, primary_id))
# Convert values into strings in cases of unicode from the database.
# BioSQL could eventually be expanded to be unicode aware.
str_anns = {}
for key, val in annotations.items():
if isinstance(val, list):
val = [_make_unicode_into_string(x) for x in val]
elif isinstance(val, unicode):
val = str(val)
str_anns[key] = val
return str_anns
def _make_unicode_into_string(text):
if isinstance(text, unicode):
return str(text)
return text
def _retrieve_qualifier_value(adaptor, primary_id):
qvs = adaptor.execute_and_fetchall(
"SELECT name, value"
" FROM bioentry_qualifier_value JOIN term USING (term_id)"
" WHERE bioentry_id = %s"
" ORDER BY rank", (primary_id,))
qualifiers = {}
for name, value in qvs:
if name == "keyword":
name = "keywords"
# See handling of "date" in
elif name == "date_changed":
name = "date"
elif name == "secondary_accession":
name = "accessions"
qualifiers.setdefault(name, []).append(value)
return qualifiers
def _retrieve_reference(adaptor, primary_id):
# XXX dbxref_qualifier_value
refs = adaptor.execute_and_fetchall(
"SELECT start_pos, end_pos, "
" location, title, authors,"
" dbname, accession"
" FROM bioentry_reference"
" JOIN reference USING (reference_id)"
" LEFT JOIN dbxref USING (dbxref_id)"
" WHERE bioentry_id = %s"
" ORDER BY rank", (primary_id,))
references = []
for start, end, location, title, authors, dbname, accession in refs:
reference = SeqFeature.Reference()
# If the start/end are missing, reference.location is an empty list
if (start is not None) or (end is not None):
if start is not None:
start -= 1 # python counting
reference.location = [SeqFeature.FeatureLocation(start, end)]
# Don't replace the default "" with None.
if authors:
reference.authors = authors
if title:
reference.title = title
reference.journal = location
if dbname == 'PUBMED':
reference.pubmed_id = accession
elif dbname == 'MEDLINE':
reference.medline_id = accession
if references:
return {'references': references}
return {}
def _retrieve_taxon(adaptor, primary_id, taxon_id):
a = {}
common_names = adaptor.execute_and_fetch_col0(
"SELECT name FROM taxon_name WHERE taxon_id = %s"
" AND name_class = 'genbank common name'", (taxon_id,))
if common_names:
a['source'] = common_names[0]
scientific_names = adaptor.execute_and_fetch_col0(
"SELECT name FROM taxon_name WHERE taxon_id = %s"
" AND name_class = 'scientific name'", (taxon_id,))
if scientific_names:
a['organism'] = scientific_names[0]
ncbi_taxids = adaptor.execute_and_fetch_col0(
"SELECT ncbi_taxon_id FROM taxon WHERE taxon_id = %s", (taxon_id,))
if ncbi_taxids and ncbi_taxids[0] and ncbi_taxids[0] != "0":
a['ncbi_taxid'] = ncbi_taxids[0]
# Old code used the left/right values in the taxon table to get the
# taxonomy lineage in one SQL command. This was actually very slow,
# and would fail if the (optional) left/right values were missing.
# The following code is based on a contribution from Eric Gibert, and
# relies on the taxon table's parent_taxon_id field only (ignoring the
# optional left/right values). This means that it has to make a
# separate SQL query for each entry in the lineage, but it does still
# appear to be *much* faster. See Bug 2494.
taxonomy = []
while taxon_id:
name, rank, parent_taxon_id = adaptor.execute_one(
"SELECT, taxon.node_rank, taxon.parent_taxon_id"
" FROM taxon, taxon_name"
" WHERE taxon.taxon_id=taxon_name.taxon_id"
" AND taxon_name.name_class='scientific name'"
" AND taxon.taxon_id = %s", (taxon_id,))
if taxon_id == parent_taxon_id:
# If the taxon table has been populated by the BioSQL script
# this is how top parent nodes are stored.
# Personally, I would have used a NULL parent_taxon_id here.
if rank != "no rank":
# For consistency with older versions of Biopython, we are only
# interested in taxonomy entries with a stated rank.
# Add this to the start of the lineage list.
taxonomy.insert(0, name)
taxon_id = parent_taxon_id
if taxonomy:
a['taxonomy'] = taxonomy
return a
def _retrieve_comment(adaptor, primary_id):
qvs = adaptor.execute_and_fetchall(
"SELECT comment_text FROM comment"
" WHERE bioentry_id=%s"
" ORDER BY rank", (primary_id,))
comments = [comm[0] for comm in qvs]
# Don't want to add an empty list...
if comments:
return {"comment": comments}
return {}
class DBSeqRecord(SeqRecord):
"""BioSQL equivalent of the Biopython SeqRecord object."""
def __init__(self, adaptor, primary_id):
self._adaptor = adaptor
self._primary_id = primary_id
(self._biodatabase_id, self._taxon_id,,
accession, version, self._identifier,
self._division, self.description) = self._adaptor.execute_one(
"SELECT biodatabase_id, taxon_id, name, accession, version,"
" identifier, division, description"
" FROM bioentry"
" WHERE bioentry_id = %s", (self._primary_id,))
if version and version != "0": = "%s.%s" % (accession, version)
else: = accession
# We don't yet record any per-letter-annotations in the
# BioSQL database, but we should set this property up
# for completeness (and the __str__ method).
# We do NOT want to load the sequence from the DB here!
length = _retrieve_seq_len(adaptor, primary_id)
self._per_letter_annotations = _RestrictedDict(length=length)
def __get_seq(self):
if not hasattr(self, "_seq"):
self._seq = _retrieve_seq(self._adaptor, self._primary_id)
return self._seq
def __set_seq(self, seq):
# TODO - Check consistent with self._per_letter_annotations
self._seq = seq
def __del_seq(self):
del self._seq
seq = property(__get_seq, __set_seq, __del_seq, "Seq object")
def __get_dbxrefs(self):
if not hasattr(self, "_dbxrefs"):
self._dbxrefs = _retrieve_dbxrefs(self._adaptor, self._primary_id)
return self._dbxrefs
def __set_dbxrefs(self, dbxrefs):
self._dbxrefs = dbxrefs
def __del_dbxrefs(self):
del self._dbxrefs
dbxrefs = property(__get_dbxrefs, __set_dbxrefs, __del_dbxrefs,
"Database cross references")
def __get_features(self):
if not hasattr(self, "_features"):
self._features = _retrieve_features(self._adaptor,
return self._features
def __set_features(self, features):
self._features = features
def __del_features(self):
del self._features
features = property(__get_features, __set_features, __del_features,
def __get_annotations(self):
if not hasattr(self, "_annotations"):
self._annotations = _retrieve_annotations(self._adaptor,
if self._identifier:
self._annotations["gi"] = self._identifier
if self._division:
self._annotations["data_file_division"] = self._division
return self._annotations
def __set_annotations(self, annotations):
self._annotations = annotations
def __del_annotations(self):
del self._annotations
annotations = property(__get_annotations, __set_annotations,
__del_annotations, "Annotations")