In [0]:
@udf (DoubleType())
def cosine_similarity(v1, v2):
    return float(v1.dot(v2) / ((v1.norm(2) * v2.norm(2))))


@udf (DoubleType())
def euqlidean_dist(v1, v2):
    return 1 - float(v1.squared_distance(v2) ** 0.5)


def get_similarities(embeddings_df, metric=cosine_similarity, cache=True, n_partitions=100):
    embeddings_df1 = embeddings_df.withColumnRenamed("index", "index1").withColumnRenamed("embedding", "embedding1")                     
    embeddings_df2 = embeddings_df.withColumnRenamed("index", "index2").withColumnRenamed("embedding", "embedding2")
    sim_df = embeddings_df1.join(embeddings_df2, embeddings_df1["index1"] < embeddings_df2["index2"])

    if cache:
        sim_df.persist()
    
    sim_df = sim_df.repartition(n_partitions) \
                            .withColumn("similarity", metric(F.col("embedding1"), F.col("embedding2"))) \
                            .select("index1", "index2", "similarity")
    
    print("Calculating similarities...")
    sim_data = sim_df.collect()

    return sim_data


def similarities_to_dict(sim_data):  
    col1, col2 = "index1", "index2"
    sim_dict = {}

    for row in tqdm(sim_data):
        if row[col1] not in sim_dict:
            sim_dict[row[col1]] = {}
        sim_dict[row[col1]][row[col2]] = row["similarity"]
        if row[col2] not in sim_dict:
            sim_dict[row[col2]] = {}
        sim_dict[row[col2]][row[col1]] = row["similarity"]
    
    return sim_dict


def recalculate_mean_sim(elems_sim_dict, old_mean_dist, elems, new_elem):
    total = len(elems) * (len(elems) + 1) * 0.5 - len(elems)
    new_total = (len(elems) + 1) * (len(elems) + 2) * 0.5 - (len(elems) + 1)
    new_mean_dist = old_mean_dist * total
    for old_elem in elems:
        try:
            new_mean_dist += elems_sim_dict[new_elem][old_elem]
        except Exception as e:
            new_mean_dist += elems_sim_dict[old_elem][new_elem]
    new_mean_dist /= new_total
    return new_mean_dist


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-504828327775848>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0;34m@[0m[0mudf[0m [0;34m([0m[0mDoubleType[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0;32mdef[0m [0mcosine_similarity[0m[0;34m([0m[0mv1[0m[0;34m,[0m [0mv2[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0;32mreturn[0m [0mfloat[0m[0;34m([0m[0mv1[0m[0;34m.[0m[0mdot[0m[0;34m([0m[0mv2[0m[0;34m)[0m [0;34m/[0m [0;34m([0m[0;34m([0m[0mv1[0m[0;34m.[0m[0mnorm[0m[0;34m([0m[0;36m2[0m[0;34m)[0m [0;34m*[0m [0mv2[0m[0;34m.[0m[0mnorm[0m[0;34m([0m[0;36m2[0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m [0;34m[0m[0m
[1;32m      5[0m [0;34m[0m

In [0]:
# some indian-style code... but it (mostly) works
def cluster_sentences(sent_to_ind_map, sim_dict, min_sim=0.85, min_sim_threshold=0.8, min_sim_dec_step=0.01,
                      min_mean_sim=0.75, min_mean_sim_threshold=0.7, min_mean_sim_dec_step=0.01):
    indexes_dict = {index: {"cluster": None} for index in sent_to_ind_map}
    print(f"Sentences to cluster: {len(indexes_dict)}")
    indexes_iter = iter(indexes_dict)

    clusters = {}
    noise = {}
    cluster_num = 0
    get_next = True
    iters = 0
    clustered = 0

    prev_epoch_clustered = 0
    start = time.time()
    while clustered < len(indexes_dict):  
        try:
            if get_next:
                current_index = next(indexes_iter)
            if indexes_dict[current_index]["cluster"] is not None:
                get_next = True
                continue
            
            sorted_sim = sorted(((value, key) for key ,value in sim_dict[current_index].items() 
                                 if indexes_dict[key]["cluster"] is None), reverse=True)
            if len(sorted_sim) == 0:
                get_next = True
                continue

            max_current_sim, closest_to_current = sorted_sim[0]
            # Case: no unclustered sentence close enough to the current sentence
            if max_current_sim < min_sim:
                get_next = True
                sorted_sim_clustered = sorted(((value, key) for key ,value in sim_dict[current_index].items() 
                                    if indexes_dict[key]["cluster"] is not None and indexes_dict[key]["cluster"] != -1), reverse=True)
                
                # Subcase: but there is a clustered sentence close enough to the current
                if len(sorted_sim) > 0:
                    new_mean_sim = 1
                    i = 0

                    # Find cluster to which the current sentence is similar enough
                    while new_mean_sim >= min_mean_sim and i < len(sorted_sim_clustered):
                        max_current_sim, closest_to_current = sorted_sim_clustered[i]
                        i += 1
                        # Oops, no sentence similar enough
                        if max_current_sim < min_sim:
                            break
                        
                        closest_cluster_num = indexes_dict[closest_to_current]["cluster"]
                        new_mean_sim = recalculate_mean_sim(sim_dict, clusters[closest_cluster_num]["mean_dist"], 
                                                        clusters[closest_cluster_num]["indexes"], current_index)
                        # Subcase: found cluster close eoungh to the sentence
                        if new_mean_sim >= min_mean_sim:
                            clustered += 1
                            clusters[closest_cluster_num]["mean_dist"] = new_mean_sim
                            clusters[closest_cluster_num]["indexes"].append(current_index)
                            indexes_dict[current_index]["cluster"] = indexes_dict[closest_to_current]["cluster"]
                            break

            # Case: there is a sentence that close enough to the current sentence    
            else:
                # Subcase: new cluster
                if cluster_num not in clusters:
                    clustered += 2
                    clusters[cluster_num] = {"mean_dist": max_current_sim, "indexes": [current_index, closest_to_current]}
                    indexes_dict[current_index]["cluster"] = cluster_num
                    indexes_dict[closest_to_current]["cluster"] = cluster_num
                    current_index = closest_to_current
                    get_next = False
                
                # Subcase: existing cluster, the current sentence is in it, trying to find the closest sentence to both the current sentence
                # and the cluster
                else: 
                    new_mean_sim = 1
                    i = 1
                    while new_mean_sim >= min_mean_sim and i < len(sorted_sim):
                        new_mean_sim = recalculate_mean_sim(sim_dict, clusters[cluster_num]["mean_dist"], 
                                                        clusters[cluster_num]["indexes"], closest_to_current)
                        # The closest sentence found
                        if new_mean_sim >= min_mean_sim:
                            clustered += 1
                            clusters[cluster_num]["mean_dist"] = new_mean_sim
                            clusters[cluster_num]["indexes"].append(closest_to_current)
                            indexes_dict[closest_to_current]["cluster"] = cluster_num
                            current_index = closest_to_current
                            get_next = False
                            break
                        
                        # Still not found, continue searching
                        max_current_sim, closest_to_current = sorted_sim[i]
                        i += 1
                        if max_current_sim < min_sim:
                            break
                    
                    # Not found sentence close enough both to the current sentence and it's cluster, the current cluster is formed,
                    # create a new cluster
                    if indexes_dict[closest_to_current]["cluster"] is None:
                        get_next = True
                        cluster_num += 1

        except StopIteration:
            iters += 1
            if time.time() - start > 5:
                print(f'''Iterations: {iters}''', end=('\r'))
                start = time.time()
            
            if clustered == prev_epoch_clustered:
                min_mean_sim -= min_mean_sim_dec_step
            stop = False
            if round(min_mean_sim, 2) < min_mean_sim_threshold:
                min_sim -= min_sim_dec_step
                if round(min_sim, 2) < min_sim_threshold:
                    for key in {key: value for key, value in indexes_dict.items() if value["cluster"] is None}:
                        clustered += 1
                        indexes_dict[key]["cluster"] = -1
                        if -1 not in clusters:
                            clusters[-1] = {"mean_dist": 0, "indexes": [key]}
                        else:
                            clusters[-1]["indexes"].append(key)
                    stop = True
            if stop:
                break
            else:
                prev_epoch_clustered = clustered
                indexes_iter = iter(sim_dict)

    print(f'''Iterations: {iters}''', end=('\r'))
    print(f'''\nClustered: {clustered}, Noise: {len(clusters[-1]["indexes"]) 
          if -1 in clusters else 0}, Total clusters: {len(clusters)}''')
    # print("\nFinished")
    return clusters, indexes_dict, noise

In [0]:
def get_similarities_dict(embeddings_dict, metric=cosine_similarity, cache=False, save_pickle=True, from_pickle=True, pickle_folder="", 
                      pickle_path="/Workspace/Users/vladklim@campus.technion.ac.il/Project/similarities_dicts/"):
    if save_pickle and not os.path.exists(os.path.join(pickle_path, pickle_folder)):
        os.makedirs(os.path.join(pickle_path, pickle_folder))

    similarities_dict = {}
    for i, category in enumerate(sorted(list(embeddings_dict.keys()))):
        print(f"\nCategory: {category}")
        calculated = False
        success = False
        if from_pickle:
            try:
                with open(os.path.join(pickle_path, pickle_folder, 
                                       f'''similarities_{category.lower().replace(" ", "_")}.pickle'''), "rb") as f:
                    similarities_dict[category] = pickle.load(f)
                print(f"{category} - succsessfully loaded from pickle.")
                success = True
            except:
                print(f"{category} - failed to load from pickle, performing calcuclations...")
                
        if not success:
            try:
                similarities_data = get_similarities(embeddings_dict[category], metric=metric, cache=cache)
                similarities_dict[category] = similarities_to_dict(similarities_data)
                print(f"{category} - successfully calculated similarities.")
                calculated = True
            except Exception as e:
                print(e)
                print(f"{category} - failed to calculate, skipping...")
                break

        if calculated and save_pickle:
            try:
                with open(os.path.join(pickle_path, pickle_folder, 
                                       f'''similarities_{category.lower().replace(" ", "_")}.pickle'''), "wb") as f:
                    pickle.dump(similarities_dict, f)
                print(f"{category} - successfully saved to pickle.")
            except:
                print(f"{category} - failed to save to pickle, try to save from output.")
    print("\nFinished.")

    return similarities_dict


def get_clusters_dict(sent_to_ind_map, similarities_dict, save_pickle=True, from_pickle=True, 
                      min_sim=0.85, min_sim_threshold=0.8, min_sim_dec_step=0.01,
                      min_mean_sim=0.75, min_mean_sim_threshold=0.7, min_mean_sim_dec_step=0.01,
                      pickle_name="clusters_dict.pickle", 
                      pickle_path="/Workspace/Users/vladklim@campus.technion.ac.il/Project/clusters_dicts/"):
    if save_pickle and not os.path.exists(pickle_path):
        os.makedirs(pickle_path)

    clusters_dict = {}
    if from_pickle:
        try:
            with open(os.path.join(pickle_path, pickle_name), "rb") as f:
                clusters_dict = pickle.load(f)
            print(f"Succsessfully loaded from pickle.")
            return clusters_dict
        except:
            print(f"Failed to load from pickle, performing calcuclations...")

    print("Clustering started...")
    for category in sorted(list(similarities_dict.keys())):
        print(f"\nCategory: {category}")
        clusters_dict[category] = cluster_sentences(sent_to_ind_map[category], similarities_dict[category],
                    min_sim=min_sim, min_sim_threshold=min_sim_threshold, min_sim_dec_step=min_sim_dec_step,
                    min_mean_sim=min_mean_sim, min_mean_sim_threshold=min_mean_sim_threshold, min_mean_sim_dec_step=min_mean_sim_dec_step)
    
    if save_pickle:
        try:
            with open(os.path.join(pickle_path, pickle_name), "wb") as f:
                    pickle.dump(clusters_dict, f)
            print(f"Successfully saved to pickle.")
        except:
            print(f"Failed to save to pickle, try to save from output.")

    print("\nFinished")
    
    return clusters_dict


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-504828327775850>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m def get_similarities_dict(embeddings_dict, metric=cosine_similarity, cache=False, save_pickle=True, from_pickle=True, pickle_folder="", 
[0m[1;32m      2[0m                       pickle_path="/Workspace/Users/vladklim@campus.technion.ac.il/Project/similarities_dicts/"):
[1;32m      3[0m     [0;32mif[0m [0msave_pickle[0m [0;32mand[0m [0;32mnot[0m [0mos[0m[0;34m.[0m[0mpath[0m[0;34m.[0m[0mexists[0m[0;34m([0m[0mos[0m[0;34m.[0m[0mpath[0m[0;34m.[0m[0mjoin[0m[0;34m([0m[0mpickle_path[0m[0;34m,[0m [0mpickle_folder[0m[0;34m)[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m         [0mos[0m[0;34m.[0m[0mmakedirs[0m[0;34m([0m[0mos[0m[0;34m.[0m[0mpath[0

In [0]:
@udf (VectorUDT())
def elementwise_avg(vectors):
    num_vectors = len(vectors)
    vector_size = len(vectors[0])
    avg_values = [0.0] * vector_size

    for vector in vectors:
        for i in range(vector_size):
            avg_values[i] += vector[i] / num_vectors

    return Vectors.dense(avg_values)

In [0]:
categories = ["Automation Machinery Manufacturing"]
titles_embeddings_dict_trunc = {key: value for key, value in titles_embeddings_dict.items() if key in categories}

In [0]:
titles_similarities_dict = get_similarities_dict(titles_embeddings_dict_trunc, pickle_folder="titles", save_pickle=True, from_pickle=False)

In [0]:
titles_clusters_dict = get_clusters_dict(job_titles_toi, titles_similarities_dict, pickle_name="titles_clusters_dict.pickle", 
                                        min_sim=0.85, min_sim_threshold=0.75, min_sim_dec_step=0.01,
                                        min_mean_sim=0.8, min_mean_sim_threshold=0.7, min_mean_sim_dec_step=0.01, 
                                        save_pickle=True, from_pickle=False)

In [0]:
category = "Automation Machinery Manufacturing"
indexes_clusters = titles_clusters_dict[category][1]
get_cluster_udf = udf(lambda index: indexes_clusters[index]['cluster'], IntegerType())
titles_embeddings_dict[category] = titles_embeddings_dict[category] \
                                        .withColumn("cluster", get_cluster_udf(F.col("index"))).withColumn("category", F.lit(category))

centroids_df = titles_embeddings_dict[category].groupBy("cluster", "category") \
                                            .agg(elementwise_avg(F.collect_list("embedding")).alias("centroid"))

titles_embeddings_dict[category] = titles_embeddings_dict[category].join(centroids_df, ['cluster', 'category'], 'left_outer') \
                                        .withColumn("sim_to_centroid", cosine_similarity(F.col("embedding"), F.col("centroid"))) \
                                        .select("index", "sentence", "cluster", "sim_to_centroid", "category", "embedding", "centroid")
titles_embeddings_dict[category].persist()
titles_embeddings_dict[category].display()

In [0]:
def plot_jobs_hist_by_industry():    
    industries = tech_comps.select("industries").distinct().rdd.flatMap(lambda x: x).collect()

    fig, axs = plt.subplots(len(industries)+1, figsize=(10, 25))

    df = tech_comps.withColumn("jobs_number", F.size(F.col("jobs_parsed")))
    axs[0].hist(df.select("jobs_number").toPandas()["jobs_number"], bins=20, alpha=0.7)
    axs[0].set_title("All")

    for i, industry in enumerate(industries):
        data = df.filter(df["industries"] == industry).select("jobs_number").toPandas()
        axs[i+1].hist(data["jobs_number"], bins=50, alpha=0.7)
        axs[i+1].set_title(f"{industry}")

    plt.tight_layout()
    plt.show()

plot_jobs_hist_by_industry()

clustering

In [0]:
jobs_titles_data = {}
job_titles_toi = {}
for industry in set(comps_industries.values()):
    jobs_titles_data[industry] = sum([[(url, i, f'{url}_{i}', job["title"]) for i, job in enumerate(value_dict["jobs"])] 
                                 for url, value_dict in jobs_dict.items() if value_dict["industry"] == industry], [])
    job_titles_toi[industry] = {i: title for i, title in enumerate (list(set(sum([[job["title"] for job in value_dict["jobs"]] 
                                        for value_dict in jobs_dict.values() if value_dict["industry"] == industry], []))))}
sum([len(job_titles_toi[industry]) for industry in job_titles_toi.keys()])

In [0]:
def optics_clustering(embeddings_df, 
                    metric='cosine', min_samples=3, pca=False, pca_k=50, max_eps=np.inf, xi=0.05, cluster_method="xi"):
    df, embedding_col = (embeddings_df, "embedding") if not pca else (get_PCA_components(embeddings_df, k=pca_k), "embedding_pca")
    indexed_embeddings = {row[embedding_col]: row["index"] for row in 
                          df.select("index", embedding_col).collect()}
    embeddings_list = list(indexed_embeddings.keys())

    X = np.array(embeddings_list)
    # clusters = OPTICS(min_samples=min_samples, metric=metric, 
    #                   n_jobs=-1, max_eps=max_eps, xi=xi, cluster_method=cluster_method).fit_predict(X)
    clusters = hdbscan.HDBSCAN(min_cluster_size=5, min_samples=1, cluster_selection_method='leaf',
                            metric=metric, cluster_selection_epsilon=0.75, prediction_data=True).fit_predict(X)
    
    eval_score = metrics.silhouette_score(X, clusters, metric=metric)
    print(f"Silhouette score: {eval_score}")
    eval_score = metrics.calinski_harabasz_score(X, clusters)
    print(f"Calinski-Harabasz score: {eval_score}")
    eval_score = metrics.davies_bouldin_score(X, clusters)
    print(f"Davies-Bouldin score: {eval_score}")

In [0]:
centroids_df = titles_clusters.groupBy("cluster", "category") \
                                            .agg(elementwise_avg(F.collect_list("embedding")).alias("centroid"))
titles_clusters_centroids = titles_clusters.join(centroids_df, ['cluster', 'category'], 'left_outer') \
                                        .withColumn("euclidean_dist", euqlidean_dist(F.col("embedding"), F.col("centroid"))) \
                                        .select("index", "sentence", "cluster", "soft_cluster", "euclidean_dist", "category", "embedding", "centroid")
titles_clusters_centroids.persist()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Calculate cosine similarities
df = titles_clusters_centroids.select("centroid", "cluster").distinct().filter(F.col("cluster") != F.lit(-1))
df_cross = df.selectExpr("centroid as centroid1", "cluster as cluster1") \
            .crossJoin(df.selectExpr("centroid as centroid2", "cluster as cluster2"))
df_similarity = df_cross.withColumn("euclidean_dist", euqlidean_dist(col("centroid1"), col("centroid2")))

# Find the most similar vector for each vector in the first DataFrame
window = Window.partitionBy("centroid1").orderBy(col("euclidean_dist"))
df_most_similar = df_similarity.withColumn("rn", row_number().over(window)).filter(col("rn") == 2).drop("rn")
df_most_similar.select("cluster1", "cluster2", "euclidean_dist").orderBy("cluster1").display()


In [0]:
pickle_path = "/Workspace/Users/shalom2552@campus.technion.ac.il/project/students_skills_education_df.pickle"
with open(pickle_path, "rb") as f:
    pickle_rdd = pickle.load(f)
students_df = spark.createDataFrame(pickle_rdd)
students_df.display()

In [0]:
# This creates names "layers" for clusters. The method is simple: the most inner layer is the sentence itself.
# The second layer is the max_ most frequent words in the sentences in each clusters. The third is the max_-1 most frequent, etc.
# We filtered and preprocessed some of the words before layering, and also it is possible to start from n-th most frequent word, using start_from, 
# to obtain more meaningful names (sometimes it helps).
def create_layers(clusters_df, min_=2, max_=5, min_word_len=2, start_from=0, lemmatize=True, soft=True):
    df = clusters_df.withColumn("sentence_cleaned", filter_nouns(F.col("sentence_cleaned")))
    cluster_col = "soft_cluster" if soft else "cluster"
    current_sent_col = "sentence_cleaned"
    current_cluster_col = cluster_col

    for i in range(max_ - min_ + 1):
        words = df.withColumn("word", F.explode(F.split(F.col(current_sent_col), r"\s+|,\s*")))

        if i == 0:
            words = words.withColumn("word", F.regexp_replace("word", "[^a-zA-Z+#/]", "")) \
                            .filter(F.length(F.col("word")) >= min_word_len)

            lemmatizer = nltk.stem.WordNetLemmatizer()
            @udf (StringType())
            def lemmatize(word):
                return lemmatizer.lemmatize(word)

            if lemmatize: 
                words = words.withColumn("word", lemmatize("word"))
                    
        word_counts = words.groupBy("word", current_cluster_col).count()

        most_frequent_words = word_counts.orderBy(current_cluster_col, F.desc("count"))

        top_words_per_cluster = most_frequent_words.groupBy(current_cluster_col) \
            .agg(F.collect_list("word").alias("words")) \
            .select(current_cluster_col, F.when((F.size(F.col("words")) >= max_ + start_from - i), 
                        F.concat_ws(", ", get_first_n(F.col("words"), F.lit(max_ - i), F.lit(start_from)))) \
            .otherwise("Unlabelled" if i == 0 else F.col(current_sent_col)).alias(f"layer_{i}"))

        if i == 0:
            clusters_range = list(range(clusters_df.agg(F.min(cluster_col).alias("max_cluster")).collect()[0]["max_cluster"],
                            clusters_df.agg(F.max(cluster_col).alias("max_cluster")).collect()[0]["max_cluster"] + 1))
            remaining_clusters = [row[cluster_col] for row in top_words_per_cluster.select(cluster_col).distinct().collect()]
            clusters_to_supplement = [c for c in clusters_range if c not in remaining_clusters]
            if clusters_to_supplement != []:
                top_words_per_cluster = top_words_per_cluster.union(
                    spark.createDataFrame([(c, v) for c, v in zip(clusters_to_supplement, ["Unlabelled"] * len(clusters_to_supplement))], [cluster_col, f"layer_{i}"]))
        
        if i == 0:
            df = top_words_per_cluster
            df.persist()
        else:
            df = df.join(top_words_per_cluster, [current_cluster_col], "left_outer")

        current_sent_col = current_cluster_col = f"layer_{i}"
    
    clusters_df = clusters_df.join(df, [cluster_col], "left_outer")

    # for i in range(max_ - min_ + 1):
    #     clusters_df = clusters_df.withColumn(f"layer_{i}", F.when(F.col("soft_probability") < min_soft_prob, F.lit("Unlabelled")) \
    #                                                         .otherwise(F.col(f"layer_{i}")))

    return clusters_df


## Functions

In [0]:
# Preprocessing functions
import re
from nltk import pos_tag
from nltk.tokenize import word_tokenize


def remove_non_english_chars(text):
        return re.sub(r'[^\x00-\x7F]+', '', text).strip()
    

# This removes the (...) parts from the given sentence (also [...] and {...}). For example, the "(also [...] and {...})"" from the previous sentence will be removed.
def remove_between_brackets(text):
    pattern = r"\([^()]*\)|\[[^\[\]]*\]|\{[^{}]*\}"
    result = re.sub(pattern, "", text)
    return result


# removes punctuation (replaces with spaces), removes repeating spaces
def clean_text(text):
    punctuation = '!"$%&\'()*-/:;<=>?[\\]^_`{|}~'
    translator = str.maketrans(punctuation, ' ' * len(punctuation))
    text = text.translate(translator)
    text = ' '.join(text.split())
    return text


def filter_nouns_adjectives(sentence):
    words = word_tokenize(sentence)
    tagged_words = pos_tag(words)
    filtered_words = [word for word, pos in tagged_words if pos.startswith('N') or pos.startswith('J')]
    filtered_sentence = ' '.join(filtered_words)
    if filtered_sentence == "":
        return sentence
    return filtered_sentence


def filter_nouns_adjectives_verbs(sentence):
    words = word_tokenize(sentence)
    tagged_words = pos_tag(words)
    filtered_words = [word for word, pos in tagged_words if pos.startswith('N') or pos.startswith('J') or pos.startswith('V')]
    filtered_sentence = ' '.join(filtered_words)
    if filtered_sentence == "":
        return sentence
    return filtered_sentence
    

# The prep_pipeline is a list of functions from the above (just put the function in the list, as object)
# The functions are applied on the sentence, by the order in the list (the order matters sometimes)
def optional_preprocess(sentence, prep_pipeline, lower=True):
    # example of prep_pipeline: [remove_non_english_chars, remove_between_brackets]
    if lower:
        prep_pipeline = [lambda s: s.lower()] + prep_pipeline
    for func in prep_pipeline:
        sentence = func(sentence)

    return sentence

    
# Sentence embeddings
def embed_sentences(data, pretrained_embeddings, prep_pipeline=[], 
                    remove_stopwords=True, clean=True, lower=True, normalize=False):
    # Optional preprocessing / cleaning
    optional_preprocess_udf = udf(lambda sentence: optional_preprocess(sentence, prep_pipeline, lower=lower), StringType())
    data = data.withColumn("sentence_cleaned", optional_preprocess_udf(F.col("sentence")))
    if clean:
        data = data.withColumn("sentence_cleaned", F.regexp_replace(F.col("sentence_cleaned"), r"[^a-zA-Z+#/-_ ]", ""))

    # Optional stop words removing. Some words are worth keeping in some cases. E.g "it" can stand for "IT" (Information Technologies), wchich is important in our case
    keep_stopwords = ["it"]
    if remove_stopwords:
        english_stopwords = [word for word in stopwords.words('english') if word not in keep_stopwords]

        tokenizer = ftr.Tokenizer(inputCol="sentence_cleaned", outputCol="tokenized")
        data = tokenizer.transform(data)

        remover = ftr.StopWordsRemover(inputCol="tokenized", outputCol="filtered", stopWords=english_stopwords)
        data = remover.transform(data).withColumn("sentence_cleaned", F.concat_ws(" ", F.col("filtered")))
    
    # The embedding part
    documentAssembler = DocumentAssembler().setInputCol("sentence_cleaned").setOutputCol("document")
    embeddings = pretrained_embeddings.setInputCols(["document"]).setOutputCol("sentence_embeddings")
    pipeline = Pipeline().setStages([
        documentAssembler,
        embeddings
    ])

    distinct_data = data.select("sentence_cleaned").distinct()

    result = pipeline.fit(distinct_data).transform(distinct_data)
    result = result.select("sentence_cleaned", "sentence_embeddings") \
            .withColumn("embedding", F.expr("transform(sentence_embeddings, x -> x.embeddings)")).drop("sentence_embeddings")

    # Optional normalization
    if normalize:
        normalizer = ftr.Normalizer(p=2.0).setInputCol("embedding_raw").setOutputCol("embedding")
        result = normalizer.transform(result).select("sentence_cleaned", "embedding")

    # Convert the embedding to spark vector
    array_to_vector_udf = udf(lambda x: Vectors.dense(x[0]), VectorUDT())
    result = result.select("sentence_cleaned", "embedding").withColumn("embedding", array_to_vector_udf(F.col("embedding")))

    embeddings_df = data.select("index", "sentence_cleaned").join(result, "sentence_cleaned", "inner")

    return embeddings_df


# This function is relevant if you want to create a dict of embeddings by some category (in our case, industry was a category, but in the end we choose to not divide the data into categories. Yet, we used this function, with category "all", to not rewrite the code)
def get_sentence_embeddings_dict(sent_dict, pretrained_embeddings, prep_pipeline=[], cache=True, n_partitions=100,
                                 remove_stopwords=True, clean=True, lower=True, normalize=False):
    sentence_embeddings_dict = {}

    for category in sent_dict.keys():
        data = sent_dict[category].select("index", "sentence")
        embeddings_df = embed_sentences(data, pretrained_embeddings, prep_pipeline=prep_pipeline,
                                remove_stopwords=remove_stopwords, clean=clean, lower=lower, normalize=normalize)

        sentence_embeddings_dict[category] = sent_dict[category] \
                .join(embeddings_df, "index", "inner") \
                .repartition(n_partitions)
        if cache:
            sentence_embeddings_dict[category].persist()

    return sentence_embeddings_dict

In [0]:
# returns pca components vector, given a column of vectors
def get_PCA_components(embeddings_df, inputCol="embedding", outputCol="embedding_pca", k=50):
    pca = PCA(k=k, inputCol=inputCol).setOutputCol(outputCol)
    df_pca = pca.fit(embeddings_df).transform(embeddings_df)
    return df_pca


# This is a wrapper for hdbscan function (from the hdbscan package), works bad with the cosine similarity (at least in our case), so don't use the "precomputed" metric. Optionally, reduces the embeddings dimension using pca (we didn't use it in the final version)
def hdbscan_clustering(embeddings_df, metric='precomputed', min_samples=2, min_cluster_size=5, cluster_selection_method='leaf',
                           cluster_selection_epsilon=0.75, prediction_data=True, pca=False, pca_k=50):
    if pca:
        embedding_col = "embedding_pca"
        pca_distinct = get_PCA_components(df.select("embedding").distinct(), k=pca_k)
        df = df.join(pca_distinct, "embedding", "inner")
    else:
        df, embedding_col = embeddings_df, "embedding"

    # indexed_embeddings = list(zip(*[(row[embedding_col], row["index"]) for row in 
    #                                     df.select("index", embedding_col).collect()]))

    embeddings_list = [row.embedding for row in df.select("embedding").distinct().collect()]

    X = np.array(embeddings_list)

    if metric == "precomputed":
        X = metrics.pairwise.cosine_similarity(X, X)
        np.fill_diagonal(X, 0)
    
    # clusterer is our clustering model that we will use for cluster prediction of new points (aka empbedded sentences)
    clusterer = hdbscan.HDBSCAN(min_cluster_size=min_cluster_size, min_samples=min_samples, 
                                cluster_selection_method=cluster_selection_method, metric=metric, cluster_selection_epsilon=cluster_selection_epsilon, prediction_data=prediction_data,
                                gen_min_span_tree=True).fit(X)
    
    clusters = clusterer.labels_
    probs = clusterer.probabilities_
    # the soft clusters are clusters which contain the original clustered points, and also points that are marked previously as noise. The noise points are appended to the clusters to which they are most probably belong (considering the existing clusters).
    all_points_membership_vectors = hdbscan.all_points_membership_vectors(clusterer)
    soft_clusters = [np.argmax(x) for x in all_points_membership_vectors]
    soft_probs = [np.max(x) for x in all_points_membership_vectors]
    
    # These are different evaluation scores. The most relevant for hdbscan is DBCV, although it only helps to choose the right tunning, as it is not an absolute score and can't be used for comparing different clustering models on different kinds of data.
    eval_score = metrics.silhouette_score(X, clusters, metric=metric)
    print(f"Silhouette score: {eval_score}")
    eval_score = metrics.calinski_harabasz_score(X, clusters)
    print(f"Calinski-Harabasz score: {eval_score}")
    eval_score = metrics.davies_bouldin_score(X, clusters)
    print(f"Davies-Bouldin score: {eval_score}")
    try:
        eval_score = clusterer.relative_validity_
        print(f"DBCV relative score: {eval_score}")
    except Exception as e:
        print(f"DBCV:\n{e}")
    print(f"Number of clusters: {len(set(clusters)) - 1}")
    print(f"Number of unique sentences clustered: {len([x for x in clusters if x != -1])}")


    # mapping clusters to sentences
    clusters_map = [(embedding, int(cluster), int(soft_cluster), 
                     float(probability), float(soft_probability)) 
                    for i, (embedding, cluster, soft_cluster, probability, soft_probability) 
                    in enumerate(zip(embeddings_list, clusters, soft_clusters, probs, soft_probs))]
    clusters_map_df = spark.createDataFrame(clusters_map, ["embedding", "cluster", "soft_cluster", "probability", "soft_probability"])

    df_clustered = df.join(clusters_map_df, "embedding", "inner")

    return df_clustered, clusterer


# tsne embeddings are 2D vectors, obtained from embeddings of high dimension, used for visualisation of high dimensional vectors/points
def get_tsne_embeddings(df_clustered, inputCol="embedding", metric='cosine', 
                            random_state=None, early_exaggeration=12, perplexity=20):
    df = df_clustered.select(inputCol).distinct()
    df_tsne = df_clustered

    # it's recommended to reduce the dimensionality to 50 with pca before applying tsne, to achieve better results
    if len(df_clustered.first().asDict()[inputCol]) > 50:
        df = get_PCA_components(df, inputCol=inputCol, k=50)
        df_tsne = df_tsne.join(df, inputCol, "inner")
        inputCol = "embedding_pca"
        
    vectors = df.select(inputCol).rdd.map(lambda x: tuple(x[inputCol].toArray())).collect()
    vectors_tsne = TSNE(perplexity=perplexity, early_exaggeration=early_exaggeration,
                   metric=metric, n_jobs=-1, random_state=random_state).fit_transform(vectors)
    
    # schema = StructType([StructField(inputCol, ArrayType(DoubleType(), True), True),
    #                     StructField("vector_TSNE", ArrayType(DoubleType(), True), True)])
    vectors_df = spark.createDataFrame([Row(**{inputCol: Vectors.dense(v1), "vector_TSNE": Vectors.dense(v2.tolist())}) 
                                for v1, v2 in zip(vectors, vectors_tsne)], [inputCol, "vector_TSNE"])
    df_tsne = df_tsne.join(vectors_df, inputCol, "inner").drop("embedding_pca")

    return df_tsne


def generate_n_unique_colors(indexes, saturation=1, lightness="random"):
        saturation = random.uniform(0.9, 1) if saturation == "random" else saturation
        lightness = random.uniform(0.2, 0.3) if lightness == "random" else lightness

        colors = {}
        for i in indexes:
            r, g, b = colorsys.hls_to_rgb(random.uniform(0, 1), lightness, saturation)
            r_hex = int(r * 255)
            g_hex = int(g * 255)
            b_hex = int(b * 255)
            color_code = "#{:02x}{:02x}{:02x}".format(r_hex, g_hex, b_hex)
            colors[i] = color_code
        return colors


# this function visualizes the clusters in 2D, but not as pretty as the datamapplot library we used for final visualization.
# yet, we use it to analyze the clusters
def visualize_tsne_clusters(vectors, clusters, probs, subtitle, soft_clusters=None, soft_probs=None, show_soft_clusters=True,
                            show_noise=False, annotate=True, show_simplices=True, clusters_to_show=[], fig_size=(10, 10), s=200, fontsize=10, satur_accent=1, cent_prob=0.99):
    x_coords, y_coords = zip(*vectors)
    x_coords, y_coords = np.array(x_coords), np.array(y_coords)
    centroids = {cluster: (np.mean(np.array([x for i, x in enumerate(x_coords) if clusters[i] == cluster and probs[i] > cent_prob])),
                           np.mean(np.array([y for i, y in enumerate(y_coords) if clusters[i] == cluster and probs[i] > cent_prob])))
                 for cluster in set(clusters)}
    
    if len(clusters_to_show) == 0:
        clusters_to_show =  clusters
    unique_clusters = list(set([x for x in clusters if x != -1]))
    
    # Calculate convex hulls for each cluster
    hulls = {}
    for i, cluster in enumerate(unique_clusters):
        cluster_points = np.array([vector for i, vector in enumerate(vectors)  if clusters[i] == cluster])
        try:
            hull = ConvexHull(cluster_points)
            hull_points = np.vstack([cluster_points[hull.vertices], cluster_points[hull.vertices[0]]])
            hulls[i] = hull_points
        except:
             hulls[i] = None

    # Interpolate hull points to create smooth area
    smooth_hulls = {}
    for i in range(len(unique_clusters)):
        hull_points = hulls[i]
        if hull_points is not None:
            x, y = hull_points[:, 0], hull_points[:, 1]
            t = np.linspace(0, 1, len(x))
            interp = interp1d(t, np.vstack([x, y]), kind='cubic', axis=1)
            t_smooth = np.linspace(0, 1, 100)
            smooth_points = interp(t_smooth).T
            smooth_hulls[i] = smooth_points
        else:
            smooth_hulls[i] = None

    color_palette = generate_n_unique_colors(unique_clusters)
    color_palette = {key: (value if key in clusters_to_show else 'white') for key, value in color_palette.items()}
    color_palette[-1] = (0.5, 0.5, 0.5)
    zorders = {key: (2 if key in clusters_to_show else 0) for key in clusters}
    alphas = {"cluster": 0.1, "soft_cluster": 0.1, "noise": 0.03}

    cluster_colors = [color_palette[x] for x in clusters]
    cluster_member_colors = [sns.desaturate(x, p) for x, p in zip(cluster_colors, probs)]
    
    plt.figure(figsize=fig_size)
    if show_noise:
        for i, (x, y, cluster) in enumerate(zip(x_coords, y_coords, clusters)):
            if cluster == -1:
                plt.scatter(x, y, color=cluster_member_colors[i], alpha=alphas["noise"], s=s/2, zorder=1)

    if show_soft_clusters and soft_clusters is not None and soft_probs is not None:
        alphas["cluster"] = alphas["soft_cluster"] * 2
        soft_cluster_colors = [sns.desaturate(color_palette[x], min(1, p * satur_accent)) for x, p in zip(soft_clusters, soft_probs)]
        for i, (x, y, soft_cluster) in enumerate(zip(x_coords, y_coords, soft_clusters)):
            if clusters[i] == -1 and soft_cluster in clusters_to_show:
                plt.scatter(x, y, color=soft_cluster_colors[i], alpha=alphas["soft_cluster"], s=s, zorder=3)
                
    for i, (x, y, cluster) in enumerate(zip(x_coords, y_coords, clusters)):
        if cluster != -1:
            plt.scatter(x, y, color=cluster_member_colors[i], alpha=alphas["cluster"], s=s, zorder=zorders[cluster])
    
    for i, cluster in enumerate(unique_clusters):
        x, y = centroids[cluster]
        smooth_points = smooth_hulls[i]
        if show_simplices and smooth_points is not None:
            plt.fill(smooth_points[:, 0], smooth_points[:, 1], color=color_palette[cluster], alpha=0.05, zorder=zorders[cluster])
        if annotate:
            plt.text(x, y, str(cluster), color=color_palette[cluster], ha='center', va='center', 
                     fontsize=fontsize, zorder=zorders[cluster])

    plt.title(f"2D Approximate Relative Visual Representation of Clusters ([0-{max(clusters)}])\n({subtitle})", 
              fontsize=fig_size[0] + 3)
    plt.xticks([])
    plt.yticks([])
    plt.show()

In [0]:
@udf (VectorUDT())
def elementwise_avg(vectors):
    num_vectors = len(vectors)
    vector_size = len(vectors[0])
    avg_values = [0.0] * vector_size

    for vector in vectors:
        for i in range(vector_size):
            avg_values[i] += vector[i] / num_vectors

    return Vectors.dense(avg_values)


@udf (DoubleType())
def cosine_similarity(v1, v2):
    return float(v1.dot(v2) / ((v1.norm(2) * v2.norm(2))))


@udf (DoubleType())
def euqlidean_dist(v1, v2):
    return float(v1.squared_distance(v2) ** 0.5)


@udf (ArrayType(StringType()))
def get_first_n(arr, n, start_from):
    return list(arr)[start_from : n + start_from]


# This used for correcting the list wrapped in string, we got from Gemini as the answer to the promt
@udf (ArrayType(StringType()))
def string_to_list(list_string):
    return [s.strip() for s in list_string[1:-1].split(r', ')]


@udf (StringType())
def filter_nouns(sentence):
    words = word_tokenize(sentence)
    tagged_words = pos_tag(words)
    filtered_words = [word for word, pos in tagged_words if pos.startswith('N')]
    filtered_sentence = ' '.join(filtered_words)
    if filtered_sentence == "":
        return sentence
    return filtered_sentence

In [0]:
def save_to_pickle(data, file_name, dest_dir="/Workspace/Users/vladklim@campus.technion.ac.il/Project/models/"):
    Path("/Workspace/Users/vladklim@campus.technion.ac.il/Project/models/").mkdir(parents=True, exist_ok=True)
    file_path = os.path.join(dest_dir, file_name)
    with open(file_path, "wb") as f:
        pickle.dump(data, f)


# This used to split files larger than 10mb, because databricks didn't allow to download large files 
def split(source, dest_dir, files_name, write_size=10**7):
    if not os.path.exists(dest_dir):
        os.mkdir(dest_dir)
    else:
        for file in os.listdir(dest_dir):
            os.remove(os.path.join(dest_dir, file))
    part_num = 0
    
    with open(source, 'rb') as input_file:
        while True:
            chunk = input_file.read(write_size)
            if not chunk:
                break
            
            part_num += 1
            file_path = os.path.join(dest_dir, files_name + str(part_num))
            
            with open(file_path, 'wb') as dest_file:
                dest_file.write(chunk)
    
    print(f"Partitions created: {part_num}")


# Used to join the splitted files back
def join(source_dir, dest_file, read_size):
    with open(dest_file, 'wb') as output_file:
        for path in os.listdir(source_dir):
            with open(path, 'rb') as input_file:
                while True:
                    bytes = input_file.read(read_size)
                    if not bytes:
                        break
                output_file.write(bytes)

In [0]:
# This creates names "layers" for clusters. The method is simple: the most inner layer is the sentence itself.
# The second layer is the max_ most frequent words in the sentences in each clusters. The third is the max_-1 most frequent, etc.
# We filtered and preprocessed some of the words before layering, and also it is possible to start from n-th most frequent word, using start_from, 
# to obtain more meaningful names (sometimes it helps).
def create_layers(clusters_df, min_=2, max_=5, min_word_len=2, start_from=0, lemmatize=True, soft=True):
    df = clusters_df.withColumn("sentence_cleaned", filter_nouns(F.col("sentence_cleaned")))
    cluster_col = "soft_cluster" if soft else "cluster"
    sent_col = "sentence_cleaned"

    words = df.withColumn("word", F.explode(F.split(F.col(sent_col), r"\s+|,\s*")))
    words = words.withColumn("word", F.regexp_replace("word", "[^a-zA-Z+#/]", "")) \
                    .filter(F.length(F.col("word")) >= min_word_len)

    lemmatizer = nltk.stem.WordNetLemmatizer()
    @udf (StringType())
    def lemmatize(word):
        return lemmatizer.lemmatize(word)

    if lemmatize: 
        words = words.withColumn("word", lemmatize("word"))
                    
    word_counts = words.groupBy("word", cluster_col).count()
    most_frequent_words = word_counts.orderBy(cluster_col, F.desc("count"))

    top_words_per_cluster = most_frequent_words.groupBy(cluster_col) \
        .agg(F.collect_list("word").alias("words")) \
        .select(cluster_col, F.when(F.size(F.col("words")) >= max_ + start_from, 
                    get_first_n(F.col("words"), F.lit(max_), F.lit(start_from))) \
        .otherwise(F.array(F.lit("Unlabelled"))).alias(f"top_{max_}_words"))

    clusters_range = list(range(clusters_df.agg(F.min(cluster_col).alias("max_cluster")).collect()[0]["max_cluster"],
                    clusters_df.agg(F.max(cluster_col).alias("max_cluster")).collect()[0]["max_cluster"] + 1))
    remaining_clusters = [row[cluster_col] for row in top_words_per_cluster.select(cluster_col).distinct().collect()]
    clusters_to_supplement = [c for c in clusters_range if c not in remaining_clusters]
    if clusters_to_supplement != []:
        top_words_per_cluster = top_words_per_cluster.union(
            spark.createDataFrame([(c, v) for c, v in zip(clusters_to_supplement, ["Unlabelled"] * len(clusters_to_supplement))], [cluster_col, f"top_{max_}_words"]))
        
    df = top_words_per_cluster
    df.persist()

    for i in range(max_ - min_ + 1):
        df = df.withColumn(f"layer_{i}", F.concat_ws(", ", get_first_n(F.col(f"top_{max_}_words"), F.lit(max_ - i), F.lit(0))))
    
    clusters_df = clusters_df.join(df.drop(f"top_{max_}_words"), [cluster_col], "left_outer")
    clusters_df.persist()

    return clusters_df


In [0]:
@udf (StringType())
def extract_job_function(s):
    result = re.findall(r"Job function=(.*?), Seniority level", s)
    if len(result) > 0:
        return result[0]
    return ""


@udf (ArrayType(StringType()))
def split_job_functions(s):
    result = re.split(r', and | and |, ', s)
    if result == ['']:
        return ["Unlabelled"]
    return result

#### Predicting clusters for the remaining dataset with the trained clusterer<br>
 (this takes a lot of time... we didn't do it in the end, worked only with the 10000 sample)

In [0]:
skills_clusters_full = skills_embeddings_dict["all"] \
                        .join(skills_clusters.drop("sentence", "sentence_cleaned", "embedding"), "index", "outer") \
                        .fillna(-1, subset=["cluster", "soft_cluster"]) \
                        .fillna(0, subset=["probability", "soft_probability"])

In [0]:
unclustered_skills_embeddings = skills_clusters_full.filter(F.col("soft_cluster") == -1).select("index", "embedding")
indexed_skills_embeddings = {row["embedding"]: row["index"] for row in unclustered_skills_embeddings.collect()}
skills_embeddings_list = list(indexed_skills_embeddings.keys())
skills_to_predict = np.array(skills_embeddings_list)

In [0]:
skills_soft_clusters = [np.argmax(x) for x in skills_membership_vectors]
skills_soft_probs = [np.max(x) for x in skills_membership_vectors]
skills_clusters_map = {skills_embeddings_list[i]: (int(soft_cluster), float(soft_probability)) 
                    for i, (soft_cluster, soft_probability) 
                    in enumerate(zip(skills_soft_clusters, skills_soft_probs))}
get_soft_cluster_udf = F.udf(lambda embedding: skills_clusters_map[embedding][0] 
                             if embedding in skills_clusters_map else None, IntegerType())
get_soft_prob_udf = F.udf(lambda embedding: skills_clusters_map[embedding][1] 
                          if embedding in skills_clusters_map else None, FloatType())

In [0]:
skills_membership_vectors = hdbscan.prediction.membership_vector(skills_clusterer, skills_to_predict)

In [0]:
skills_clusters_full_denoised = skills_clusters_full \
                    .withColumn("soft_cluster_temp", get_soft_cluster_udf(F.col("embedding"))) \
                    .withColumn("soft_prob_temp", get_soft_prob_udf(F.col("embedding"))) \
                    .withColumn("soft_cluster", F.when(F.col("soft_cluster_temp").isNull(), 
                                                       F.col("soft_cluster")).otherwise(F.col("soft_cluster_temp"))) \
                    .drop("soft_cluster_temp") \
                    .withColumn("soft_probability", F.when(F.col("soft_prob_temp").isNull(), 
                                                           F.col("soft_probability")).otherwise(F.col("soft_prob_temp"))) \
                    .drop("soft_prob_temp", "embedding", "sentence_cleaned")
skills_clusters_full_denoised.persist()

Out[82]: DataFrame[index: int, sentence: string, cluster: bigint, soft_cluster: bigint, probability: double, soft_probability: double]

In [0]:
skills_clusters_full_denoised_fix = skills_clusters_full_denoised \
    .join(skills_clusters_full.selectExpr("index", "soft_probability as soft_prob_initial", "embedding", "sentence_cleaned"), "index", "left_outer")

In [0]:
skills_clusters_full_denoised_fix = skills_clusters_full_denoised_fix \
    .withColumn("probability", F.when(F.col("soft_prob_initial") == 0, 
                                F.col("soft_probability")).otherwise(F.col("probability"))) \
    .drop("soft_probability").withColumnRenamed("soft_prob_initial", "soft_probability") \
    .withColumn("soft_cluster", F.when((F.col("soft_probability") > 0) & (F.col("soft_probability") < 0.004), 
                                F.lit(-1)).otherwise(F.col("soft_cluster"))) \
    .withColumn("soft_cluster", F.when((F.col("probability") < 0.2), 
                                F.lit(-1)).otherwise(F.col("soft_cluster")))

In [0]:
skills_clusters_full_denoised_fix.count()

Out[210]: 75588