In [0]:
import pandas as pd
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.types import StringType, ArrayType,BooleanType
from langdetect import detect, DetectorFactory
import tensorflow_hub as hub
from sklearn.cluster import DBSCAN
import google.generativeai as genai
import seaborn as sns
import matplotlib.pyplot as plt
import torch
from scipy.cluster.hierarchy import linkage, fcluster, dendrogram
import tensorflow
from sklearn.metrics import silhouette_score
from pyspark.sql.functions import col, explode, count, desc, collect_list, udf, concat_ws, row_number
from pyspark.sql.window import Window
import re
import matplotlib.pyplot as plt
import google.generativeai as genai
import time
from sklearn.metrics import davies_bouldin_score
from sklearn.metrics import calinski_harabasz_score
from sklearn.metrics import silhouette_samples
from sklearn.metrics import pairwise_distances
from transformers import AutoTokenizer, AutoModelForTokenClassification
import json
import ast
spark = SparkSession.builder.getOrCreate()

**1. Prepare the Data**

In [0]:
pandas_df = pd.read_parquet("/Workspace/Users/noor.abu@campus.technion.ac.il/data/glassdoor_jobs.parquet")
pandas_df2= pd.read_parquet("/Workspace/Users/noor.abu@campus.technion.ac.il/data/glassdoor_jobs (1).parquet")
df = spark.createDataFrame(pandas_df)
df2 = spark.createDataFrame(pandas_df2)
df = df.union(df2)
print(df.count())
df = df.drop_duplicates()
print(df.count())

In [0]:
state_dict = {
  "AL": "Alabama", "AK": "Alaska", "AZ": "Arizona", "AR": "Arkansas",
    "CA": "California", "CO": "Colorado", "CT": "Connecticut", "DE": "Delaware",
    "FL": "Florida", "GA": "Georgia", "HI": "Hawaii", "ID": "Idaho",
    "IL": "Illinois", "IN": "Indiana", "IA": "Iowa", "KS": "Kansas",
    "KY": "Kentucky", "LA": "Louisiana", "ME": "Maine", "MD": "Maryland",
    "MA": "Massachusetts", "MI": "Michigan", "MN": "Minnesota", "MS": "Mississippi",
    "MO": "Missouri", "MT": "Montana", "NE": "Nebraska", "NV": "Nevada",
    "NH": "New Hampshire", "NJ": "New Jersey", "NM": "New Mexico", "NY": "New York",
    "NC": "North Carolina", "ND": "North Dakota", "OH": "Ohio", "OK": "Oklahoma",
    "OR": "Oregon", "PA": "Pennsylvania", "RI": "Rhode Island", "SC": "South Carolina",
    "SD": "South Dakota", "TN": "Tennessee", "TX": "Texas", "UT": "Utah",
    "VT": "Vermont", "VA": "Virginia", "WA": "Washington", "WV": "West Virginia",
    "WI": "Wisconsin", "WY": "Wyoming",
    "Minnesota": "Minnesota",
    "DC": "District of Columbia",
    "Georgia": "Georgia",
    "Oahu Island": "Hawaii",
    "Utah": "Utah",
    "PR": "Puerto Rico",
    "Dimmit": "Texas",
    "Missouri": "Missouri",
    "Prince George's": "Maryland",
    "Island of Hawai‘i": "Hawaii",
    "Indiana": "Indiana",
    "Tennessee": "Tennessee",
    "Wisconsin": "Wisconsin",
    "South Carolina": "South Carolina",
    "Virginia": "Virginia",
    "Kentucky": "Kentucky",
    "Harris": "Texas",
    "Nebraska": "Nebraska",
    "Oregon": "Oregon",
    "Mississippi": "Mississippi",
    "Ohio": "Ohio",
    "Illinois": "Illinois",
    "Texas": "Texas",
    "Arizona": "Arizona",
    "Nevada": "Nevada",
    "Mecklenburg": "North Carolina",
    "Kansas": "Kansas",
    "North Carolina": "North Carolina",
    "Washington State": "Washington",
    "New Jersey": "New Jersey",
    "Pennsylvania": "Pennsylvania",
    "Henderson": "Nevada",
    "Arkansas": "Arkansas",
    "Connecticut": "Connecticut",
    "Alaska": "Alaska",
    "Long Island-Queens": "New York",
    "Maine": "Maine",
    "Remote":"Remote",
    "United States": "United States",
    "Iowa": "Iowa",
    "New York State": "New York",
    "Puerto Rico": "Puerto Rico",
    "Michigan": "Michigan",
    "Montana": "Montana",
    "Colorado": "Colorado",
    "Minneapolis-Saint Paul": "Minnesota",
    "Dallas-Fort Worth": "Texas",
    "Alabama": "Alabama",
    "New Mexico": "New Mexico",
    "Redstone Arsenal": "Alabama",
    "Bell": "California",
    "Cuyahoga": "Ohio",
    "Oakland": "California",
    "California": "California",
    "GU": "Guam", 
      "Bergen": "New Jersey",  
    "Maryland": "Maryland",
    "Massachusetts": "Massachusetts",
    "Florida": "Florida",
    "St Louis Park": "Minnesota",
    "California": "California",
    "Ford Island": "Hawaii",
}
df_updated = df.withColumn(
    "Location",
    when(col("Location").contains(","), trim(split(col("Location"), ",")[1]))
    .otherwise(col("Location"))
)
def map_state(location):
    return state_dict.get(location, None)

state_udf = udf(map_state, StringType())

df_updated = df_updated.withColumn("Full State Name", state_udf(df_updated["Location"]))
df_updated = df_updated.na.drop(subset=["Full State Name"])
df_updated.display()


**2. Cleaning the Data**

In [0]:
df_updated = df_updated.withColumn("job_title_lower", lower(col("Job Title")))
df_updated = df_updated.withColumn("job_title_cleaned", regexp_replace(col("job_title_lower"), "[^a-zA-Z\s]", " "))
custom_stop_words = ["per", "hr", "hour", "full", "time", "annual", "salary", "k", "fulltime","days","hours","hrs","day","parttime","part","years","year","am","pm","Part-time"]

tokenizer = Tokenizer(inputCol="job_title_cleaned", outputCol="job_title_tokens")
df_updated = tokenizer.transform(df_updated)

stop_words_remover = StopWordsRemover(inputCol="job_title_tokens", outputCol="cleaned_tokens")
stop_words_remover.setStopWords(stop_words_remover.getStopWords() + custom_stop_words)  
df_updated = stop_words_remover.transform(df_updated)

def tokens_to_string(tokens):
    return " ".join(tokens)

combine_udf = udf(tokens_to_string, StringType())
df_updated = df_updated.withColumn("cleaned_job_title", combine_udf(col("cleaned_tokens"))).select("Job Title", "Location", "Description", "Full State Name", "cleaned_job_title")
def detect_language(text):
    try:
        return detect(text)
    except:
        return "unknown" 

lang_udf = udf(detect_language, StringType())


df_updated = df_updated.withColumn("language", lang_udf(col("Description")))
df_updated = df_updated.filter(col("language") == "en")


**3. Creating Clusters**

In [0]:
@pandas_udf(ArrayType(FloatType()))
def generate_embeddings(job_titles: pd.Series) -> pd.Series:
    model = hub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
    embeddings = model(job_titles.tolist()).numpy()
    return pd.Series(embeddings.tolist())

df_with_embeddings = df_updated.withColumn("embeddings", generate_embeddings(df_updated["cleaned_job_title"]))

pandas_df = df_with_embeddings.select("Job Title","Full State Name","Description","cleaned_job_title", "embeddings").toPandas()
embeddings_np = np.array(pandas_df["embeddings"].tolist())
linkage_matrix = linkage(embeddings_np, method='ward')

plt.figure(figsize=(10, 7))
dendrogram(linkage_matrix, truncate_mode='level', p=5)
plt.title("Dendrogram")
plt.xlabel("Job Titles")
plt.ylabel("Distance")
plt.show()

threshold = 3 
pandas_df['cluster'] = fcluster(linkage_matrix, threshold, criterion='distance')

df_with_clusters = spark.createDataFrame(pandas_df)
print(df_with_clusters.select("cluster").distinct().count())


In [0]:
genai.configure(api_key="API key")
model = genai.GenerativeModel("gemini-1.5-flash")
cluster_descriptions = df_with_clusters.groupBy("cluster").agg(
    concat_ws(", ", collect_list("cleaned_job_title")).alias("job_titles")
)

cluster_descriptions_pd = cluster_descriptions.toPandas()

def generate_cluster_names(job_titles_list, delay=5):
    cluster_names = []
    for job_titles in job_titles_list:
        if len(job_titles) > 1024:
            job_titles = job_titles[:1024]
        
        prompt = f"Here are job titles: {job_titles}. Provide a short and general name for this cluster:"
        try:
            response = model.generate_content(prompt)
            cluster_name = response.text.strip() if response and response.candidates else "Unknown Cluster"
        except Exception as e:
            cluster_name = f"Error: {str(e)}"
        cluster_names.append(cluster_name)
        
        time.sleep(delay)
    return cluster_names
cluster_descriptions_pd["cluster_name"] = generate_cluster_names(cluster_descriptions_pd["job_titles"].tolist())

cluster_descriptions_with_names = spark.createDataFrame(cluster_descriptions_pd)

cluster_descriptions_with_names.display()

In [0]:
pd_df = pd.read_csv("/PATH/cluster_with_names.csv")
cluster_descriptions_with_names = spark.createDataFrame(pd_df)
df_with_clusters_named = cluster_descriptions_with_names.join(df_with_clusters, on="cluster")

In [0]:
pd_df = pd.read_csv("/PATH/JOINED_DATA.csv")
df_with_clusters_named = spark.createDataFrame(pd_df)

**How Good is our Cluster?**

In [0]:
schema = StructType([
    StructField("cleaned_job_title", StringType(), True),
    StructField("cluster", IntegerType(), True),
    StructField("embeddings", ArrayType(FloatType()), True)
])

parsed_rdd = df_with_clusters_named.rdd.map(
    lambda row: (
        row['cleaned_job_title'], 
        row['cluster'], 
        np.fromstring(row['embeddings'].strip("[]"), sep=",").tolist()
    )
)
parsed_df = spark.createDataFrame(parsed_rdd, schema)

features = np.array(parsed_df.select("embeddings").rdd.map(lambda row: row[0]).collect())
clusters = np.array(parsed_df.select("cluster").rdd.map(lambda row: row[0]).collect())

features = np.vstack(features)

calinski_harabasz = calinski_harabasz_score(features, clusters)
print("Calinski-Harabasz Index:", calinski_harabasz)
distances = pairwise_distances(features)
intra_cluster_distances = []


for cluster in np.unique(clusters):
    cluster_indices = np.where(clusters == cluster)[0]
    cluster_points = distances[np.ix_(cluster_indices, cluster_indices)]
    
    avg_distance = cluster_points.sum() / (len(cluster_indices) ** 2)
    intra_cluster_distances.append(avg_distance)

intra_cluster_cohesion = np.mean(intra_cluster_distances)
print("Intra-Cluster Cohesion (Overall Average Within-Cluster Distance):", intra_cluster_cohesion)

**Extracting Skills from Description**

In [0]:
df_with_clusters_named = df_with_clusters_named.withColumn("Description", regexp_replace(col("Description"), r"[\n\r\t]", "  "))
df_with_clusters_named = df_with_clusters_named.withColumn("description_lower ", lower(col("Description")))
df_with_clusters_named = df_with_clusters_named.withColumn("Description_cleaned", regexp_replace(col("description_lower "), "[^a-zA-Z\s]", "  "))

df_with_clusters_named = df_with_clusters_named.drop("description_lower")
df_with_clusters_named.display()

tokenizer = AutoTokenizer.from_pretrained("jjzha/jobbert_skill_extraction")
model = AutoModelForTokenClassification.from_pretrained("jjzha/jobbert_skill_extraction")
def chunk_text(text, max_length=512):
    tokens = tokenizer.tokenize(text)
    chunks = []
    for i in range(0, len(tokens), max_length - 2):  
        chunk = tokens[i:i + max_length - 2]
        chunks.append(tokenizer.convert_tokens_to_string(chunk))
    return chunks

def extract_skills(description):
    chunks = chunk_text(description)
    skills = []
    for chunk in chunks:
        inputs = tokenizer(chunk, return_tensors="pt", truncation=True, padding=True, max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)
        logits = outputs.logits
        predicted_class_ids = torch.argmax(logits, dim=2)
        tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
        predicted_classes = [model.config.id2label[class_id.item()] for class_id in predicted_class_ids[0]]

        skill = ""
        for token, predicted_class in zip(tokens, predicted_classes):
            if "#" not in token:
                if predicted_class == "B":
                    if skill:
                        skills.append(skill.strip())
                    skill = token
                elif predicted_class == "I":
                    skill += f" {token} "
        if skill:
            skills.append(skill.strip())
    return skills
extract_skills_udf = udf(extract_skills, ArrayType(StringType()))

result_df = df_with_clusters_named.withColumn("skills1", extract_skills_udf("Description_cleaned"))
df = result_df.toPandas()
df.to_csv("/PATH/extract_skills.csv")

In [0]:
pd_df = pd.read_csv("/PATH/extract_skills.csv")
df_with_clusters = spark.createDataFrame(pd_df)
tokenizer = AutoTokenizer.from_pretrained("jjzha/jobbert_knowledge_extraction")
model = AutoModelForTokenClassification.from_pretrained("jjzha/jobbert_knowledge_extraction")
def chunk_text(text, max_length=512):
    tokens = tokenizer.tokenize(text)
    chunks = []
    for i in range(0, len(tokens), max_length - 2):
        chunk = tokens[i:i + max_length - 2]
        chunks.append(tokenizer.convert_tokens_to_string(chunk))
    return chunks

def extract_skills(description):
    chunks = chunk_text(description)
    skills = []
    for chunk in chunks:
        inputs = tokenizer(chunk, return_tensors="pt", truncation=True, padding=True, max_length=512)
        with torch.no_grad():
            outputs = model(**inputs)
        logits = outputs.logits
        predicted_class_ids = torch.argmax(logits, dim=2)
        tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
        predicted_classes = [model.config.id2label[class_id.item()] for class_id in predicted_class_ids[0]]

        skill = ""
        for token, predicted_class in zip(tokens, predicted_classes):
            if "#" not in token:
                if predicted_class == "B":
                    if skill:
                        skills.append(skill.strip())
                    skill = token
                elif predicted_class == "I":
                    skill += f" {token} "
        if skill:
            skills.append(skill.strip())
    return skills
extract_skills_udf = udf(extract_skills, ArrayType(StringType()))

result_df2 = result_df.withColumn("skills2", extract_skills_udf("Description_cleaned"))
df = result_df2.toPandas()
df.to_csv("/PATH/extract_skills_full.csv")


In [0]:

def combine_skills_udf(skills1, skills2):
    try:
        def extract_skills(skills):
            if not skills or skills == "[]":
                return []
            skills = re.findall(r"'(.*?)'", skills)
            return [skill.strip() for skill in skills]

        skills1_list = extract_skills(skills1)
        skills2_list = extract_skills(skills2)
        combined = list(set(skills1_list + skills2_list))
        return combined
    except Exception as e:
        return []

combine_skills = udf(combine_skills_udf, ArrayType(StringType()))

df = pd.read_csv("/PATH/extract_skills_full.csv")
spark_df = spark.createDataFrame(df)
combined = spark_df.join(df_updated, on="cleaned_job_title")

df_combined = combined.withColumn(
    "combined_skills",
    combine_skills(col("skills1"), col("skills2"))
)

skills_exploded = df_combined.withColumn("skill", explode(col("combined_skills")))
skills_count = skills_exploded.groupBy("Full State Name", "skill").agg(count("skill").alias("count"))

window_spec = Window.partitionBy("Full State Name").orderBy(desc("count"))
ranked_skills = skills_count.withColumn("rank", row_number().over(window_spec))
top_skills_per_state = ranked_skills.filter(col("rank") <= 5).orderBy("Full State Name", "rank")

state_skills_combined = top_skills_per_state.groupBy("Full State Name") \
    .agg(collect_list(col("skill")).alias("Top_Skills"))

state_skills_combined.display()


In [0]:
def combine_skills_udf(skills1, skills2):
    try:
        def extract_skills(skills):
            if not skills or skills == "[]":
                return []
            skills = re.findall(r"'(.*?)'", skills)
            return [skill.strip() for skill in skills]

        skills1_list = extract_skills(skills1)
        skills2_list = extract_skills(skills2)
        combined = list(set(skills1_list + skills2_list))
        return combined
    except Exception as e:
        return []

combine_skills_udf = udf(combine_skills_udf, ArrayType(StringType()))
spark_df = spark_df.withColumn(
    "combined_skills",
    combine_skills_udf(col("skills1"), col("skills2"))
)
df_cluster_skills = (
    spark_df.groupBy("cluster")
    .agg(collect_list("combined_skills").alias("skills_nested_array"))
    .withColumn("skills", flatten("skills_nested_array"))
    .select("cluster", "skills")
)
df_cluster_skills.display()


**Profiles Data**

In [0]:
profiles = spark.read.parquet('/dbfs/linkedin_people_train_data')
profiles = profiles.replace(["N/A", "None","null",""], None)
selected_profiles = profiles.drop('avatar','country_code','current_company:company_id'
,'current_company:name','educations_details', 'followers', 'following','groups', 'people_also_viewed', 'posts', 'recommendations_count', 'timestamp', 'url')

In [0]:
def total_degrees_fileds(education):
    Nulls = [None,"Null","null","None","","N/A"]
    if education in Nulls:
        return None
    degree = []
    field = []
    for i in education:
        if i["degree"] not in Nulls:
            degree.append(i["degree"])
        if i["field"] not in Nulls:
            field.append(i["field"])
    return degree, field

degree_udf = udf(lambda edu: total_degrees_fileds(edu)[0], ArrayType(StringType()))
field_udf = udf(lambda edu: total_degrees_fileds(edu)[1], ArrayType(StringType()))
selected_profiles = selected_profiles.withColumn("degree", degree_udf(selected_profiles["education"])).withColumn("field", field_udf(selected_profiles["education"]))
def positions(experience):
    Nulls = [None,"Null","null","None","","N/A"]
    titles = []
    if experience in Nulls:
        return None
    for i in experience:
        if i["positions"] in Nulls:
            titles.append(i["title"])
        else:
             for j in i["positions"]:
                 if j["title"] not in Nulls:
                     titles.append(j["title"])
    return titles

positions_udf = udf(lambda exp: positions(exp), ArrayType(StringType()))
selected_profiles = selected_profiles.withColumn("previous_positions", positions_udf(selected_profiles["experience"]))
selected_profiles = selected_profiles.drop('experience').drop('education')
def total_languages(languages):
    Nulls = [None,"Null","null","None","","N/A"]
    if languages in Nulls:
        return None
    languages = []
    for i in languages:
        if i["title"] not in Nulls:
            languages.append(i["title"])
    return languages

language_udf = udf(lambda exp: total_languages(exp), ArrayType(StringType()))
selected_profiles = selected_profiles.withColumn("total_languages", language_udf(selected_profiles["languages"]))
def total_volunteering(volunteer_experience):
    Nulls = [None,"Null","null","None","","N/A"]
    volunteer = []
    if volunteer_experience in Nulls:
        return None
    for i in volunteer_experience:
        if i["title"] not in Nulls:
            volunteer.append(i["title"])
    return volunteer

volunteer_udf = udf(lambda exp: total_volunteering(exp), ArrayType(StringType()))
selected_profiles = selected_profiles.withColumn("total_volunteering", language_udf(selected_profiles["volunteer_experience"]))
def total_courses(courses):
    Nulls = [None,"Null","null","None","","N/A"]
    total_courses= []
    if courses in Nulls:
        return None
    for i in courses:
        if i["title"] not in Nulls:
            total_courses.append(i["title"])
    return total_courses
courses_udf = udf(lambda course: total_courses(course), ArrayType(StringType()))
selected_profiles = selected_profiles.withColumn("total_courses", courses_udf(selected_profiles["сourses"]))

def total_certifications(certifications):
    Nulls = [None,"Null","null","None","","N/A"]
    total_certifications= []
    if certifications in Nulls:
        return None
    for i in certidications:
        if i["title"] not in Nulls:
            total_certifications.append(i["title"])
    return total_certifications

certifications_udf = udf(lambda certification: total_certifications(certification), ArrayType(StringType()))
selected_profiles = selected_profiles.withColumn("total_certifications", courses_udf(selected_profiles["certifications"]))
selected_profiles = selected_profiles.drop('languages').drop('volunteer_experience')
selected_profiles = selected_profiles.drop('сourses')
selected_profiles.display()

In [0]:
selected_profiles = selected_profiles.withColumn("all_experiences",  concat(
        col("previous_positions"),  
        array("position"), 
        col("total_volunteering") 
    )
)
selected_profiles = selected_profiles.withColumn("qualifications", concat(
    col("total_languages"),
    col("total_courses"), 
    col("degree"),
    col("field"),
    col("total_certifications")))

selected_profiles = selected_profiles.drop('previous_positions').drop('position').drop('total_volunteering').drop('total_languages').drop('degree').drop('field').drop('total_courses').drop('total_certifications').drop('certifications')
selected_profiles.display()

**Matching Samples from Profiles to The Clusters**

In [0]:
sampled_df = selected_profiles.sample(withReplacement=False, fraction=0.015)

apis = ["API key"]
broadcast_api_key = spark.sparkContext.broadcast(apis[0])
current_key_index = 0
def match_clusters_with_genai(person_experience, person_qualifications, clusters):
    prompt = f"""
You are a clustering assistant. Match the following job experiences and qualifications to the most relevant clusters. 
If no experiences or qualifications are provided, assign the person to clusters that require no experience or qualifications.

Person's job experiences: {person_experience if len(person_experience) else "No experience provided"}
Person's qualifications: {person_qualifications if len(person_qualifications) else "No qualifications provided"}
Available clusters and their jobs: {clusters}

Respond with a comma-separated list of cluster numbers. If no exact match is found, include the most relevant clusters.
"""

    global current_key_index
    genai.configure(api_key=apis[current_key_index])  
    model = genai.GenerativeModel("gemini-1.5-flash")
    try:
        response = model.generate_content(prompt)
        sleep(2)
        matches = response.text.strip() if response and response.candidates else ""
        time.sleep(8)
        return matches.split(",")
    except Exception as e:
        error_message = str(e)
        if "exhausted" in error_message or "request limit per minute" in error_message:
                current_key_index = (current_key_index + 1) % len(apis)
                print(f"Switching to the next API key: {apis[current_key_index % len(apis)]}")
        else:
            return [f"Error during API call: {e}"]
        

match_udf = udf(
    lambda experience, qualifications, cluster_jobs: match_clusters_with_genai(experience, qualifications, cluster_jobs),
    ArrayType(StringType())
)

clusters_df = df_with_clusters_named.select("cluster", "job_titles")
clusters_dict = clusters_df.rdd.map(lambda row: (row["cluster"], row["job_titles"])).collectAsMap()
clusters_str = ", ".join([f"Cluster {k}: {v}" for k, v in clusters_dict.items()])

result_df = sampled_df.withColumn(
    "matched_clusters",
    match_udf(col("all_experiences"), col("qualifications"), lit(clusters_str)) 
)
df = result_df.toPandas()
df.to_csv("/PATH/profiles_with_clusters_sample_6.csv")# change the number each time you run

**Adding Skills for Each Samples Profiles**

In [0]:
df1 = pd.read_csv("/PATH/profiles_with_clusters_sample_1.csv")#change the number to the sample you want to work with
df1 = spark.createDataFrame(df1)
df1.display()
def extract_clusters_udf(clusters):
    try:
        if not clusters or clusters == "[]":  
            return []
        clusters_list = re.findall(r"'(.*?)'", clusters)
        return [cluster.strip() for cluster in clusters_list]  
    except Exception as e:
        return []

extract_clusters = udf(extract_clusters_udf, ArrayType(StringType()))
df_extracted_clusters = df1.withColumn("extracted_clusters", extract_clusters(col("matched_clusters")))

df_extracted_clusters.display()

In [0]:

skills_exploded = df_cluster_skills.withColumn("skills_exploded", explode(col("skills")))
skills_count = skills_exploded.groupBy("cluster", "skills_exploded").agg(count("skills_exploded").alias("count"))

window_spec = Window.partitionBy("cluster").orderBy(desc("count"))
ranked_skills = skills_count.withColumn("rank", row_number().over(window_spec))
top_skills_per_cluster = ranked_skills.filter(col("rank") <= 5).orderBy("cluster", "rank")


top_cluster_skills = top_skills_per_cluster.groupBy("cluster") \
    .agg(collect_list(col("skills_exploded")).alias("Top_Skills"))

top_cluster_skills.display()

In [0]:
df_exploded = df_extracted_clusters.withColumn("cluster", explode(col("extracted_clusters")))
df_joined = df_exploded.join(top_cluster_skills, on="cluster", how="left")

df_skills = df_joined.groupBy("id").agg(collect_list("Top_Skills").alias("skills"))
df_result = df_extracted_clusters.join(df_skills, on="id", how="left")

used_profiles = df_result.select("id", "about", "city", "name", "all_experiences", "qualifications", "skills")
used_profiles.display()

In [0]:
def city_to_state(city):
    if not city:
        return "United States"
    states = state_dict.values()
    for state in states:
        if state in city:
            return state
        
city_to_state_udf = udf(city_to_state, StringType())
used_profiles = used_profiles.withColumn("state", city_to_state_udf(col("city")))
used_profiles = used_profiles.drop("city")


In [0]:
def extract_list_udf(input_str):
    try:
        if not input_str or input_str == "[]":  
            return []
        # Extract items inside single quotes
        extracted_list = re.findall(r"'(.*?)'", input_str)
        return [item.strip() for item in extracted_list]
    except Exception as e:
        return []

# Register the UDF
extract_list = udf(extract_list_udf, ArrayType(StringType()))
used_profiles = (
    used_profiles.withColumn("qualifications_list", extract_list(col("qualifications")))
       .withColumn("experience_list", extract_list(col("all_experiences")))
)
used_profiles = used_profiles.withColumn("flattened_skills", array_distinct(flatten(col("skills"))))
used_profiles = used_profiles.drop("skills")
used_profiles = used_profiles.withColumnRenamed("flattened_skills", "skills")
used_profiles.display()

In [0]:
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import pandas as pd

             
bert_model = SentenceTransformer('all-MiniLM-L6-v2')


def get_average_embedding(text_list):
    if not text_list:
        return np.zeros(bert_model.get_sentence_embedding_dimension())  
    embeddings = bert_model.encode(text_list, convert_to_tensor=False)
    return np.mean(embeddings, axis=0)  


def calculate_similarity(experience, qualification, added_skills):

       
        experience = experience if isinstance(experience, list) else []
        qualification = qualification if isinstance(qualification, list) else []
        added_skills = added_skills if isinstance(added_skills, list) else []
  
        if not experience and not qualification or not added_skills:
            return 0.0
   
        experience_embedding = get_average_embedding(experience)
        qualification_embedding = get_average_embedding(qualification)
        skill_embeddings = bert_model.encode(added_skills, convert_to_tensor=False)
        similarities1 = cosine_similarity([experience_embedding], skill_embeddings)[0]
        similarities2 = cosine_similarity([qualification_embedding], skill_embeddings)[0]

        if  float(np.mean(similarities1)) >  float(np.mean(similarities2)):
            return float(np.mean(similarities1))
        else:
            return float(np.mean(similarities2))

  




similarity_udf = udf(calculate_similarity, DoubleType())


profiles_with_skills = used_profiles.filter(
    col("skills").isNotNull()&
    col("qualifications").isNotNull() & 
    col("all_experiences").isNotNull()&
   ( col("qualifications") != "[]") &
   (col("all_experiences") != "[]") &
   (col("all_experiences") != "[--]"))

result_df = profiles_with_skills.withColumn(
    "similarity_score", 
    similarity_udf(
        profiles_with_skills["experience_list"], 
        profiles_with_skills["qualifications_list"], 
        profiles_with_skills["skills"]
    )
)

average_similarity = result_df.agg(mean("similarity_score").alias("avg_similarity_score")).collect()[0]["avg_similarity_score"]


result_df.display()  
print(f"Average Similarity Score: {average_similarity:.4f}")

**Courses Data**

In [0]:
coursera_courses = pd.read_csv("/PATH/coursera_course_dataset.csv")
coursera_courses = spark.createDataFrame(coursera_courses)
coursera_courses.display()
edx_courses = pd.read_csv("/PATH/data/edx.csv")
edx_courses = spark.createDataFrame(edx_courses)
edx_courses.display()

In [0]:
def convert_to_list(associatedskills):
    try:
        if associatedskills:
            return  associatedskills.split(",") 
        return None 
    except Exception as e:
        return None

convert_to_list_udf = udf(convert_to_list, ArrayType(StringType()))
edx_courses = edx_courses.withColumn("associatedskills_list", convert_to_list_udf(col("associatedskills")))

edx_courses = edx_courses.drop("associatedskills")
edx_courses.display()

Recommending Courses Based on Top Skills in Each State

In [0]:
!python -m spacy download en_core_web_md

In [0]:
profiles_without_skills = used_profiles.filter(col("skills").isNull())
profiles_without_skills_sampled = profiles_without_skills.limit(200)
profiles_without_skills_sampled.display()
import spacy
from pyspark.sql.functions import udf, col, broadcast, collect_list, explode
from pyspark.sql.types import ArrayType, FloatType, StringType
from sklearn.metrics.pairwise import cosine_similarity

nlp = spacy.load("en_core_web_md")

def compute_vectors(skill_list):
    return [nlp(skill).vector.tolist() for skill in skill_list] if skill_list else []

compute_vectors_udf = udf(compute_vectors, ArrayType(ArrayType(FloatType())))

state_skills_combined = state_skills_combined.withColumn(
    "skill_vectors", compute_vectors_udf(col("Top_Skills"))
)
edx_courses = edx_courses.withColumn(
    "course_skill_vectors", compute_vectors_udf(col("associatedskills_list"))
)


state_skills_combined_broadcast = broadcast(state_skills_combined)
edx_courses_broadcast = broadcast(edx_courses)


def find_similar_skills(skill_vectors, course_skill_vectors, threshold=0.8):
    if not skill_vectors or not course_skill_vectors:
        return []
    matched_skills = []
    for skill_vec in skill_vectors:
        for course_skill_vec in course_skill_vectors:
            similarity = cosine_similarity([skill_vec], [course_skill_vec])[0][0]
            if similarity >= threshold:
                return True
    return False

find_similar_skills_udf = udf(find_similar_skills, BooleanType())


recommendations = (
    profiles_without_skills
    .join(state_skills_combined_broadcast, col("Full State Name") == col("state"))
    .crossJoin(edx_courses_broadcast)
    .withColumn(
        "matched_skills",
        find_similar_skills_udf(col("skill_vectors"), col("course_skill_vectors"))
    )
    .filter(col("matched_skills")==True)
)


recommendations_grouped = recommendations.groupBy("id").agg(
    collect_list("title").alias("recommended_courses")
)
df = recommendations_grouped.toPandas()
df.to_csv("/PATH/recommendations_grouped_edx.csv")

In [0]:
def convert_to_list(associatedskills):
    try:
        if associatedskills:
            return  associatedskills.split(",") 
        return None 
    except Exception as e:
        return None

convert_to_list_udf = udf(convert_to_list, ArrayType(StringType()))
coursera_courses = coursera_courses.withColumn("associatedskills_list", convert_to_list_udf(col("Skills")))



In [0]:
import spacy
from pyspark.sql.functions import udf, col, broadcast, collect_list, explode
from pyspark.sql.types import ArrayType, FloatType, StringType
from sklearn.metrics.pairwise import cosine_similarity

nlp = spacy.load("en_core_web_md")

def compute_vectors(skill_list):
    return [nlp(skill).vector.tolist() for skill in skill_list] if skill_list else []

compute_vectors_udf = udf(compute_vectors, ArrayType(ArrayType(FloatType())))

state_skills_combined = state_skills_combined.withColumn(
    "skill_vectors", compute_vectors_udf(col("Top_Skills"))
)
coursera_courses = coursera_courses.withColumn(
    "course_skill_vectors", compute_vectors_udf(col("associatedskills_list"))
)

state_skills_combined_broadcast = broadcast(state_skills_combined)
coursera_courses_broadcast = broadcast(coursera_courses)


def find_similar_skills(skill_vectors, course_skill_vectors, threshold=0.8):
    if not skill_vectors or not course_skill_vectors:
        return []
    matched_skills = []
    for skill_vec in skill_vectors:
        for course_skill_vec in course_skill_vectors:
            similarity = cosine_similarity([skill_vec], [course_skill_vec])[0][0]
            if similarity >= threshold:
                return True
    return False

find_similar_skills_udf = udf(find_similar_skills, BooleanType())


recommendations = (
    profiles_without_skills
    .join(state_skills_combined_broadcast, col("Full State Name") == col("state"))
    .crossJoin(coursera_courses_broadcast)
    .withColumn(
        "matched_skills",
        find_similar_skills_udf(col("skill_vectors"), col("course_skill_vectors"))
    )
    .filter(col("matched_skills")==True)
)


recommendations_grouped = recommendations.groupBy("id").agg(
    collect_list("title").alias("recommended_courses")
)
df = recommendations_grouped.toPandas()
df.to_csv("/PATH/recommendations_grouped_coursea.csv")
