-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
hmmerdomtab.py
363 lines (294 loc) · 13.3 KB
/
hmmerdomtab.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# Copyright 2012 by Wibowo Arindrarto. All rights reserved.
# This code is part of the Biopython distribution and governed by its
# license. Please see the LICENSE file that should have been included
# as part of this package.
"""Bio.SearchIO parser for HMMER domain table output format."""
from itertools import chain
from Bio._py3k import _as_bytes, _bytes_to_string
from Bio.SearchIO._objects import QueryResult, Hit, HSP
from Bio.SearchIO._index import SearchIndexer
from hmmertab import HmmerTabIndexer
def hmmer_domtab_hmmhit_iterator(handle):
"""Generator function to parse HMMER domain table output as QueryResults.
handle -- Handle to the file.
This iterator assumes that the HMM coordinates are search hit coordinates.
"""
for qresult in HmmerDomtabIterator(handle, hmm_as_hit=True):
yield qresult
def hmmer_domtab_hmmquery_iterator(handle):
"""Generator function to parse HMMER domain table output as QueryResults.
handle -- Handle to the file.
This iterator assumes that the HMM coordinates are search query
coordinates.
"""
for qresult in HmmerDomtabIterator(handle, hmm_as_hit=False):
yield qresult
def read_forward(handle, strip=True):
"""Reads through whitespaces, returns the first non-whitespace line."""
while True:
line = handle.readline()
# return the line if it has characters
if line and line.strip():
if strip:
return line.strip()
else:
return line
# or if has no characters (EOF)
elif not line:
return line
class HmmerDomtabIterator(object):
"""Parser for the HMMER domain table format that assumes HMM profile
coordinates are hit coordinates."""
def __init__(self, handle, hmm_as_hit):
self.handle = handle
self.line = read_forward(self.handle)
self.hmm_as_hit = hmm_as_hit
def __iter__(self):
# stop iterating if it's an empty file
if not self.line:
raise StopIteration
# if line starts with '#', it's a header line
# and we want to read through that
else:
if self.line.startswith('#'):
while True:
self.line = read_forward(self.handle)
# break out of loop when it's not the header lines anymore
if not self.line.startswith('#'):
break
# stop iterating if we only have headers
if not self.line:
raise StopIteration
for qresult in self.parse_qresult():
yield qresult
def parse_result_row(self):
"""Returns a dictionary of parsed row values."""
assert self.line
cols = filter(None, self.line.strip().split(' '))
# if len(cols) > 22, we have extra description columns
# combine them all into one string in the 19th column
if len(cols) > 22:
cols[22] = ' '.join(cols[22:])
else:
cols[22] = ''
# assign parsed column data into qresult, hit, and hsp dicts
qresult = {}
qresult['id'] = cols[3] # query name
qresult['acc'] = cols[4] # query accession
qresult['seq_len'] = cols[5] # qlen
hit = {}
hit['id'] = cols[0] # target name
hit['acc'] = cols[1] # target accession
hit['seq_len'] = cols[2] # tlen
hit['evalue'] = cols[6] # evalue
hit['bitscore'] = cols[7] # score
hit['bias'] = cols[8] # bias
hit['desc'] = cols[22] # description of target
hsp = {}
hsp['domain_index'] = cols[9] # # (domain number)
# not parsing cols[10] since it's basically len(hit)
hsp['evalue_cond'] = cols[11] # c-evalue
hsp['evalue'] = cols[12] # i-evalue
hsp['bitscore'] = cols[13] # score
hsp['bias'] = cols[14] # bias
hsp['hit_from'] = int(cols[15]) - 1 # hmm from
hsp['hit_to'] = int(cols[16]) - 1 # hmm to
hsp['query_from'] = int(cols[17]) - 1 # ali from
hsp['query_to'] = int(cols[18]) - 1 # ali to
hsp['env_from'] = int(cols[19]) - 1 # env from
hsp['env_to'] = int(cols[20]) - 1 # env to
hsp['acc_avg'] = cols[21] # acc
# switch hmm<-->ali coordinates if hmm is not hit
if not self.hmm_as_hit:
hsp['hit_to'], hsp['query_to'] = \
hsp['query_to'], hsp['hit_to']
hsp['hit_from'], hsp['query_from'] = \
hsp['query_from'], hsp['hit_from']
return {'qresult': qresult, 'hit': hit, 'hsp': hsp}
def parse_qresult(self):
"""Generator function that returns QueryResult objects."""
qid_cache = None
hid_cache = None
# flag for denoting whether the query is the same as the previous line
# or not
same_query = False
while True:
# only parse the result row if it's not EOF
if self.line:
parsed = self.parse_result_row()
qresult_id = parsed['qresult']['id']
# a new qresult is created whenever qid_cache != qresult_id
if qid_cache != qresult_id:
# append the last hit and yield qresult if qid_cache is filled
if qid_cache is not None:
qresult.append(hit)
yield qresult
same_query = False
qid_cache = qresult_id
qresult = QueryResult(qresult_id)
for attr, value in parsed['qresult'].items():
setattr(qresult, attr, value)
# when we've reached EOF, try yield any remaining qresult and break
elif not self.line:
qresult.append(hit)
yield qresult
break
# otherwise, we must still be in the same query, so set the flag
elif not same_query:
same_query = True
hit_id = parsed['hit']['id']
# a new hit is created whenever hid_cache != hit_id
if hid_cache != hit_id:
# if we're in the same query, append the previous line's hit
if same_query:
qresult.append(hit)
hid_cache = hit_id
hit = Hit(hit_id, qresult_id)
for attr, value in parsed['hit'].items():
setattr(hit, attr, value)
# each line is basically a different HSP, so we always add it to
# any hit object we have
hsp = HSP(hit_id, qresult_id)
for attr, value in parsed['hsp'].items():
setattr(hsp, attr, value)
hit.append(hsp)
self.line = read_forward(self.handle)
class HmmerDomtabHmmhitIndexer(HmmerTabIndexer):
"""Indexer class for HMMER domain table output that assumes HMM profile
coordinates are hit coordinates."""
def __init__(self, *args, **kwargs):
HmmerTabIndexer.__init__(self, *args, **kwargs)
# set parser for on-the-fly parsing
self._parser = hmmer_domtab_hmmhit_iterator
self._query_id_idx = 3
class HmmerDomtabHmmqueryIndexer(HmmerTabIndexer):
"""Indexer class for HMMER domain table output that assumes HMM profile
coordinates are query coordinates."""
def __init__(self, *args, **kwargs):
HmmerTabIndexer.__init__(self, *args, **kwargs)
# set parser for on-the-fly parsing
self._parser = hmmer_domtab_hmmquery_iterator
self._query_id_idx = 3
class HmmerDomtabHmmhitWriter(object):
"""Writer for hmmer-domtab output format which writes hit coordinates
as HMM profile coordinates."""
def __init__(self, handle):
self.handle = handle
self.hmm_as_hit = True
def write_file(self, qresults):
"""Writes to the handle.
Returns a tuple of how many QueryResult, Hit, and HSP objects were written.
"""
handle = self.handle
qresult_counter, hit_counter, hsp_counter = 0, 0, 0
try:
first_qresult = qresults.next()
except StopIteration:
handle.write(self.build_header())
else:
# write header
handle.write(self.build_header(first_qresult))
# and then the qresults
for qresult in chain([first_qresult], qresults):
if qresult:
handle.write(self.build_row(qresult))
qresult_counter += 1
hit_counter += len(qresult)
hsp_counter += sum([len(hit) for hit in qresult])
return qresult_counter, hit_counter, hsp_counter
def build_header(self, first_qresult=None):
"""Returns the header string of a domain HMMER table output."""
# calculate whitespace required
# adapted from HMMER's source: src/p7_tophits.c#L1157
if first_qresult is not None:
#qnamew = max(20, len(first_qresult.id))
qnamew = 20
tnamew = max(20, len(first_qresult[0].id))
qaccw = max(10, len(first_qresult.acc))
taccw = max(10, len(first_qresult[0].acc))
else:
qnamew, tnamew, qaccw, taccw = 20, 20, 10, 10
header = ''
header += "#%*s %22s %40s %11s %11s %11s\n" % \
(tnamew+qnamew-1+15+taccw+qaccw, "", "--- full sequence ---", \
"-------------- this domain -------------", "hmm coord", \
"ali coord", "env coord")
header += "#%-*s %-*s %5s %-*s %-*s %5s %9s %6s %5s %3s %3s %9s " \
"%9s %6s %5s %5s %5s %5s %5s %5s %5s %4s %s\n" % (tnamew-1, \
" target name", taccw, "accession", "tlen", qnamew, \
"query name", qaccw, "accession", "qlen", "E-value", "score", \
"bias", "#", "of", "c-Evalue", "i-Evalue", "score", "bias", \
"from", "to", "from", "to", "from", "to", "acc", \
"description of target")
header += "#%*s %*s %5s %*s %*s %5s %9s %6s %5s %3s %3s %9s %9s " \
"%6s %5s %5s %5s %5s %5s %5s %5s %4s %s\n" % (tnamew-1, \
"-------------------", taccw, "----------", "-----", \
qnamew, "--------------------", qaccw, "----------", \
"-----", "---------", "------", "-----", "---", "---", \
"---------", "---------", "------", "-----", "-----", "-----", \
"-----", "-----", "-----", "-----", "----", \
"---------------------")
return header
def build_row(self, qresult):
"""Returns a string or one row or more of the QueryResult object."""
rows = ''
# calculate whitespace required
# adapted from HMMER's source: src/p7_tophits.c#L1083
qnamew = max(20, len(qresult.id))
tnamew = max(20, len(qresult[0].id))
qaccw = max(10, len(qresult.acc))
taccw = max(10, len(qresult[0].acc))
# try to get qresult accession
try:
qresult_acc = qresult.acc
except AttributeError:
qresult_acc = '-'
for hit in qresult:
# try to get hit accession
try:
hit_acc = hit.acc
except AttributeError:
hit_acc = '-'
for hsp in hit:
if self.hmm_as_hit:
hmm_to = hsp.hit_to + 1
hmm_from = hsp.hit_from + 1
ali_to = hsp.query_to + 1
ali_from = hsp.query_from + 1
else:
hmm_to = hsp.query_to + 1
hmm_from = hsp.query_from + 1
ali_to = hsp.hit_to + 1
ali_from = hsp.hit_from + 1
rows += "%-*s %-*s %5d %-*s %-*s %5d %9.2g %6.1f %5.1f %3d %3d" \
" %9.2g %9.2g %6.1f %5.1f %5d %5d %5ld %5ld %5d %5d %4.2f %s\n" % \
(tnamew, hit.id, taccw, hit_acc, hit.seq_len, qnamew, qresult.id, \
qaccw, qresult_acc, qresult.seq_len, hit.evalue, hit.bitscore, \
hit.bias, hsp.domain_index, len(hit), hsp.evalue_cond, hsp.evalue, \
hsp.bitscore, hsp.bias, hmm_from, hmm_to, ali_from, ali_to, \
hsp.env_from + 1, hsp.env_to + 1, hsp.acc_avg, hit.desc)
return rows
class HmmerDomtabHmmqueryWriter(HmmerDomtabHmmhitWriter):
"""Writer for hmmer-domtab output format which writes query coordinates
as HMM profile coordinates."""
def __init__(self, handle):
self.handle = handle
self.hmm_as_hit = False
def _test():
"""Run the Bio.SearchIO.HmmerIO.hmmerdomtab module's doctests.
This will try and locate the unit tests directory, and run the doctests
from there in order that the relative paths used in the examples work.
"""
import doctest
import os
test_dir = 'Tests'
if os.path.isdir(os.path.join('..', '..', test_dir)):
print "Runing doctests..."
cur_dir = os.path.abspath(os.curdir)
os.chdir(os.path.join('..', '..', test_dir))
doctest.testmod()
os.chdir(cur_dir)
print "Done"
# if not used as a module, run the doctest
if __name__ == "__main__":
_test()