Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

gff mapping refactored to sane interface

  • Loading branch information...
commit ca2369e409446c587e977f0c0d3875ed8b72b6a7 1 parent 55a1629
@JoseBlanca authored
Showing with 37 additions and 66 deletions.
  1. +30 −46 franklin/gff.py
  2. +7 −20 test/gff_test.py
View
76 franklin/gff.py
@@ -44,21 +44,13 @@ def _deescape(string):
class GffFile(object):
'A GFF object for reading and writing'
- def __init__(self, fpath, mode='r', feature_mappers=None,
- feature_filters=None):
+ def __init__(self, fpath, mode='r'):
'''It inits the class,
Accepted modes are r and w.
'''
self._mode = mode
self._fpath = fpath
- if not feature_mappers:
- feature_mappers = []
- self.feature_mappers = feature_mappers
-
- if not feature_filters:
- feature_filters = []
- self.feature_filters = feature_filters
if mode not in ('r', 'w'):
msg = 'Mode should be r or w. Mode not supported: %s' % mode
@@ -158,24 +150,7 @@ def _create_feature(self, line):
feature['phase'] = phase
feature['attributes'] = attributes
- self._apply_feature_mappers(feature)
- feature = self._apply_feature_filters(feature)
- if feature:
- return feature
-
- def _apply_feature_mappers(self, feature):
- 'It applies all mappers to the given feature'
- for mapper in self.feature_mappers:
- feature = mapper(feature)
-
- def _apply_feature_filters(self, feature):
- 'It applies all mappers to the given feature'
- for filters in self.feature_filters:
- feature = filter(filters, [feature])
- if feature:
- if isinstance(feature, list):
- feature = feature[0]
- return feature
+ return feature
def _get_items(self):
'It yields the items in the GFF file'
@@ -215,8 +190,15 @@ def _get_features(self):
yield item
features = property(_get_features)
- def write(self, kind, item):
- 'It writes a line'
+ def write(self, item):
+ '''It writes a line.
+
+ The item should be a tuple with the kind and the information about the
+ feature
+ '''
+ if item is None:
+ return
+ kind, item = item
if self._fhand.tell() == 0:
if not self.version:
self.version = _DEFAULT_WRITE_VERSION
@@ -226,11 +208,8 @@ def write(self, kind, item):
elif kind == COMMENT:
self._fhand.write('#' + item + '\n')
elif kind == FEATURE:
- self._apply_feature_mappers(item)
- item = self._apply_feature_filters(item)
- if item:
- feature_line = self._feature_to_str(item) + '\n'
- self._fhand.write(feature_line)
+ feature_line = self._feature_to_str(item) + '\n'
+ self._fhand.write(feature_line)
elif kind == FASTA:
self._fhand.write('##FASTA\n')
write_seqs_in_file(item, self._fhand, format='fasta')
@@ -340,15 +319,17 @@ def write_gff(out_fpath, items):
writer = GffFile(fpath=out_fpath, mode='w')
for kind, item in items:
- writer.write(kind, item)
+ writer.write((kind, item))
writer.flush()
def create_feature_type_filter(types):
'it creates a filter that filters by type of the feature'
- def feature_type_filter(feature):
+ def feature_type_filter(item):
'The real filter'
- kind = feature['type']
+ if item[0] != FEATURE:
+ return item
+ kind = item[1]['type']
if kind in types:
return True
else:
@@ -449,14 +430,17 @@ def _add_dbxref_to_feature(feature, dbxref_db, dbxref_id):
dbxrefs = ','.join(dbxrefs)
feature['attributes']['Dbxref'] = dbxrefs
-def modify_gff3(ingff3_fpath, outgff3_fpath, mappers=None, filters=None):
- 'It modifies the gff with the goven mappers'
- in_gff = GffFile(fpath=ingff3_fpath, feature_mappers=mappers,
- feature_filters=filters)
+def modify_gff(ingff3_fpath, outgff3_fpath, mappers=None, filters=None):
+ 'It modifies the gff features with the given mappers and filters'
+ in_gff = GffFile(fpath=ingff3_fpath)
out_gff = GffFile(fpath=outgff3_fpath, mode='w')
-
- for kind, item in in_gff.items:
- out_gff.write(kind, item)
+ for item in in_gff.items:
+ for mapper in mappers:
+ item = mapper(item)
+ for filter_ in filters:
+ item = filter_(item)
+ if item is not None:
+ out_gff.write(item)
out_gff.flush()
class SeqGffWriter(object):
@@ -474,11 +458,11 @@ def __init__(self, fhand, default_type=None, source='.'):
def write(self, sequence):
'It does the real write of the features'
seq_feature = self._get_seq_feature(sequence)
- self._writer.write(FEATURE, seq_feature)
+ self._writer.write((FEATURE, seq_feature))
#subfeature
for feature in self._get_sub_features(sequence):
- self._writer.write(FEATURE, feature)
+ self._writer.write((FEATURE, feature))
self.num_features += 1
def _get_seq_sofa_type(self, sequence):
View
27 test/gff_test.py
@@ -74,7 +74,7 @@ def test_write_gff(self):
gff_in_fhand = self._create_gff_file()
gff = GffFile(fpath=gff_in_fhand.name)
for kind, item in gff.items:
- gff_out.write(kind, item)
+ gff_out.write((kind, item))
gff_out.flush()
result = open(gff_out_fhand.name).read()
@@ -92,33 +92,20 @@ def test_items_in_gff():
assert features[1]['id'] == 'ctg 0'
assert features[98]['name'] == 'Cm13_B04'
- def test_gff_mappers(self):
- 'It test that the mappers in gff are runs'
- database = 'database'
- relations = {'gene00001': 'acc1'}
- dbxref_mapper = create_dbxref_feature_mapper(database, relations)
-
- fhand = self._create_gff_file()
- gff = GffFile(fpath=fhand.name, feature_mappers=[dbxref_mapper])
- items = list(gff.items)
- assert items[1][1]['attributes']['Dbxref'] == 'database:acc1'
-
def test_gff_filters(self):
'It test that we can use filters for features in gff'
types = ['gene']
fhand = self._create_gff_file()
type_filter = create_feature_type_filter(types)
- gff = GffFile(fpath=fhand.name, feature_filters=[type_filter])
- items = list(gff.items)
+ gff = GffFile(fpath=fhand.name,)
+ items = filter(type_filter, gff.items)
assert items[1][1]['id'] == 'gene00001'
-
-
types = ['contig']
fhand = self._create_gff_file()
type_filter = create_feature_type_filter(types)
- gff = GffFile(fpath=fhand.name, feature_filters=[type_filter])
- items = list(gff.items)
+ gff = GffFile(fpath=fhand.name)
+ items = filter(type_filter, gff.items)
assert len(items) == 2
class GffMappersTest(unittest.TestCase):
@@ -228,10 +215,10 @@ def test_feature_type_filter():
'score': '.', 'type': 'contig', 'id':'ctg0', 'strand': '.'}
type_filter = create_feature_type_filter(['contig'])
- assert filter(type_filter, [feature])
+ assert filter(type_filter, [(FEATURE, feature)])
type_filter = create_feature_type_filter(['gene'])
- assert not filter(type_filter, [feature])
+ assert not filter(type_filter, [(FEATURE, feature)])
class GffOutTest(unittest.TestCase):
'It tests the gff writer'
Please sign in to comment.
Something went wrong with that request. Please try again.