This repository has been archived by the owner on Mar 1, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
consensus.py
executable file
·128 lines (113 loc) · 4.32 KB
/
consensus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python
import os
import sys
import errno
import ctypes
import argparse
# Locate the library file.
LIBFILE = 'libconsensus.so'
script_dir = os.path.dirname(os.path.realpath(__file__))
library_path = os.path.join(script_dir, LIBFILE)
if not os.path.isfile(library_path):
library_path = os.path.join(script_dir, '..', 'lib', LIBFILE)
if not os.path.isfile(library_path):
ioe = IOError('Library file "'+LIBFILE+'" not found.')
ioe.errno = errno.ENOENT
raise ioe
consensus = ctypes.cdll.LoadLibrary(library_path)
consensus.get_consensus.restype = ctypes.c_char_p
consensus.get_consensus_duplex.restype = ctypes.c_char_p
consensus.build_consensus_duplex_simple.restype = ctypes.c_char_p
ARG_DEFAULTS = {'alignment':sys.stdin}
DESCRIPTION = "Get the consensus of a set of aligned sequences."
def make_argparser():
parser = argparse.ArgumentParser(description=DESCRIPTION)
parser.set_defaults(**ARG_DEFAULTS)
parser.add_argument('alignment', type=argparse.FileType('r'),
help='The aligned sequences, in FASTA format (but no multi-line sequences).')
return parser
def main(argv):
parser = make_argparser()
args = parser.parse_args(argv[1:])
sequences = []
line_num = 0
for line in args.alignment:
line_num += 1
if line_num % 2 == 0:
sequences.append(line.rstrip('\r\n'))
cons = get_consensus(sequences)
print(cons)
# N.B.: The quality scores must be aligned with their accompanying sequences.
def get_consensus(align, quals=[], cons_thres=-1.0, qual_thres=' ', gapped=False):
cons_thres_c = ctypes.c_double(cons_thres)
qual_thres_c = ctypes.c_char(qual_thres)
n_seqs = len(align)
if gapped:
gapped_c = 1
else:
gapped_c = 0
assert not quals or len(quals) == n_seqs, 'Different number of sequences and quals.'
seq_len = None
for seq in (align + quals):
if seq_len is None:
seq_len = len(seq)
else:
assert seq_len == len(seq), ('All sequences in the alignment must be the same length: '
'{}bp != {}bp.\nAlignment:\n{}'.format(seq_len, len(seq),
'\n'.join(align)))
align_c = (ctypes.c_char_p * n_seqs)()
for i, seq in enumerate(align):
align_c[i] = ctypes.c_char_p(seq)
quals_c = (ctypes.c_char_p * n_seqs)()
for i, qual in enumerate(quals):
quals_c[i] = ctypes.c_char_p(qual)
if not quals:
quals_c = 0
return consensus.get_consensus(align_c, quals_c, n_seqs, seq_len, cons_thres_c, qual_thres_c,
gapped_c)
# N.B.: The quality scores must be aligned with their accompanying sequences.
def get_consensus_duplex(align1, align2, quals1=[], quals2=[], cons_thres=-1.0, qual_thres=' ',
method='iupac'):
assert method in ('iupac', 'freq')
cons_thres_c = ctypes.c_double(cons_thres)
qual_thres_c = ctypes.c_char(qual_thres)
n_seqs1 = len(align1)
n_seqs2 = len(align2)
assert (not quals1 and not quals2) or (quals1 and quals2)
assert not quals1 or len(quals1) == n_seqs1
assert not quals2 or len(quals2) == n_seqs2
seq_len = None
for seq in (align1 + align2 + quals1 + quals2):
if seq_len is None:
seq_len = len(seq)
else:
assert seq_len == len(seq), 'All sequences in the alignment must be the same length.'
align1_c = (ctypes.c_char_p * n_seqs1)()
for i, seq in enumerate(align1):
align1_c[i] = ctypes.c_char_p(seq)
align2_c = (ctypes.c_char_p * n_seqs1)()
for i, seq in enumerate(align2):
align2_c[i] = ctypes.c_char_p(seq)
quals1_c = (ctypes.c_char_p * n_seqs1)()
for i, seq in enumerate(quals1):
quals1_c[i] = ctypes.c_char_p(seq)
quals2_c = (ctypes.c_char_p * n_seqs1)()
for i, seq in enumerate(quals2):
quals2_c[i] = ctypes.c_char_p(seq)
if not quals1:
quals1_c = 0
if not quals2:
quals2_c = 0
return consensus.get_consensus_duplex(align1_c, align2_c, quals1_c, quals2_c, n_seqs1, n_seqs2,
seq_len, cons_thres_c, qual_thres_c, method)
def build_consensus_duplex_simple(cons1, cons2, gapped=False):
assert len(cons1) == len(cons2)
cons1_c = ctypes.c_char_p(cons1)
cons2_c = ctypes.c_char_p(cons2)
if gapped:
gapped_c = 1
else:
gapped_c = 0
return consensus.build_consensus_duplex_simple(cons1_c, cons2_c, gapped_c)
if __name__ == '__main__':
sys.exit(main(sys.argv))