This repository has been archived by the owner on Nov 9, 2023. It is now read-only.
/
supervised_learning.py
602 lines (524 loc) · 22.4 KB
/
supervised_learning.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
#!/usr/bin/env python
__author__ = "Dan Knights"
__copyright__ = "Copyright 2010, The QIIME Project"
__credits__ = ["Dan Knights"]
__license__ = "GPL"
__version__ = "1.2.1"
__maintainer__ = "Dan Knights"
__email__ = "daniel.knights@colorado.edu"
__status__ = "Release"
import subprocess
from os import remove, path, devnull
from os.path import join
from sys import stdout
from time import sleep
from tempfile import mkdtemp
from cogent.app.util import get_tmp_filename
from qiime.util import get_qiime_project_dir
from qiime.format import format_otu_table
from cogent.app.util import CommandLineApplication, CommandLineAppResult, \
FilePath, ResultPath, ApplicationError
from cogent.app.parameters import Parameters
from qiime.parse import parse_otu_table, parse_mapping_file
from numpy import array, set_printoptions, nan
def R_format_table(input_filepath, output_filepath=None, write_to_file=True):
"""Formats OTU table or mapping file for R. Removes '#' from header line
If write_to_tmp_file, writes formatted file to file and returns path
else, returns lines of new file
Note: proper way to do this is to use, e.g., parse_otu_table, but that
is very slow for large tables.
This works whether there are comment lines or not, and whether the header
has a '#' at the beginning r not. This method would break if there were
a comment line before the header that happened to have the same number
of delimeters (i.e. tabs) as the data rows.
"""
# preprocessing: determine number of columns in data
fin = open(input_filepath, 'U')
line = fin.readline()
while line.startswith('#'):
line = fin.readline()
num_columns = len(line.strip().split('\t'))
fin.close()
# open a temporary file
if write_to_file:
if output_filepath is None:
output_filepath = get_tmp_filename(
prefix='table_R_format', suffix='.txt')
fout = open(output_filepath, 'w')
else:
lines = []
# copy all lines to new file, but strip comment character from header
fin = open(input_filepath, 'U')
for line in fin:
if line.startswith('#'):
if len(line.strip().split('\t')) == num_columns:
line = line[1:]
if write_to_file:
fout.write(line.strip() + '\n')
else:
lines.append(line.strip())
fin.close()
if write_to_file:
fout.close()
return output_filepath
else:
return lines
class RSupervisedLearner(CommandLineApplication):
"""R Supervised Learner application controller
Runs R with a source script (from qiime/support_files/R), and
passes in an OTU table and mapping file. Causes R to run a supervised
classifier to predict labels from a given category from the mapping file
using the provided OTUs.
"""
_input_handler = '_input_as_path'
_command = "R"
_options ={}
_R_parameters = {
'flags': '--vanilla --slave'
}
# The name of the R script (located under qiime/support_files/R/)
_R_script = 'error.est.r'
_parameters = {}
_parameters.update(_options)
_parameters.update(_R_parameters)
def getHelp(self):
"""Returns documentation string"""
help_str =\
"""
Runs a supervised classifier with an OTU table as predictors and one
column from a mapping file as the category labels.
Outputs:
predictions.txt: the labels predicted by the classifier for the given
samples. Each sample is predicted by a model that was trained
without it.
probabilities.txt: the label probabilities for each of the given
samples. (if available)
summary.txt: a summary of the results, including the expected
generalization error of the classifier
features.txt: a list of discriminative OTUs with their associated
importance scores (if available)
params.txt: a list of any non-default parameters used in training
the model.
For an overview of the application of supervised classification to
microbiota, see PubMed ID 21039646.
"""
return help_str
def __call__(self, predictor_fp, response_fp, response_name,
model_names, output_dir=None, remove_tmp=True,
param_file=None, filter=None,
filter_min=10,filter_max=100,
filter_step=10, filter_reps=10,
verbose=False):
"""Run the application with the specified kwargs on data
data: A file nameinput_handler will be called on this data before it
is passed as part of the command-line argument, so by creating
your own input handlers you can customize what kind of data
you want your application to accept
remove_tmp: if True, removes tmp files
returns a dict of CommandLineAppResult objects, one for each machine
learning model, keyed by the model name
"""
input_handler = self.InputHandler
suppress_stdout = self.SuppressStdout
suppress_stderr = self.SuppressStderr
if suppress_stdout:
outfile = devnull
else:
outfilepath = FilePath(self.getTmpFilename(self.TmpDir))
outfile = open(outfilepath,'w')
if suppress_stderr:
errfile = devnull
else:
errfilepath = FilePath(self.getTmpFilename(self.TmpDir))
errfile = open(errfilepath, 'w')
predictor_fp = getattr(self,input_handler)(predictor_fp)
response_fp = getattr(self,input_handler)(response_fp)
# create random output dir if needed
if output_dir is None:
output_dir = mkdtemp(prefix='R_output_')
rflags = self.RParameters['flags']
rscript = self._get_R_script_path()
base_command = self._get_base_command()
cd_command, base_command = base_command.split(';')
cd_command += ';'
R_source_dir = self._get_R_script_dir()
# Build up the command, consisting of a BaseCommand followed by
# input and output (file) specifications
pre_command = 'cat'
args = ['--sourcedir', R_source_dir,
'-i', predictor_fp,
'-m', response_fp,
'-c', response_name,
'-o', output_dir,
'--models', ','.join(model_names)]
if verbose:
args += ['--verbose']
if not param_file is None:
args += ['--params', param_file]
if not filter is None:
args += ['--filter', filter,
'--filter_min', str(filter_min),
'--filter_max', str(filter_max),
'--filter_reps', str(filter_reps),
'--filter_step', str(filter_step)]
if param_file is None:
param_file = ''
command = self._commandline_join(
[ cd_command, pre_command, '%s |' %(rscript), base_command,
'--args'
] + args
)
if verbose:
print "Command: ", command
if self.HaltExec:
raise AssertionError, "Halted exec with command:\n" + command
# run command, wait for output, get exit status
proc = subprocess.Popen(command, shell=True, stdout=outfile, stderr=errfile)
if verbose:
print '\nR output\n'
tmpoutfile = open(outfilepath,'U')
while proc.poll() is None:
stdout.write(tmpoutfile.readline())
sleep(0.01)
tmpoutfile.close()
proc.wait()
exit_status = proc.returncode
# Determine if error should be raised due to exit status of
# appliciation
if not self._accept_exit_status(exit_status):
if exit_status == 2:
raise ApplicationError, \
'R library not installed: \n' + \
''.join(open(errfilepath,'r').readlines()) + '\n'
else:
raise ApplicationError, \
'Unacceptable application exit status: %s, command: %s'\
% (str(exit_status),command) +\
' Program output: \n\n%s\n'\
%(''.join(open(errfilepath,'r').readlines()))
# open the stdout and stderr if not being suppressed
out = None
if not suppress_stdout:
out = open(outfilepath,"r")
err = None
if not suppress_stderr:
err = open(errfilepath,"r")
result = {}
for i, model in enumerate(model_names):
subdir = join(output_dir, model)
# don't attempt to open the out/err files more than once
if i == 1:
out = err = None
try:
result[model] = CommandLineAppResult(
out, err, exit_status,
result_paths=self._get_result_paths(subdir))
except ApplicationError, ae:
msg = str(ae) + \
'\n\ncommand: %s'\
% (command) +\
' \n\nProgram stdout:\n%s'\
%(''.join(open(outfilepath,'r').readlines())) +\
' \n\nProgram stderr:\n%s'\
%(''.join(open(errfilepath,'r').readlines()))
raise ApplicationError, msg
# Clean up the input file if one was created
if remove_tmp:
if self._input_filename:
remove(self._input_filename)
self._input_filename = None
return result
def _get_result_paths(self, output_dir):
"""Returns the filepaths for all result files"""
files = {
'features': 'feature_importance_scores.txt',
'summary': 'summary.txt',
'cv_probabilities': 'cv_probabilities.txt',
'mislabeling': 'mislabeling.txt',
'params': 'params.txt',
}
result_paths = {}
for name, file in files.iteritems():
result_paths[name] = ResultPath(
Path=path.join(output_dir, file), IsWritten=True)
return result_paths
def _get_R_script_dir(self):
"""Returns the path to the qiime R source directory
"""
qiime_dir = get_qiime_project_dir()
script_dir = path.join(qiime_dir,'qiime','support_files','R')
return script_dir
def _get_R_script_path(self):
"""Returns the path to the R script to be executed
"""
return path.join(self._get_R_script_dir(), self._R_script)
def _commandline_join(self, tokens):
"""Formats a list of tokens as a shell command
"""
commands = filter(None, map(str, tokens))
return self._command_delimiter.join(commands).strip()
def _accept_exit_status(self,exit_status):
""" Return False to raise an error due to exit_status !=0 of application
"""
if exit_status != 0:
return False
return True
@property
def RParameters(self):
return self.__extract_parameters('R')
def __extract_parameters(self, name):
"""Extracts parameters in self._<name>_parameters from self.Parameters
Allows the program to conveniently access a subset of user-
adjusted parameters, which are stored in the Parameters
attribute.
Relies on the convention of providing dicts named according to
"_<name>_parameters" and "_<name>_synonyms". The main
parameters object is expected to be initialized with the
contents of these dicts. This method will throw an exception
if either convention is not adhered to.
"""
parameters = getattr(self, '_' + name + '_parameters')
result = Parameters(parameters)
for key in result.keys():
result[key] = self.Parameters[key]
return result
class RSupervisedLearner(CommandLineApplication):
"""R Supervised Learner application controller
Runs R with a source script (from qiime/support_files/R), and
passes in an OTU table and mapping file. Causes R to run a supervised
classifier to predict labels from a given category from the mapping file
using the provided OTUs.
"""
_input_handler = '_input_as_path'
_command = "R"
_options ={}
_R_parameters = {
'flags': '--vanilla --slave'
}
# The name of the R script (located under qiime/support_files/R/)
_R_script = 'error.est.r'
_parameters = {}
_parameters.update(_options)
_parameters.update(_R_parameters)
def getHelp(self):
"""Returns documentation string"""
help_str =\
"""
Runs a supervised classifier with an OTU table as predictors and one
column from a mapping file as the category labels.
Outputs:
predictions.txt: the labels predicted by the classifier for the given
samples. Each sample is predicted by a model that was trained
without it.
probabilities.txt: the label probabilities for each of the given
samples. (if available)
summary.txt: a summary of the results, including the expected
generalization error of the classifier
features.txt: a list of discriminative OTUs with their associated
importance scores (if available)
params.txt: a list of any non-default parameters used in training
the model.
For an overview of the application of supervised classification to
microbiota, see PubMed ID 21039646.
"""
return help_str
def __call__(self, predictor_fp, response_fp, response_name,
model_names, output_dir=None, remove_tmp=True,
param_file=None, filter=None,
filter_min=10,filter_max=100,
filter_step=10, filter_reps=10,
verbose=False):
"""Run the application with the specified kwargs on data
data: A file nameinput_handler will be called on this data before it
is passed as part of the command-line argument, so by creating
your own input handlers you can customize what kind of data
you want your application to accept
remove_tmp: if True, removes tmp files
returns a dict of CommandLineAppResult objects, one for each machine
learning model, keyed by the model name
"""
input_handler = self.InputHandler
suppress_stdout = self.SuppressStdout
suppress_stderr = self.SuppressStderr
if suppress_stdout:
outfile = devnull
else:
outfilepath = FilePath(self.getTmpFilename(self.TmpDir))
outfile = open(outfilepath,'w')
if suppress_stderr:
errfile = devnull
else:
errfilepath = FilePath(self.getTmpFilename(self.TmpDir))
errfile = open(errfilepath, 'w')
predictor_fp = getattr(self,input_handler)(predictor_fp)
response_fp = getattr(self,input_handler)(response_fp)
# create random output dir if needed
if output_dir is None:
output_dir = mkdtemp(prefix='R_output_')
rflags = self.RParameters['flags']
rscript = self._get_R_script_path()
base_command = self._get_base_command()
cd_command, base_command = base_command.split(';')
cd_command += ';'
R_source_dir = self._get_R_script_dir()
# Build up the command, consisting of a BaseCommand followed by
# input and output (file) specifications
pre_command = 'cat'
args = ['--sourcedir', R_source_dir,
'-i', predictor_fp,
'-m', response_fp,
'-c', response_name,
'-o', output_dir,
'--models', ','.join(model_names)]
if verbose:
args += ['--verbose']
if not param_file is None:
args += ['--params', param_file]
if not filter is None:
args += ['--filter', filter,
'--filter_min', str(filter_min),
'--filter_max', str(filter_max),
'--filter_reps', str(filter_reps),
'--filter_step', str(filter_step)]
if param_file is None:
param_file = ''
command = self._commandline_join(
[ cd_command, pre_command, '%s |' %(rscript), base_command,
'--args'
] + args
)
if verbose:
print "Command: ", command
if self.HaltExec:
raise AssertionError, "Halted exec with command:\n" + command
# run command, wait for output, get exit status
proc = subprocess.Popen(command, shell=True, stdout=outfile, stderr=errfile)
if verbose:
print '\nR output\n'
tmpoutfile = open(outfilepath,'U')
while proc.poll() is None:
stdout.write(tmpoutfile.readline())
sleep(0.01)
tmpoutfile.close()
proc.wait()
exit_status = proc.returncode
# Determine if error should be raised due to exit status of
# appliciation
if not self._accept_exit_status(exit_status):
if exit_status == 2:
raise ApplicationError, \
'R library not installed: \n' + \
''.join(open(errfilepath,'r').readlines()) + '\n'
else:
raise ApplicationError, \
'Unacceptable application exit status: %s, command: %s'\
% (str(exit_status),command) +\
' Program output: \n\n%s\n'\
%(''.join(open(errfilepath,'r').readlines()))
# open the stdout and stderr if not being suppressed
out = None
if not suppress_stdout:
out = open(outfilepath,"r")
err = None
if not suppress_stderr:
err = open(errfilepath,"r")
result = {}
for i, model in enumerate(model_names):
subdir = join(output_dir, model)
# don't attempt to open the out/err files more than once
if i == 1:
out = err = None
try:
result[model] = CommandLineAppResult(
out, err, exit_status,
result_paths=self._get_result_paths(subdir))
except ApplicationError, ae:
msg = str(ae) + \
'\n\ncommand: %s'\
% (command) +\
' \n\nProgram stdout:\n%s'\
%(''.join(open(outfilepath,'r').readlines())) +\
' \n\nProgram stderr:\n%s'\
%(''.join(open(errfilepath,'r').readlines()))
raise ApplicationError, msg
# Clean up the input file if one was created
if remove_tmp:
if self._input_filename:
remove(self._input_filename)
self._input_filename = None
return result
def _get_result_paths(self, output_dir):
"""Returns the filepaths for all result files"""
files = {
'features': 'feature_importance_scores.txt',
'summary': 'summary.txt',
'cv_probabilities': 'cv_probabilities.txt',
'mislabeling': 'mislabeling.txt',
'params': 'params.txt',
}
result_paths = {}
for name, file in files.iteritems():
result_paths[name] = ResultPath(
Path=path.join(output_dir, file), IsWritten=True)
return result_paths
def _get_R_script_dir(self):
"""Returns the path to the qiime R source directory
"""
qiime_dir = get_qiime_project_dir()
script_dir = path.join(qiime_dir,'qiime','support_files','R')
return script_dir
def _get_R_script_path(self):
"""Returns the path to the R script to be executed
"""
return path.join(self._get_R_script_dir(), self._R_script)
def _commandline_join(self, tokens):
"""Formats a list of tokens as a shell command
"""
commands = filter(None, map(str, tokens))
return self._command_delimiter.join(commands).strip()
def _accept_exit_status(self,exit_status):
""" Return False to raise an error due to exit_status !=0 of application
"""
if exit_status != 0:
return False
return True
@property
def RParameters(self):
return self.__extract_parameters('R')
def __extract_parameters(self, name):
"""Extracts parameters in self._<name>_parameters from self.Parameters
Allows the program to conveniently access a subset of user-
adjusted parameters, which are stored in the Parameters
attribute.
Relies on the convention of providing dicts named according to
"_<name>_parameters" and "_<name>_synonyms". The main
parameters object is expected to be initialized with the
contents of these dicts. This method will throw an exception
if either convention is not adhered to.
"""
parameters = getattr(self, '_' + name + '_parameters')
result = Parameters(parameters)
for key in result.keys():
result[key] = self.Parameters[key]
return result
class RSupervisedLearnerFilter(RSupervisedLearner):
"""R Supervised Learner application controller
Runs R with a source script (from qiime/support_files/R), and
passes in an OTU table and mapping file. Causes R to run a supervised
classifier to predict labels from a given category from the mapping file
using the provided OTUs.
"""
def _get_result_paths(self, output_dir):
"""Returns the filepaths for all result files"""
files = {
'params': 'params.txt',
'filter_summary': 'filter_summary.txt',
'filter_errors': 'filter_errors.txt',
'filter_features': 'filter_features.txt',
'otu_subset': 'otu_subset_table.txt',
}
result_paths = {}
for name, file in files.iteritems():
result_paths[name] = ResultPath(
Path=path.join(output_dir, file), IsWritten=True)
return result_paths