In [None]:
import pandas as pd
import gensim
import numpy as np
from itertools import permutations
import json

In [None]:
class DataExtractionBase:

    def __init__(self, link: str) -> None:
        self.category_types = None

    def get_ids(self, sheet_name: str):
        pass

    def get_column_name(self, category: str):
        pass

    def get_series(self, sheet_name: str, category: str):
        pass


class DataExtractionPDTexts(DataExtractionBase):

    def __init__(self, link: str) -> None:
        super().__init__(link)
        self.dataset_norm = pd.read_excel(link, sheet_name='healthy')
        self.dataset_pd = pd.read_excel(link, sheet_name='general_massive')
        self.category_types = ['tokens', 'tokens_without_stops', 'lemmas', 'lemmas_without_stops']

    def get_ids(self, sheet_name: str = 'healthy') -> int:
        """
        Getting ID column
        """
        if sheet_name == 'healthy':
            return self.dataset_norm['speakerID']
        return self.dataset_pd['ID']

    def get_series(self,
                   sheet_name: str,
                   category: str) -> pd.DataFrame:
        """
        Getting one of 8 columns:
          from one of the 2 pages of the dataset
          from one of the 4 categories

        sheet_name: healthy | PD
        category: tokens | tokens_without_stops | lemmas | lemmas_without_stops
        """
        if sheet_name == 'healthy':
            return self.dataset_norm[category]

        return self.dataset_pd[category]

In [None]:
class ClustersDataBase:

    def __init__(self,
                 extractor: DataExtractionBase,
                 model: gensim.models.fasttext.FastTextKeyedVectors) -> None:
        self.extractor = extractor
        self.model = model
        self.healthy_data = None
        self.impediment_data = None
        self.impediment_type = ''

    def get_df(self, sheet):
        pass

    def add_column(self,
                   sheet_name: str,
                   category: str,
                   clusters: pd.Series) -> None:
        pass

    @staticmethod
    def avg_cluster_size(row: pd.Series) -> float:
        """
        Get average cluster size in a row
        """
        clusters_sizes = []
        for cell in row:
            clusters_sizes.extend(len(cluster) for cluster in cell)
        return sum(clusters_sizes) / len(clusters_sizes)

    def avg_cluster_distance(self, cluster_sequence):
        """
        Count average cluster distance
        """
        if not cluster_sequence:
            return np.NaN

        centroids_dict = {}
        distances = []

        for cluster in cluster_sequence:
            centroid = sum(self.model[word] for word in cluster) / len(cluster)
            centroids_dict[tuple(cluster)] = centroid

        for idx in range(0, len(cluster_sequence)-1):
            cluster_1 = cluster_sequence[idx]
            cluster_2 = cluster_sequence[idx+1]
            Dij = np.dot(
                gensim.matutils.unitvec(centroids_dict[tuple(cluster_1)]),
                gensim.matutils.unitvec(centroids_dict[tuple(cluster_2)])
            )
            distances.append(Dij)

        if not distances:
            return np.NaN

        return sum(distances)/len(distances)

    def silhouette_score(self, cluster_sequence):
        silhouette_coefs = []

        for idx, cluster in enumerate(cluster_sequence):
            for word_1 in cluster:

                a = sum(self.model.similarity(word_1, word_2)
                        for word_2 in cluster if word_1 != word_2) / len(cluster)

                if idx != len(cluster_sequence) - 1:
                    b = sum(self.model.similarity(word_1, word_2)
                            for word_2 in cluster_sequence[idx + 1]) / len(cluster_sequence[idx + 1])
                else:
                    b = sum(self.model.similarity(word_1, word_2)
                            for word_2 in cluster_sequence[idx - 1]) / len(cluster_sequence[idx - 1])

                s = (b - a) / max(a, b)
                silhouette_coefs.append(s)

        if silhouette_coefs:
            return sum(silhouette_coefs) / len(silhouette_coefs)
        return np.NaN

    @staticmethod
    def cluster_t_score(f_n, f_c, f_nc, N):
        if f_nc == 0:
            return 0
        numerator = f_nc - f_n * f_c / N
        denominator = np.sqrt(f_nc)
        return numerator / denominator

    def avg_cluster_t_score(self, cell, column_clusters):
        all_words = ' '.join([word for cell in column_clusters for cluster in cell for word in cluster])
        N = len(all_words)

        cell_t_scores = []
        for cluster in cell:
            all_wordpairs = list(permutations(cluster, 2))

            pairwise_t_scores = []
            for wordpair in all_wordpairs:
                f_n = all_words.count(wordpair[0])
                f_c = all_words.count(wordpair[1])
                f_nc = all_words.count(' '.join((wordpair[0], wordpair[1])))
                f_nc += all_words.count(' '.join((wordpair[1], wordpair[0])))

                t_score = self.cluster_t_score(f_n, f_c, f_nc, N)
                pairwise_t_scores.append(t_score)

            cell_t_scores.extend(pairwise_t_scores)

        return sum(cell_t_scores)

    def save_excel(self, path) -> None:
        """
        Saving data with clusters to an Excel file
        """
        with pd.ExcelWriter(path) as writer:
            self.healthy_data.to_excel(writer, sheet_name='healthy', index=False)
            self.impediment_data.to_excel(writer, sheet_name=self.impediment_type, index=False)


class ClustersDataPDTexts(ClustersDataBase):

    def __init__(self,
                 extractor: DataExtractionPDTexts,
                 model: gensim.models.fasttext.FastTextKeyedVectors) -> None:
        super().__init__(extractor, model)
        self.id_healthy = extractor.get_ids('healthy')
        self.id_impediment = extractor.get_ids('general_massive')
        self.healthy_data = pd.DataFrame(self.id_healthy)
        self.impediment_data = pd.DataFrame(self.id_impediment)
        self.impediment_type = 'PD'

    def get_df(self, sheet):
        if sheet == 'healthy':
            return self.healthy_data
        return self.impediment_data

    def add_column(self,
                   sheet_name: str,
                   category: str,
                   clusters: pd.Series) -> None:
        """
        Adding a column with clusters
        """
        if sheet_name == 'healthy':
            self.healthy_data[category] = clusters

        else:
            self.impediment_data[category] = clusters

    def count_num_switches(self,
                           sheet_name: str,
                           category: str) -> None:
        """
        Count number of switches for each cell
        """
        if sheet_name == 'healthy':
            new_column_name = f'Switch_number_{category}'
            self.healthy_data[new_column_name] = self.healthy_data[category].apply(lambda x: len(x) - 1)

        else:
            new_column_name = f'Switch_number_{category}'
            self.impediment_data[new_column_name] = self.impediment_data[category].apply(lambda x: len(x) - 1)

    def count_mean_cluster_size(self,
                                sheet_name: str,
                                category: str) -> None:
        """
        Count mean cluster size for each row
        """
        if sheet_name == 'healthy':
            new_column_name = f'Mean_cluster_size_{category}'
            self.healthy_data[new_column_name] = self.healthy_data[category].apply(self.avg_cluster_size)

        else:
            new_column_name = f'Mean_cluster_size_{category}'
            self.impediment_data[new_column_name] = self.impediment_data[category].apply(self.avg_cluster_size)

    def count_mean_distances(self,
                             sheet_name: str,
                             category: str):
        """
        Counting distances for all columns
        """
        if sheet_name == 'healthy':
            new_column_name = f'Mean_distance_{category}'
            self.healthy_data[new_column_name] = self.healthy_data[category].apply(self.avg_cluster_distance)

        else:
            new_column_name = f'Mean_distance_{category}'
            self.impediment_data[new_column_name] = self.impediment_data[category].apply(self.avg_cluster_distance)

    def count_mean_silhouette_score(self,
                                    sheet_name: str,
                                    category: str):
        """
        Counting silhouette scores for all columns
        """
        if sheet_name == 'healthy':
            new_column_name = f'Silhouette_score_{category}'
            self.healthy_data[new_column_name] = self.healthy_data[category].apply(self.silhouette_score)

        else:
            new_column_name = f'Silhouette_score_{category}'
            self.impediment_data[new_column_name] = self.impediment_data[category].apply(self.silhouette_score)

    def count_cluster_t_scores(self,
                               sheet_name: str,
                               category: str):
        """
        Counting cluster t-scores for all columns
        """
        if sheet_name == 'healthy':
            new_column_name = f'Mean_cluster_t_score_{category}'
            self.healthy_data[new_column_name] = self.healthy_data[category].apply(
                lambda x: self.avg_cluster_t_score(x, self.healthy_data[category])
            )

        else:
            new_column_name = f'Mean_cluster_t_score_{category}'
            self.impediment_data[new_column_name] = self.impediment_data[category].apply(
                lambda x: self.avg_cluster_t_score(x, self.impediment_data[category])
            )

In [None]:
class Clusterizer:

    def __init__(self, model: gensim.models.fasttext.FastTextKeyedVectors) -> None:
        self._model = model

    def get_cosine_similarity(self, w1, w2):
        """
        Getting cosine similarity depending on model
        """
        if isinstance(self._model, gensim.models.fasttext.FastTextKeyedVectors):
            return self._model.similarity(w1, w2)

        v1 = self._model.get_word_vector(w1)
        v2 = self._model.get_word_vector(w2)

        return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

    def cluster(self, word_sequence: list[str]) -> list[list[str]]:
        """
        An implementation of words clustering algorithm
        for verbal fluency test results,
        from N. Lundin et al., 2022.
        b -- current word
        c -- next word
        d -- next word after the next
        """
        words_by_clusters = []
        cluster = []

        for idx, word in enumerate(word_sequence):
            if word == 'PEOS':
                break

            if word == 'BOS':
                b = word
                c = word_sequence[idx + 1]
                d = word_sequence[idx + 2]
                b_c_sim = self.get_cosine_similarity(b, c)
                c_d_sim = self.get_cosine_similarity(c, d)
                continue

            cluster.append(word)
            a_b_sim = b_c_sim   # S(A,B) equals S(B,C) from previous iteration
            b_c_sim = c_d_sim   # S(B,C) equals S(C,D) from previous iteration
            c_d_sim = self.get_cosine_similarity(word_sequence[idx + 1], word_sequence[idx + 2])

            if a_b_sim > b_c_sim and b_c_sim < c_d_sim:  # a condition of a switch
                words_by_clusters.append(cluster)
                cluster = []

        if cluster:
            words_by_clusters.append(cluster)
        return words_by_clusters

    @staticmethod
    def _custom_similarity(embedding_1, embedding_2):  # с этим что-то надо сделать, оно работает для фасттехта?
        return np.dot(gensim.matutils.unitvec(embedding_1),
                      gensim.matutils.unitvec(embedding_2))

    def davies_bouldin_index(self, cluster_sequence: list[list[str]]) -> float:
        """
        The Davies Bouldin index implementation,
        based on https://scikit-learn.org/stable/modules/clustering.html#davies-bouldin-index
        Si -- the average distance between each point of cluster i
        and the centroid of that cluster – also known as cluster diameter;
        Sj -- the average distance between each point of cluster j
        and the centroid of that cluster – also known as cluster diameter;
        Dij -- the distance between cluster centroids i and j;
        Rij -- similarity between clusters i and j.
        """
        centroids_dict = {}
        for cluster in cluster_sequence:
            centroid = sum(self.model[word] for word in cluster) / len(cluster)
            centroids_dict[tuple(cluster)] = centroid

        Si_values_dict = {}
        for cluster in cluster_sequence:
            cluster_centroid = centroids_dict[tuple(cluster)]
            Si = sum(self._custom_similarity(self.model[word], cluster_centroid)
                    for word in cluster) / len(cluster)
            Si_values_dict[tuple(cluster)] = Si

        Rij_max_values = []
        for cluster_1 in cluster_sequence:
            Rij_values = []
            Si = Si_values_dict[tuple(cluster)]

            for cluster_2 in cluster_sequence:
                if cluster_2 == cluster_1:
                    continue
                Sj = Si_values_dict[tuple(cluster_2)]
                Dij = self._custom_similarity(centroids_dict[tuple(cluster_1)], centroids_dict[tuple(cluster_2)])
                Rij = (Si + Sj) / Dij
                Rij_values.append(Rij)

            if Rij_values:
                Rij_max_values.append(max(Rij_values))

        return sum(Rij_max_values) / len(Rij_max_values) if Rij_max_values else None

    def silhouette_score(self, cluster_sequence: list[list[str]]) -> float:
        """
        The Silhouette score implementation,
        based on https://scikit-learn.org/stable/modules/clustering.html#silhouette-coefficient
        a -- the mean distance between a sample and all other points in the same class;
        b -- the mean distance between a sample and all other points in the next nearest cluster.
        """
        silhouette_coefs = []

        for idx, cluster in enumerate(cluster_sequence):
            for word_1 in cluster:

                a = sum(self.get_cosine_similarity(word_1, word_2)
                    for word_2 in cluster if word_1 != word_2) / len(cluster)

                if idx != len(cluster_sequence) - 1:
                    b = sum(self.get_cosine_similarity(word_1, word_2)
                    for word_2 in cluster_sequence[idx + 1]) / len(cluster_sequence[idx + 1])
                else:
                    b = sum(self.get_cosine_similarity(word_1, word_2)
                    for word_2 in cluster_sequence[idx - 1]) / len(cluster_sequence[idx - 1])

                if a == 0 or b == 0:
                    s = 0
                else:
                    s = (b - a) / max(a, b)
                silhouette_coefs.append(s)

        return sum(silhouette_coefs) / len(silhouette_coefs)

    @staticmethod
    def evaluate_clustering(DB_values_page: list[float], silhouette_values: list[float]) -> None:
        """
        The computation of all clustering metrics
        to evaluate given clustering model.
        """
        mean_DB_index_value = sum(DB_values_page) / len(DB_values_page)
        mean_silhouette_score_value = sum(silhouette_values) / len(silhouette_values)
        print('The performance of this clustering algorithm: ')
        print(f'Mean value of Davies Bouldin index: {mean_DB_index_value}')
        print(f'Mean value of Silhouette score: {mean_silhouette_score_value}')

In [None]:
class Vectorizer:
    def __init__(self, model: gensim.models.fasttext.FastTextKeyedVectors) -> None:
        self.model = model
        self._vectors_dictionary = {'BOS': self.model['BOS'].tolist(),
                                    'EOS': self.model['EOS'].tolist(),
                                    'PEOS': self.model['PEOS'].tolist()}

    def update_dict(self, words: str) -> None:
        """
        Updating the dictionary during each cell vectorising
        """
        for one_word in words.split(', '):
            if one_word not in self._vectors_dictionary:
                self._vectors_dictionary[one_word] = self.model[one_word].tolist()

    def update_json(self) -> None:
        """
        Updating and saving the json file
        """
        with open("/content/vectors.json", "w") as fp:
            json.dump(self._vectors_dictionary, fp, ensure_ascii=False)

    def get_dictionary(self) -> dict:
        """
        In case we need to get the dictionary
        """
        return self._vectors_dictionary

    @staticmethod
    def get_sequence(words_string: str) -> list[str]:
        """
        Getting a list of tokens + tags of beginning and ending
        BOS -- Beginning of Sentence
        PEOS -- pre-End of Sentence
        EOS -- End of Sentence
        """
        return ['BOS'] + words_string.split(', ') + ['PEOS', 'EOS']

In [None]:
project_path = r'' # добавьте свой путь до папки


def main():
    # defining classes
    model_path = rf'{project_path}\models\geowac\model.model'
    geowac_model = gensim.models.KeyedVectors.load(model_path)
    extractor = DataExtractionPDTexts(rf'{project_path}\data\control_pd_preprocessed.xlsx')
    vectoriser = Vectorizer(geowac_model)
    cluster_saver = ClustersDataPDTexts(extractor, geowac_model)
    clusters_getter = Clusterizer(geowac_model)

    # general principle: clustering one cell at a time
    DB_values_page = []
    silhouette_values_page = []

    for page in ['healthy', 'pd']:
        DB_values_lexemes_kind = []
        silhouette_values_lexemes_kind = []

        for category in extractor.category_types:
            sequence_series = extractor.get_series(page, category)  # getting words lists from a column
            clusters_list = []  # a list of lists of clusters for current column

            DB_values_column = []
            silhouette_values_column = []

            for words_string in sequence_series:
                if not isinstance(words_string, str):  # dealing with NaNs or other non-string values
                    clusters_list.append([])
                    continue

                tokens_sequence = vectoriser.get_sequence(words_string)
                # string of words coverted to list with special tags

                cell_clusters = clusters_getter.cluster(tokens_sequence)
                # converting list of words to list of clusters
                clusters_list.append(cell_clusters)

                DB_value = clusters_getter.davies_bouldin_index(cell_clusters)
                # calculating Davies Bouldin index for each cell
                if DB_value:
                    DB_values_column.append(DB_value)

                silhouette_value = clusters_getter.silhouette_score(cell_clusters)
                # calculating Silhouette score for each cell
                silhouette_values_column.append(silhouette_value)

            cluster_saver.add_column(page, category,
                                     pd.Series(clusters_list))
            # adding clusters column in a table

            # counting metrics
            cluster_saver.count_num_switches(page, category)
            cluster_saver.count_mean_cluster_size(page, category)
            cluster_saver.count_mean_distances(page, category)
            cluster_saver.count_mean_silhouette_score(page, category)
            cluster_saver.count_cluster_t_scores(page, category)

            DB_values_lexemes_kind.extend(DB_values_column)
            silhouette_values_lexemes_kind.extend(silhouette_values_column)

        DB_values_page.extend(DB_values_lexemes_kind)
        silhouette_values_page.extend(silhouette_values_lexemes_kind)

    clusters_getter.evaluate_clustering(DB_values_page, silhouette_values_page)
    cluster_saver.save_excel(rf'{project_path}\result\pd_texts\clusters_metrics_dataset.xlsx')
    # vectors = vectoriser.get_dictionary()


if __name__ == '__main__':
    main()