This repository has been archived by the owner on Nov 9, 2023. It is now read-only.
/
core_microbiome.py
85 lines (73 loc) · 3.34 KB
/
core_microbiome.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python
# File created on 08 Jun 2012
from __future__ import division
__author__ = "Greg Caporaso"
__copyright__ = "Copyright 2011, The QIIME project"
__credits__ = ["Greg Caporaso", "Jai Ram Rideout"]
__license__ = "GPL"
__version__ = "1.8.0"
__maintainer__ = "Greg Caporaso"
__email__ = "gregcaporaso@gmail.com"
from numpy import array
from biom.exception import TableException
from biom.table import table_factory
def get_filter_to_core_f(table,
sample_ids=None,
fraction_for_core=1.):
""" return function that filters a table to its core observations
table: the biom-format table object to filter
sample_ids: list of sample ids of interest for the core
computation (default: all samples are of interest)
fraction_for_core: the fraction of the sample_ids that
an observation must have a non-zero count for to be
considered a core observation
"""
if not (0. <= fraction_for_core <= 1.):
raise ValueError, \
"invalid fraction_for_core passed to core filter: %1.2f is outside of range [0,1]." % fraction_for_core
# generate the position mask, which contains True at SampleIds
# positions that contain an id in sample_ids
if sample_ids == None:
position_mask = array([True] * len(table.SampleIds))
else:
position_mask = array([s in sample_ids for s in table.SampleIds])
# determine the number of sample_ids that must have a non-zero
# value for an OTU to be considered part of the core
min_count = fraction_for_core * position_mask.sum()
def f(values, obs_ids, obs_md):
# count the sample ids with non-zero observation
# counts that are in sample_ids. if that is greater than
# the minimum required count, return True
return ((values != 0) & position_mask).sum() >= min_count
return f
def filter_table_to_core(table,
sample_ids=None,
fraction_for_core=1.):
""" filter a table to it's core observations
table: the biom-format table object to filter
sample_ids: list of sample ids of interest for the core
computation (default: all samples are of interest)
fraction_for_core: the fraction of the sample_ids that
an observation must have a non-zero count for to be
considered a core observation
"""
filter_f = get_filter_to_core_f(table,sample_ids,fraction_for_core)
return table.filterObservations(filter_f)
def core_observations_across_sample_ids(table,
sample_ids=None,
fraction_for_core=1.):
""" get the list of core observations in table
table: the biom-format table object to filter
sample_ids: list of sample ids of interest for the core
computation (default: all samples are of interest)
fraction_for_core: the fraction of the sample_ids that
an observation must have a non-zero count for to be
considered a core observation
"""
try:
result = list(filter_table_to_core(table,
sample_ids,
fraction_for_core).ObservationIds)
except TableException:
result = []
return result