# CSPB-3104 Programming Assignment 9



1) (5 points) Implement Kruskal's algorithm 

Input: An edge list with weights: [(0,1,1), (0,2,2),(1,2,1)]  
Output: A minimum spanning tree in the form of an edge list with weights: [(0, 1, 1), (1, 2, 1)] 

Note: Edge lists are lists of triples (i, j, w), with i < j, which represents an edge between nodes i and j with weight w.  Edges are undirected in this notebook, and you should always return edges in the form (i, j, w), where i < j. Make sure to sort your final edge list in natural order, ie (0, 2, 1) before (1,2,1), (0,1,0) before (0,2,0).

Hint: Look into Python's Set class


In [None]:
class UnionFind:
    def __init__(self, n):
        self.parent = list(range(n))  # each node is its own parent initially
        self.rank = [0] * n           # rank to keep the trees flat

    def find(self, u):
        if self.parent[u] != u:
            self.parent[u] = self.find(self.parent[u])  # path compression
        return self.parent[u]

    def union(self, u, v):
        root_u = self.find(u)
        root_v = self.find(v)
        
        if root_u != root_v:
            # union by rank to keep the tree balanced
            if self.rank[root_u] > self.rank[root_v]:
                self.parent[root_v] = root_u
            elif self.rank[root_u] < self.rank[root_v]:
                self.parent[root_u] = root_v
            else:
                self.parent[root_v] = root_u
                self.rank[root_u] += 1

In [7]:
def kruskal(edge_list):
    # sort the edges by weight in ascending order
    edge_list.sort(key=lambda x: x[2])
    
    # initialize Union-Find for cycle detection
    # find the number of nodes by finding the maximum node index
    nodes = set()
    for edge in edge_list:
        nodes.add(edge[0])
        nodes.add(edge[1])
    uf = UnionFind(len(nodes))
    
    mst_edges = []  # list to store the edges of the MST
    
    # iterate over the sorted edges and build the MST
    for u, v, w in edge_list:
        # ensure edges are in the form (i, j) where i < j
        if u > v:
            u, v = v, u
            
        # only add the edge if it doesn't form a cycle
        if uf.find(u) != uf.find(v):
            uf.union(u, v)
            mst_edges.append((u, v, w))
            
            # stop if added enough edges for an MST
            if len(mst_edges) == len(nodes) - 1:
                break
    
    # sort the final MST edges as specified
    mst_edges.sort()
    return mst_edges


2. (5 points) Implement Prim's algorithm

Input: An edge list with weights: [(0,1,1), (0,2,2),(1,2,1)]  
Output: A minimum spanning tree in the form of an edge list with weights: [(0, 1, 1), (1, 2, 1)] 

Note: Edge lists are lists of triples (i, j, w), with i < j, which represents an edge between nodes i and j with weight w.  Edges are undirected in this notebook, and you should always return edges in the form (i, j, w), where i < j. Make sure to sort your final edge list in natural order, ie (0, 2, 1) before (1,2,1), (0,1,0) before (0,2,0).

Hint: You can use heapq for the priority queue.

In [8]:
import heapq

def prim(edge_list):
    # convert the edge list into an adjacency list
    graph = {}
    for u, v, w in edge_list:
        if u not in graph:
            graph[u] = []
        if v not in graph:
            graph[v] = []
        graph[u].append((w, v))
        graph[v].append((w, u))

    # arbitrarily choose a starting node (node 0 in this case)
    start_node = 0
    mst_edges = []  # resulting MST edges
    visited = set()  # track visited nodes

    # priority queue (min-heap) for edges, initialized with the edges from start_node
    min_heap = []
    visited.add(start_node)

    # add all edges from the start node to the heap
    for edge in graph[start_node]:
        weight, neighbor = edge
        heapq.heappush(min_heap, (weight, start_node, neighbor))

    # if haven't visited all nodes, keep adding edges to the MST
    while min_heap and len(visited) < len(graph):
        weight, u, v = heapq.heappop(min_heap)
        
        # skip if the node is already in the MST
        if v in visited:
            continue

        # add the edge to the MST
        mst_edges.append((u, v, weight))
        visited.add(v)

        # add all edges from the newly added node to the heap
        for edge in graph[v]:
            next_weight, neighbor = edge
            if neighbor not in visited:
                heapq.heappush(min_heap, (next_weight, v, neighbor))

    # sort the edges in the MST as specified (i < j lexicographically)
    mst_edges = [(min(u, v), max(u, v), w) for u, v, w in mst_edges]
    mst_edges.sort()
    
    return mst_edges


3) (15 points)  Finding the most likely mutation tree

You're given a list of bacteria RNA fragments, all from related bacteria which have mutated into separate strains over time.  Your goal is to come up with the most likely sequence of mutations that led to this state of affairs.  

The chance that one bacteria mutated into another depends on the number of differences in their RNA strings. 
The more differences in their RNA strings, the more unlikely it is that the bacteria mutated into each other.  (In fact, exponentially more unlikely -- the probability that k locations changed at the same time is $2^{-k}$).

If we construct a fully connected graph whose nodes represent RNA fragments and each edge has weight $2^{-k}$, where k is the number of differences between RNA strings, then a spanning tree which *maximizes* the *product* of edge weights will be the __most likely mutation tree__.  (Each mututation is assumed to be independent, so the chance that all the mutations in the spanning tree happen is the product of their respective probabilities)

Write a function that takes a list of RNA fragments, constructs an edge list with weights, then returns the most likely mutation tree, along with its probability.  

Note: your algorithm should construct a graph and then run your implementation of Kruskal's algorithm on it.  The difficulty lies in determining the correct graph, so that a minimum sum spanning tree in your graph corresponds to a maximum product spanning tree in the graph described above.

Input: ["adad","adac","acad", "cdac","addd"]  
Output: ([('adad', 'adac', 0.5),
  ('adad', 'acad', 0.5),
  ('adad', 'addd', 0.5),
  ('adac', 'cdac', 0.5)],
 0.0625)

In [9]:
# Given a list of RNA fragments, returns a spanning tree which maximizes the probability of mutation

import math
from itertools import combinations

# helper function to calculate Hamming distance
def hamming_distance(s1, s2):
    return sum(c1 != c2 for c1, c2 in zip(s1, s2)) # counts number of differing characters between two strings of equal len

# kruskal's algo using Union-Find
class UnionFind:
    def __init__(self, n):
        self.parent = list(range(n))
        self.rank = [0] * n
    
    def find(self, u):
        if self.parent[u] != u:
            self.parent[u] = self.find(self.parent[u])  # path compression
        return self.parent[u]
    
    def union(self, u, v):
        root_u = self.find(u)
        root_v = self.find(v)
        
        if root_u != root_v:
            # union by rank to keep the tree balanced
            if self.rank[root_u] > self.rank[root_v]:
                self.parent[root_v] = root_u
            elif self.rank[root_u] < self.rank[root_v]:
                self.parent[root_u] = root_v
            else:
                self.parent[root_v] = root_u
                self.rank[root_u] += 1

# mutation tree function
def mutation_tree(rna_fragments):
    n = len(rna_fragments)
    edges = []
    
    # construct the edge list with transformed weights
    for (i, frag1), (j, frag2) in combinations(enumerate(rna_fragments), 2):
        k = hamming_distance(frag1, frag2)
        weight = 2 ** -k  # probability weight
        edges.append((k, i, j, weight))  # using Hamming distance k as transformed weight
    
    # sort edges by the hamming distance for kruskal's algo (minimizing k)
    edges.sort()

    # initialize UF and the MST
    uf = UnionFind(n)
    mst_edges = []
    mst_probability = 1.0

    # run Kruskal's algo, construct the MST
    for k, i, j, prob in edges:
        if uf.find(i) != uf.find(j):
            uf.union(i, j)
            mst_edges.append((rna_fragments[i], rna_fragments[j], prob))
            mst_probability *= prob  # multiply the original probability

            # stop if added enough edges for an MST
            if len(mst_edges) == n - 1:
                break

    return mst_edges, mst_probability


Testing below

----

In [10]:
## DO NOT EDIT TESTING CODE FOR YOUR ANSWER ABOVE
# Press shift enter to test your code. Ensure that your code has been saved first by pressing shift+enter on the previous cell.
from IPython.core.display import display, HTML
def kruskal_test():
    failed = False
    test_cases = [ 
        ([(0,1,1), (0,2,2),(1,2,1)], [(0, 1, 1), (1, 2, 1)]),
        ([(0,1,2), (0,4,1), (1,2,1), (1,4,2), (2,3,1), (3,4,1)], 
         [(0, 4, 1), (1, 2, 1), (2, 3, 1), (3, 4, 1)]),
        ([(0,1,1), (0,2,2), (0,3,1), (1,4,1), (1,5,2), (2,4,2), 
          (2,6,2), (3,5,2), (3,6,1), (4,7,2), (5,7,2), (6,7,1)], 
          [(0, 1, 1), (0, 2, 2), (0, 3, 1), (1, 4, 1), (1, 5, 2), (3, 6, 1), (6, 7, 1)]),
        ([(0,1,2), (0,2,2), (0,3,1), (1,4,1), (1,5,1), (2,4,2), 
          (2,6,1), (3,5,2), (3,6,2), (4,7,2), (5,7,2), (6,7,1)], 
         [(0, 1, 2), (0, 2, 2), (0, 3, 1), (1, 4, 1), (1, 5, 1), (2, 6, 1), (6, 7, 1)]) 
    ]
    for (test_graph, solution) in test_cases:
        output = kruskal(test_graph)
        if (solution != output):
            s1 = '<font color=\"red\"> Failed - test case: Inputs: graph =' + str(test_graph) + "<br>"
            s2 = '  <b> Expected Output: </b> ' + str(solution) + ' Your code output: ' + str(output)+ "<br>"
            display(HTML(s1+s2))
            failed = True
            
    if failed:
        display(HTML('<font color="red"> One or more tests failed. </font>'))
    else:
        display(HTML('<font color="green"> All tests succeeded! </font>'))
kruskal_test()

In [11]:
## DO NOT EDIT TESTING CODE FOR YOUR ANSWER ABOVE
# Press shift enter to test your code. Ensure that your code has been saved first by pressing shift+enter on the previous cell.
from IPython.core.display import display, HTML
def prim_test():
    failed = False
    test_cases = [ 
        ([(0,1,1), (0,2,2),(1,2,1)], [(0, 1, 1), (1, 2, 1)]),
        ([(0,1,2), (0,4,1), (1,2,1), (1,4,2), (2,3,1), (3,4,1)], 
         [(0, 4, 1), (1, 2, 1), (2, 3, 1), (3, 4, 1)]),
        ([(0,1,1), (0,2,2), (0,3,1), (1,4,1), (1,5,2), (2,4,2), 
          (2,6,2), (3,5,2), (3,6,1), (4,7,2), (5,7,2), (6,7,1)], 
          [(0, 1, 1), (0, 2, 2), (0, 3, 1), (1, 4, 1), (1, 5, 2), (3, 6, 1), (6, 7, 1)]),
        ([(0,1,2), (0,2,2), (0,3,1), (1,4,1), (1,5,1), (2,4,2), 
          (2,6,1), (3,5,2), (3,6,2), (4,7,2), (5,7,2), (6,7,1)], 
         [(0, 1, 2), (0, 2, 2), (0, 3, 1), (1, 4, 1), (1, 5, 1), (2, 6, 1), (6, 7, 1)]) 
    ]
    for (test_graph, solution) in test_cases:
        output = prim(test_graph)
        if (solution != output):
            s1 = '<font color=\"red\"> Failed - test case: Inputs: graph =' + str(test_graph) + "<br>"
            s2 = '  <b> Expected Output: </b> ' + str(solution) + ' Your code output: ' + str(output)+ "<br>"
            display(HTML(s1+s2))
            failed = True
            
    if failed:
        display(HTML('<font color="red"> One or more tests failed. </font>'))
    else:
        display(HTML('<font color="green"> All tests succeeded! </font>'))
prim_test()

In [12]:
## DO NOT EDIT TESTING CODE FOR YOUR ANSWER ABOVE
# Press shift enter to test your code. Ensure that your code has been saved first by pressing shift+enter on the previous cell.
from IPython.core.display import display, HTML
def mutation_test():
    failed = False
    test_cases = [ 
        (["TAT", "CAT", "CAC"],([('TAT', 'CAT', 0.5), ('CAT', 'CAC', 0.5)], 0.25)),
        (["ACATA", "ATCTA", "GTCTA", "GTATA", "GCATA"], 
        ([('ACATA', 'GCATA', 0.5), ('ATCTA', 'GTCTA', 0.5), ('GTCTA', 'GTATA', 0.5), ('GTATA', 'GCATA', 0.5)], 0.0625)),
        (["GATTACA", "CGACTCA", "CATTACA", "CGACATA", "CGTTACA", "CGACACA", "CATTACG", "CGATACA"], 
         ([('GATTACA', 'CATTACA', 0.5), ('CGACTCA', 'CGACACA', 0.5), ('CATTACA', 'CGTTACA', 0.5), 
           ('CATTACA', 'CATTACG', 0.5), ('CGACATA', 'CGACACA', 0.5), ('CGTTACA', 'CGATACA', 0.5), ('CGACACA', 'CGATACA', 0.5)], 0.0078125)),
        (["CATTACA", "GATTACA", "CTTTACA", "CTGGTGA", "CTGTACA", "CTGGTCA", "CTGGTGC", "CTGGACA"], 
        ([('CATTACA', 'GATTACA', 0.5), ('CATTACA', 'CTTTACA', 0.5), ('CTTTACA', 'CTGTACA', 0.5), 
          ('CTGGTGA', 'CTGGTCA', 0.5), ('CTGGTGA', 'CTGGTGC', 0.5), ('CTGTACA', 'CTGGACA', 0.5), ('CTGGTCA', 'CTGGACA', 0.5)], 0.0078125))
    ]
    for (test_graph, solution) in test_cases:
        output = mutation_tree(test_graph)
        if (solution != output):
            s1 = '<font color=\"red\"> Failed - test case: Inputs: graph =' + str(test_graph) + "<br>"
            s2 = '  <b> Expected Output: </b> ' + str(solution) + ' Your code output: ' + str(output)+ "<br>"
            display(HTML(s1+s2))
            failed = True
            
    if failed:
        display(HTML('<font color="red"> One or more tests failed. </font>'))
    else:
        display(HTML('<font color="green"> All tests succeeded! </font>'))
mutation_test()