In [None]:
# default_exp internal

In [None]:
#hide
%load_ext autoreload
%autoreload 2

In [None]:
#export
import csv
import matplotlib.pyplot as plt
from nbdev.imports import *
from nbdev.export import *
import numpy as np
import pandas as pd
from scipy.cluster import hierarchy
from scipy.cluster.hierarchy import ward, cut_tree, dendrogram
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, pairwise_distances
from textblob import TextBlob

# Internal

> Internal MedTop methods

## Data processing

In [None]:
#export
def get_phrase(sent, window_size, feature_names, include_input_in_tfidf, tdm, token_averages):
    "Finds the most expressive phrase in a sentence."
    adj_adv_pos_list = ["JJ","JJR", "JJS", "RB", "RBR", "RBS"]
    phrase_scores = []
    top_phrase = None
    top_score = -1
    
    # Iterate phrases (sub-sentences of length window_size)
    for p in range(len(sent.tokens) - window_size + 1):
        window = slice(p, p + window_size)
        phrase = sent.tokens[window]
        phrase_pos = sent.pos_tags[window]

        weight = 1 + abs(TextBlob(" ".join(phrase)).sentiment.polarity)
        score = 0

        for i, token in enumerate(phrase):
            # Skip tokens not in feature_names
            if token not in list(feature_names.keys()):
                continue

            pos = phrase_pos[i][1]
            token_ix = feature_names[token]

            # Token score comes from TF-IDf matrix if include_input_in_tfidf is set, otherwise, use tokens averages
            token_score = tdm[token_ix, sent.doc_id] if include_input_in_tfidf else token_averages[token_ix];

            # Scale token_score by 3x if the token is an adjective or adverb
            score += (token_score * 3) if pos in adj_adv_pos_list else token_score
        
        # Update top_score if necessary
        phrase_score = score * weight
        if phrase_score > top_score:
            top_phrase = phrase
            top_score = phrase_score

    return top_phrase

In [None]:
#export
def get_vector_tfidf(sent, dictionary, term_matrix):
    "Create a word vector for a given sentence using a term matrix."
    vec_ids = [x[0] for x in dictionary.doc2bow(sent.phrase)]
    return term_matrix[vec_ids].sum(axis=0)

def get_vector_w2v(sent, model):
    "Create a word vector for a given sentence using a Word2Vec model."
    tokens = [token for token in sent.phrase if token in model.wv.vocab]
    return model[tokens].sum(axis=0)

In [None]:
#export
def w2v_pretrained(bin_file):
    "Load a pre-trained Word2Vec model from a bin file."
    return gensim.models.KeyedVectors.load_word2vec_format(bin_file, binary=True)

## Clustering

In [None]:
#export
def get_silhouette_score_hac(phrase_vecs, linkage_matrix, height):
    "Assigns clusters to a list of word vectors for a given `height` and calculates the silhouette score of the clustering."
    cluster_assignments = [x[0] for x in cut_tree(linkage_matrix, height=height)]
    return silhouette_score(phrase_vecs, cluster_assignments)

def get_tree_height(root):
    "Gets the height of a binary tree."
    if root is None:
        return 1
    return max(get_tree_height(root.left), get_tree_height(root.right)) + 1

def get_linkage_matrix(phrase_vecs, dist_metric):
    "Creates a linkage matrix by calculating distance between phrase vectors."
    if dist_metric == "cosine":
        dist = 1 - cosine_similarity(phrase_vecs)
    else:
        dist = pairwise_distances(phrase_vecs, metric=dist_metric)
    return ward(dist)

def get_optimal_height(data, linkage_matrix, show_dendrogram = False, show_chart = True, save_chart = False, chart_file = "HACSilhouette.png"):
    """
    Clusters the top phrase vectors and plots the silhoute coefficients for a range of dendrograph heights. 
    Returns the optimal height value (highest silhoute coefficient)
    """
    # Maximum cut point height is the height of the tree
    max_h = get_tree_height(hierarchy.to_tree(linkage_matrix)) + 1
    h_range = range(2,max_h)
    phrase_vecs = list(data.vec)
    h_scores = [get_silhouette_score_hac(phrase_vecs, linkage_matrix, h) for h in h_range]
    
    # Optionally display the clustering dendrogram
    if show_dendrogram:
        dendrogram(linkage_matrix)
        plt.show()
        
    # Optionally display the graph of silhouette score by height
    if show_chart:
        fig = plt.plot(h_range, h_scores)
        plt.show()

    # Optionally save the graph of silhouette score by height to disk
    if save_chart:
        plt.savefig(chart_file, dpi=300)
    
    # optimal_h is height value with the highest silhouette score
    optimal_height = h_range[np.argmax(h_scores)]
    return optimal_height

def get_cluster_assignments_hac(data, dist_metric, height = None, show_dendrogram = False, show_chart = False):
    "Use Hierarchical Agglomerative Clustering (HAC) to cluster phrase vectors"
    linkage_matrix = get_linkage_matrix(list(data.vec), dist_metric)
    
    # Use optimal height if no height is specified
    if height is None:
        height = get_optimal_height(data, linkage_matrix, show_dendrogram, show_chart)
    
    cluster_assignments = [x[0] for x in cut_tree(linkage_matrix, height=height)]
    return cluster_assignments

In [None]:
#export
def get_silhouette_score_kmeans(phrase_vecs, k):
    "Assigns clusters to a list of word vectors for a given `k` and calculates the silhouette score of the clustering."
    cluster_assignments = KMeans(k).fit(phrase_vecs).predict(phrase_vecs)
    return silhouette_score(phrase_vecs, cluster_assignments)
    
def get_optimal_k(data, show_chart = True, save_chart = False, chart_file = "KmeansSilhouette.png"):
    "Calculates the optimal k-value (highest silhoute coefficient). Optionally prints a chart of silhouette score by k-value or saves it to disk."
    phrase_vecs = list(data.vec)
    max_k = min(len(phrase_vecs), 100)
    k_range = range(2, max_k)
    score = [get_silhouette_score_kmeans(phrase_vecs, i) for i in k_range]
    
    # Optionally display the graph of silhouette score by k-value
    if show_chart:
        fig = plt.plot(k_range, score)
        
    # Optionally save the graph of silhouette score by k-value to disk
    if save_chart:
        plt.savefig(chart_file, dpi=300)
    
    # optimal_k is k value with the highest silhouette score
    optimal_k = k_range[np.argmax(score)]
    return optimal_k

def get_cluster_assignments_kmeans(data, k = None, show_chart = False):
    "Use K-means algorithm to cluster phrase vectors"
    phrase_vecs = list(data.vec)
    
    # Use optimal k if no k-value is specified
    if k is None:
        k = get_optimal_k(data, show_chart)
    
    # Assign clusters
    kmeans = KMeans(n_clusters=k, random_state=42).fit(phrase_vecs)
    cluster_assignments = kmeans.predict(phrase_vecs)
    
    return cluster_assignments

## Export

In [None]:
#export   
def df_to_disk(df, file_name, mode="w", header=True):
    "Writes a dataframe to disk as a tab delimited file."
    
    df.to_csv(file_name, sep='\t', mode=mode, header=header, encoding='utf-8', index=False, quoting=csv.QUOTE_NONE, quotechar="",  escapechar="\\")
    if mode == "w":
        print(f"Results saved to {file_name}")
    
    
def sentences_to_disk(data, file_name = 'output/DocumentSentenceList.txt'):
    "Writes the raw sentences to a file organized by document and sentence number."
    
    df = data[["id", "text"]].copy()
    df_to_disk(df, file_name)
    
    
def write_cluster(cluster_rows, file_name, mode = 'a', header=False):
    "Appends the rows for a single cluster to disk."
    
    df_to_disk(cluster_rows, file_name, mode=mode, header=header)
    
    
def clusters_to_disk(data, doc_df, cluster_df, file_name = 'output/TopicClusterResults.txt'):
    "Writes the sentences and phrases to a file organized by cluster and document."

    # Create a dataframe containing the data to be saved to disk
    df = data[["cluster", "doc_id", "sent_id", "phrase", "text"]].copy()
    file_names = [doc_df.loc[c].file for c in data.doc_id]
    df.insert(loc=2, column='file', value=file_names)
    df.sort_values(by=["cluster", "doc_id", "sent_id"], inplace=True)
    
    # Write document header
    cluster_rows = pd.DataFrame(None, columns=data.columns)
    write_cluster(cluster_rows, file_name, mode = 'w', header=True)
    
    # Write each cluster
    for c in set(data.cluster):
        # Write a cluster header containing the main topics for each cluster
        with open(file_name, encoding="utf-8", mode = 'a') as file:
            keywords = ', '.join(cluster_df.loc[c, 'topics'])
            file.write(f"Cluster: {c}; Keywords: [{keywords}]\n")
            
        # Write the sentences in each cluster
        cluster_rows = df[df.cluster == c].copy()
        write_cluster(cluster_rows, file_name)

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted Core.ipynb.
Converted index.ipynb.
Converted internal.ipynb.
Converted preprocessing.ipynb.
Converted Sandbox.ipynb.
