#### Import Packages and Connect to MongoDB

In [None]:
%pip install pymongo
%pip install affinda==2.1.0
%pip install sentence_transformers

In [None]:
from pyspark.sql.functions import split
from pyspark.ml.feature import CountVectorizer, Tokenizer
from pyspark.ml.feature import Word2Vec
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf,col
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator

import numpy as np
import os
import pymongo
import pandas as pd

In [None]:
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

In [2]:
from user_definition import *

In [3]:
mongo_username = mongo_username
mongo_password = mongo_password
mongo_ip_address = mongo_ip_address
database_name = database_name
collection_name = collection_name

In [None]:
class MongoDBCollection:
    def __init__(self,
                 username,
                 password,
                 ip_address,
                 database_name,
                 collection_name):
        '''
        Using ip_address, database_name, collection_name,
        initiate the instance's attributes including ip_address,
        database_name, collection_name, client, db and collection.

        For pymongo, see more details in the following.
        https://pymongo.readthedocs.io
        '''
        self.username = username
        self.password = password
        
        self.ip_address = ip_address
        self.database_name = database_name
        self.collection_name = collection_name

        self.client = pymongo.MongoClient(f"mongodb+srv://{username}:{password}@{ip_address}")
        self.db = self.client[database_name]
        self.collection = self.db[collection_name]
        
        self.connection_string = f"mongodb+srv://{username}:{password}@{ip_address}/{database_name}.{collection_name}"

    def return_db(self):
        '''
        Return db which is the database in the client
        '''
        return self.db

    def return_collection(self):
        '''
        Return db which belongs to the db.
        '''
        return self.collection

    def return_num_docs(self, query):
        '''
        Return the number of documents satisfying the given query.
        '''
        return self.collection.count_documents(query)

    def drop_collection(self):
        '''
        Drop the collection
        '''
        return self.collection.drop()

    def find(self, query, projection):
        '''
        Return an iteratatable using query and projection.
        '''
        for item in self.collection.find(query, projection):
            yield item

    def insert_one(self, doc):
        '''
        Insert the given document
        '''
        self.collection.insert_one(doc)

    def insert_many(self, docs):
        '''
        Insert the given documents
        '''
        self.collection.insert_many(docs)

    def update_many(self, filter, update):
        '''
        Update documents satisfying filter with update.
        Both filter and update are dictionaries.
        '''
        self.collection.update_many(filter, update)

In [None]:
collection = MongoDBCollection(mongo_username,mongo_password, mongo_ip_address, database_name,collection_name)

In [None]:
collection.return_num_docs({})

In [None]:
test = list(collection.collection.find({'job_description':{"$type":'string'}},{'job_description':1 ,"job_title":1,"_id":0}))
jd = [i['job_description'] for i in test]
jt = [i['job_title'] for i in test]
rdd = ss.sparkContext.parallelize(zip(jt,jd))

df = rdd.map(lambda x: (x[0],x[1] )).toDF(['title','text'])
df.show(5)

# 1. Resume parser and Job predictions: 

In [None]:
#Parses the resume and pick important fields from a resume

from pathlib import Path

from affinda import AffindaAPI, TokenCredential

affinda_api = affinda_api

class ResumeParser:
    def __init__(self):
        self.client = AffindaAPI(credential=TokenCredential(token=affinda_api))

    def parse_pdf(self, pdf_file_path):
        # Create resume with file
        file_pth = Path(pdf_file_path)

        with open(file_pth, "rb") as f:
            resume = self.client.create_resume(file=f)

        return resume.as_dict()

    def format_resume(self, resume_dict):
        #Takes the output from Affinda and formats to a format that we need
        parsed = {}
        parsed['name'] = resume_dict['data']['name']['raw']
        parsed['total_years_experience'] = resume_dict['data']['total_years_experience']
        parsed['education'] = []
        for e in resume_dict['data']['education']:
            education_dict = {}
            education_dict['organization'] = e.get('organization', '')
            education_dict['degree'] = e['accreditation']['input_str']
            parsed['education'].append(education_dict)

        parsed['work'] = []
        for w in resume_dict['data']['work_experience']:
            work_dict = {}
            work_dict['company'] = w.get('organization', '')
            work_dict['title'] = w.get('job_title', '')
            work_dict['job_description'] = w.get('job_description', '')
            parsed['work'].append(work_dict)

        parsed['skills'] = []
        for s in resume_dict['data']['skills']:
            parsed['skills'].append(s['name'])

        return parsed
    
    def construct_embed_string(self, pdf_file_path):
        formatted = self.format_resume(self.parse_pdf(pdf_file_path))
        embed_string = ''
        embed_string += f"The candidate's name is {formatted['name']}, and he has a total of {formatted['total_years_experience']} years of experience \n"
        for e in formatted['education']:
            embed_string += f"The candidate has a degree in {e['degree']} from {e.get('organization', '')}\n"
        
        embed_string += '\n\n'
        for w in formatted['work']:
            embed_string += "The candidate worked at "
            embed_string += f"{w['company']} as a {w['title']}. Responsibilities included \n"
            for jr in w['job_description'].split('\n'):
                embed_string += jr + '\n'
            embed_string += '\n \n'
        
        embed_string += '\n\n'
        embed_string += 'The candidate is skilled in the following \n'
        for s in formatted['skills']:
            embed_string += s + '\n'
        
        return embed_string

# Word embeddings using sentence transformers

In [None]:
from sentence_transformers import SentenceTransformer
llm_small = SentenceTransformer('multi-qa-MiniLM-L6-cos-v1')
llm_large = SentenceTransformer('multi-qa-mpnet-base-dot-v1')

In [4]:
# loading pre-computed word embeddings for our data sets

LLM_SMALL_EMBEDDINGS_PATH = '/dbfs/FileStore/shared_uploads/pnuthanakalva@dons.usfca.edu/embeddings_multi_qa_MiniLM_L6_cos_v1.npy'
LLM_LARGE_EMBEDDINGS_PATH = '/dbfs/FileStore/shared_uploads/pnuthanakalva@dons.usfca.edu/embeddings_multi_qa_mpnet_base_dot_v1.npy'
IDS_LIST_PATH = '/dbfs/FileStore/shared_uploads/pnuthanakalva@dons.usfca.edu/ids.txt'

In [None]:
llm_small_embeddings = np.load(LLM_SMALL_EMBEDDINGS_PATH)
llm_large_embeddings = np.load(LLM_LARGE_EMBEDDINGS_PATH)

In [None]:
with open(IDS_LIST_PATH, 'r') as file:
    ids = []
    for line in file:
        ids.append(line)

In [None]:
RESUME_PATH = '/dbfs/FileStore/shared_uploads/apeddyreddy@dons.usfca.edu/input'
resume_parser = ResumeParser()
resumes = os.listdir(RESUME_PATH)
resume_embeddings_small = []
resume_embeddings_large = []

predicted_output = pd.DataFrame(columns=["Resume Candidate's Background", "Small Sentence Transformer Predicted Job Posting", "Large Sentence Transformer predicted Job Posting"])

for resume in resumes:
    if(resume.endswith('.pdf')):
        resume_full = os.path.join(RESUME_PATH, resume)
        resume_string = resume_parser.construct_embed_string(resume_full)
        resume_embedding_small = llm_small.encode(resume_string)
        closest_id = ids[np.argmax(np.matmul(llm_small_embeddings, resume_embedding_small.reshape(384, -1)))].strip('\n')
        if (closest_id.endswith('_1')) or (closest_id.endswith('_2')):
            closest_id = closest_id.split('_1')[0].split('_2')[0]
        closest_job_small = list(collection.collection.find({'_id':closest_id}, {'_id':False,'job_title':True}))[0]
        resume_embedding_large = llm_large.encode(resume_string)
        closest_id = ids[np.argmax(np.matmul(llm_large_embeddings, resume_embedding_large.reshape(768, -1)))].strip('\n')
        if (closest_id.endswith('_1')) or (closest_id.endswith('_2')):
            closest_id = closest_id.split('_1')[0].split('_2')[0]
        closest_job_large = list(collection.collection.find({'_id':closest_id}, {'_id':False, 'job_title':True}))[0]
        predicted_output.loc[len(predicted_output.index)] = [resume, closest_job_small['job_title'], closest_job_large['job_title']] 

**Outcome**

In [None]:
predicted_output

Unnamed: 0,Resume Candidate's Background,Small Sentence Transformer Predicted Job Posting,Large Sentence Transformer predicted Job Posting
0,10_digital_marketing.pdf,Director of Digital Marketing,Senior Java Software Engineer
1,11_engineer.pdf,(Entry Level) Java developer/Coder/Data Scientist,C#/Python Developer
2,12_env_studies.pdf,"Assistant Professor, Environmental Science",Assistant or Associate Professor - Wildlife Ec...
3,13_HR.pdf,Talent Acquisition and Retention Manager,Digital Marketing Functional Partner
4,14_law.pdf,Human Resource/Recruiting Specialist,Assistant/Associate Professor of Psychology
5,15_management.pdf,Marketing Manager,Digital Advertising Manager
6,16_mass_media.pdf,"Director of Development, Journalism",Assistant Professor (Political Science)
7,17_operations.pdf,Regional Marketing Operations Manager POS & Gi...,Senior Java Software Engineer
8,18_political_science.pdf,Assistant Professor (Political Science),Faculty (Associate or Advanced Assistant Profe...
9,19_politics.pdf,Faculty (Associate or Advanced Assistant Profe...,Assistant Professor


#### 2. Cluster embeddings of job postings to group similar job postings

In [None]:
small_embeddings_df = pd.DataFrame(llm_small_embeddings)
small_embeds_sparkdf = spark.createDataFrame(small_embeddings_df)
va = VectorAssembler(outputCol="features", inputCols=small_embeds_sparkdf.columns)
modified_df = va.transform(small_embeds_sparkdf)

In [None]:
k_values = range(2, 11)
best_k = 2
best_silhouette_score = -1
for k in k_values:
    kmeans = KMeans().setK(k)
    model = kmeans.fit(modified_df)
    predictions = model.transform(modified_df) 
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)
    if silhouette > best_silhouette_score:
        best_silhouette_score = silhouette
        best_k = k

print(best_k, best_silhouette_score)
kmeans = KMeans().setK(best_k)
model = kmeans.fit(modified_df)
predictions = model.transform(modified_df)

In [None]:
df_pandas = predictions.toPandas()
index_labels = {}
for i in range(best_k):
    temp = df_pandas.index[df_pandas['prediction'] == i].tolist()[14:16]
    index_labels[i] = [ids[x] for x in temp]
    

In [None]:
cluster_jobs = {}
random_clusters = [0,2,4]
for k in random_clusters:
    jobs = []
    for val in index_labels[k]:
        closest_id = val.strip('\n')
        if (closest_id.endswith('_1')) or (closest_id.endswith('_2')):
            closest_id = closest_id.split('_1')[0].split('_2')[0]
        closest_job_small = list(collection.collection.find({'_id':closest_id}, {'_id':False,'job_title':True}))[0]
        jobs += closest_job_small
    cluster_jobs[k] = jobs

**Clustering Outcome:**

cluster 0 : [{'job_title': 'Data Reporting and Analytics Consultant III'}, {'job_title': 'Data Scientist I'}]

cluster 2 : [{'job_title': 'International Multimedia Journalist - I (News Center)'}, {'job_title': 'Digital Reporter I'}]

cluster 4 : [{'job_title': 'Fitness Coach (Personal Trainer)'}, {'job_title': 'Fitness Trainer'}]