In [1]:
%matplotlib inline

In [2]:
import numpy as np
import random
import ete3

from collections import Counter
from scipy import stats
import glob

from Bio import Phylo
from io import StringIO

In [3]:
out_dir = '../Data/'

# Uniform tree sampling

**First the functions**

In [4]:
def build_dicts(n):
    ###The dictionary that will hold the number of possible trees with a given number of leaves
    n_trees_dict = {}
    ###It has a few quirks that need to be added manually
    n_trees_dict[np.nan] = np.nan
    n_trees_dict[1] = 1
    
    ###Establishing the arrays that hold the number of downstream children on the left and right
    ###sides of a bifurcation
    l_side = [1]
    r_side = [np.nan]
    ###A pointer dictionary that tracks where in those arrays trees with a given number of leaves reside
    pointer_dict = {}
    pointer_dict[1] = (0, 0) #i.e. the first index contains trees related to 1 downstream leaf
    
    ###Build up the quantities
    for tree in range(2, n+1):
        starting_index = len(l_side)
        range_val = tree//2+1
        totals = []
        for i in range(1, range_val):
            j = tree-i
            l_side.append(j)
            r_side.append(i)
            if i!=j:
                totals.append(n_trees_dict[i]*n_trees_dict[j])
            else:
                totals.append((((n_trees_dict[i]**2)-n_trees_dict[i])//2)+n_trees_dict[i])
        ending_index = len(l_side)-1
        totals = sum(totals)
        n_trees_dict[tree] = totals
        pointer_dict[tree] = (starting_index, ending_index)
    return l_side, r_side, n_trees_dict, pointer_dict 


def build_tree_full(lside_arr, rside_arr, n_trees_dict, pointer_dict, n_leaves):
    '''
    Maybe this is where my recursive algorithm could/should lie?
    '''
    #Start by first looking at the end of the df
    n_trees = n_trees_dict[n]
    #Establishing the index of the tree that I want to grab
    tree_index = random.randint(0, n_trees-1)
    #Getting the number of descendants and index within that set
    l_children, r_children, n_within = select_index(pointer_dict,\
                                                    lside_arr, rside_arr, n_trees_dict, tree_index, n)

    l_index, r_index = select_descendants(l_children, r_children, n_within, n_trees_dict)
    
    l_l_children, l_r_children, l_n_within = select_index(pointer_dict,\
                                                lside_arr, rside_arr, n_trees_dict, l_index, l_children)
    
    r_l_children, r_r_children, r_n_within = select_index(pointer_dict,\
                                                lside_arr, rside_arr, n_trees_dict, r_index, r_children)

    my_tree = ete3.Tree()
    l_split = my_tree.add_child(name=(l_l_children, l_r_children, l_n_within)) 
    r_split = my_tree.add_child(name=(r_l_children, r_r_children, r_n_within)) 
    my_tree = build_recursive_full(my_tree, my_tree, lside_arr, rside_arr, n_trees_dict, pointer_dict)
    return my_tree

def select_index(pointer_dict, lside_arr, rside_arr, n_trees_dict, tree_index, n):
    """
    The real problem is that I'm generating these values at each call
    but like if I don't, I'll have to store them and for large n (>100)
    this gets positively enormous/prohibitive.
    
    So instead, I generate this on the fly each time around? 
    """
    if n == 1:
        return np.nan, np.nan, 0
    lside_temp = np.array(lside_arr[pointer_dict[n][0]:pointer_dict[n][1]+1], dtype=object)
    rside_temp = np.array(rside_arr[pointer_dict[n][0]:pointer_dict[n][1]+1], dtype=object)
    lside_trees = np.array([n_trees_dict[i] for i in lside_temp], dtype=object)
    rside_trees = np.array([n_trees_dict[i] for i in rside_temp], dtype=object)
    multiplied = lside_trees*rside_trees
    multiplied[lside_temp==rside_temp] = 0
    triangularized = ((lside_trees**2 - lside_trees)//2)+lside_trees
    triangularized[lside_temp!=rside_temp] = 0
    combined = np.cumsum(multiplied + triangularized)

    line_index = np.argmax(combined>tree_index)
    if line_index == 0:
        index_within = tree_index
    else:
        index_within = tree_index - combined[line_index-1]
    l_children = lside_temp[line_index]
    r_children = rside_temp[line_index]
    return l_children, r_children, index_within


    
def select_descendants(l_children, r_children, index_within, n_trees_dict):
    """
    Technically not just selecting the square with this update...
    """
    if l_children != r_children:
        l_trees = n_trees_dict[l_children]
        r_trees = n_trees_dict[r_children]
        lside = (index_within)%l_trees
        rside = ((index_within)//l_trees)
    else:
        l_trees = n_trees_dict[l_children]
        lside, rside = triu_indices_AJH(index_within, l_trees)
    return lside, rside



def build_recursive_full(my_tree, node, lside_arr, rside_arr, n_trees_dict, pointer_dict):
    if node.children == []:
        return my_tree
    for child_node in node.children:
        if child_node.name != (np.nan, np.nan, 0):
            l_children, r_children, n_within = child_node.name
            
            l_index, r_index = select_descendants(l_children, r_children, n_within, n_trees_dict)
            
            l_l_children, l_r_children, l_n_within = select_index(pointer_dict,\
                                            lside_arr, rside_arr, n_trees_dict, l_index, l_children)
    
            r_l_children, r_r_children, r_n_within = select_index(pointer_dict,\
                                                lside_arr, rside_arr, n_trees_dict, r_index, r_children)

            l_split = child_node.add_child(name=(l_l_children, l_r_children, l_n_within)) 
            r_split = child_node.add_child(name=(r_l_children, r_r_children, r_n_within))     
            my_tree = build_recursive_full(my_tree, child_node,\
                                           lside_arr, rside_arr, n_trees_dict, pointer_dict)
    return my_tree




In [5]:
####This might actually work!
def triu_indices_AJH(i, mat_len):
    """
    This function gets the triangular indices from a square matrix (including the diagonal). 
    It was developed with a bit of guess and check and using some functions/solutions I found 
    online but it looks robust. Should be tested more extensively but my test against numpy triu_indices()
    works well (and this of course skips the matrix building step).
    
    Critical differences to some fxns found online were making sure it runs on integer math to handle
    enormous numbers.
    
    Relies on isqrt, which should be the python implementation in >3.8 but Newton's method works (slowly)
    
    Inputs:
        i - the linear index of interest
        m - the dimensions of one side of the square matrix
        
    Outputs
        row index
        column index
    """
    ii = (mat_len*(mat_len+1))//2-1-i
    K = (isqrt(8*ii+1)-1)//2
    row = mat_len-1-K
    return row, i - (mat_len*(mat_len-1)//2) + ((mat_len-row)*((mat_len-row)-1))//2


def triu_accuracy_test(max_n_to_test):
    """
    Tests the accuracy of my triangle indices test compared to numpy.
    The advantage of mine is not having to actually build the matrix and instead work directly 
    with the dimensions.
    
    Will run slowly so am arbitrarily limiting max_n_to_test to 200
    """
    if max_n_to_test > 100:
        max_n_to_test = 100
        print("Setting max variable to test to 100, test takes a long time otherwise.")
    for n in range(1, max_n_to_test+1):
        for k in range(((n**2-n)//2)+n):
            i, j = triu_indices_AJH(k, n)
            assert np.triu_indices(n, k=0)[0][k] == i
            assert np.triu_indices(n, k=0)[1][k] == j, print('###',np.triu_indices(n, k=0)[1][k], j)

def isqrt(n):
    """
    Integer square root. Need to upgrade to python 3.8 for math.isqrt()
    """
    x = n
    y = (x + 1) // 2
    while y < x:
        x = y
        y = (x + n // x) // 2
    return x

**Now run it**

In [6]:
n = 32
l_side, r_side, n_trees_dict, pointer_dict = build_dicts(n)
tree_list = []
for i in range(100):
    tree = build_tree_full(l_side, r_side, n_trees_dict, pointer_dict, n)
    tree.name = ''
    for node in tree.get_descendants():
        node.name = ''
    assert len(tree.get_leaves()) == n, len(tree.get_leaves())
    tree_list.append(tree.write('newick', format=100))

In [7]:
###This is just a simple test of uniformity to ensure everything is working as planned
# counter_dict = Counter(tree_list)
# print(len(counter_dict.values()))
# print(stats.chisquare(list(counter_dict.values())))

In [8]:
for i, tree in enumerate(tree_list):
    with open(out_dir+'{}_uniformTopology_{}.newick'.format(n, i), 'w') as outfile:
        outfile.write(tree)

# Birth-death tree sampling

In [9]:
def random_insert(listy, item):
    '''
    I preliminarily tested 2 options for inserting and randomizing new items:
    
    1.
    listy = list(range(1000))
    random_insert(listy, 'a')
    random_insert(listy, 'b')
    
    2.
    listy = list(range(1000))
    listy.extend(['a', 'b'])
    random.shuffle(listy)
    
    Obviously the former doesn't randomize the whole list but I found the implementation using random_insert 
    to be absurdly quicker. As long as the list is grown from an initially randomly shuffled I think that
    this would produce a fully random output. So this function is currently used below.
    
    '''
    listy.insert(random.randrange(len(listy)+1), item)

def get_bd_topology(n_extant_taxa):
    '''
    Docs
    
    Restrictions beta>=mu
    
    
    
    Algorithm notes:
    1. Code is definitely worse than O(n). But don't think that it's quadratic.
    2. Should perhaps test random insertion of the two new children to avoid the 
        re-shuffling step... but are two random insertions worse than one random re-shuffling?
    '''
    my_tree = ete3.Tree() # Instantiate an empty tree
    #Add two leaves to start the first bifurcation
    A = my_tree.add_child(name="", dist=0.) 
    B = my_tree.add_child(name="", dist=0.) 
    #And put those leaves in a list of who to draw from
    all_leaves = [A, B]
    random.shuffle(all_leaves)
    for i in range(n_extant_taxa-2):
        #Choose a random leaf, add two children to it
        choice = all_leaves[-1]
        new_A = choice.add_child(name="", dist=0.)
        new_B = choice.add_child(name="", dist=0.)
        #Remove the leaf and randomly add its children to our running list of extant
        all_leaves = all_leaves[:-1]
        random_insert(all_leaves, new_A)
        random_insert(all_leaves, new_B)
    return my_tree

def rotate_tree(my_tree):
    a, b = my_tree.get_children()
    if len(a.get_descendants()) < len(b.get_descendants()):
        my_tree.swap_children()
    for node in my_tree.get_descendants():
        if not node.is_leaf():
            a, b = node.get_children()
            if len(a.get_descendants()) < len(b.get_descendants()):
                node.swap_children()
    return my_tree

In [10]:
n = 32
tree_list = []
for i in range(100):
    tree = get_bd_topology(n)
    assert len(tree.get_leaves()) == n, len(tree.get_leaves())
    tree = rotate_tree(tree)
    assert len(tree.get_leaves()) == n, len(tree.get_leaves())
    tree_list.append(tree.write('newick', format=100))

In [11]:
for i, tree in enumerate(tree_list):
    with open(out_dir+'{}_bdTopology_{}.newick'.format(n, i), 'w') as outfile:
        outfile.write(tree)

# Test topologies

In [19]:
test_trees = []
for tree_file in glob.glob(out_dir+'*32_bdTopology*'):
    test_trees.append(ete3.Tree(tree_file, format=100))
test_tree_strings = [tree.write('newick', format=100) for tree in test_trees]
print(len(test_trees))

100


**Visualizing trees**

In [20]:
counter_dict = Counter(test_tree_strings)
unique_trees_str = list(counter_dict.keys())
if len(unique_trees_str) < 10:
    for newick in unique_trees_str:
        print(counter_dict[newick])
        Phylo.draw(Phylo.read(StringIO(newick), 'newick'))

**Ensure tree is bifurcating with no polytomies**

In [21]:
for tree in test_trees:
    assert set([len(tree.children)]+\
               [len(i.children) for i in tree.get_descendants() if i.is_leaf()==False]) == {2}