In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.cluster import AgglomerativeClustering
from sklearn.decomposition import PCA
from scipy.cluster.hierarchy import dendrogram, linkage

import os

In [2]:
out_dir = 'Clustering'

In [None]:
scenario_datasets = os.listdir("./Scenarios")
scenario_datasets = [f"./Scenarios/{x}" for x in scenario_datasets]
scenario_datasets

In [4]:
scenarios_description = [
    {"name": "Scenario 1_N", "encoding": "Label encoding", "remove_outliers": False, "remove_errors": False, "scaling": "MinMax"},
    {"name": "Scenario 1_S", "encoding": "Label encoding", "remove_outliers": False, "remove_errors": False, "scaling": "Standard"},
    {"name": "Scenario 2_N", "encoding": "Label encoding", "remove_outliers": False, "remove_errors": False,
     "impute_zeros": {"RestingBP": "mean", "Cholesterol": "mean"}, "oldpeak_abs": True, "scaling": "MinMax"},
    {"name": "Scenario 2_S", "encoding": "Label encoding", "remove_outliers": False, "remove_errors": False,
     "impute_zeros": {"RestingBP": "mean", "Cholesterol": "mean"}, "oldpeak_abs": True, "scaling": "Standard"},
    {"name": "Scenario 3_N", "encoding": "Label encoding", "remove_outliers": True, "remove_errors": True, "scaling": "None"},
    {"name": "Scenario 3_S", "encoding": "Label encoding", "remove_outliers": True, "remove_errors": True, "scaling": "Standard"},
    {"name": "Scenario 4_N", "encoding": "Label encoding", "remove_outliers": "replace_with_mean", "remove_errors": True, "scaling": "MinMax"},
    {"name": "Scenario 4_S", "encoding": "Label encoding", "remove_outliers": "replace_with_mean", "remove_errors": True, "scaling": "Standard"},
    {"name": "Scenario 5_N", "encoding": {"Sex": "Label", "ExerciseAngina": "Label", "ST_Slope": "Label", "RestingECG": "Label", "ChestPainType": "OneHot"},
     "remove_outliers": False, "remove_errors": False, "scaling": "MinMax"},
    {"name": "Scenario 5_S", "encoding": {"Sex": "Label", "ExerciseAngina": "Label", "ST_Slope": "Label", "RestingECG": "Label", "ChestPainType": "OneHot"},
     "remove_outliers": False, "remove_errors": False, "scaling": "Standard"},
    {"name": "Scenario 6_N", "encoding": {"Sex": "Label", "ExerciseAngina": "Label", "ST_Slope": "Label", "RestingECG": "Label", "ChestPainType": "OneHot"},
     "remove_outliers": False, "remove_errors": True, "scaling": "None"},
    {"name": "Scenario 6_S", "encoding": {"Sex": "Label", "ExerciseAngina": "Label", "ST_Slope": "Label", "RestingECG": "Label", "ChestPainType": "OneHot"},
     "remove_outliers": False, "remove_errors": True, "scaling": "Standard"},
    {"name": "Scenario 7_N", "encoding": {"Sex": "Label", "ExerciseAngina": "Label", "ST_Slope": "Label", "RestingECG": "Label", "ChestPainType": "OneHot"},
     "remove_outliers": True, "remove_errors": True, "scaling": "MinMax"},
    {"name": "Scenario 7_S", "encoding": {"Sex": "Label", "ExerciseAngina": "Label", "ST_Slope": "Label", "RestingECG": "Label", "ChestPainType": "OneHot"},
     "remove_outliers": True, "remove_errors": True, "scaling": "Standard"},
    {"name": "Scenario 8_N", "encoding": {"Sex": "Label", "ExerciseAngina": "Label", "ST_Slope": "Label", "RestingECG": "Label", "ChestPainType": "OneHot"},
     "remove_outliers": "replace_with_mean", "remove_errors": True, "scaling": "None"},
    {"name": "Scenario 8_S", "encoding": {"Sex": "Label", "ExerciseAngina": "Label", "ST_Slope": "Label", "RestingECG": "Label", "ChestPainType": "OneHot"},
     "remove_outliers": "replace_with_mean", "remove_errors": True, "scaling": "Standard"},
]


In [8]:
class ClusterAnalysis:
    def __init__(self, data, name):
        self.data = data
        self.methods = ["complete", "average", "single"]
        self.types = ["euclidean", "manhattan", "cosine", "correlation"]

        self.save_dir = f"{out_dir}/{name}"
        os.makedirs(self.save_dir, exist_ok=True)

        

    def _reduce_dimensions(self, method="PCA", n_components=2):
        if method == "PCA":
            pca = PCA(n_components=n_components)
            self.data = pd.DataFrame(pca.fit_transform(self.data))
            self.data.columns = [f"PC{i}" for i in range(1, n_components + 1)]
        return self.data
        

    def plot_dendrogram(self, method="single", type="euclidean"):
        Z = linkage(self.data, method=method)
        plt.figure(figsize=(25, 10))
        plt.title('Hierarchical Clustering Dendrogram - Method: {} - Type: {}'.format(method, type))
        dendrogram(
            Z,
            leaf_rotation=45.,
            leaf_font_size=3.,
        )
        # plt.show()
        plt.savefig(f"{self.save_dir}/Dendrogram_{method}_{type}.png")

    def plot_clusters(self, method="single"):
        model = AgglomerativeClustering(linkage=method)
        model.fit(self.data)
        labels = model.labels_
        self.data["Cluster"] = labels
        sns.pairplot(self.data, hue="Cluster")
        # plt.show()
        plt.savefig(f"{self.save_dir}/Clusters_{method}.png")

    def run(self):
        # self.data = self._reduce_dimensions()
        for method in self.methods:
            for type in self.types:
                self.plot_dendrogram(method, type)
                self.plot_clusters(method)

In [None]:
for scenario, scenario_description in zip(scenario_datasets, scenarios_description):
    print(f"Scenario Name: {scenario_description['name']}")
    print(f"  Encoding: {scenario_description['encoding']}")
    print(f"  Remove Outliers: {scenario_description['remove_outliers']}")
    print(f"  Remove Errors: {scenario_description.get('remove_errors', 'Not Specified')}")
    print(f"  Impute Zeros: {scenario_description.get('impute_zeros', 'None')}")
    print(f"  Oldpeak Absolute: {scenario_description.get('oldpeak_abs', 'Not Specified')}")
    print(f"  Scaling: {scenario_description['scaling']}")

    data = pd.read_csv(scenario)
    clusterer = ClusterAnalysis(data, scenario_description['name'])
    clusterer.run()