In [1]:
import numpy as np

# Построение филогенетических деревьев алгоритмом W(U)PGMA

In [2]:
def wupgma(dist, clust_names, key="w"):
    n = dist.shape[0]
    dist_dynamic = fill_dist_matrix(dist)
    clust_size = np.ones((n))
    clust_heights = np.zeros((n))
    clust_str = clust_names.copy()
    
    union_dist_i, union_dist_j = 0, 0
    i, j = 0, 0
    for _ in range(n - 1):
        i, j = np.unravel_index(dist_dynamic.argmin(), dist_dynamic.shape)
        
        if key == "w":
            union_dist_i, union_dist_j, dist_dynamic = \
              recount_dist_wupgma(i, j, 0.5, 0.5, dist_dynamic, clust_heights)
                
        elif key == "u":
            s = clust_size[i] + clust_size[j]
            coef_i = clust_size[i] / s
            coef_j = clust_size[j] / s
            union_dist_i, union_dist_j, dist_dynamic = \
              recount_dist_wupgma(i, j, coef_i, coef_j, dist_dynamic, clust_heights)
        
        clust_str[i] = "({}:{}, {}:{})".format(clust_str[i], 
                                               round(union_dist_i, 3), clust_str[j], round(union_dist_j, 3)) 
        clust_size[i] += 1
        clust_heights[i] += union_dist_i
        
    return clust_str[i]

In [3]:
def recount_dist_wupgma(i, j, coef_i, coef_j, dist, heights):
    dist_i = dist[i, j] / 2 - heights[i]
    dist_j = dist[i, j] / 2 - heights[j]
    
    for k in range(dist.shape[0]):
        if k != j and k != i:
            dist[i, k] = dist[k, i] = dist[k, i] * coef_i + dist[k, j] * coef_j
    
    for k in range(dist.shape[0]):
        dist[k, j] = dist[j, k] = np.inf
        
    return dist_i, dist_j, dist

# Построение филогенетических деревьев алгоритмом Neighbor Joining

In [4]:
def neighbor_joining(dist, clust_names):
    n = dist.shape[0]
    dist_dynamic = fill_dist_matrix(dist)
    clust_str = clust_names.copy()
    
    a, b = 0, 0
    for i in range(n - 2):
        a, b, dist_a, dist_b = minimize_dist(dist_dynamic, n - i)
        dist_dynamic = recount_dist_nj(dist_dynamic, a, b)
            
        if i != (n - 3):
            clust_str[a] = "({}:{}, {}:{})".format(clust_str[a], 
                                               round(dist_a, 3), clust_str[b], round(dist_b, 3))
        else:
            j, k = np.unravel_index(dist_dynamic.argmin(), dist_dynamic.shape) 
            c = j if a == k or b == k else k
            
            clust_str[a] = "({}:{}, {}:{}, {}:{})".format(clust_str[a], round(dist_a, 3), 
                                                          clust_str[b], round(dist_b, 3),
                                                          clust_str[c], round(dist_dynamic[j][k], 3))
            
    return clust_str[a]

In [5]:
# helpful functions for NJ
def count_sum_dists(dist):
    n = dist.shape[0]
    sum_dists = np.full((n,), np.inf)
    for i in range(n):
        sum_i = np.inf
        for j in range(n):
            if dist[i, j] != np.inf:
                sum_i = dist[i, j] if sum_i == np.inf else sum_i + dist[i, j]

        sum_dists[i] = sum_i
        
    return sum_dists

def minimize_dist(dist, N):
    n = dist.shape[0]
    sum_dists = count_sum_dists(dist)
    to_minimize = lambda x, y: dist[x, y] - (sum_dists[x] - dist[x, y] + sum_dists[y] - dist[x, y]) / (N - 2)
    
    a, b, d = 0, 0, np.inf
    for i in range(n):
        for j in range(i + 1, n):
            if dist[i, j] != np.inf and to_minimize(i, j) <= d:
                d = to_minimize(i, j)
                a = i
                b = j
                
    dist_a = 0.5 * (dist[a, b] + (sum_dists[a] - sum_dists[b]) / (N - 2))
    dist_b = 0.5 * (dist[a, b] + (sum_dists[b] - sum_dists[a]) / (N - 2))
    
    return a, b, dist_a, dist_b

def recount_dist_nj(dist, a, b):
    n = dist.shape[0]
    for k in range(n):
        if k != a and k != b:
            dist[a, k] = dist[k, a] = 0.5 * (dist[b, k] + dist[a, k] - dist[a, b])

    for k in range(n):
        dist[b, k] = dist[k, b] = np.inf

    return dist

In [6]:
def fill_dist_matrix(dist):
    new_m = dist.astype(float).copy()
    
    for i in range(dist.shape[0]):
        new_m[i, i] = np.inf
        for j in range(i):
            new_m[i, j] = new_m[j, i]
            
    return new_m

# Тесты 1-2

In [7]:
def test1():
    a = np.asarray([[0, 16, 16, 10],
                    [0, 0, 8, 8],
                    [0, 0, 0, 4],
                    [0, 0, 0, 0]])
    print("WPGMA: {}".format(wupgma(a, ["A", "B", "C", "D"], "w")))
    print("UPGMA: {}".format(wupgma(a, ["A", "B", "C", "D"], "u")))
    print("NJ: {}".format(neighbor_joining(a, ["A", "B", "C", "D"])))

In [8]:
test1()

WPGMA: (A:7.25, (B:4.0, (C:2.0, D:2.0):2.0):3.25)
UPGMA: (A:7.0, (B:4.0, (C:2.0, D:2.0):2.0):3.0)
NJ: (B:5.5, (C:3.5, D:0.5):0.5, A:10.5)


In [9]:
def test2():
    a = np.asarray([[0, 5, 4, 7, 6, 8],
                    [0, 0, 7, 10, 9, 11],
                    [0, 0, 0, 7, 6, 8],
                    [0, 0, 0, 0, 5, 9],
                    [0, 0, 0, 0, 0, 8],
                    [0, 0, 0, 0, 0, 0]])
    print("WPGMA: {}".format(wupgma(a, ["A", "B", "C", "D", "E", "F"], "w")))
    print("UPGMA: {}".format(wupgma(a, ["A", "B", "C", "D", "E", "F"], "u")))
    print("NJ: {}".format(neighbor_joining(a, ["A", "B", "C", "D", "E", "F"])))

In [10]:
test2()

WPGMA: ((((A:2.0, C:2.0):1.0, B:3.0):1.0, (D:2.5, E:2.5):1.5):0.5, F:4.5)
UPGMA: ((((A:2.0, C:2.0):1.0, B:3.0):0.75, (D:2.5, E:2.5):1.25):0.65, F:4.4)
NJ: (((A:1.0, B:4.0):1.0, C:2.0):1.0, (D:3.0, E:2.0):1.0, F:5.0)
