# Project Data Lab

## Imports

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, size, explode, regexp_replace, when, collect_list, expr, udf
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, FloatType
from sentence_transformers import SentenceTransformer
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import numpy as np
from scipy.spatial.distance import cosine
import re
import pandas as pd
import matplotlib.pyplot as plt
from sentence_transformers import SentenceTransformer
from sklearn.manifold import TSNE
from matplotlib import cm
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.functions import array_to_vector
from pyspark.ml.linalg import Vectors
from pyspark.sql.window import Window

2025-02-07 03:30:45.046792: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-02-07 03:30:45.125510: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-02-07 03:30:45.468806: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Parameters:

In [0]:
N = 20_000
seed = 42
k = 10
max_sentence_length = 512
show_null = False
save_to_dbfs = True
to_visualize = False
spark.conf.set("spark.sql.execution.rangeExchange.sampleRate", str(seed))

In [0]:
people = spark.read.parquet("/dbfs/linkedin_people_train_data")
# sampling the data so we dont have to wait 4 hours 
n = N
# Copy dataframe and select only a few rows
people_df = people.sample(False, n / people.count(), seed=seed)

In [0]:
index_model = 0
models_list = ['all-MiniLM-L6-v2', # Bert
                'all-distilroberta-v1', # roberta
                '',
                'multi-qa-distilbert-cos-v1' # distilberta
                ]   

In [0]:
def extract_all(column_name):
    """
    Extracts all values from a column
    @param column_name: The name of the column
    @return: None
    """
    global people_df
    people_df = people_df.withColumn(
        f"all_{column_name}s",
        F.expr(f"""
            FILTER(
                CONCAT(
                    transform(experience, x -> x['{column_name}']),
                    ARRAY(current_company['{column_name}'])
                ),
                x -> x is not null and x != 'null'
            )
        """)
    )

extract_all('company_id')
extract_all('title')

columns = ['id', 'all_company_ids', 'all_titles']
# people_df = people_df.select(*[col(c) for c in columns]) 

# people_df.display()

## Part 1 - Map Users Into Meta Industries:
1. Get all of the user's titles. 
2. Encode and compute their average enmbedding.
3. Assign the meta industry with the lowest similarity score. 

In [0]:
if show_null:
    total_count = people_df.count()

    # Count people where 'all_titles' is empty
    empty_experience_count = people_df.filter(F.size("all_titles") == 0).count()
    percentage_empty = (empty_experience_count / total_count) * 100 if total_count > 0 else 0

    print(f"Percentage of people with no experience: {percentage_empty:.2f}%")

In [0]:

meta_industries_jobs = {
    'Healthcare and Medical': [
        'Doctor',
        'Nurse',
        'Pharmacist',
        'Medical Technician',
        'Surgeon',
        'Physical Therapist',
        'Radiologist',
        'Dentist',
        'Psychologist',
        'Paramedic'
    ],
    'Media and Entertainment': [
        'Journalist',
        'Film Director',
        'Actor',
        'Graphic Designer',
        'Photographer',
        'Video Editor',
        'Music Producer',
        'Animator',
        'Public Relations Specialist',
        'Content Creator'
    ],
    'Technology': [
        'Software Engineer',
        'Data Scientist',
        'Cybersecurity Analyst',
        'IT Support Specialist',
        'Web Developer',
        'AI/ML Engineer',
        'DevOps Engineer',
        'Database Administrator',
        'Game Developer',
        'Network Administrator'
    ],
    'Government and Public Policy': [
        'Policy Analyst',
        'Legislative Assistant',
        'Public Relations Officer',
        'City Planner',
        'Government Program Manager',
        'Diplomat',
        'Civil Servant',
        'Economist',
        'Social Worker',
        'Law Enforcement Officer'
    ],
    'Real Estate and Construction': [
        'Real Estate Agent',
        'Architect',
        'Construction Manager',
        'Civil Engineer',
        'Surveyor',
        'Property Manager',
        'Interior Designer',
        'Urban Planner',
        'Structural Engineer',
        'Quantity Surveyor'
    ],
    'Financial and Investment': [
        'Financial Analyst',
        'Investment Banker',
        'Accountant',
        'Auditor',
        'Insurance Agent',
        'Loan Officer',
        'Stockbroker',
        'Tax Consultant',
        'Risk Manager',
        'Wealth Manager'
    ],
    'Transportation and Logistics': [
        'Logistics Coordinator',
        'Supply Chain Manager',
        'Truck Driver',
        'Freight Broker',
        'Warehouse Manager',
        'Customs Officer',
        'Air Traffic Controller',
        'Delivery Driver',
        'Fleet Manager',
        'Import/Export Coordinator'
    ],
    'Services': [
        'Customer Service Representative',
        'Event Planner',
        'Travel Agent',
        'Hotel Manager',
        'Personal Trainer',
        'Hair Stylist',
        'Security Guard',
        'Housekeeper',
        'Barista',
        'Waiter/Waitress'
    ],
    'Education and Training': [
        'Teacher',
        'Professor',
        'Tutor',
        'School Administrator',
        'Curriculum Developer',
        'Instructional Designer',
        'Corporate Trainer',
        'Education Consultant',
        'Librarian',
        'Special Education Teacher'
    ],
    'Manufacturing': [
        'Production Manager',
        'Quality Control Inspector',
        'Assembly Line Worker',
        'Industrial Engineer',
        'Machine Operator',
        'Supply Chain Analyst',
        'Mechanical Technician',
        'Warehouse Worker',
        'Manufacturing Engineer',
        'Materials Manager'
    ],
    'Retail and Consumer Goods': [
        'Retail Sales Associate',
        'Store Manager',
        'Merchandiser',
        'Buyer',
        'Cashier',
        'E-commerce Manager',
        'Inventory Specialist',
        'Customer Support Specialist',
        'Brand Manager',
        'Visual Merchandiser'
    ],
    'Miscellaneous': [
    "Ranch Manager",  # Ranching
    "Livestock Handler",  # Ranching
    "Petroleum Engineer",  # Oil and Gas
    "Oil Rig Worker",  # Oil and Gas
    "Mining Engineer",  # Mining
    "Geologist",  # Mining
    "Sports Coach",  # Spectator Sports
    "Event Coordinator",  # Spectator Sports
    "Nanotechnology Research Scientist",  # Nanotechnology Research
    "Materials Engineer",  # Nanotechnology Research
    "Chief Executive Officer (CEO)",  # Executive Offices
    "Military Officer",  # Armed Forces
    "Utility Engineer",  # Utilities
    "Non-Profit Director",  # Non-profit Organizations
    "Recreational Facility Manager",  # Recreational Facilities
    "Clergy Member",  # Religious Institutions
    "Fisheries Biologist",  # Fisheries
    "Factory Worker",  # Manufacturing
    "Librarian",  # Libraries
    "Writer"  # Artists and Writers
]
}
# [
#     # # Ranching & Farming
#     # 'Rancher',
#     # 'Farm Manager',
#     # 'Agricultural Technician',
#     # 'Livestock Veterinarian',
#     # 'Dairy Farmer',
#     # 'Crop Consultant',

#     # # Spectator Sports
#     # 'Professional Athlete',
#     # 'Sports Coach',
#     # 'Referee',
#     # 'Sports Commentator',
#     # 'Athletic Trainer',
#     # 'Team Manager',

#     # # Mining & Oil and Gas
#     # 'Mining Engineer',
#     # 'Geologist',
#     # 'Drilling Operator',
#     # 'Petroleum Engineer',
#     # 'Oil Rig Worker',
#     # 'Mineral Inspector',

#     # # Executive Offices
#     # 'Chief Executive Officer (CEO)',
#     # 'Chief Operating Officer (COO)',
#     # 'Chief Financial Officer (CFO)',
#     # 'Executive Assistant',
#     # 'Board Member',
#     # 'Corporate Strategist',

#     # # Nanotechnology Research
#     # 'Nanotechnology Engineer',
#     # 'Materials Scientist',
#     # 'Lab Technician',
#     # 'Biomedical Engineer',
#     # 'Research Scientist',
#     # 'Microelectronics Technician',

#     # # Non-profit Organizations & Civic/Social Organizations
#     # 'Non-profit Director',
#     # 'Fundraising Coordinator',
#     # 'Community Outreach Specialist',
#     # 'Volunteer Coordinator',
#     # 'Grant Writer',
#     # 'Social Worker',

#     # # Utilities
#     # 'Electrical Engineer',
#     # 'Power Plant Operator',
#     # 'Water Treatment Specialist',
#     # 'Energy Consultant',
#     # 'Gas Technician',
#     # 'Renewable Energy Specialist',

#     # # Armed Forces
#     # 'Army Officer',
#     # 'Naval Aviator',
#     # 'Infantry Soldier',
#     # 'Military Intelligence Analyst',
#     # 'Logistics Officer',
#     # 'Cybersecurity Specialist',

#     # # Recreational Facilities
#     # 'Park Ranger',
#     # 'Recreation Director',
#     # 'Lifeguard',
#     # 'Fitness Instructor',
#     # 'Outdoor Guide',
#     # 'Camp Director',

#     # # Fisheries
#     # 'Fisherman',
#     # 'Aquaculture Technician',
#     # 'Marine Biologist',
#     # 'Fishery Officer',
#     # 'Seafood Processor',
#     # 'Commercial Diver',

#     # # Religious Institutions
#     # 'Clergy Member',
#     # 'Chaplain',
#     # 'Theologian',
#     # 'Religious Counselor',
#     # 'Missionary',
#     # 'Worship Director',

#     # # Artists and Writers
#     # 'Painter',
#     # 'Sculptor',
#     # 'Author',
#     # 'Poet',
#     # 'Illustrator',
#     # 'Screenwriter',

#     # # Libraries
#     # 'Librarian',
#     # 'Archivist',
#     # 'Library Technician',
#     # 'Information Specialist',
#     # 'Digital Curator',
#     # 'Research Assistant',

#     # # Alternative Dispute Resolution
#     # 'Mediator',
#     # 'Arbitrator',
#     # 'Conflict Resolution Specialist',
#     # 'Legal Consultant',
#     # 'Conciliation Officer',
#     # 'Ombudsman'
#     'Miscellaneous'
# ]

In [0]:
meta_industries = list(meta_industries_jobs.keys())

model = SentenceTransformer(models_list[index_model])
if model.get_max_seq_length() < max_sentence_length:
    model.max_seq_length = max_sentence_length
def average_embedding(titles):
    """
    Computes the average embedding for a list of titles.
    @param titles: A list of titles
    @return: The average embedding
    """
    if not titles:  
        return []
    # titles = [titles[0]] + titles # TODO Check if this is necessary. We double the most recent title to give more weight 
    embeddings = model.encode(titles)
    avg_embedding = np.mean(embeddings, axis=0)
    return avg_embedding.tolist()  

industry_embeddings = [average_embedding(meta_industries_jobs[industry]) for industry in meta_industries]



In [0]:
avg_embedding_udf = F.udf(average_embedding, ArrayType(FloatType()))

# Filter out rows where 'all_titles' is empty
people_df = people_df.filter(F.size(F.col('all_titles')) > 0)

people_df = people_df.withColumn('title_vector', avg_embedding_udf(F.col('all_titles')))

In [0]:
def closest_meta_industry(title_vector):
    """
    Computes the closest meta industry based on cosine similarity.
    @param title_vector: The title vector
    @return: The closest meta industry
    """
    if not title_vector: 
        return None
    
    # Calculate cosine similarities between the title vector and each industry embedding
    similarities = [1 - cosine(title_vector, industry_embedding) for industry_embedding in industry_embeddings]
    max_sim_index = np.argmax(similarities)  # Get index of the highest similarity
    return meta_industries[max_sim_index]  # Return the corresponding meta industry

closest_meta_industry_udf = F.udf(closest_meta_industry, F.StringType())
people_df = people_df.withColumn('meta_industry', closest_meta_industry_udf(F.col('title_vector')))

# people_df.display()

## Part 2 - Compute User Summary Vector:
1. Get all relevant columns
2. Generate a string that combines the relevant columns that are not null into a summary
3. Embedd that summary as a representing vector

In [0]:
selected_columns = [
    "id",
    "name", 
    "city", 
    "education", 
    "educations_details", 
    "current_company", 
    "position", 
    "experience",
    "meta_industry",
    "title_vector"
]

people_selected = people_df.select(*selected_columns)

# Extracting fields from the 'current_company' column and adding them as new columns
people_transformed = people_selected \
    .withColumn("current_company:industry", col("current_company.industry")) \
    .withColumn("current_company:name", col("current_company.name")) \
    .withColumn("current_company:title", col("current_company.title"))

people_transformed = people_transformed.drop("current_company")

In [0]:
# Extracting values from lists of dicts
education_keys = ["degree", "start_year", "end_year", "field", "meta", "title"]
experience_keys = ["company", "description", "duration", "duration_short", "end_date", "location", "positions", "start_date", "subtitle", "title"]

# Exploding and collecting list values for education
for key in education_keys:
    people_transformed = people_transformed.withColumn(f"education:{key}", 
        expr(f"transform(education, x -> x['{key}'])"))

# Exploding and collecting list values for experience
for key in experience_keys:
    people_transformed = people_transformed.withColumn(f"experience:{key}", 
        expr(f"transform(experience, x -> x['{key}'])"))

people_transformed = people_transformed.drop("experience", "education")
# people_transformed.display()


In [0]:
if show_null:
    # Columns that don't contain lists
    non_list_columns = ['name', 'city', 'educations_details', 'position', 
                        'current_company:company_id', 'current_company:industry', 
                        'current_company:name', 'current_company:title']
    # Columns that contain lists
    list_columns = [f'education:{key}' for key in education_keys] + [f'experience:{key}' for key in experience_keys]

    # Calculate the percentage of non-null values for non-list columns
    for column in non_list_columns:
        non_null_count = people_transformed.filter(F.col(column).isNotNull()).count()
        total_count = people_transformed.count()
        non_null_percentage = (non_null_count / total_count) * 100
        print(f"Percentage of non-null rows in '{column}': {non_null_percentage:.2f}%")

    print('-'*50)


In [0]:
if show_null:
    # Calculating the percentage of lists that have at least 1 non-null element for list columns
    for column in list_columns:
        # column = column.replace(':', '#')
        non_null_count = people_transformed.filter(
        F.size(F.expr(f"filter(`{column}`, x -> x is not null)")) > 0
    ).count()
        total_count = people_transformed.count()
        non_null_percentage = (non_null_count / total_count) * 100
        print(f"Percentage of non-null rows in '{column}': {non_null_percentage:.2f}%")


In [0]:
import ast
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

columns = [
    "name", "city", "educations_details", "position", 
    "current_company_industry", "current_company_name", "current_company_title", 
    "education_degree", "education_start_year", "education_end_year", 
    "education_field", "education_meta", "education_title", 
    "experience_company", "experience_description", "experience_duration", 
    "experience_duration_short", "experience_end_date", "experience_location", 
    "experience_positions", "experience_start_date", "experience_subtitle", 
    "experience_title"
]

def summarize_row(name, city, education_details, position, current_company_industry,
                  current_company_name, current_company_title, education_degree,
                  education_start_year, education_end_year, education_field,
                  education_meta, education_title, experience_company, experience_description,
                  experience_duration, experience_duration_short, experience_end_date,
                  experience_location, experience_positions, experience_start_date,
                  experience_subtitle, experience_title):
    """
    This function generates a custom summary for each row in the DataFrame.
    Parameters: The DataFrame's columns.
    Returns: A string containing the summary.
    """
    summary_parts = []
    
    if name:
        summary_parts.append(f"My name is {name}")
    if city:
        summary_parts.append(f"I live in {city}")
    
    # Add education details if available
    education_summary = []
    if education_degree and education_start_year and education_end_year and education_title:
        try:
            degrees = ast.literal_eval(str(education_degree))
            start_years = ast.literal_eval(str(education_start_year))
            end_years = ast.literal_eval(str(education_end_year))
            titles = ast.literal_eval(str(education_title))

            for degree, start, end, title in zip(degrees, start_years, end_years, titles):
                if degree and title:
                    education_summary.append(f"I earned a {degree} in {title} from {start} to {end}")

            if education_summary:
                summary_parts.append(", ".join(education_summary))
        except (SyntaxError, ValueError):
            pass

    if position:
        summary_parts.append(f"I currently work as {position}")
    if current_company_name and current_company_title:
        summary_parts.append(f"at {current_company_name} as {current_company_title}")


    # Add experience details if available
    experience_summary = []
    try:
        companies = ast.literal_eval(str(experience_company)) if experience_company else []
        titles = ast.literal_eval(str(experience_title)) if experience_title else []
        durations = ast.literal_eval(str(experience_duration)) if experience_duration else []
        duration_short = ast.literal_eval(str(experience_duration_short)) if experience_duration_short else []
        start_dates = ast.literal_eval(str(experience_start_date)) if experience_start_date else []
        end_dates = ast.literal_eval(str(experience_end_date)) if experience_end_date else []
        locations = ast.literal_eval(str(experience_location)) if experience_location else []
        subtitles = ast.literal_eval(str(experience_subtitle)) if experience_subtitle else []
        descriptions = ast.literal_eval(str(experience_description)) if experience_description else []

        for i in range(len(titles)):
            exp_parts = []

            if companies and companies[i] and companies[i] != "null":
                exp_parts.append(f"Company: {companies[i]}")
            if titles and titles[i] and titles[i] != "null":
                exp_parts.append(f"Title: {titles[i]}")
            if durations and durations[i] and durations[i] != "null":
                exp_parts.append(f"Duration: {durations[i]}")
            if duration_short and duration_short[i] and duration_short[i] != "null":
                exp_parts.append(f"Short Duration: {duration_short[i]}")
            if start_dates and start_dates[i] and start_dates[i] != "null":
                exp_parts.append(f"Start Date: {start_dates[i]}")
            if end_dates and end_dates[i] and end_dates[i] != "null":
                exp_parts.append(f"End Date: {end_dates[i]}")
            if locations and locations[i] and locations[i] != "null":
                exp_parts.append(f"Location: {locations[i]}")
            if subtitles and subtitles[i] and subtitles[i] != "null":
                exp_parts.append(f"Subtitle: {subtitles[i]}")
            if descriptions and descriptions[i] and descriptions[i] != "null":
                exp_parts.append(f"Description: {descriptions[i]}")

            if exp_parts:
                experience_summary.append(" | ".join(exp_parts))

        if experience_summary:
            summary_parts.append("My professional experience includes: " + ". ".join(experience_summary))
    except (SyntaxError, ValueError):
        pass

    return '. '.join(summary_parts) + "." if summary_parts else None
summarize_udf = udf(summarize_row, StringType())

# Add the summary column
people_transformed = people_transformed.withColumn("summary", summarize_udf(
    col("name"),
    col("city"),
    col("educations_details"),
    col("position"),
    col("`current_company:industry`"),
    col("`current_company:name`"),
    col("`current_company:title`"),
    col("`education:degree`"),
    col("`education:start_year`"),
    col("`education:end_year`"),
    col("`education:field`"),
    col("`education:meta`"),
    col("`education:title`"),
    col("`experience:company`"),
    col("`experience:description`"),
    col("`experience:duration`"),
    col("`experience:duration_short`"),
    col("`experience:end_date`"),
    col("`experience:location`"),
    col("`experience:positions`"),
    col("`experience:start_date`"),
    col("`experience:subtitle`"),
    col("`experience:title`")
))

In [0]:
from functools import partial

def embed_summary(summary, embed_model):
    """
    Computes the embedding for a summary.
    @param summary: A summary string
    @param embed_model: The model to use to compute the embedding
    @return: The embedding
    """
    if not summary:
        return []
    embedding = embed_model.encode([summary])[0] 
    return embedding.tolist()  

embed_summary_partial = partial(embed_summary, embed_model=model)
embed_summary_udf = F.udf(embed_summary_partial, ArrayType(FloatType()))



In [0]:
people_transformed = people_transformed.withColumn('summary_emb', embed_summary_udf(col('summary')))

# people_transformed.display()


## Part 3 - Visualize The Clustering


### Converting the array into a vector column:

In [0]:
# people_features = people_transformed.withColumn("features", array_to_vector("summary_emb")) # this df will be used in the summary part

# K=50
# pca = PCA(k=K, inputCol="features", outputCol="pca_features")

# # Create a pipeline
# pipeline = Pipeline(stages=[pca])

# # Apply the pipeline to your DataFrame
# people_features_selected = people_features.select('id','meta_industry','summary_emb','features')
# model = pipeline.fit(people_features_selected)
# people_features_pca = model.transform(people_features_selected)

# # The DataFrame will now contain a new column 'pca_features' with the PCA result
# # people_features_pca.display()

In [0]:
if to_visualize:
    # PCA:
    people_features = people_transformed.withColumn("features", array_to_vector("summary_emb")) # this df will be used in the summary part

    K=50
    pca = PCA(k=K, inputCol="features", outputCol="pca_features")
    pipeline = Pipeline(stages=[pca])
    model = pipeline.fit(people_features)
    people_features_pca = model.transform(people_features)
    # people_features_pca.display()


    # TSNE:
    pca_df = people_features_pca.select("id", "meta_industry", "pca_features").toPandas()
    X = np.array(pca_df['pca_features'].apply(lambda x: np.array(x.toArray())).tolist())

    tsne = TSNE(n_components=2, random_state=42, n_jobs=-1) 
    X_tsne = tsne.fit_transform(X)

    # Adding the T-SNE results back to the DataFrame
    pca_df['TSNE_1'] = X_tsne[:, 0]
    pca_df['TSNE_2'] = X_tsne[:, 1]

    # Mapping meta_industry to a color
    industry_categories = pca_df['meta_industry'].unique()
    color_map = {industry: cm.get_cmap('tab10')(i % 10) for i, industry in enumerate(industry_categories)}

    # Plotting the T-SNE results
    plt.figure(figsize=(10, 8))

    # Plot each industry with a different color
    for industry in industry_categories:
        subset = pca_df[pca_df['meta_industry'] == industry]
        plt.scatter(subset['TSNE_1'], subset['TSNE_2'], label=industry, color=color_map[industry])

    # Add labels and legend
    plt.title("T-SNE Visualization of Worker Dataset after PCA by Meta Industry")
    plt.xlabel("T-SNE Component 1")
    plt.ylabel("T-SNE Component 2")
    plt.legend(title="Meta Industry", bbox_to_anchor=(1.05, 1), loc='upper left')

    # Show the plot
    plt.show()


## Part 4 - Embedd Job Listings: 
1. Get the scraped data
2. Find a meta_industry smiliarly to the technique with the users
3. Create a summary string smiliarly to how we did it with the users
4. Embedd the summary 

In [0]:
job_listings = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/tables/dvirPeleg/monster_jobs.csv")
job_listings = job_listings.repartition("type")

job_listings = job_listings.withColumn(
    "summary",
    F.concat_ws(
        " ",
        F.lit("We are"), F.col("company"),
        F.lit(", located in"), F.col("location"),
        F.lit("and looking for a"), F.col("title"),
        F.lit("in the"), F.col("type"),
        F.lit("industry."), F.col("description")
    )
)  
# embedding the summary, and finding the closest meta industry based on the title and industry
job_listings = job_listings.withColumn("summary_emb", embed_summary_udf(job_listings['summary'])) \
                .withColumn("title_emb", embed_summary_udf(F.concat_ws(" " , job_listings['title'], job_listings['type'])))\
                    .withColumn('meta_industry', closest_meta_industry_udf(F.col('title_emb')))\
                        .withColumn('job_id', F.monotonically_increasing_id())\
                            .withColumn('only_title_emb',embed_summary_udf(F.col('title')))


## Part 4.5 - Save the embeddings and meta_industry

In [0]:
def check_path_exists(path):
    try:
        dbutils.fs.ls(path)
        return True
    except Exception as e:
        return False


In [0]:
if save_to_dbfs:
    job_list_name = f'job_list_new_misc_model_index_{index_model}'
    people_name = f'people_new_misc_model_index_{index_model}_N_{N}'
    # check if file already exists in dbfs
    if not check_path_exists(f"dbfs:/FileStore/tables/dvirPeleg/{job_list_name}"):
        job_listings.write.format("parquet").mode("overwrite").save(f"dbfs:/FileStore/tables/dvirPeleg/{job_list_name}")
    else:
        print(f"File {job_list_name} already exists")   
    if not check_path_exists(f"dbfs:/FileStore/tables/dvirPeleg/{people_name}"):
        people_transformed.write.format("parquet").mode("overwrite").save(f"dbfs:/FileStore/tables/dvirPeleg/{people_name}")
    else:
        print(f"File {people_name} already exists")

## Part 5 - Find Top k

In [0]:
job_list_name = f'job_list_new_misc_model_index_{index_model}'
people_name = f'people_new_misc_model_index_{index_model}_N_{N}'

job_listings = spark.read.format("parquet").load(f"dbfs:/FileStore/tables/dvirPeleg/{job_list_name}")
people_transformed = spark.read.format("parquet").load(f"dbfs:/FileStore/tables/dvirPeleg/{people_name}")

In [0]:
def cosine_similarity(v1, v2):
    """
    Computes the cosine similarity between two vectors.
    @param v1: The first vector
    @param v2: The second vector
    @return: The cosine similarity between the two vectors
    """
    v1 = np.array(v1)
    v2 = np.array(v2)
    dot_product = np.dot(v1, v2)
    norm_v1 = np.linalg.norm(v1)
    norm_v2 = np.linalg.norm(v2)
    return float(dot_product / (norm_v1 * norm_v2)) if norm_v1 != 0 and norm_v2 != 0 else 0.0

cosine_similarity_udf = udf(cosine_similarity, FloatType())



job_listings=job_listings.withColumnRenamed("summary_emb", "summary_emb_job")
people_transformed=people_transformed.withColumnRenamed("summary_emb", "summary_emb_user")

df_joined = job_listings.join(people_transformed, on="meta_industry")


df_with_similarity = df_joined.withColumn(
    "similarity", cosine_similarity_udf(F.col("summary_emb_job"), F.col("summary_emb_user"))
)

window_spec = Window.partitionBy("job_id").orderBy(F.col("similarity").desc())

df_ranked = df_with_similarity.withColumn("rank", F.row_number().over(window_spec))
df_top_k = df_ranked.filter(F.col("rank") <= k)


In [0]:
from pyspark.sql.functions import collect_list

# Group by job_id and collect the top k compatible people and their similarity scores
df_aggregated = df_top_k.groupBy("job_id").agg(
    collect_list("id").alias("most_compatible"),   
    collect_list("similarity").alias("similarity_most_compatible"),
    collect_list("title_vector").alias("title_vector"),
)

# Join the aggregated data back to the job listings DataFrame
final_df = job_listings.join(df_aggregated, on="job_id", how="left")

# Step 3: Ensure we only keep the top k elements in each list
from pyspark.sql.functions import slice

final_df = final_df.withColumn(
    "most_compatible", slice("most_compatible", 1, k)  # Keep top k people
).withColumn(
    "similarity_most_compatible", slice("similarity_most_compatible", 1, k)  # Keep top k similarity scores
)

In [0]:
df_top_k.select('job_id','title','type','meta_industry','id','name','position','rank','similarity').display()

job_id,title,type,meta_industry,id,name,position,rank,similarity
0,Production Worker (Sewing),production,Manufacturing,jeff-doering-76396b53,Jeff Doering,Railroad Manufacture Professional,1,0.6164059
0,Production Worker (Sewing),production,Manufacturing,alexander-smith-53561171,Alexander Smith,Tooling / Operations Manager Magna International,2,0.5983672
0,Production Worker (Sewing),production,Manufacturing,i%c5%9f%c4%b1l-sivasl%c4%b1-%c3%b6kten-6739136b,Işıl Sivaslı Ökten,Metallurgist at The Lincoln Electric Company,3,0.59034383
0,Production Worker (Sewing),production,Manufacturing,moldprosolutions,Jeffery Williams,Process Development Supervisor at New Concept Technology,4,0.5721084
0,Production Worker (Sewing),production,Manufacturing,brian-ward-31213510a,Brian Ward,Manufacturing Engineer,5,0.5718131
0,Production Worker (Sewing),production,Manufacturing,jeremy-stadtmueller-b880898,Jeremy Stadtmueller,Director of Product Management - Design Engineering and Documentation,6,0.5620077
0,Production Worker (Sewing),production,Manufacturing,thien-nguyen-28b8939a,Thien Nguyen,"Automotive&Welder Operator, Electrical&Forklift drivers/Machine Operator, Outfitter/Assembly and Production/Warehouse.",7,0.5598625
0,Production Worker (Sewing),production,Manufacturing,channingshattuck,Channing Shattuck,Design Engineer,8,0.5596207
0,Production Worker (Sewing),production,Manufacturing,curtis-krick-08ba084,Curtis Krick,Head of Sales - Advanced Manufacturing & Plastics (North America) at Kistler Instrument Corp.,9,0.55225563
0,Production Worker (Sewing),production,Manufacturing,my-nguyen-36aaa2211,My Nguyen,Graduate from University of Connecticut,10,0.5516612


In [0]:
if save_to_dbfs:
    final_df_name = f'job_listings_most_compatible_people_K_{k}_N_{N}_model_{index_model}'
    if not check_path_exists(final_df_name):
        print(f"Creating new table: {final_df_name}")
        final_df.write.format("parquet").mode("overwrite").save(f"dbfs:/FileStore/tables/dvirPeleg/{final_df_name}")
    else:
        print(f"Table {final_df_name} already exists")

In [0]:
final_df.select("job_id", "most_compatible", "similarity_most_compatible").display()