#### Import Packages and Connect to MongoDB

In [0]:
%pip install pymongo
%pip install affinda==2.1.0
%pip install sentence_transformers

In [0]:
from pyspark.sql.functions import split
from pyspark.ml.feature import CountVectorizer, Tokenizer
from pyspark.ml.feature import Word2Vec
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf,col
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator

import os
import pymongo
import pandas as pd

In [0]:
from pyspark.sql import SparkSession

ss = SparkSession.builder.getOrCreate()
sc = ss.sparkContext

In [0]:
mongo_username = 'mongoadmin'
mongo_password = 'pDru4hbGjnIGRCmm'
mongo_ip_address = 'msds697-cluster.ogkse.mongodb.net'
database_name = 'msds697'
collection_name = 'job_postings'

In [0]:
class MongoDBCollection:
    def __init__(self,
                 username,
                 password,
                 ip_address,
                 database_name,
                 collection_name):
        '''
        Using ip_address, database_name, collection_name,
        initiate the instance's attributes including ip_address,
        database_name, collection_name, client, db and collection.

        For pymongo, see more details in the following.
        https://pymongo.readthedocs.io
        '''
        self.username = username
        self.password = password
        
        self.ip_address = ip_address
        self.database_name = database_name
        self.collection_name = collection_name

        self.client = pymongo.MongoClient(f"mongodb+srv://{username}:{password}@{ip_address}")
        self.db = self.client[database_name]
        self.collection = self.db[collection_name]
        
        self.connection_string = f"mongodb+srv://{username}:{password}@{ip_address}/{database_name}.{collection_name}"

    def return_db(self):
        '''
        Return db which is the database in the client
        '''
        return self.db

    def return_collection(self):
        '''
        Return db which belongs to the db.
        '''
        return self.collection

    def return_num_docs(self, query):
        '''
        Return the number of documents satisfying the given query.
        '''
        return self.collection.count_documents(query)

    def drop_collection(self):
        '''
        Drop the collection
        '''
        return self.collection.drop()

    def find(self, query, projection):
        '''
        Return an iteratatable using query and projection.
        '''
        for item in self.collection.find(query, projection):
            yield item

    def insert_one(self, doc):
        '''
        Insert the given document
        '''
        self.collection.insert_one(doc)

    def insert_many(self, docs):
        '''
        Insert the given documents
        '''
        self.collection.insert_many(docs)

    def update_many(self, filter, update):
        '''
        Update documents satisfying filter with update.
        Both filter and update are dictionaries.
        '''
        self.collection.update_many(filter, update)

In [0]:
collection = MongoDBCollection(mongo_username,mongo_password, mongo_ip_address, database_name,collection_name)

In [0]:
collection.return_num_docs({})

In [0]:
test = list(collection.collection.find({'job_description':{"$type":'string'}},{'job_description':1 ,"job_title":1,"_id":0}))
jd = [i['job_description'] for i in test]
jt = [i['job_title'] for i in test]
rdd = ss.sparkContext.parallelize(zip(jt,jd))

df = rdd.map(lambda x: (x[0],x[1] )).toDF(['title','text'])
df.show(5)

#### Task 1. Generate Word2Vec Embeddings in Spark ML and Predict Relevant Job Postings for Given Resume
(Harrison)

In [0]:
from pyspark.ml.feature import Word2Vec, StopWordsRemover, Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
stopwords = StopWordsRemover(inputCol="words", outputCol="filtered_words")

df = tokenizer.transform(df)
df = stopwords.transform(df)
df.count()

In [0]:
#vectorSize = embedding dimension, set to k=100
word2Vec = Word2Vec(vectorSize=100, seed=42, inputCol="filtered_words", outputCol="model")
word2Vec.setMaxIter(10)
word2Vec.getMaxIter()
word2Vec.clear(word2Vec.maxIter)
model = word2Vec.fit(df)
df_word2vec = model.transform(df)

In [0]:
df_word2vec.show(5)

##### Find_Closest_Embeddings Function to return closest embedding

In [0]:
import numpy as np
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StructType, StructField, DoubleType

def get_cosine_similarity(vector_a, vector_b):
    dot_product = np.dot(vector_a, vector_b)
    norm_a = np.linalg.norm(vector_a)
    norm_b = np.linalg.norm(vector_b)
    return dot_product / (norm_a * norm_b)

def get_k_closest_embeddings(df, query_embedding, k=10):
    cosine_similarity_udf = udf(lambda x: float(get_cosine_similarity(query_embedding, x)), DoubleType())
    df_with_similarity = df.withColumn("cosine_similarity", cosine_similarity_udf(df["model"]))
    sorted_df = df_with_similarity.sort("cosine_similarity", ascending=False)
    k_closest_embeddings = sorted_df.limit(k).na.drop()
    return k_closest_embeddings

##### Generated Word2Vec Predicted Job Postings

In [0]:
##From Task2
from pathlib import Path
import os
from affinda import AffindaAPI, TokenCredential

# affinda_api = 'abf33df87b93a7fb90d72d65676d2e10949ffdec'
affinda_api = '28ba8894334a3acccdcda99a92733ac9b71d2245'

class ResumeParser:
    def __init__(self):
        self.client = AffindaAPI(credential=TokenCredential(token=affinda_api))

    def parse_pdf(self, pdf_file_path):
        # Create resume with file
        file_pth = Path(pdf_file_path)

        with open(file_pth, "rb") as f:
            resume = self.client.create_resume(file=f)

        return resume.as_dict()

    def format_resume(self, resume_dict):
        #Takes the output from Affinda and formats to a format that we need
        parsed = {}
        parsed['name'] = resume_dict['data']['name']['raw']
        parsed['total_years_experience'] = resume_dict['data']['total_years_experience']
        parsed['education'] = []
        for e in resume_dict['data']['education']:
            education_dict = {}
            education_dict['organization'] = e.get('organization', '')
            education_dict['degree'] = e['accreditation']['input_str']
            parsed['education'].append(education_dict)

        parsed['work'] = []
        for w in resume_dict['data']['work_experience']:
            work_dict = {}
            work_dict['company'] = w.get('organization', '')
            work_dict['title'] = w.get('job_title', '')
            work_dict['job_description'] = w.get('job_description', '')
            parsed['work'].append(work_dict)

        parsed['skills'] = []
        for s in resume_dict['data']['skills']:
            parsed['skills'].append(s['name'])

        return parsed
    
    def construct_embed_string(self, pdf_file_path):
        formatted = self.format_resume(self.parse_pdf(pdf_file_path))
        embed_string = ''
        embed_string += f"The candidate's name is {formatted['name']}, and he has a total of {formatted['total_years_experience']} years of experience \n"
        for e in formatted['education']:
            embed_string += f"The candidate has a degree in {e['degree']} from {e.get('organization', '')}\n"
        
        embed_string += '\n\n'
        for w in formatted['work']:
            embed_string += "The candidate worked at "
            embed_string += f"{w['company']} as a {w['title']}. Responsibilities included \n"
            for jr in w['job_description'].split('\n'):
                embed_string += jr + '\n'
            embed_string += '\n \n'
        
        embed_string += '\n\n'
        embed_string += 'The candidate is skilled in the following \n'
        for s in formatted['skills']:
            embed_string += s + '\n'
        
        return embed_string

In [0]:
RESUME_PATH = '/dbfs/FileStore/shared_uploads/apeddyreddy@dons.usfca.edu/input'
resume_parser = ResumeParser()
resumes = os.listdir(RESUME_PATH)

resume_strings=[]
resume_title = []
for resume in resumes:
    resume_full = os.path.join(RESUME_PATH, resume)
    resume_string = resume_parser.construct_embed_string(resume_full)
    resume_strings.append(resume_string)
    resume_title.append(resume)
    
rdd = ss.sparkContext.parallelize(zip(resume_title,resume_strings))
df1 = rdd.map(lambda x: (x[0],x[1] )).toDF(['resume_title','resume_str'])

In [0]:
tokenizer = Tokenizer(inputCol="resume_str", outputCol="words")
df1 = tokenizer.transform(df1)

word2Vec = Word2Vec(vectorSize=100, seed=42, inputCol="words", outputCol="model")
word2Vec.setMaxIter(10)
word2Vec.getMaxIter()
word2Vec.clear(word2Vec.maxIter)
model_res = word2Vec.fit(df1)
df_word2vec_res = model_res.transform(df1)

In [0]:
results = []

for i in range(df_word2vec_res.count()):
    title = df_word2vec_res.select('resume_title').rdd.collect()[i]['resume_title']
    query_embedding = df_word2vec_res.select('model').rdd.collect()[i]
    result = get_k_closest_embeddings(df_word2vec, query_embedding, 5).rdd.collect()[0]['title']
    results.append((title, result))

**Outcome** 

As shown in the table, Word2Vec matching does not generate accurate job posting recommendations for resume input. We hope the use of a more advanced pre-trained model could give us better embeddings for generating job posting recommendations.

In [0]:
df = pd.DataFrame(results, columns=["Resume Candidate's Background", "Word2Vec Predicted Job Posting"])
df

Unnamed: 0,Resume Candidate's Background,Word2Vec Predicted Job Posting
0,10_digital_marketing.pdf,Staff Data Scientist
1,11_engineer.pdf,Staff Data Scientist
2,12_env_studies.pdf,Applied Scientist - Computer Vision & Image Pr...
3,13_HR.pdf,Applied Scientist - Computer Vision & Image Pr...
4,14_law.pdf,Applied Scientist - Computer Vision & Image Pr...
5,15_management.pdf,Staff Data Scientist
6,16_mass_media.pdf,Staff Data Scientist
7,17_operations.pdf,Applied Scientist - Computer Vision & Image Pr...
8,18_political_science.pdf,Staff Data Scientist
9,19_politics.pdf,Applied Scientist - Computer Vision & Image Pr...


#### Task 2 and 3. Resume Parser and Job posting prediction

In [0]:
from pathlib import Path
import os
from affinda import AffindaAPI, TokenCredential

# affinda_api = 'abf33df87b93a7fb90d72d65676d2e10949ffdec'
#other key
affinda_api = '28ba8894334a3acccdcda99a92733ac9b71d2245'

class ResumeParser:
    def __init__(self):
        self.client = AffindaAPI(credential=TokenCredential(token=affinda_api))

    def parse_pdf(self, pdf_file_path):
        # Create resume with file
        file_pth = Path(pdf_file_path)

        with open(file_pth, "rb") as f:
            resume = self.client.create_resume(file=f)

        return resume.as_dict()

    def format_resume(self, resume_dict):
        #Takes the output from Affinda and formats to a format that we need
        parsed = {}
        parsed['name'] = resume_dict['data']['name']['raw']
        parsed['total_years_experience'] = resume_dict['data']['total_years_experience']
        parsed['education'] = []
        for e in resume_dict['data']['education']:
            education_dict = {}
            education_dict['organization'] = e.get('organization', '')
            education_dict['degree'] = e['accreditation']['input_str']
            parsed['education'].append(education_dict)

        parsed['work'] = []
        for w in resume_dict['data']['work_experience']:
            work_dict = {}
            work_dict['company'] = w.get('organization', '')
            work_dict['title'] = w.get('job_title', '')
            work_dict['job_description'] = w.get('job_description', '')
            parsed['work'].append(work_dict)

        parsed['skills'] = []
        for s in resume_dict['data']['skills']:
            parsed['skills'].append(s['name'])

        return parsed
    
    def construct_embed_string(self, pdf_file_path):
        formatted = self.format_resume(self.parse_pdf(pdf_file_path))
        embed_string = ''
        embed_string += f"The candidate's name is {formatted['name']}, and he has a total of {formatted['total_years_experience']} years of experience \n"
        for e in formatted['education']:
            embed_string += f"The candidate has a degree in {e['degree']} from {e.get('organization', '')}\n"
        
        embed_string += '\n\n'
        for w in formatted['work']:
            embed_string += "The candidate worked at "
            embed_string += f"{w['company']} as a {w['title']}. Responsibilities included \n"
            for jr in w['job_description'].split('\n'):
                embed_string += jr + '\n'
            embed_string += '\n \n'
        
        embed_string += '\n\n'
        embed_string += 'The candidate is skilled in the following \n'
        for s in formatted['skills']:
            embed_string += s + '\n'
        
        return embed_string

In [0]:
from sentence_transformers import SentenceTransformer
llm_small = SentenceTransformer('multi-qa-MiniLM-L6-cos-v1')
llm_large = SentenceTransformer('multi-qa-mpnet-base-dot-v1')

In [0]:
LLM_SMALL_EMBEDDINGS_PATH = '/dbfs/FileStore/shared_uploads/pnuthanakalva@dons.usfca.edu/embeddings_multi_qa_MiniLM_L6_cos_v1.npy'
LLM_LARGE_EMBEDDINGS_PATH = '/dbfs/FileStore/shared_uploads/pnuthanakalva@dons.usfca.edu/embeddings_multi_qa_mpnet_base_dot_v1.npy'
IDS_LIST_PATH = '/dbfs/FileStore/shared_uploads/pnuthanakalva@dons.usfca.edu/ids.txt'

In [0]:
import pickle
import numpy as np
llm_small_embeddings = np.load(LLM_SMALL_EMBEDDINGS_PATH)
llm_large_embeddings = np.load(LLM_LARGE_EMBEDDINGS_PATH)

In [0]:
with open(IDS_LIST_PATH, 'r') as file:
    ids = []
    for line in file:
        ids.append(line)

In [0]:
import pandas as pd

In [0]:
RESUME_PATH = '/dbfs/FileStore/shared_uploads/apeddyreddy@dons.usfca.edu/input'
resume_parser = ResumeParser()
resumes = os.listdir(RESUME_PATH)
resume_embeddings_small = []
resume_embeddings_large = []

predicted_output = pd.DataFrame(columns=["Resume Candidate's Background", "Small Sentence Transformer Predicted Job Posting", "Large Sentence Transformer predicted Job Posting"])

for resume in resumes:
    if(resume.endswith('.pdf')):
        resume_full = os.path.join(RESUME_PATH, resume)
        resume_string = resume_parser.construct_embed_string(resume_full)
        resume_embedding_small = llm_small.encode(resume_string)
        closest_id = ids[np.argmax(np.matmul(llm_small_embeddings, resume_embedding_small.reshape(384, -1)))].strip('\n')
        if (closest_id.endswith('_1')) or (closest_id.endswith('_2')):
            closest_id = closest_id.split('_1')[0].split('_2')[0]
        closest_job_small = list(collection.collection.find({'_id':closest_id}, {'_id':False,'job_title':True}))[0]
        resume_embedding_large = llm_large.encode(resume_string)
        closest_id = ids[np.argmax(np.matmul(llm_large_embeddings, resume_embedding_large.reshape(768, -1)))].strip('\n')
        if (closest_id.endswith('_1')) or (closest_id.endswith('_2')):
            closest_id = closest_id.split('_1')[0].split('_2')[0]
        closest_job_large = list(collection.collection.find({'_id':closest_id}, {'_id':False, 'job_title':True}))[0]
        predicted_output.loc[len(predicted_output.index)] = [resume, closest_job_small['job_title'], closest_job_large['job_title']] 

**Outcome**

In [0]:
predicted_output

Unnamed: 0,Resume Candidate's Background,Small Sentence Transformer Predicted Job Posting,Large Sentence Transformer predicted Job Posting
0,10_digital_marketing.pdf,Director of Digital Marketing,Senior Java Software Engineer
1,11_engineer.pdf,(Entry Level) Java developer/Coder/Data Scientist,C#/Python Developer
2,12_env_studies.pdf,"Assistant Professor, Environmental Science",Assistant or Associate Professor - Wildlife Ec...
3,13_HR.pdf,Talent Acquisition and Retention Manager,Digital Marketing Functional Partner
4,14_law.pdf,Human Resource/Recruiting Specialist,Assistant/Associate Professor of Psychology
5,15_management.pdf,Marketing Manager,Digital Advertising Manager
6,16_mass_media.pdf,"Director of Development, Journalism",Assistant Professor (Political Science)
7,17_operations.pdf,Regional Marketing Operations Manager POS & Gi...,Senior Java Software Engineer
8,18_political_science.pdf,Assistant Professor (Political Science),Faculty (Associate or Advanced Assistant Profe...
9,19_politics.pdf,Faculty (Associate or Advanced Assistant Profe...,Assistant Professor


#### Task 4. Cluster embeddings of job postings to group similar job postings
(Ajayeswar Reddy Peddyreddy)

In [0]:
small_embeddings_df = pd.DataFrame(llm_small_embeddings)
small_embeds_sparkdf = spark.createDataFrame(small_embeddings_df)
va = VectorAssembler(outputCol="features", inputCols=small_embeds_sparkdf.columns)
modified_df = va.transform(small_embeds_sparkdf)

In [0]:
k_values = range(2, 11)
best_k = 2
best_silhouette_score = -1
for k in k_values:
    kmeans = KMeans().setK(k)
    model = kmeans.fit(modified_df)
    predictions = model.transform(modified_df) 
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)
    if silhouette > best_silhouette_score:
        best_silhouette_score = silhouette
        best_k = k

print(best_k, best_silhouette_score)
kmeans = KMeans().setK(best_k)
model = kmeans.fit(modified_df)
predictions = model.transform(modified_df)

In [0]:
df_pandas = predictions.toPandas()
index_labels = {}
for i in range(best_k):
    temp = df_pandas.index[df_pandas['prediction'] == i].tolist()[14:16]
    index_labels[i] = [ids[x] for x in temp]
    

In [0]:
cluster_jobs = {}
random_clusters = [0,2,4]
for k in random_clusters:
    jobs = []
    for val in index_labels[k]:
        closest_id = val.strip('\n')
        if (closest_id.endswith('_1')) or (closest_id.endswith('_2')):
            closest_id = closest_id.split('_1')[0].split('_2')[0]
        closest_job_small = list(collection.collection.find({'_id':closest_id}, {'_id':False,'job_title':True}))[0]
        jobs += closest_job_small
    cluster_jobs[k] = jobs

**Clustering Outcome:**

cluster 0 : [{'job_title': 'Data Reporting and Analytics Consultant III'}, {'job_title': 'Data Scientist I'}]

cluster 2 : [{'job_title': 'International Multimedia Journalist - I (News Center)'}, {'job_title': 'Digital Reporter I'}]

cluster 4 : [{'job_title': 'Fitness Coach (Personal Trainer)'}, {'job_title': 'Fitness Trainer'}]

#### Task 5. Standardization of Job Titles and Then Classification Models for Job Descriptions 
(Bharadwaj and Harrison)

In [0]:
! pip install fuzzywuzzy
from fuzzywuzzy import process
from fuzzywuzzy import fuzz

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [0]:
titles = [i['title'] for i in list(df_word2vec.select('title').rdd.collect())]
candidate_labels = ['Data Scientist', 'Data Analyst', 'Data Engineer', 'Machine Learning Engineer', 'Software Engineer', 'Other']

matches = {}

for title in titles:
    best_label, score = process.extractOne(title.lower(), candidate_labels, scorer=fuzz.token_sort_ratio)
    matches[title] = best_label

data = [(k, v) for k, v in matches.items()]

temp_df = ss.createDataFrame(data, [ "title", "std_title"])
data = df_word2vec.join(temp_df, 'title', 'inner')
data = data.select('model','std_title')
data.show(2)

In [0]:
labelIndexer = StringIndexer(inputCol='std_title', outputCol='indexedLabel').fit(data)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# create a random forest classifier to train a multi-class classification model
rf = RandomForestClassifier(labelCol='indexedLabel', featuresCol='model', numTrees=10)

# train the model on the training data
model = Pipeline(stages=[labelIndexer, rf]).fit(trainingData)

In [0]:
# make predictions on the test data and evaluate the model's performance
predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='fMeasureByLabel')
accuracy = evaluator.evaluate(predictions)
print('Test set accuracy = %.2f%%' % (accuracy * 100))

Outcome - Job titles are standardized using fuzzy match. Using standardized job titles, we trained the embeddings to classify the job description into standardized job title. We are now able to classify any new job description into the standardized job title. This allows us to validate the posted job title with the true job title (predicted by the above classification model)