-
Notifications
You must be signed in to change notification settings - Fork 12
/
bionitio.py
236 lines (206 loc) · 8.18 KB
/
bionitio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
'''
Module : Main
Description : The main entry point for the program.
Copyright : (c) BIONITIO_AUTHOR, BIONITIO_DATE
License : BIONITIO_LICENSE
Maintainer : BIONITIO_EMAIL
Portability : POSIX
The program reads one or more input FASTA files. For each file it computes a
variety of statistics, and then prints a summary of the statistics as output.
'''
from argparse import ArgumentParser
from math import floor
import sys
import logging
import pkg_resources
from Bio import SeqIO
EXIT_FILE_IO_ERROR = 1
EXIT_COMMAND_LINE_ERROR = 2
EXIT_FASTA_FILE_ERROR = 3
DEFAULT_MIN_LEN = 0
DEFAULT_VERBOSE = False
HEADER = 'FILENAME\tNUMSEQ\tTOTAL\tMIN\tAVG\tMAX'
PROGRAM_NAME = "bionitio"
try:
PROGRAM_VERSION = pkg_resources.require(PROGRAM_NAME)[0].version
except pkg_resources.DistributionNotFound:
PROGRAM_VERSION = "undefined_version"
def exit_with_error(message, exit_status):
'''Print an error message to stderr, prefixed by the program name and 'ERROR'.
Then exit program with supplied exit status.
Arguments:
message: an error message as a string.
exit_status: a positive integer representing the exit status of the
program.
'''
logging.error(message)
print("{} ERROR: {}, exiting".format(PROGRAM_NAME, message), file=sys.stderr)
sys.exit(exit_status)
def parse_args():
'''Parse command line arguments.
Returns Options object with command line argument values as attributes.
Will exit the program on a command line error.
'''
description = 'Read one or more FASTA files, compute simple stats for each file'
parser = ArgumentParser(description=description)
parser.add_argument(
'--minlen',
metavar='N',
type=int,
default=DEFAULT_MIN_LEN,
help='Minimum length sequence to include in stats (default {})'.format(
DEFAULT_MIN_LEN))
parser.add_argument('--version',
action='version',
version='%(prog)s ' + PROGRAM_VERSION)
parser.add_argument('--log',
metavar='LOG_FILE',
type=str,
help='record program progress in LOG_FILE')
parser.add_argument('fasta_files',
nargs='*',
metavar='FASTA_FILE',
type=str,
help='Input FASTA files')
return parser.parse_args()
class FastaStats(object):
'''Compute various statistics for a FASTA file:
num_seqs: the number of sequences in the file satisfying the minimum
length requirement (minlen_threshold).
num_bases: the total length of all the counted sequences.
min_len: the minimum length of the counted sequences.
max_len: the maximum length of the counted sequences.
average: the average length of the counted sequences rounded down
to an integer.
'''
#pylint: disable=too-many-arguments
def __init__(self,
num_seqs=None,
num_bases=None,
min_len=None,
max_len=None,
average=None):
"Build an empty FastaStats object"
self.num_seqs = num_seqs
self.num_bases = num_bases
self.min_len = min_len
self.max_len = max_len
self.average = average
def __eq__(self, other):
"Two FastaStats objects are equal iff their attributes are equal"
if type(other) is type(self):
return self.__dict__ == other.__dict__
return False
def __repr__(self):
"Generate a printable representation of a FastaStats object"
return "FastaStats(num_seqs={}, num_bases={}, min_len={}, max_len={}, " \
"average={})".format(
self.num_seqs, self.num_bases, self.min_len, self.max_len,
self.average)
def from_file(self, fasta_file, minlen_threshold=DEFAULT_MIN_LEN):
'''Compute a FastaStats object from an input FASTA file.
Arguments:
fasta_file: an open file object for the FASTA file
minlen_threshold: the minimum length sequence to consider in
computing the statistics. Sequences in the input FASTA file
which have a length less than this value are ignored and not
considered in the resulting statistics.
Result:
A FastaStats object
'''
num_seqs = num_bases = 0
min_len = max_len = None
for seq in SeqIO.parse(fasta_file, "fasta"):
this_len = len(seq)
if this_len >= minlen_threshold:
if num_seqs == 0:
min_len = max_len = this_len
else:
min_len = min(this_len, min_len)
max_len = max(this_len, max_len)
num_seqs += 1
num_bases += this_len
if num_seqs > 0:
self.average = int(floor(float(num_bases) / num_seqs))
else:
self.average = None
self.num_seqs = num_seqs
self.num_bases = num_bases
self.min_len = min_len
self.max_len = max_len
return self
def pretty(self, filename):
'''Generate a pretty printable representation of a FastaStats object
suitable for output of the program. The output is a tab-delimited
string containing the filename of the input FASTA file followed by
the attributes of the object. If 0 sequences were read from the FASTA
file then num_seqs and num_bases are output as 0, and min_len, average
and max_len are output as a dash "-".
Arguments:
filename: the name of the input FASTA file
Result:
A string suitable for pretty printed output
'''
if self.num_seqs > 0:
num_seqs = str(self.num_seqs)
num_bases = str(self.num_bases)
min_len = str(self.min_len)
average = str(self.average)
max_len = str(self.max_len)
else:
num_seqs = num_bases = "0"
min_len = average = max_len = "-"
return "\t".join([filename, num_seqs, num_bases, min_len, average,
max_len])
def process_files(options):
'''Compute and print FastaStats for each input FASTA file specified on the
command line. If no FASTA files are specified on the command line then
read from the standard input (stdin).
Arguments:
options: the command line options of the program
Result:
None
'''
if options.fasta_files:
for fasta_filename in options.fasta_files:
logging.info("Processing FASTA file from %s", fasta_filename)
try:
fasta_file = open(fasta_filename)
except IOError as exception:
exit_with_error(str(exception), EXIT_FILE_IO_ERROR)
else:
with fasta_file:
stats = FastaStats().from_file(fasta_file, options.minlen)
print(stats.pretty(fasta_filename))
else:
logging.info("Processing FASTA file from stdin")
stats = FastaStats().from_file(sys.stdin, options.minlen)
print(stats.pretty("stdin"))
def init_logging(log_filename):
'''If the log_filename is defined, then
initialise the logging facility, and write log statement
indicating the program has started, and also write out the
command line from sys.argv
Arguments:
log_filename: either None, if logging is not required, or the
string name of the log file to write to
Result:
None
'''
if log_filename is not None:
logging.basicConfig(filename=log_filename,
level=logging.DEBUG,
filemode='w',
format='%(asctime)s %(levelname)s - %(message)s',
datefmt="%Y-%m-%dT%H:%M:%S%z")
logging.info('program started')
logging.info('command line: %s', ' '.join(sys.argv))
def main():
"Orchestrate the execution of the program"
options = parse_args()
init_logging(options.log)
print(HEADER)
process_files(options)
# If this script is run from the command line then call the main function.
if __name__ == '__main__':
main()