In [0]:
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, udf, lower, substring, broadcast
from pyspark.sql.types import StringType, ArrayType, FloatType
from gensim.models import Word2Vec
from scipy.spatial.distance import cosine
import numpy as np
import re

In [0]:
file_location_4 = "/FileStore/tables/linkedin_124k_kaggle.parquet"
file_type = "parquet"

linkedin_124k_kaggle = spark.read.format(file_type).load(file_location_4)

In [0]:
classification = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_location_3)


In [0]:

# Stopwords list for cleaning descriptions
stopwords = set([
    # Conjunctions and prepositions
    "a", "an", "the", "and", "or", "but", "nor", "so", "yet", "for", "because", "since", "although", "though", 
    "whereas", "while", "unless", "until", "whether", "if", "then", "else", "beside", "between", "among", "through",
    "against", "along", "upon", "before", "after", "above", "below", "within", "without", "during", "across", "off", 
    "via", "per", "amid", "amongst", "throughout", "whereby", "wherein",

    # Question words
    "what", "who", "whom", "which", "whose", "where", "when", "why", "how", "whenever", "wherever", 
    "whichever", "whoever", "whomever",

    # Numbers (both words and digits)
    "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve", 
    "thirteen", "fourteen", "fifteen", "sixteen", "seventeen", "eighteen", "nineteen", "twenty", 
    "thirty", "forty", "fifty", "sixty", "seventy", "eighty", "ninety", "hundred", "thousand", 
    "million", "billion", "trillion", "1", "2", "3", "4", "5", "6", "7", "8", "9", "0",

    # Time-related words
    "now", "then", "later", "soon", "yesterday", "today", "tomorrow", "tonight", "afterwards", 
    "earlier", "eventually", "recently", "previously", "lately", "once", "twice", "thrice", 
    "daily", "weekly", "monthly", "yearly", "always", "never", "sometimes", "often", "rarely", 
    "frequently", "occasionally", "asap", "immediately", "currently", "ongoing", "permanent", "temporary",

    # Directions and locations
    "up", "down", "left", "right", "front", "back", "forward", "backward", "toward", "away", 
    "inside", "outside", "upward", "downward", "north", "south", "east", "west", "northeast", 
    "northwest", "southeast", "southwest", "international", "regional", "global", "local",

    # Common terms in job descriptions
    "job", "position", "title", "career", "industry", "field", "department", "division", 
    "team", "section", "branch", "office", "company", "organization", "firm", "business", 
    "enterprise", "corporation", "startup", "venture", "client", "customer", "stakeholder", 
    "partner", "investor", "shareholder", "personnel", "staff", "employee", "applicant", 
    "candidate", "recruit", "hire", "vacancy", "opening",

    # General verbs (to avoid affecting relevant skills)
    "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "having", 
    "do", "does", "did", "doing", "can", "could", "shall", "should", "will", "would", 
    "may", "might", "must", "ought", "need", "dare",

    # Technical terms found in reports, documents, and publications
    "figure", "table", "section", "chapter", "paragraph", "appendix", "reference", "citation", 
    "footnote", "index", "abstract", "conclusion", "introduction", "summary", "discussion", 
    "results", "methodology", "analysis", "data", "survey", "experiment", "variable", 
    "statistic", "finding", "trend", "observation", "estimation", "approximation", 
    "prediction", "evaluation",

    # Common phrases in job advertisements
    "opportunity", "competitive", "growth", "develop", "training", "support", "exciting", "rewarding", 
    "innovative", "cutting-edge", "leading", "established", "recognized", "renowned", "progressive", 
    "collaborative", "inclusive", "diverse", "teamwork", "mentoring", "coaching", "partnership", 
    "leadership", "visionary", "impactful", "driven", "dedicated", "empowered", "engaging"
])

# Function to clean text and remove stopwords
def clean_text(text):
    if not text:
        return ""
    
    text = text.lower()  # Convert text to lowercase
    text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
    words = text.split()
    words = [word for word in words if word not in stopwords]  # Remove stopwords
    return " ".join(words)

clean_text_udf = udf(clean_text, StringType())

# Clean the description and other text fields in the tables
linkedin_124k_kaggle = linkedin_124k_kaggle.withColumn("clean_description", clean_text_udf(lower(col("description"))))
classification = classification.withColumn("clean_onet_description", clean_text_udf(lower(col("O*NET-SOC 2019 Description"))))

# Truncate the description to 600 characters after cleaning
linkedin_124k_kaggle = linkedin_124k_kaggle.withColumn("short_description", substring(col("clean_description"), 1, 600))
classification = classification.withColumn("short_onet_description", substring(col("clean_onet_description"), 1, 600))

# Use Broadcast to speed up classification
classification = broadcast(classification)

# Train Word2Vec on all descriptions from both tables
job_sentences = [row["short_description"].split() for row in linkedin_124k_kaggle.select("short_description").distinct().collect()]
onet_sentences = [row["short_onet_description"].split() for row in classification.select("short_onet_description").distinct().collect()]

word2vec_model = Word2Vec(job_sentences + onet_sentences, vector_size=100, window=5, min_count=1, workers=8)

# Generate vectors for descriptions in both tables
def get_vector(description):
    words = description.split()
    vectors = [word2vec_model.wv[word] for word in words if word in word2vec_model.wv]
    if vectors:
        return np.mean(vectors, axis=0).tolist()  # Compute mean vector
    else:
        return np.zeros(word2vec_model.vector_size).tolist()  # Return zero vector if no valid words

get_vector_udf = udf(get_vector, ArrayType(FloatType()))

linkedin_124k_kaggle = linkedin_124k_kaggle.withColumn("description_vector", get_vector_udf(col("short_description")))
classification = classification.withColumn("onet_vector", get_vector_udf(col("short_onet_description")))

# Match each title based on vector similarity
onet_vectors = classification.select("O*NET-SOC 2019 Title", "onet_vector").rdd.collectAsMap()

def find_best_match(description_vector):
    if not description_vector:
        return "Other"
    
    best_match = "Other"
    best_score = float("inf")
    
    for onet_title, onet_vector in onet_vectors.items():
        distance = cosine(description_vector, onet_vector)
        if distance < best_score:
            best_score = distance
            best_match = onet_title
    
    return best_match

find_best_match_udf = udf(find_best_match, StringType())

linkedin_124k_kaggle = linkedin_124k_kaggle.withColumn("title_elad_new", find_best_match_udf(col("description_vector")))

# Check unique values before and after matching
columns_to_check = ["title", "title_elad_new"]

for col_name in columns_to_check:
    unique_count = linkedin_124k_kaggle.select(col(col_name)).distinct().count()
    print(f"Column: {col_name} | Unique values: {unique_count}")

# Display sample results
linkedin_124k_kaggle.select("title", "title_elad_new").distinct().show(50, truncate=False)
