In [None]:
pip install sentence_transformers transformers hdbscan umap-learn

In [None]:
from sklearn.cluster import AgglomerativeClustering
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd
import numpy as np
import re
import hdbscan
import umap.umap_ as umap
import re
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer

In [None]:
class candidate():

    def __init__(self, text,
                 mode: str = 'n_gram',
                 ngram_range = (2,4),
                 feature_1gram: int = 2000,
                 feature_mgram: int = 10000,
                 punctuation: str = '[?!,.]'
                 ):
        self.corpus = np.array(text)
        self.mode = mode

        self.ngram_range = ngram_range
        self.feature_1gram = feature_1gram
        self.feature_mgram = feature_mgram
        self.punctuation = punctuation

    def lower(self):
        text = self.corpus
        text = text.astype('U')
        text = np.char.lower(text)

        return text

    def n_gram(self):
        ngram_range = self.ngram_range
        feature_1gram = self.feature_1gram
        feature_mgram = self.feature_mgram

        text = self.lower()
        vectorizer = CountVectorizer(max_features = feature_1gram)
        vectorizer2 = CountVectorizer(analyzer='word', ngram_range=ngram_range, max_features = feature_mgram)
        vectorizer.fit(text)
        vectorizer2.fit(text)
        vocab = vectorizer.get_feature_names_out()
        phrase = vectorizer2.get_feature_names_out()
        total = list(np.concatenate([vocab,phrase]))

        return total

    def sentence_level(self):

        text = self.corpus
        punctuation = self.punctuation

        sentence_list = []
        for index, doc in enumerate(text):
          doc = doc.strip()
          doc = re.split(punctuation,doc)
          doc = [sent.strip() for sent in doc]
          doc = list(filter(None, doc))
          sentence_list.extend(doc)
        return list(set(sentence_list))

    def document_level(self):

        text = list(self.corpus)
        return list(set(text))

    def build_vocab(self):
        if self.mode == 'n_gram':
             total = self.n_gram()
        elif self.mode == 'sentence_level':
             total = self.sentence_level()
        elif self.mode == 'document_level':
             total = self.document_level()

        return total


class pipeline():

    def __init__(self, text, model = 'all-mpnet-base-v2', device = 'cuda'):
        self.corpus = text
        self.encoder = SentenceTransformer(model, device = device)

    def encoding(self):
        embedding = self.encoder.encode(self.corpus)
        globals()['embedding'] = embedding
        return embedding

    def DR(self, embedding, dimension = 5): # dimensionality reduction

        reducer = umap.UMAP(random_state=42,n_components=dimension)
        embedding = reducer.fit_transform(embedding)
        return embedding

    def clustering(self, embedding, min_cluster_size = 2):
        clusterer = hdbscan.HDBSCAN(min_cluster_size=min_cluster_size)
        cluster_labels = clusterer.fit_predict(embedding)
        outlier_scores = clusterer.outlier_scores_
        return cluster_labels, outlier_scores

    def agglomerative_clustering(self, embedding, n_clusters = 5):
        clustering = AgglomerativeClustering(n_clusters=n_clusters).fit(embedding)
        cluster_labels = clustering.labels_
        return cluster_labels, _

    def centroid(self, embedding, candidate_vocab, cluster_labels):

        text = self.corpus
        encoder = self.encoder
        centroids = {}
        rep = embedding
        rep_rep = encoder.encode(candidate_vocab)
        total = candidate_vocab

        globals()['frame'] = pd.DataFrame()
        globals()['frame']['text'] = text
        globals()['frame']['label'] = cluster_labels

        for m in list(set(cluster_labels)):
            index = frame[frame['label'] == m].index
            subset = rep[index]
            centroid = np.mean(rep[index],axis = 0)
            centroids[m] = centroid

        centroid_keywords = {}
        for key in centroids.keys():
            centroid = centroids[key]
            similarity = cosine_similarity([centroid],rep_rep)
            centroid_keyword_index = similarity[0].argsort()[-3:][::-1]
            centroid_keywords[key] = [total[i] for i in centroid_keyword_index]

        return centroid_keywords


    def pipeline(self, candidate_vocab, dimension = 5, clustering_method = 'agglomerative',\
                 min_cluster_size = 2, n_clusters = 5):

        embedding = self.encoding()
        reduced_embedding = self.DR(embedding, dimension = dimension)

        globals()['reduced_embedding'] = reduced_embedding
        if clustering_method == 'agglomerative':
            cluster_labels, outlier_scores = self.agglomerative_clustering\
            (reduced_embedding, n_clusters = n_clusters)
        if clustering_method == 'hdbscan':
            cluster_labels, outlier_scores = self.clustering\
            (reduced_embedding, min_cluster_size = min_cluster_size)
        centroid_keywords = self.centroid(embedding, candidate_vocab, cluster_labels)
        return cluster_labels, outlier_scores, centroid_keywords