In [None]:
# Copyright 2021, Battelle Energy Alliance, LLC

import os
import re
import json
from collections import namedtuple
import numpy as np
import pandas as pd
import copy
import time
from statistics import mode


# Change working directory if not the project directory
current_dir = os.getcwd()
folders = re.split('\/', current_dir)
if folders[len(folders)-1] == 'split':
    os.chdir(os.path.abspath(os.path.join('..')))

# Load environment variables from .env file    
!pip install python-dotenv
%load_ext dotenv
%dotenv
import settings
#%pwd

In [None]:
with open(os.getenv("ML_ADAPTER_OBJECT_LOCATION"), 'r') as fp:
    data = json.load(fp)

In [None]:
ProtoTuple = namedtuple('ProtoTuple', ['proto_dist', 'proto_index'])
CompareClusterProto = namedtuple('CompareClusterProto', ['min_index', 'proto'])

In [None]:
def distance_matrix(matrix: np.ndarray):
    """
    This function determines the euclidean distances of a matrix
    """
    from scipy import spatial
    dist_vector = spatial.distance.pdist(matrix, 'euclidean')
    square_distance_matrix = spatial.distance.squareform(dist_vector)
    return square_distance_matrix

In [None]:
class Cluster():
    """
    This class contains all of the information for a given cluster.

    Args:
        id (str): The unique identifier of the cluster.  As clusters are merged, the index increases in size.
        previous_link_id (list): This list contains the pair of previous clusters ids that were merged. When empty, there is no prior cluster (single point of information).
        next_link_id (list): This list contains the next cluster (filled in after applying hierarchical clustering).
        proto (ProtoTuple): This includes the proto_dist and proto_index.
        sample_indices (int list): A list of ints that describe which sample indicies are contained within this cluster.
    """
    # using slots to save time and memory
    __slots__ = ('id', 'previous_id', 'next_id', 'proto', 'sample_indices')

    def __init__(self) -> None:
        self.id = 1
        self.previous_id = []
        self.next_id = -1
        self.proto = None
        self.sample_indices = []

In [None]:
def minimax_dist(dist_matrix: np.ndarray, cluster_index: np.ndarray) -> ProtoTuple:
    """
    This function is used to determine the minimax distance between two different clusters.
    
    Args:
        dist_matrix (ndarray): This is an numpy array containing the distances between all samples.
        cluster_index (ndarray): A set of indices within the distance matrix.
    
    Return:
        minimax_dist (ProtoTuple): The minimax radius and prototype index as a list.
    """
    # create a matrix that is just the current samples
    sub_matrix = dist_matrix[cluster_index[:, None], cluster_index]
    # grab the sum per row
    sum_per_row = np.apply_along_axis(np.nansum, 0, sub_matrix)
    # determine the sample that is closest to all other samples
    min_index = np.nanargmin(sum_per_row)
    # find the radius determined by the maximum from the min centriod
    proto_dist = np.nanmax(sub_matrix[min_index])
    # since a sub_matrix was created, determine the proper index
    proto_index = cluster_index[min_index]
    minimax_dist = ProtoTuple(proto_dist, proto_index)

    return minimax_dist

In [None]:
def compare_clusters(dist_matrix:np.ndarray, cluster_list:list, top_of_stack:Cluster) -> CompareClusterProto:
    """
    This function is used to loop through each cluster to determine the cluster that is the closest cluster to the top of the stack.

    Args:
        dist_matrix (ndarray): This is an numpy array containing the distances between all samples.
        cluster_list (list): A list of cluster objects to compare tothe top of the stack in the nearest neighbor function.
        top_of_stack (Cluster): The current top of the stack for the nearest neighbor function.
    
    Return:
        CompareClusterProto (CompareClusterProto): The index of the closest cluster and prototype associated with the combination of the closest cluster and the top of the stack.

    """
    proto_dist = None
    min_index = None
    proto_index = None
    # looping over all clusters
    for i in cluster_list:
        # if the value of i is the top of the stack, then skip
        if top_of_stack.id == i.id:
            continue

        # getting the appropriate row indices for the top of the stack and the iterator i
        cluster_index = np.array(top_of_stack.sample_indices + i.sample_indices)
        single_dist = minimax_dist(dist_matrix, cluster_index)
        # The if portion initializes the proto dist and index based on the first value of i
        # The else portion checks to see if any iterations of i are closer to the top of the stack and replaces the proto index if closer to the top of the stack
        if proto_dist is None:
            proto_dist = single_dist.proto_dist
            proto_index = single_dist.proto_index
            min_index = i.id
        else:
            if proto_dist > single_dist.proto_dist:
                proto_dist = single_dist.proto_dist
                proto_index = single_dist.proto_index
                min_index = i.id

    return CompareClusterProto(min_index, ProtoTuple(proto_dist, proto_index))

In [None]:
def create_active_clusters(dist_matrix):
    """
        This function takes a distance matrix and creates an object
    """
    active_clusters = []
    for i in range(dist_matrix.shape[1]):
        new_obj = Cluster()
        current_sample = int(i)
        new_obj.id = current_sample
        new_obj.proto = ProtoTuple(0, current_sample)
        new_obj.sample_indices.append(current_sample)
        active_clusters.append(new_obj)

    return active_clusters

In [None]:
def nearest_neighbor(dist_matrix: np.ndarray):
    """

    This function is used to determine the minimax distance between two different clusters.

    From Wikipedia:
    Initialize the set of active clusters to consist of n one-point clusters, one for each input point.
    Let S be a stack data structure, initially empty, the elements of which will be active clusters.
    While there is more than one cluster in the set of clusters:
        If S is empty, choose an active cluster arbitrarily and push it onto S.
        Let C be the active cluster on the top of S. Compute the distances from C to all other clusters, and let D be the nearest other cluster.
        If D is already in S, it must be the immediate predecessor of C. Pop both clusters from S and merge them.
        Otherwise, if D is not already in S, push it onto S.

    Args:
        dist_matrix (ndarray): This is an numpy array containing the distances between all samples.

    
    Return:
        retired_clusters (Cluster list): A list of all of the clusters obtained by prototypical clustering.

    """
    # dist_matrix should be the distance within each different active cluster
    # create a set of active clusters for each sample in the dist_matrix
    # active_clusters is a list of Cluster
    active_clusters = create_active_clusters(dist_matrix)
    stack = []
    retired_clusters = []
    len_active_clusters = len(active_clusters)
    num_clusters = len(active_clusters)
    while len_active_clusters > 1:
        # grab the last active cluster
        if len(stack) == 0:
            stack.append(active_clusters[0])

        stack_ids = [i.id for i in stack]
        active_ids = [i.id for i in active_clusters]
        closest_cluster = compare_clusters(dist_matrix, active_clusters, stack[-1])
        chosen_cluster = active_clusters[int(np.where(np.array(active_ids) == closest_cluster.min_index)[0])]
        if chosen_cluster.id in stack_ids:
            # merge closest and last stack
            num_clusters += 1
            new_cluster = Cluster()
            new_cluster.id = num_clusters
            new_cluster.previous_id = [chosen_cluster.id, stack[-1].id]
            new_cluster.proto = closest_cluster.proto
            new_cluster.sample_indices = chosen_cluster.sample_indices + stack[-1].sample_indices
            # remove clusters from active_clusters and add merged
            active_clusters = [i for i in active_clusters if i.id not in new_cluster.previous_id]
            active_clusters.append(new_cluster)
            # remove clusters from stack
            stack[-1].next_id = num_clusters
            stack[-2].next_id = num_clusters
            retired_clusters.append(stack.pop())
            retired_clusters.append(stack.pop())
            len_active_clusters -= 1
        else:
            stack.append(chosen_cluster)

    retired_clusters.append(active_clusters[0])
    return retired_clusters

In [None]:
def create_dendrogram(retired_clusters):
    """
    Plots a dendrogram from the hierarchical clustering algorithm
    
    Args:
        retired_clusters (Cluster list): A list of all of the clusters obtained by prototypical clustering.
    """
    import networkx as nx
    import matplotlib.pyplot as plt

    G = nx.Graph()
    for clust in retired_clusters:
        G.add_node(clust.id)
    for clust in retired_clusters:
        if clust.previous_id:
            for prev in clust.previous_id:
                G.add_edge(prev, clust.id)
        if clust.next_id != -1:
            G.add_edge(clust.next_id, clust.id)
    ax1 = plt.subplot()
    nx.draw(G, with_labels=True, font_weight='bold')
    plt.show()

In [None]:
def get_clusters(retired_clusters: list, k_clusters: int):
    """
    This function cuts the tree to a specified number of clusters
    
    Args:
        retired_clusters (Cluster list): A list of all of the clusters obtained by prototypical clustering.
        k_clusters (int): the number of clusters desired
    
    Return:
        clusters (Cluster list): A list of chosen clusters after cutting of the tree 
    """
    clusters = list()
    cluster_options = list()
    while len(clusters) < k_clusters:
        # Delete last cluster in retired clusters
        retired_clust = retired_clusters.pop()
        # Add previous ids clusters to cluster options list
        for i in range(len(retired_clust.previous_id)):
            for j in reversed(range(len(retired_clusters))):
                if retired_clusters[j].id == retired_clust.previous_id[i]:
                    cluster_options.append(retired_clusters[j])
                    break

        # Pick the cluster with the max distance
        max_dist = None
        max_index = None
        for i in range(len(cluster_options)):
            dist = cluster_options[i].proto[0]
            # Initialize max_dist to first cluster's distance
            if max_dist == None:
                max_dist = dist
                max_index = i
            # Update max_dist when new distance is greater
            elif max_dist < dist:
                max_dist = dist
                max_index = i
        # Add max distance cluster to clusters list
        clusters.append(cluster_options[max_index])
        cluster_options.pop(max_index)
    return clusters

In [None]:
def prune_clusters(clusters: list):
    """
    This function prunes clusters to contain unique sample indices for every cluster (no repeats)
    
    Args:
        clusters (Cluster list): A list of chosen clusters after cutting of the tree 
    
    Return:
        pruned_clusters (Cluster list): A list of pruned clusters where each cluster has a unique set of sample indices
        prototype_ids (integer list): A list of row indices of the prototype (center point in a cluster)
    """
    pruned_clusters = list()
    # Order the clusters by id
    cluster_ids = dict()
    prototype_ids = list()
    count = 0
    for i in clusters:
        cluster_ids[count] = i.id
        count += 1
    cluster_ids = dict(sorted(cluster_ids.items(), key=lambda item: item[1]))
    ordered_clusters = list()
    for index in cluster_ids.keys():
        ordered_clusters.append(clusters[index])

    assigned_indices = list()
    for clust in ordered_clusters:
        # Create a pruned sample indices list filtering out repeated indices
        sample_indices = list()
        for index in clust.sample_indices:
            if index not in assigned_indices:
                sample_indices.append(index)
        if sample_indices:
            # Add indices to assigned indices list
            assigned_indices.extend(sample_indices)
            # Create cluster and add the cluster to a list
            cluster = Cluster()
            cluster.id = clust.id
            cluster.previous_id = clust.previous_id
            cluster.next_id = clust.next_id
            cluster.proto = clust.proto
            cluster.sample_indices = sample_indices
            pruned_clusters.append(cluster)
    for i in pruned_clusters:
        prototype_ids.append(i.proto[1])
    return pruned_clusters, prototype_ids

In [None]:
def assign_clusters(dataset: np.ndarray, sample_index: list, clusters: list):
    """
    This function assigns each row in the dataset to a cluster
    
    Args:
        dataset (ndarray): dataset to assign clusters to
        sample_index (integer list): a list of indices of the sample dataset used in the prototypical clustering
        clusters (Cluster list): A list of pruned clusters where each cluster has a unique set of sample indices
    
    Return:
        assignments (DataFrame): contains information about the cluster assigned to each row in the dataset
            columns: cluster_id, prototype_id, assigned_id
                cluster_id: the id of the Cluster object
                prototype_id: the row index of the prototype (center point in a cluster)
                assigned_id: assign a numerical id beginning at 0 ranging to the number of clusters
    """
    assignments = pd.DataFrame(columns=["cluster_id", "prototype_id", "assigned_id"])
    N_dataset = dataset.shape[0]
    p_dataset = dataset.shape[1]
    N_clusters = len(clusters)

    # Create a dictionary of cluster ids where the (key, value) is (cluster id, assigned id)
    cluster_ids = dict()
    for i in range(len(clusters)):
        cluster_ids[clusters[i].id] = i

    # Create arrays for each column
    cluster_id = np.zeros(N_dataset, dtype=np.int64)
    prototype_id = np.zeros(N_dataset, dtype=np.int64)
    assigned_id = np.zeros(N_dataset, dtype=np.int64)

    # Populate array with predetermined cluster assignments by prototypical clustering
    for clust in clusters:
        for index in clust.sample_indices:
            cluster_id[sample_index[index]] = clust.id
            prototype_id[sample_index[index]] = clust.proto[1]
            assigned_id[sample_index[index]] = cluster_ids[clust.id]

    # Create an array of each cluster's prototype row in the dataset
    prototypes = np.zeros((N_clusters, p_dataset))
    for i in range(N_clusters):
        prototype = dataset[sample_index[clusters[i].proto[1]], :]
        prototypes[i, :] = prototype

    # Determine the Euclidean distances between the prototypes and the unselected dataset
    distances = np.zeros((N_dataset, N_clusters))
    for i in range(N_clusters):
        # Repeat prototype row for entire length of dataset
        temp_distances = np.repeat(np.reshape(prototypes[i], (-1, p_dataset)), repeats=N_dataset, axis=0)
        # Determine the Euclidean distances e.g. (x - x1)^2
        temp_distances = np.power((temp_distances - dataset), 2)
        # Sum up the distances in a row of the distance matrix
        distances[:, i] = np.sum(temp_distances, axis=1)

    # Assign a prototype with the minumum distance to every row in the dataset
    minimum_distance = np.argmin(distances, axis=1)
    for i in range(N_dataset):
        if cluster_id[i] == 0 and prototype_id[i] == 0 and assigned_id[i] == 0:
            cluster_id[i] = clusters[minimum_distance[i]].id
            prototype_id[i] = clusters[minimum_distance[i]].proto[1]
            assigned_id[i] = cluster_ids[clusters[minimum_distance[i]].id]

    # Populate assignments datafram
    assignments["cluster_id"] = cluster_id.tolist()
    assignments["prototype_id"] = prototype_id.tolist()
    assignments["assigned_id"] = assigned_id.tolist()
    return assignments

In [None]:
def get_training_testing_sets(dataset: pd.DataFrame, dist_matrix: np.ndarray, prototype_ids: list, assignments: pd.DataFrame, test_size: float):
    """
    This function assigns clusters to the training and testing set
    
    Args:
        dataset (DataFrame): dataset to assign training and testing sets
        dist_matrix (ndarray): This is an numpy array containing the distances between all samples.
        prototype_ids (integer list): A list of row indices of the prototype (center point in a cluster)
        assignments (DataFrame): contains information about the cluster assigned to each row in the dataset
            columns: cluster_id, prototype_id, assigned_id
                cluster_id: the id of the Cluster object
                prototype_id: the row index of the prototype (center point in a cluster)
                assigned_id: assign a numerical id beginning at 0 ranging to the number of clusters
        test_size (float): a precentage of the testing set size (decimal form)
    
    Return:
        training_set (DataFrame): A DataFrame of the training set
        testing_set (DataFrame): A DataFrame of the testing set


    """
    training_set = pd.DataFrame(columns=dataset.columns)
    testing_set = pd.DataFrame(columns=dataset.columns)
    training_set_clusters = list()
    testing_set_clusters = list()
    testing_percent = 0.0

    # Determine the cluster that is furthest distance away from all other clusters
    iterations = 0
    while not testing_set_clusters:
        subset_prototype_ids = list(set(prototype_ids) - set(training_set_clusters))
        prototype_dist = dist_matrix[np.ix_(subset_prototype_ids, subset_prototype_ids)]
        max_distance = np.argmax(prototype_dist, axis=1)
        max_dist_id = prototype_ids[mode(max_distance)]

        # Get the cluster from the dataset
        max_dist_assignments = assignments[assignments['prototype_id'] == max_dist_id]
        max_dist_indices = max_dist_assignments.index.tolist()
        cluster = dataset.iloc[max_dist_indices, :]
        cluster_percentage = float(len(cluster)) / len(dataset)

        # Find a cluster whose size is less than the testing size
        if cluster_percentage < test_size:
            testing_set = testing_set.append(cluster)
            testing_set_clusters.append(max_dist_id)
            testing_percent = float(len(testing_set)) / len(dataset)
        else:
            training_set = training_set.append(cluster)
            prototype_ids.remove(max_dist_id)
            training_set_clusters.append(max_dist_id)

        # Catch if infinite loop
        iterations += 1
        if iterations > len(prototype_ids):
            break

    # Determine the distances between prototypes and initialize previously selected prototypes to extremely high number (for min)
    assigned_indices = list()

    for index, val in enumerate(prototype_ids):
        for clust in training_set_clusters:
            if val == clust:
                assigned_indices.append(index)
        for clust in testing_set_clusters:
            if val == clust:
                assigned_indices.append(index)

    test_prototype_dist = dist_matrix[np.ix_(assigned_indices, prototype_ids)]
    for i in range(len(assigned_indices)):
            test_prototype_dist[i, assigned_indices] = 1e10

    # Add clusters to the testing set until bigger than the testing size
    iterations = 0
    while testing_percent < test_size:
        # Find prototype with the minimum distance from other prototypes in the testing set
        minimum_index = np.unravel_index(np.argmin(test_prototype_dist, axis=None), test_prototype_dist.shape)

        # Add the prototype cluster that is the closest distance to the previously selected prototypes to the testing set
        min_dist_assignments = assignments[assignments['assigned_id'] == minimum_index[1]]
        min_dist_indices = min_dist_assignments.index.tolist()
        cluster = dataset.iloc[min_dist_indices, :]
        testing_set = testing_set.append(cluster)
        testing_percent = float(len(testing_set)) / len(dataset)
        
        # Update the distance matrix and list of selected prototypes
        assigned_indices.append(minimum_index[1])
        test_prototype_dist = dist_matrix[np.ix_(assigned_indices, prototype_ids)]
        for i in range(len(assigned_indices)):
            test_prototype_dist[i, assigned_indices] = 1e10
        
        # Catch if infinite loop
        iterations += 1
        if iterations > len(prototype_ids):
            break
    
    # Assign clusters to training set
    test_index = testing_set.index.tolist()
    dataset_index = dataset.index.tolist()
    training_indices = list(set(dataset_index) - set(test_index))
    training_set = dataset.iloc[training_indices, :]
    
    return training_set, testing_set

In [None]:
def main():
    # Get environment variables
    start = time.time()
    split = json.loads(os.getenv("SPLIT"))
    N = split["hierarchical_clustering"]["N"]
    max_clusters = split["hierarchical_clustering"]["max_clusters"]
    test_size = split["hierarchical_clustering"]["test_size"]
    
    # Determines the euclidean distances of a dataset
    split_file = data["DATASET"]
    dataset = pd.read_csv(split_file)
    
    # Filter dataset to contain only numeric columns
    dataset_numeric = pd.read_csv(split_file)
    for col in dataset.columns:
        is_numeric = pd.api.types.is_numeric_dtype(dataset[col])
        if not is_numeric:
            dataset_numeric.drop(col, axis=1, inplace=True)
    
    dataset_array = dataset_numeric.to_numpy()
    if dataset_numeric.shape[0] > N:
        X = dataset_numeric.sample(n=N, random_state=1)
    else:
        X = dataset
    X_index = X.index.tolist()
    X = X.to_numpy()
    dist_matrix = distance_matrix(X)
    
    # Perform hierarchical clustering by determining the minimax distance between two different clusters.
    retired_clusters = nearest_neighbor(dist_matrix)
    retired_clusters_copy = copy.deepcopy(retired_clusters)
    
    # Cut the tree to a specified number of clusters
    clusters = get_clusters(retired_clusters_copy, max_clusters)

    # Prunes clusters to contain unique sample indices for every cluster (no repeats)
    pruned_clusters, prototype_ids = prune_clusters(clusters)
    
    # Assign each row in the dataset to a cluster
    assignments = assign_clusters(dataset_array, X_index, pruned_clusters)
    
    # Assign clusters to the training and testing set
    training_set, testing_set = get_training_testing_sets(dataset, dist_matrix, prototype_ids, assignments, test_size)

    # Write the training and testing sets to files
    training_set.to_csv('data/training_set.csv')
    testing_set.to_csv('data/testing_set.csv')
    end = time.time()
    print("Time elapsed: ", end - start)

In [None]:
main()