In [1]:
import random
import numpy as np
import itertools

In [2]:
def traverse(tree, nodes=None):

    # traverses tree and adds all nodes to a list
    if nodes is None:
        nodes = []

    if isinstance(tree, tuple) or isinstance(tree, list):
	# Left subtree
        traverse(tree[0], nodes)
	# Right subtree
        traverse(tree[1], nodes)
	
    else:
		# Reached edge node
        nodes.append(tree)
		
    return nodes

# def xor(str1, str2):
#     L = len(str1)
#     s = ''
#     for i in range(L):
#         if str1[i] != str2[i]:
#             s += str(1)
#         else:
#             s += str(0)
#     return s

def traverse_xor(tree, subtrees=None):

    if subtrees is None:
        subtrees = []

    # traverses the tree and adds all subtrees to a list
    if isinstance(tree, tuple) or isinstance(tree, list):
        if tree not in subtrees:
            subtrees.append(tree)
	# Left subtree
        traverse_xor(tree[0], subtrees)
	# Right subtree
        traverse_xor(tree[1], subtrees)
		
    return subtrees

# def tokenize(tree):
#     tree = str(tree)
#     return tree.replace('(', ' ( ').replace(')', ' ) ').replace('[', ' [ ').replace(']', ' ] ').replace(',', '').split()

def tree_xor(subtrees, bit_dict):
    vals = dict()
    for x in subtrees:
        if isinstance(x[0], int):
            val_1 = bit_dict[x[0]]
        else:
            val_1 = vals[x[0]]
        
        if isinstance(x[1], int):
            val_2 = bit_dict[x[1]]
        else:
            val_2 = vals[x[1]]
        vals[x] = val_1 ^ val_2

    new_vals = dict()
    for elem in vals:
        new_vals[vals[elem]] = 'in'

    return new_vals
        
def distance(tree1, tree2):
    """
    Returns distance between two phylogenetic trees

        Arguments:
            tree1 - first list of lists or tuple of tuples, with integer member elements
            tree2 - second list of lists or tuple of tuples, with integer member elements
        Returns:
            dist - integer distance between 'tree1' and 'tree2'
    """
    BITS = 32
    bit_dict = dict()

    # OLD XOR FUNCTION
    # for node in traverse(tree1):
    #     s = ''
    #     for i in range(BITS):
    #         n = random.randint(0, 1)
    #         s += str(n)
    #     bit_dict[node] = s

    for node in traverse(tree1):
        s = random.randint(0, 2**BITS - 1)
        bit_dict[node] = s
    
    subtrees1 = reversed(traverse_xor(tree1))
    edge_dict = tree_xor(subtrees1, bit_dict)

    for node in traverse(tree2):
        if node not in bit_dict:
            s = random.randint(0, 2**BITS - 1)
            bit_dict[node] = s
    
    subtrees2 = reversed(traverse_xor(tree2))
    edge_dict2 = tree_xor(subtrees2, bit_dict)
    
    dist = 0
    for elem in edge_dict:
        if elem not in edge_dict2:
            dist += 1

    return dist

In [3]:
T1 = (0, (1, (2, (3, 4))))
T2 = (0, (1, (3, (2, 4))))
bit_dict = {0: 15, 1: 14, 2: 12, 3: 8, 4: 0}
print(traverse_xor(T1))
print(traverse_xor(T2))
print(tree_xor(reversed(traverse_xor(T1)), bit_dict))
print(tree_xor(reversed(traverse_xor(T2)), bit_dict))

[(0, (1, (2, (3, 4)))), (1, (2, (3, 4))), (2, (3, 4)), (3, 4)]
[(0, (1, (3, (2, 4)))), (1, (3, (2, 4))), (3, (2, 4)), (2, 4)]
{8: 'in', 4: 'in', 10: 'in', 5: 'in'}
{12: 'in', 4: 'in', 10: 'in', 5: 'in'}


In [4]:
def tokenize(tree):
    tree = str(tree)
    tree = tree.replace('(', '').replace(')', '').replace(',', '').split()
    return [int(x) for x in tree]

def cluster(treeList, minClusters, maxClusters):
    """
    Return each clustering step from 'maxClusters' to 'minClusters' size from input 'treeList'

        Arguments:
            treeList - list of all trees to cluster
            minClusters - integer size of minimum number of clusters
            maxClusters - integer size of maximum number of clusters
        Returns:
            clusterings - list of clusters with information on numbers of trees, diameters (Robinson-Foulds distance), and average diameter
                [[[numbers of trees], [diameters], average diameter],   <- max number of clusters
                .
                .
                .
                [[numbers of trees], [diameters], average diameter]]   <- min number of clusters
    """
    # initial clustering -- each tree is its own cluster
    pairwise_dist = dict()
    L = len(treeList)

    cluster_list = range(L)
    
    cluster_dist = dict()
    transform = dict()
    clusterings = []
    
    for k in range(L - minClusters):
        cluster_dist = dict()
        for x in cluster_list:
            for y in cluster_list:
                if (x != y) and ((x, y) not in cluster_dist or (y, x) not in cluster_dist):
                    if x in transform:
                        x_new = transform[x]
                        Lx = len(x_new)
                    else:
                        x_new = x
                        Lx = 1
                    
                    if y in transform:
                        y_new = transform[y]
                        Ly = len(y_new)
                    else:
                        y_new = y
                        Ly = 1
                    dist = 0

                    if isinstance(x, tuple) and isinstance(y, tuple):
                        for ell in x_new:
                            for elm in y_new:
                                dist += distance(treeList[ell], treeList[elm])
                    
                    elif isinstance(x, int) and isinstance(y, int):
                        dist = distance(treeList[x], treeList[y])

                    elif isinstance(x, tuple) and isinstance(y, int):
                        for ell in x_new:
                            dist += distance(treeList[ell], treeList[y])

                    elif isinstance(y, tuple) and isinstance(x, int):
                        for elm in y_new:
                            dist += distance(treeList[elm], treeList[x])
                    
                    dist /= (Lx * Ly)
                    cluster_dist[(x, y)] = dist
                    cluster_dist[(y, x)] = dist

        pairwise_dist.update(cluster_dist)

        new_cluster = min(cluster_dist, key=cluster_dist.get)
        
        cluster_list.remove(new_cluster[0])
        cluster_list.remove(new_cluster[1])
        cluster_list.append(new_cluster)

        transform[new_cluster] = tokenize(new_cluster)

        if k >= L - maxClusters - 1:
            # res = [[number of trees], [diameters], average diameter]
            count_res = []
            diam_res = []
    
            for cluster in cluster_list:
                if not isinstance(cluster, int):
                    count_res.append(len(transform[cluster]))
                    trans = transform[cluster]
                    pairs = list(itertools.combinations(trans, 2))
                    diam = int(max(pairwise_dist[pair] for pair in pairs))
                    diam_res.append(diam)
                else:
                    count_res.append(1)
                    diam_res.append(0)

            avg_diam = sum(diam_res)/len(diam_res)

            res = [count_res, diam_res, avg_diam]
            clusterings.append(res)

    return clusterings

In [5]:
def specificity(cluster, threshold):
    """
    Calculate the specificity of 'cluster' given a target 'threshold'

        Arguments:
            cluster - list of trees
            threshold - float value of threshold
        Returns:
            spec - float specificity of 'cluster' given 'threshold'
    """
    BITS = 32
    edge_list = list()
    K = len(cluster)

    bit_dict = dict()
    for tree in cluster:
        for node in traverse(tree):
            if node not in bit_dict:
                s = random.randint(0, 2**BITS - 1)
                bit_dict[node] = s
        
        subtrees = reversed(traverse_xor(tree))
        edge_dict = tree_xor(subtrees, bit_dict)
        for i in edge_dict:
            edge_list.append(i)

    num_partitions = len(edge_list)/K

    edge_set = set()

    for edge in edge_list:
        if (edge_list.count(edge)/K) >= threshold:
            edge_set.add(edge)
    
    num_consensus = len(edge_set)
    spec = num_consensus/num_partitions

    return spec

In [None]:
## COALESCENT SIMULATION

trials = 1000
N = 500
klist = [2, 3, 4, 5]

def coalescent_simulation(N, k):
    gens_between_events = []
    
    for _ in range(1000):
        alleles = list(range(2 * N))
        generations = 0
        
        while len(alleles) > k:
            sample_parents = random.sample(alleles, k)  # sample parents uniformly at random with replacement
            alleles = list(set(alleles) - set(sample_parents))
            generations += 1

        gens_between_events.append(generations)
    
    return gens_between_events

for k in klist:
    results = coalescent_simulation(N, k)
    mean_generations = np.mean(results)
    std_dev = np.std(results)
    print(f"k = {k}:")
    print(f"mean gens between events: {mean_generations}")
    print(f"Standard deviation: {std_dev}\n")