-
Notifications
You must be signed in to change notification settings - Fork 16
/
_methods.py
117 lines (95 loc) · 4.58 KB
/
_methods.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2017, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
from warnings import warn
from functools import reduce
from operator import or_
import numpy as np
import skbio
from q2_types.feature_table import BIOMV210Format
from q2_types.tree import NewickFormat
import q2_state_unifrac as qsu
from q2_state_unifrac._meta import CONSOLIDATIONS
def unweighted(table: BIOMV210Format,
phylogeny: NewickFormat,
threads: int=1,
variance_adjusted: bool=False)-> skbio.DistanceMatrix:
return qsu.ssu(str(table), str(phylogeny), 'unweighted',
variance_adjusted, 1.0, threads)
def weighted_normalized(table: BIOMV210Format,
phylogeny: NewickFormat,
threads: int=1,
variance_adjusted: bool=False)-> skbio.DistanceMatrix:
return qsu.ssu(str(table), str(phylogeny), 'weighted_normalized',
variance_adjusted, 1.0, threads)
def weighted_unnormalized(table: BIOMV210Format,
phylogeny: NewickFormat,
threads: int=1,
variance_adjusted: bool=False) -> skbio.DistanceMatrix: # noqa
return qsu.ssu(str(table), str(phylogeny), 'weighted_unnormalized',
variance_adjusted, 1.0, threads)
def generalized(table: BIOMV210Format,
phylogeny: NewickFormat,
threads: int=1,
alpha: float=1.0,
variance_adjusted: bool=False)-> skbio.DistanceMatrix:
if alpha == 1.0:
warn("alpha of 1.0 is weighted-normalized UniFrac. "
"Weighted-normalized is being used instead as it is more "
"optimized.",
Warning)
return weighted_normalized(table, phylogeny, threads,
variance_adjusted)
else:
return qsu.ssu(str(table), str(phylogeny), 'generalized',
variance_adjusted, alpha, threads)
METHODS = {'unweighted': unweighted,
'weighted_normalized': weighted_normalized,
'weighted_unnormalized': weighted_unnormalized,
'generalized': generalized}
def meta(tables: tuple, phylogenies: tuple, weights: tuple=None,
consolidation: str=None, method: str=None,
threads: int=1, variance_adjusted: bool=False,
alpha: float=None) -> skbio.DistanceMatrix:
if not len(tables):
raise ValueError("No tables specified.")
if not len(phylogenies):
raise ValueError("No trees specified.")
if len(tables) != len(phylogenies):
raise ValueError("Number of trees and tables must be the same.")
if weights is None:
weights = tuple(1 for _ in phylogenies)
else:
if len(weights) != len(phylogenies):
raise ValueError("Number of weights does not match number of "
"trees and tables.")
if method is None:
raise ValueError("No method specified.")
method_ = METHODS.get(method.replace('-', '_'))
if method_ is None:
raise ValueError("Method (%s) unrecognized. Available methods are: %s"
% (method, ', '.join(METHODS.keys())))
if consolidation is None:
raise ValueError("No consolidation specified.")
consolidation_ = CONSOLIDATIONS.get(consolidation.replace('-', '_'))
if consolidation_ is None:
raise ValueError("Consolidation (%s) unrecognized. Available "
"consolidations are: %s"
% (consolidation, ', '.join(CONSOLIDATIONS.keys())))
if alpha is not None and method is not generalized:
raise ValueError("The alpha parameter can only be set when the method "
"is set as 'generalized', the selected method is "
"'%s'." % method)
kwargs = {'threads': threads, 'variance_adjusted': variance_adjusted}
if alpha is not None:
kwargs['alpha'] = alpha
weights = np.array(weights, float)/sum(weights)
dms = [method_(table, tree, **kwargs) for table, tree in zip(tables,
phylogenies)]
all_ids = sorted(reduce(or_, [set(dm.ids) for dm in dms]))
dm = consolidation_(dms, [dm.ids for dm in dms], weights, all_ids)
return skbio.DistanceMatrix(dm, ids=all_ids)