Permalink
7cc2de8 Sep 9, 2016
203 lines (183 sloc) 7.38 KB
"""Output Biopython SeqRecords and SeqFeatures to GFF3 format.
The target format is GFF3, the current GFF standard:
http://www.sequenceontology.org/gff3.shtml
"""
from six.moves import urllib
from Bio import SeqIO
class _IdHandler:
"""Generate IDs for GFF3 Parent/Child relationships where they don't exist.
"""
def __init__(self):
self._prefix = "biopygen"
self._counter = 1
self._seen_ids = []
def _generate_id(self, quals):
"""Generate a unique ID not present in our existing IDs.
"""
gen_id = self._get_standard_id(quals)
if gen_id is None:
while 1:
gen_id = "%s%s" % (self._prefix, self._counter)
if gen_id not in self._seen_ids:
break
self._counter += 1
return gen_id
def _get_standard_id(self, quals):
"""Retrieve standardized IDs from other sources like NCBI GenBank.
This tries to find IDs from known key/values when stored differently
than GFF3 specifications.
"""
possible_keys = ["transcript_id", "protein_id"]
for test_key in possible_keys:
if test_key in quals:
cur_id = quals[test_key]
if isinstance(cur_id, tuple) or isinstance(cur_id, list):
return cur_id[0]
else:
return cur_id
return None
def update_quals(self, quals, has_children):
"""Update a set of qualifiers, adding an ID if necessary.
"""
cur_id = quals.get("ID", None)
# if we have an ID, record it
if cur_id:
if not isinstance(cur_id, list) and not isinstance(cur_id, tuple):
cur_id = [cur_id]
for add_id in cur_id:
self._seen_ids.append(add_id)
# if we need one and don't have it, create a new one
elif has_children:
new_id = self._generate_id(quals)
self._seen_ids.append(new_id)
quals["ID"] = [new_id]
return quals
class GFF3Writer:
"""Write GFF3 files starting with standard Biopython objects.
"""
def __init__(self):
pass
def write(self, recs, out_handle, include_fasta=False):
"""Write the provided records to the given handle in GFF3 format.
"""
id_handler = _IdHandler()
self._write_header(out_handle)
fasta_recs = []
try:
recs = iter(recs)
except TypeError:
recs = [recs]
for rec in recs:
self._write_rec(rec, out_handle)
self._write_annotations(rec.annotations, rec.id, len(rec.seq), out_handle)
for sf in rec.features:
sf = self._clean_feature(sf)
id_handler = self._write_feature(sf, rec.id, out_handle,
id_handler)
if include_fasta and len(rec.seq) > 0:
fasta_recs.append(rec)
if len(fasta_recs) > 0:
self._write_fasta(fasta_recs, out_handle)
def _clean_feature(self, feature):
quals = {}
for key, val in feature.qualifiers.items():
if not isinstance(val, (list, tuple)):
val = [val]
val = [str(x) for x in val]
quals[key] = val
feature.qualifiers = quals
# Support for Biopython 1.68 and above, which removed sub_features
if not hasattr(feature, "sub_features"):
feature.sub_features = []
clean_sub = [self._clean_feature(f) for f in feature.sub_features]
feature.sub_features = clean_sub
return feature
def _write_rec(self, rec, out_handle):
# if we have a SeqRecord, write out optional directive
if len(rec.seq) > 0:
out_handle.write("##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq)))
def _get_phase(self, feature):
if "phase" in feature.qualifiers:
phase = feature.qualifiers["phase"][0]
elif feature.type == "CDS":
phase = int(feature.qualifiers.get("codon_start", [1])[0]) - 1
else:
phase = "."
return str(phase)
def _write_feature(self, feature, rec_id, out_handle, id_handler,
parent_id=None):
"""Write a feature with location information.
"""
if feature.strand == 1:
strand = '+'
elif feature.strand == -1:
strand = '-'
else:
strand = '.'
# remove any standard features from the qualifiers
quals = feature.qualifiers.copy()
for std_qual in ["source", "score", "phase"]:
if std_qual in quals and len(quals[std_qual]) == 1:
del quals[std_qual]
# add a link to a parent identifier if it exists
if parent_id:
if not "Parent" in quals:
quals["Parent"] = []
quals["Parent"].append(parent_id)
quals = id_handler.update_quals(quals, len(feature.sub_features) > 0)
if feature.type:
ftype = feature.type
else:
ftype = "sequence_feature"
parts = [str(rec_id),
feature.qualifiers.get("source", ["feature"])[0],
ftype,
str(feature.location.nofuzzy_start + 1), # 1-based indexing
str(feature.location.nofuzzy_end),
feature.qualifiers.get("score", ["."])[0],
strand,
self._get_phase(feature),
self._format_keyvals(quals)]
out_handle.write("\t".join(parts) + "\n")
for sub_feature in feature.sub_features:
id_handler = self._write_feature(sub_feature, rec_id, out_handle,
id_handler, quals["ID"][0])
return id_handler
def _format_keyvals(self, keyvals):
format_kvs = []
for key in sorted(keyvals.keys()):
values = keyvals[key]
key = key.strip()
format_vals = []
if not isinstance(values, list) or isinstance(values, tuple):
values = [values]
for val in values:
val = urllib.parse.quote(str(val).strip(), safe=":/ ")
if ((key and val) and val not in format_vals):
format_vals.append(val)
format_kvs.append("%s=%s" % (key, ",".join(format_vals)))
return ";".join(format_kvs)
def _write_annotations(self, anns, rec_id, size, out_handle):
"""Add annotations which refer to an entire sequence.
"""
format_anns = self._format_keyvals(anns)
if format_anns:
parts = [rec_id, "annotation", "remark", "1", str(size if size > 1 else 1),
".", ".", ".", format_anns]
out_handle.write("\t".join(parts) + "\n")
def _write_header(self, out_handle):
"""Write out standard header directives.
"""
out_handle.write("##gff-version 3\n")
def _write_fasta(self, recs, out_handle):
"""Write sequence records using the ##FASTA directive.
"""
out_handle.write("##FASTA\n")
SeqIO.write(recs, out_handle, "fasta")
def write(recs, out_handle, include_fasta=False):
"""High level interface to write GFF3 files from SeqRecords and SeqFeatures.
If include_fasta is True, the GFF3 file will include sequence information
using the ##FASTA directive.
"""
writer = GFF3Writer()
return writer.write(recs, out_handle, include_fasta)