# STEP2&3_Skill_LDA_Analysis&Job_Recommender

# STEP2_Skill_LDA_Analysis

## STEP2_1_SetUp

In [1]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.3.1",
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
sc.install_pypi_package("pandas==1.0.5", "https://pypi.org/simple")
sc.install_pypi_package("scipy==1.4.1", "https://pypi.org/simple")
sc.install_pypi_package("matplotlib==3.2.1", "https://pypi.org/simple")
sc.install_pypi_package("spark-nlp", "https://pypi.org/simple")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1716570043646_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==1.0.5
  Using cached https://files.pythonhosted.org/packages/af/f3/683bf2547a3eaeec15b39cef86f61e921b3b187f250fcd2b5c5fb4386369/pandas-1.0.5-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1 (from pandas==1.0.5)
  Using cached https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.5 python-dateutil-2.9.0.post0

Collecting scipy==1.4.1
  Using cached https://files.pythonhosted.org/packages/dd/82/c1fe128f3526b128cfd185580ba40d01371c5d299fcf7f77968e22dfcc2e/scipy-1.4.1-cp37-cp37m-manylinux1_x86_64.whl
Installing collected packages: scipy
Successfully installed scipy-1.4.1

Collecting matplotlib==3.2.1
  Using cached https://files.pythonhosted.org/packages/b2/c2/71fcf957710f3ba1f09088b35776a799ba7dd95f7c2b195ec800933b276b/matplotlib-3.2.1-cp37-cp37m-manylinux1_x86_64.

In [3]:
import sys
import os
import json
import re
import string
import csv

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, col, lower, desc, split, trim, expr
from pyspark.sql.types import DoubleType, StringType, BooleanType, ArrayType
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF, CountVectorizer
from pyspark.ml.linalg import Vectors, VectorUDT, SparseVector
from pyspark.ml import Pipeline
from pyspark.ml.clustering import LDA

import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import Tokenizer, LemmatizerModel, Normalizer, StopWordsCleaner
from sparknlp.common import RegexRule
from sparknlp.base import DocumentAssembler

import matplotlib.pyplot as plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Get stopwords list fomr s3 bucket
stopwords_df = spark.read.text("s3://qixin-finalproj-bucket/stopwords.txt")
stopwords_list = [row.value for row in stopwords_df.collect()]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## STEP2_2_Data_Processing

In [5]:
# Read ner_skills table the parquet file
df = spark.read.parquet('s3://qixin-finalproj-bucket/ner_skills_sample.parquet')

# Remove rows where 'job_skills' is null or empty
df = df.filter(col("job_skills") != "")

# Show 20 rows of the ner_skills table
df.select("job_skills").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
# Initialize a DocumentAssembler to convert input column to document type
document_assembler = DocumentAssembler() \
    .setInputCol("job_skills") \
    .setOutputCol("document")

# Initialize the Tokenizer
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# Initialize StopWordsCleaner
stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(["token"]) \
    .setOutputCol("clean_tokens") \
    .setCaseSensitive(False) \
    .setStopWords(stopwords_list)

# Normalizer to remove punctuation and non-letter characters
normalizer = Normalizer() \
    .setInputCols(["clean_tokens"]) \
    .setOutputCol("normalized_token")

# Load a pretrained LemmatizerModel
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["normalized_token"]) \
    .setOutputCol("lemma")

# UDF to extract text from lemma annotations
extract_lemma_udf = udf(lambda xs: [x.result for x in xs], ArrayType(StringType()))

# Initialize the CountVectorizer and IDF estimator
vectorizer = CountVectorizer(inputCol="lemma_text", outputCol="vector", minDF=100)
idf = IDF(inputCol="vector", outputCol="tfidf")

# Define and create pipeline
stages = [document_assembler, tokenizer, stopwords_cleaner, normalizer, lemmatizer]
pipeline = Pipeline(stages=stages)

# Fit the pipeline to the initial DataFrame
pipeline_model = pipeline.fit(df)

# Transform the DataFrame using the fitted pipeline and adjust the 'lemma' column
df = pipeline_model.transform(df)
df = df.withColumn("lemma_text", extract_lemma_udf(col("lemma")))

# Apply CountVectorizer and IDF to the adjusted DataFrame
vectorizer_model = vectorizer.fit(df)
df = vectorizer_model.transform(df)
idf_model = idf.fit(df)
df = idf_model.transform(df)

# Show the final DataFrame structure and some rows to verify the results
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
+----------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    job_id|          job_skills|__index_level_0__|            document|               token|        clean_tokens|    normalized_token|               lemma|          lemma_text|              vector|               tfidf|
+----------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|3801187557|human resources m...|          1132159|[[document, 0, 54...|[[token, 0, 4, hu...|[[token, 0, 4, hu...|[[token, 0, 4, hu...|[[token, 0, 4, hu...|[human, resource,...|(3332,[0,11,14,15...|(3332,[0,11,14,15...|
|3785121992|loss prev

## STEP2_3_LDA_Topic_Analysis

In [9]:
# Use the Log-Likelihood and Log Perplexity to decided the number of topics

num_topics = range(2, 15, 2)
log_likelihoods = []
log_perplexities = []

for k in num_topics:
    lda = LDA(k=k, maxIter=10, seed=42, featuresCol="vector")
    model = lda.fit(df)
    ll = model.logLikelihood(df)
    lp = model.logPerplexity(df)
    log_likelihoods.append(ll)
    log_perplexities.append(lp)

# Plotting the metrics
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(num_topics, log_likelihoods, marker='o')
plt.title('Log-Likelihood vs Number of Topics')
plt.xlabel('Number of Topics')
plt.ylabel('Log-Likelihood')

plt.subplot(1, 2, 2)
plt.plot(num_topics, log_perplexities, marker='o')
plt.title('Log Perplexity vs Number of Topics')
plt.xlabel('Number of Topics')
plt.ylabel('Log Perplexity')

plt.tight_layout()
plt.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
# Use 8 as number of topics after trial
num_topics = 8
lda = LDA(k=num_topics, maxIter=10, seed = 42, featuresCol="vector")
lda_model = lda.fit(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
# Retrieve the vocabulary from CountVectorizerModel
vocab = vectorizer_model.vocabulary

# UDF to convert term indices to words
def indices_to_terms(termIndices):
    return [vocab[int(index)] for index in termIndices]
indices_to_terms_udf = udf(indices_to_terms, ArrayType(StringType()))

# Show the topics described by their top-weighted terms
topics = lda_model.describeTopics(maxTermsPerTopic=15)
topics_with_words = topics.withColumn("terms", indices_to_terms_udf("termIndices"))
topics_with_words.select("topic", "terms", "termWeights").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topic|terms                                                                                                                                                    |termWeights                                                                                                                                                                                                                                                                                                                             |
+-----+-----------

# STEP3_Job_Recommender

In [51]:
# Give a sample of user_input
user_input_sample = "data analysis, Python, business analytics"
user_skills = user_input_sample
user_skills_df = spark.createDataFrame([Row(job_skills=user_skills)])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
# Apply the pipeline to the user_input sample using the fitted pipeline
user_skills_df = pipeline_model.transform(user_skills_df)

# Adjust the 'lemma' column
user_skills_df = user_skills_df.withColumn("lemma_text", extract_lemma_udf(col("lemma")))

# Proceed with vectorization and IDF transformation
user_skills_df = vectorizer_model.transform(user_skills_df)
user_skills_df = idf_model.transform(user_skills_df)

# Show the results
user_skills_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          job_skills|            document|               token|        clean_tokens|    normalized_token|               lemma|          lemma_text|              vector|               tfidf|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|data analysis, Py...|[[document, 0, 40...|[[token, 0, 3, da...|[[token, 0, 3, da...|[[token, 0, 3, da...|[[token, 0, 3, da...|[data, analysis, ...|(3332,[12,22,38,3...|(3332,[12,22,38,3...|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+

In [53]:
# Define a user-defined function (UDF) to calculate cosine similarity between two SparseVectors
def cosine_similarity(v1, v2):
    """
    Calculate cosine similarity between two SparseVectors.
    """
    dot_product = float(v1.dot(v2)) 
    norm_v1 = float(v1.norm(2))
    norm_v2 = float(v2.norm(2))

    # Handle Zero Division cases - return 0 for similarity when either of the vectors is a zero vector
    if norm_v1 * norm_v2 == 0:
        return 0.0
    
    # Calculate cosine similarity
    similarity = dot_product / (norm_v1 * norm_v2)  
    return similarity

# Wrap the function around a UDF
cosine_similarity_udf = udf(cosine_similarity, DoubleType())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
# Initialize Spark session with a specific application name
spark = SparkSession.builder.appName("Skill-based Job Recommender").getOrCreate()

# Convert SparseVector to Python list inside UDF
def sparse_to_array(v):
    return v.toArray().tolist()
sparse_to_array_udf = udf(sparse_to_array, ArrayType(DoubleType()))

# Convert tf-idf to array with UDF
df = df.withColumn("tfidf_array", sparse_to_array_udf(col("tfidf")))
user_skills_df = user_skills_df.withColumn("tfidf_array", sparse_to_array_udf(col("tfidf")))

# Create a temporary view of the DataFrame for SQL operations
df.createOrReplaceTempView("df_posts_skills_tfidf_view")

# Create function calculating cosine similarity as a UDF
def cosine_similarity(arr1, arr2):
    dot_product = sum(x * y for x, y in zip(arr1, arr2))
    norm1 = sum(x**2 for x in arr1) ** 0.5
    norm2 = sum(x**2 for x in arr2) ** 0.5
    return dot_product / (norm1 * norm2) if norm1 and norm2 else 0.0
cosine_similarity_udf = udf(cosine_similarity, DoubleType())

# Create function to recommend jobs by comparing user skills with job skills using cosine similarity
def recommend_jobs(user_skill_df):
    user_vector = user_skill_df.select("tfidf_array").first()["tfidf_array"]
    result_df = spark.sql("SELECT *, cast(0 as double) as similarity_score FROM df_posts_skills_tfidf_view")
    result_df = result_df.withColumn("similarity_score", cosine_similarity_udf(col("tfidf_array"), array([lit(x) for x in user_vector])))
    return result_df.orderBy(col("similarity_score").desc()).limit(5)

# Get the top 5 recommended jobs and display them
top_jobs = recommend_jobs(user_skills_df)
top_jobs.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|    job_id|          job_skills|__index_level_0__|            document|               token|        clean_tokens|    normalized_token|               lemma|          lemma_text|              vector|               tfidf|         tfidf_array|  similarity_score|
+----------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+
|3770289289|business analysis...|          1171344|[[document, 0, 42...|[[token, 0, 7, bu...|[[token, 0, 7, bu...|[[token, 0, 7, bu...|[[token, 0, 7, bu...|[business, analys...|(3332,[0,2,12,20,...|(3332,[0,2,12,20,...|[