In [None]:
import sys
import matplotlib.pyplot as plt
import numpy as np
import random
import os
dataset_path = os.path.join(os.getcwd(), "dataset")
sys.path.insert(0, dataset_path)
distance_based_clustering_path = os.path.join(os.getcwd(), "distance_based_clustering")
sys.path.insert(1, distance_based_clustering_path)
evaluate_path = os.path.join(os.getcwd(), "evaluate")
sys.path.insert(2, evaluate_path)
clustering_path = os.path.join(os.getcwd(), "distance_based_clustering", "clustering")
sys.path.insert(3, clustering_path)
import dbm
import dataset_yeast_mini
import fungi_mini
import fungi
import fungi_large
import vertebrates_large
import gapped
from column_distance import *
from distance_based_clustering.clustering import hierarchical_clustering
from distance_based_clustering.clustering import kmeans_self_defined_dist
from distance_based_clustering.clustering import kmeans_hier_init
from distance_based_clustering.distance_matrix_calculation import pearson_distance
from distance_based_clustering.distance_matrix_calculation import jensen_distance
from distance_based_clustering.distance_matrix_calculation import dist_from_col
from evaluate import ari_score
from evaluate import fmi_score
from evaluate import nmi_score
from evaluate import graph

In [None]:
# Helper Functions


def assign_unique_numbers_to_values(dataset):
    """
    Assign unique numbers to the values in the dataset dictionary.
    If two values are the same, they will be given the same number.

    :param dataset: dict, a dictionary with values that need unique numbers assigned.
    :return: dict, a dictionary with values replaced by unique numbers.
    """
    unique_values = set(dataset.values())  # Get the unique values
    value_to_number = {
        value: idx for idx, value in enumerate(unique_values)
    }  # Map each unique value to a number

    return {key: value_to_number[value] for key, value in dataset.items()}


def generate_labels_from_clusters(clusters):
    """
    Generate a dictionary of labels based on cluster membership.
    Items in the same list receive the same label, starting from 0.

    :param clusters: list of lists, where each sublist represents a cluster.
    :return: dict, a dictionary with items as keys and their respective labels as values.
    """
    label_dict = {}
    for label, cluster in enumerate(clusters):
        for item in cluster:
            label_dict[item] = label
    return label_dict


def create_cluster_dicts(clusters, dataset):
    """
    Create a list of dictionaries for each cluster.

    :param clusters: A list of clusters, each cluster being a list of item keys.
    :param dataset: The dataset containing the items and their corresponding data.
    :return: A list of dictionaries, each dictionary representing a cluster.
    """
    our_cluster = []
    for cluster in clusters:
        cluster_dict = {}
        for item in cluster:
            cluster_dict[item] = dataset.mmm[item]
        our_cluster.append(cluster_dict)
    return our_cluster

def create_random_subdatasets(original_dataset, subdataset_sizes, retain_cluster_structure=False):
    # Flatten the original dataset
    flattened_dataset = [item for cluster in original_dataset for item in cluster]

    subdatasets = []

    for size in subdataset_sizes:
        if size > len(flattened_dataset):
            raise ValueError(f"Requested subdataset size {size} is larger than the total number of elements.")

        # Randomly select elements for the subdataset
        subdataset = random.sample(flattened_dataset, size)

        # Reshape into the original format, if required
        if retain_cluster_structure:
            reshaped_subdataset = []
            temp = subdataset.copy()
            for cluster in original_dataset:
                cluster_size = len(cluster)
                reshaped_subdataset.append(temp[:cluster_size])
                temp = temp[cluster_size:]
            subdataset = reshaped_subdataset

        subdatasets.append(subdataset)

    return subdatasets

import numpy as np
import os


class TreeNode(object):
    """
    Data structure for the hierarchical tree. 
    This object is either an inner node or a leaf. 
    If it is an inner node:
    both left and right attributes should be another treenode object.
    height > 0. name should be "inner#". mn (member number)
    If it is a leaf:
    Both left and right attribute should be None.
    Height = 0. name should be a specific motif id. 
    mn = 1
    """
    def __init__(self, name, left_child, right_child, height, mn):
        self.left = left_child
        self.right = right_child
        self.name = name
        self.mn = mn
        self.height = height
        # if this is an innernode: 
        if self.left != None: 
            assert self.height > 0
            assert self.left.height <= self.height
            assert self.right.height <= self.height


# This algorithm will first build a tree represented by class "TreeNode", then traverse the treenode
# for heights from highest to the lowest until it finds k-1 nodes. After neglecting these top n-1 nodes, 
# the rest tree nodes just represents how to group the motifs into clusters. 

def wrap_dm(idlist, dm):
    """
    helper functions to first wrap the motif idlist into a leaf list, and to convert the distance matrix into a distance map. 
    Precondition: 
        idlist: a list of motif ids. 
        dm: a np array representing the distance matrix. it should have a shape of len(idlist) * len(idlist)
    Returns:
        nodemap: a map of treenodes: treenode.name -> treenode
        distance_map: a map of treenode.name -> treenode.name -> distance
    """
    nodemap = {id : TreeNode(id, None, None, 0, 1) for id in idlist}
    distance_map = {}
    for i in range(len(idlist)):
        motif_id1 = idlist[i]
        id1_map = {}
        for j in range(len(idlist)):
            id1_map[idlist[j]] = dm[i][j]
        distance_map[motif_id1] = id1_map
    return nodemap, distance_map

def shortest(dm):
    """
    helper function that finds the shortest distance between two different nodes in a distance map.
    Returns the shortest distance as well as the two TreeNode.names
    Precondition:
        dm: a map of treenode.name -> treenode.name -> distance
    """
    ret = (None, None, np.inf)
    for key1 in dm.keys():
        for key2 in dm[key1].keys():
            if key1 != key2 and dm[key1][key2] < ret[2]:
                ret = [key1, key2, dm[key1][key2]]
    return ret

def tree_construction(dm, idlist, dc = "complete"):
    """
    Returns a TreeNode object denoting the current tree
    Precondition: 
        idlist: a list of motif ids. 
        dm: a np array representing the distance matrix. it should have a shape of len(idlist) * len(idlist)
        dc: the way for distance update method. Should be one of "complete", "single", "UPGMA", "WPGMA:
    """
    nodemap, dm = wrap_dm(idlist,dm)
    # need to build treenode for len(list(nodemap.keys()))-1 times
    for iter in range(len(list(nodemap.keys()))-1):
        m1, m2, sd = shortest(dm)
        
        # create the new node and update the nodemap
        new_node_name = iter
        left_node = nodemap[m1]
        right_node = nodemap[m2]
        lmn = left_node.mn
        rmn = right_node.mn
        mn = lmn + rmn
        height = sd / 2
        new_node = TreeNode(name = new_node_name, left_child = left_node, right_child = right_node, height = height, mn = mn)
        nodemap[new_node_name] = new_node 
        del nodemap[m1]
        del nodemap[m2]

        # update the distance matrix
        del dm[m1]
        del dm[m2]
        new_node_distances = {new_node_name : 0}
        for m in dm.keys():
            distance_l = dm[m][m1]
            distance_r = dm[m][m2]
            del dm[m][m1]
            del dm[m][m2]
            if dc == "single":
                new_distance = min(distance_l, distance_r)
            elif dc == "complete":
                new_distance = max(distance_l, distance_r)
            elif dc == "UPGMA":
                new_distance = (distance_l * lmn + distance_r * rmn ) / mn
            elif dc == "WPGMA":
                new_distance = (distance_l + distance_r) / 2
            dm[m][new_node_name] = new_distance
            new_node_distances[m] = new_distance
        dm[new_node_name] = new_node_distances
    assert len(list(nodemap.keys())) == 1
    return list(nodemap.values())[0]
            
        

def cut_tree(tree, nclusters):
    """
    Cut the given tree object to give nclusters
    Precondition: 
        tree is a TreeNode object
        nclusters is an integer >= 1
    """
    height_map = {tree.height : [tree]}
    remaining_steps = nclusters - 1
    while remaining_steps > 0:
        # find the highest node and split it into two child tree nodes
        # there might still be a need to handle the really rare case that actually two nodes are at the same height
        highest = max(list(height_map.keys()))
        highest_node = height_map[highest][0]
        if len(height_map[highest]) > 1:
            height_map[highest].pop(0)
        else:
            del height_map[highest]
        left_node = highest_node.left
        right_node = highest_node.right
        if left_node.height not in height_map: 
            height_map[left_node.height] = [left_node]
        else:
            height_map[left_node.height].append(left_node)
        if right_node.height not in height_map: 
            height_map[right_node.height] = [right_node]
        else:
            height_map[right_node.height].append(right_node)
        remaining_steps -= 1
    node_clusters = []
    for height in height_map.keys():
        node_clusters += height_map[height]
    assert len(node_clusters) == nclusters
    # now it is just to get the motif names from the TreeNode objects
    def degradenode(treenode):
        """
        helper function to degrade the nodes from the treeNode objects
        """
        if treenode.left == None:
            return [treenode.name]
        return degradenode(treenode.left) + degradenode(treenode.right)
    clusters = []
    for node in node_clusters:
        clusters.append(degradenode(node))
    return clusters
    

def hc(nclusters, dm, idlist, distance_update = "UPGMA"):
    """
    Grow a tree, then cut it!
    Preconditions:
        nclusters: an integer >= 1
        dm: np array for the distance matrix
        idlist: the motif ids
        distance_update: one of ["complete", "single", "UPGMA", "WPGMA"]
    """
    return cut_tree(tree_construction(dm, idlist, dc = distance_update), nclusters)
    

In [None]:
###############################################################################

# Dataset: Gapped vertebrates

dataset = gapped.DNABindingMotifs()
# dataset.mmm is a dictionary with format {"id": Bio.motifs.Motif object}
# where each motif matrix can be retrieved by motif.pssm

print(dataset.mmm)
print("Finish printing dataset \n")

dm, idlist = dist_from_col.calculate_distance_matrix(
    dataset, Pearson_CC_Distance, "expand", bg=[0.25, 0.25, 0.25, 0.25], average=np.mean
)
# dm, idlist = jensen_distance.calculate_distance_matrix(dataset)

dataset_cc = dataset.cc
print(dataset_cc)
print("Finish printing dataset cc \n")

# base_dir = "nature_clusters"
# generator = graph.MotifGraphGenerator(dataset_cc, base_dir)
# cluster_paths = generator.generate_cluster_graphs()
# generator.generate_final_composite_graph(cluster_paths)

# Convert dataset_cc into a list of lists, where each inner list contains the keys of the corresponding dictionary
dataset_cc_list = [[key for key in d] for d in dataset_cc]
print(dataset_cc_list)
print("Finish printing dataset cc list \n")

# K-means clustering
# One advantage of K-means is it can customize number of clusters needed
num_clusters = len(dataset_cc_list)  # Define the number of clusters
print(num_clusters)
print("Finish printing num_clusters \n")

clusters = hc(num_clusters, dm, idlist, distance_update="UPGMA")
print(clusters)
print("Finish printing result by K-means \n")

our_cluster = create_cluster_dicts(clusters, dataset)

print(our_cluster)
print("Finish printing our cluster \n")

# base_dir_our = "predicted_clusters"
# generator_our = graph.MotifGraphGenerator(our_cluster, base_dir_our)
# cluster_paths_our = generator_our.generate_cluster_graphs()
# generator_our.generate_final_composite_graph(cluster_paths_our)


# clusters = kmeans_self_defined_dist.kmeans_motifs(dm, idlist, num_clusters)


dataset_dic = dict(sorted(generate_labels_from_clusters(dataset_cc_list).items()))
print(dataset_dic)
print("Finish printing true labels \n")

clusters_dic = dict(sorted(generate_labels_from_clusters(clusters).items()))
print(clusters_dic)
print("Finish printing predicted labels \n")

true_labels = list(dataset_dic.values())
predicted_labels = list(clusters_dic.values())

score_ari = ari_score.evaluate_clustering(true_labels, predicted_labels)
score_fmi = fmi_score.evaluate_clustering(true_labels, predicted_labels)
score_nmi = nmi_score.evaluate_clustering(true_labels, predicted_labels)



In [None]:
# ################################################################################

# # Dataset: fungi_large

dataset2 = fungi_large.DNABindingMotifs()
# dataset2.mmm is a dictionary with format {"id": Bio.motifs.Motif object}

print(dataset2.mmm)
print("Finish printing dataset2 \n")

dm2, idlist2 = dist_from_col.calculate_distance_matrix(
    dataset2, Euclidean_Distance, "expand", bg=[0.27, 0.23, 0.23, 0.27], average=np.mean
)

dataset2_cc = dataset2.cc
print(dataset2_cc)
print("Finish printing dataset2 cc \n")

# base_dir2 = "nature_clusters2"
# generator2 = graph.MotifGraphGenerator(dataset2_cc, base_dir2)
# cluster_paths2 = generator2.generate_cluster_graphs()
# generator2.generate_final_composite_graph(cluster_paths2)

# Convert dataset2_cc into a list of lists, where each inner list contains the keys of the corresponding dictionary
dataset2_cc_list = [[key for key in d] for d in dataset2_cc]
print(dataset2_cc_list)
print("Finish printing dataset2 cc list \n")



In [None]:
# Hierarchical clustering
# clusters = hierarchical_clustering.hierarchical_clustering(5, dm, idlist)
# # resulting cluters after using clustering method
# print(clusters)

num_clusters2 = len(dataset2_cc_list)  # Define the number of clusters
print(num_clusters2)
print("Finish printing num_clusters2 \n")

#clusters2 = hierarchical_clustering.hierarchical_clustering(num_clusters2, dm2, idlist2)
clusters2 = kmeans_hier_init.kmeans_motifs(dm2, idlist2, num_clusters2)
print(clusters2)
print("Finish printing clustering result \n")
our_cluster2 = create_cluster_dicts(clusters2, dataset2)

# base_dir_our2 = "predicted_clusters2"
# generator_our2 = graph.MotifGraphGenerator(our_cluster2, base_dir_our2)
# cluster_paths_our2 = generator_our2.generate_cluster_graphs()
# generator_our2.generate_final_composite_graph(cluster_paths_our2)


dataset2_dic = dict(sorted(generate_labels_from_clusters(dataset2_cc_list).items()))
print(dataset2_dic)
print("Finish printing true labels \n")

clusters2_dic = dict(sorted(generate_labels_from_clusters(clusters2).items()))
print(clusters2_dic)
print("Finish printing predicted labels \n")

true_labels2 = list(dataset2_dic.values())
predicted_labels2 = list(clusters2_dic.values())

score_ari2 = ari_score.evaluate_clustering(true_labels2, predicted_labels2)
score_fmi2 = fmi_score.evaluate_clustering(true_labels2, predicted_labels2)
score_nmi2 = nmi_score.evaluate_clustering(true_labels2, predicted_labels2)

In [None]:
###############################################################################

Dataset: fungi

dataset3 = fungi.DNABindingMotifs()
# dataset3.mmm is a dictionary with format {"id": Bio.motifs.Motif object}

print(dataset3.mmm)
print("Finish printing dataset3 \n")

dm3, idlist3 = dist_from_col.calculate_distance_matrix(
    dataset3, Kullback_Leibler_Distance, "expand", bg=[0.27, 0.23, 0.23, 0.27], average=np.mean
)

dataset3_cc = dataset3.cc
print(dataset3_cc)
print("Finish printing dataset3 cc \n")

# base_dir3 = "nature_clusters3"
# generator3 = graph.MotifGraphGenerator(dataset3_cc, base_dir3)
# cluster_paths3 = generator3.generate_cluster_graphs()
# generator3.generate_final_composite_graph(cluster_paths3)

In [None]:


# Convert dataset3_cc into a list of lists, where each inner list contains the keys of the corresponding dictionary
dataset3_cc_list = [[key for key in d] for d in dataset3_cc]
print(dataset3_cc_list)
print("Finish printing dataset3 cc list \n")


num_clusters3 = len(dataset3_cc_list)  # Define the number of clusters
print(num_clusters3)
print("Finish printing num_clusters3 \n")

cluster3 = hc(num_clusters3, dm3, idlist3, distance_update="UPGMA")
print(cluster3)
print("Finish printing result by hierarchical clustering \n")


# cluster3 = kmeans_hier_init.kmeans_motifs(dm3, idlist3, num_clusters3)
# print(cluster3)
# print("Finish printing result by K means \n")

our_cluster3 = create_cluster_dicts(cluster3, dataset3)

# base_dir_our3 = "predicted_clusters3"
# generator_our3 = graph.MotifGraphGenerator(our_cluster3, base_dir_our3)
# cluster_paths_our3 = generator_our3.generate_cluster_graphs()
# generator_our3.generate_final_composite_graph(cluster_paths_our3)


dataset3_dic = dict(sorted(generate_labels_from_clusters(dataset3_cc_list).items()))
print(dataset3_dic)
print("Finish printing true labels \n")

clusters3_dic = dict(sorted(generate_labels_from_clusters(cluster3).items()))
print(clusters3_dic)
print("Finish printing predicted labels \n")

true_labels3 = list(dataset3_dic.values())
predicted_labels3 = list(clusters3_dic.values())

score_ari3 = ari_score.evaluate_clustering(true_labels3, predicted_labels3)
score_fmi3 = fmi_score.evaluate_clustering(true_labels3, predicted_labels3)
score_nmi3 = nmi_score.evaluate_clustering(true_labels3, predicted_labels3)
    



print(score_ari3)
print(score_fmi3)
print(score_nmi3)


In [None]:
###############################################################################

Dataset: vertebrates_large

dataset4 = vertebrates_large.DNABindingMotifs()
# dataset4.mmm is a dictionary with format {"id": Bio.motifs.Motif object}

print(dataset4.mmm)
print("Finish printing dataset4 \n")

dm4, idlist4 = dist_from_col.calculate_distance_matrix(
    dataset4, Euclidean_Distance, "expand", bg=[0.25, 0.25, 0.25, 0.25], average=np.mean
)

dataset4_cc = dataset4.cc
print(dataset4_cc)
print("Finish printing dataset4 cc \n")

# base_dir4 = "nature_clusters4"
# generator4 = graph.MotifGraphGenerator(dataset4_cc, base_dir4)
# cluster_paths4 = generator4.generate_cluster_graphs()
# generator4.generate_final_composite_graph(cluster_paths4)

In [None]:


# Convert dataset4_cc into a list of lists, where each inner list contains the keys of the corresponding dictionary
dataset4_cc_list = [[key for key in d] for d in dataset4_cc]
print(dataset4_cc_list)
print("Finish printing dataset4 cc list \n")


num_clusters4 = len(dataset4_cc_list)  # Define the number of clusters
print(num_clusters4)
print("Finish printing num_clusters4 \n")

# cluster4 = hierarchical_clustering.hierarchical_clustering(num_clusters4, dm4, idlist4)
# print(cluster4)
# print("Finish printing result by hierarchical clustering \n")


cluster4 = kmeans_hier_init.kmeans_motifs(dm4, idlist4, num_clusters4)
print(cluster4)
print("Finish printing result by K means \n")

our_cluster4 = create_cluster_dicts(cluster4, dataset4)

# base_dir_our3 = "predicted_clusters3"
# generator_our3 = graph.MotifGraphGenerator(our_cluster3, base_dir_our3)
# cluster_paths_our3 = generator_our3.generate_cluster_graphs()
# generator_our3.generate_final_composite_graph(cluster_paths_our3)


dataset4_dic = dict(sorted(generate_labels_from_clusters(dataset4_cc_list).items()))
print(dataset4_dic)
print("Finish printing true labels \n")

clusters4_dic = dict(sorted(generate_labels_from_clusters(cluster4).items()))
print(clusters4_dic)
print("Finish printing predicted labels \n")

true_labels4 = list(dataset4_dic.values())
predicted_labels4 = list(clusters4_dic.values())

score_ari4 = ari_score.evaluate_clustering(true_labels4, predicted_labels4)
score_fmi4 = fmi_score.evaluate_clustering(true_labels4, predicted_labels4)
score_nmi4 = nmi_score.evaluate_clustering(true_labels4, predicted_labels4)
    



print(score_ari4)
print(score_fmi4)
print(score_nmi4)
