This repository has been archived by the owner on Nov 9, 2023. It is now read-only.
/
make_phylogeny.py
227 lines (190 loc) · 8.58 KB
/
make_phylogeny.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/env python
__author__ = "Justin Kuczynski"
__copyright__ = "Copyright 2011, The QIIME Project"
__credits__ = ["Rob Knight", "Justin Kuczynski", "Daniel McDonald",
"Jai Ram Rideout"]
__license__ = "GPL"
__version__ = "1.6.0"
__maintainer__ = "Justin Kuczynski"
__email__ = "justinak@gmail.com"
__status__ = "Release"
"""Contains code for making a phylogeny from aligned sequences, using several techniques.
This module has the responsibility for taking a set of sequences and
returning an alignment. Mostly, it will be thin wrappers for code
already in cogent.app.*, to which wrappers for e.g. NAST need to be
added..
"""
from cogent import LoadSeqs, DNA
from cogent.parse.fasta import MinimalFastaParser
from qiime.util import FunctionWithParams
#app controllers that implement align_unaligned_seqs
import cogent.app.muscle_v38
import cogent.app.clustalw
import cogent.app.mafft
import cogent.app.raxml_v730
import cogent.app.fasttree
import cogent.app.fasttree_v1
import cogent.app.clearcut
class TreeBuilder(FunctionWithParams):
"""A TreeBuilder takes a aligned set of sequences and returns a tree.
This is an abstract class: subclasses should implement the __call__
method.
Note: sequence ids should be preserved during this process, i.e. the
description lines should be saved/restored if the phylogeny app is
destructive to them.
Specific wrappers need to be written for nontraditional approaches,
including:
(a) RAxML assignment, where sequences are assigned to internal nodes of
an existing tree
(b) BLAST-based assignment, where sequences are assigned to existing
nodes of a tree based on best blast hit (assigned only to terminal
nodes if using a single hit, but could easily imagine assigning to
an internal node based on a set of indistinguishably good hits).
(c) BLAST-like approach using VMATCH or other suffix array library, or
using oligonucleotide freqs like the RDP classifier does to assign
to an arbitrary internal node in an existing tree, etc.
"""
Name = 'TreeBuilder'
def __init__(self, params):
"""Return new TreeBuilder object with specified params.
Note: expect params to contain both generic and per-method (e.g. for
raxml vs. fasttree vs. whatever) params, so leaving it as a dict
rather than setting attributes. Some standard entries in params are:
Application: 3rd-party application used, if any, e.g. raxml
"""
self.Params = params
def __call__ (self, aln_path, result_path=None, log_path=None):
"""Returns tree from alignment.
Parameters:
aln_path: path to file of aligned sequences
result_path: path to file of results. If specified, should
dump the result to the desired path as fasta, otherwise should
return cogent.core.alignment.DenseAlignment object.
log_path: path to log, which should include dump of params.
"""
raise NotImplementedError, "TreeBuilder is an abstract class"
class CogentTreeBuilder(TreeBuilder):
"""Generic tree builder using Cogent tree methods."""
Name = 'CogentTreeBuilder'
def getResult(self, aln_path, *args, **kwargs):
"""Returns alignment from sequences.
Currently does not allow parameter tuning of program and uses
default parameters -- this is bad and should be fixed.
#TODO: allow command-line access to important aln params.
"""
module = self.Params['Module']
# standard qiime says we just consider the first word as the unique ID
# the rest of the defline of the fasta alignment often doesn't match
# the otu names in the otu table
seqs = LoadSeqs(aln_path, Aligned=True, label_to_name=lambda x: x.split()[0])
result = module.build_tree_from_alignment(seqs, moltype=DNA)
try:
root_method = kwargs['root_method']
if root_method == 'midpoint':
result = root_midpt(result)
elif root_method == 'tree_method_default':
pass
except KeyError:
pass
return result
def __call__(self, result_path=None, log_path=None, *args, **kwargs):
"""Calls superclass method to align seqs"""
return FunctionWithParams.__call__(self, result_path=result_path,
log_path=log_path, *args, **kwargs)
tree_method_constructors = {}
tree_module_names = {'muscle':cogent.app.muscle_v38,
'clustalw':cogent.app.clustalw,
#'mafft':cogent.app.mafft,
# current version of Mafft does not support tree building
'fasttree':cogent.app.fasttree,'fasttree_v1':cogent.app.fasttree_v1,
'raxml_v730':cogent.app.raxml_v730,'clearcut':cogent.app.clearcut
}
#def maxTipTipDistance(tree):
# """returns the max distance between any pair of tips
#
# Also returns the tip names that it is between as a tuple"""
# distmtx, tip_order = tree.tipToTipDistances()
# idx_max = divmod(distmtx.argmax(),distmtx.shape[1])
# max_pair = (tip_order[idx_max[0]].Name, tip_order[idx_max[1]].Name)
# return distmtx[idx_max], max_pair
#
#def decorate_max_tip_to_tip_distance(self):
# """Propagate tip distance information up the tree
#
# This method was originally implemented by Julia Goodrich with the intent
# of being able to determine max tip to tip distances between nodes on large
# trees efficiently. The code has been modified to track the specific tips
# the distance is between
# """
# for n in self.postorder():
# if n.isTip():
# n.MaxDistTips = [(0.0, n.Name), (0.0, n.Name)]
# else:
# n.MaxDistTips = [max(c.MaxDistTips) for c in n.Children][:2]
#
# max_dist = 0.0
# max_names = [None, None]
# for n in tree.nontips():
# tip_a, tip_b = n.MaxTipsTips
# dist = tip_a[0] + tip_b[0]
# if dist > max_dist:
# max_dist = dist
# max_names = [tip_a[1], tip_b[1]]
#
# return max_dist, max_names
def root_midpt(tree):
""" this was instead of PhyloNode.rootAtMidpoint(), which is slow and broke
this should be deprecated in a future release once the release version
of PyCogent's tree.rootAtMidpoint() is identical to this function
this fn doesn't preserve the internal node naming or structure,
but does keep tip to tip distances correct. uses unrootedDeepcopy()
"""
#max_dist, tip_names, int_node = getMaxTipTipDistance(tree)
max_dist, tip_names, int_node = tree.getMaxTipTipDistance()
half_max_dist = max_dist/2.0
if max_dist == 0.0: # only pathological cases with no lengths
return tree.unrootedDeepcopy()
tip1 = tree.getNodeMatchingName(tip_names[0])
tip2 = tree.getNodeMatchingName(tip_names[1])
lca = tree.getConnectingNode(tip_names[0],tip_names[1]) # last comm ancestor
if tip1.distance(lca) > half_max_dist:
climb_node = tip1
else:
climb_node = tip2
dist_climbed = 0.0
while dist_climbed + climb_node.Length < half_max_dist:
dist_climbed += climb_node.Length
climb_node = climb_node.Parent
# now midpt is either at on the branch to climb_node's parent
# or midpt is at climb_node's parent
# print dist_climbed, half_max_dist, 'dists cl hamax'
if dist_climbed + climb_node.Length == half_max_dist:
# climb to midpoint spot
climb_node = climb_node.Parent
if climb_node.isTip():
raise RuntimeError('error trying to root tree at tip')
else:
# print climb_node.Name, 'clmb node'
return climb_node.unrootedDeepcopy()
else:
# make a new node on climb_node's branch to its parent
tmp_node_name = "TMP_ROOT_NODE_NAME"
parent = climb_node.Parent
parent.removeNode(climb_node)
climb_node.Parent = None
new_node = parent.__class__()
new_node.Name = tmp_node_name
# adjust branch lengths
old_br_len = climb_node.Length
climb_node.Length = half_max_dist - dist_climbed
new_node.Length = old_br_len - climb_node.Length
if climb_node.Length < 0.0 or new_node.Length < 0.0:
raise RuntimeError('attempting to create a negative branch length!')
# re-attach tree
parent.append(new_node)
new_node.append(climb_node)
# reroot and remove the temporary node name
new_tree = tree.rootedAt(tmp_node_name)
new_root = new_tree.getNodeMatchingName(tmp_node_name)
new_root.Name = None
return new_tree