**Title**: Career Optimizer Model

**Description**: Here we aim to run job title normalization (Text classification and clustering), CV skill extraction, and filtration based recommendation model on the dataset to recommend a candidate to the skills they require to migrate to their desired career

**Author**: Benedict Ibe

**Date Created**: 15/08/2023

In [77]:
import nltk

In [78]:
%%pyspark

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
candidate_skilllevel= spark.sql("SELECT \
Id                                          as Cand_ID, \
recruit_candidatecontact                    as Candidate_contactno, \
crimson_proficiency                         as Cand_proficiency, \
crimson_requirement                         as Cand_requirement, \
crimson_experience                          as Cand_experience, \
crimson_experienceperiod                    as Cand_exp_period, \
crimson_skill                               as Cand_skill, \
crimson_name                                as Cand_name, \
crimson_skilllevelid                        as Cand_skilllevel_ID, \
lower(crimson_skillname)                    as Cand_skill_name, \
crimson_level                               as Cand_level \
FROM dataverse_edensmithcon_org87f26120.crimson_skilllevel")

In [79]:
candidate_skilllevel.show()

In [80]:
%%pyspark

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
candidate_workhist =spark.sql("SELECT \
Id                                  as ID , \
recruit_candidatecontact            as Candidate_contact,\
crimson_startdate                   as Candidate_workstrt_date,\
crimson_enddate                     as Candidate_workend_date, \
crimson_workhistoryid               as WorkHistory_ID,\
lower(crimson_description)          as Candidate_work_description,\
crimson_jobtitle                    as Candidate_job_title ,\
crimson_name                        as Cand_workplace \
FROM dataverse_edensmithcon_org87f26120.crimson_workhistory")

In [81]:
from pyspark.sql.functions import to_date

In [82]:
candidate_workhist.show()

In [83]:
#Here we convert the date-time format to date
df_hist = candidate_workhist.withColumn("Cnd_start_date", to_date("Candidate_workstrt_date"))
df_hist.show()

In [84]:
#Here we convert the date-time format to date
df_hist2 = df_hist.withColumn("Cnd_end_date", to_date("Candidate_workend_date"))
df_hist2.show()

In [85]:
#Here we select the required columns
df_hist3 = df_hist2.select('ID', 'WorkHistory_ID', 'Candidate_contact', 'Candidate_job_title', 'Cnd_start_date', 'Cnd_end_date')
df_hist3.show()

In [86]:
#Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import datediff, floor
from pyspark.sql.functions import lit

GETTING THE NUMBER OF MONTHS BETWEEN THE START TIME AND THE END TIME

In [87]:
df_hist4 = df_hist3.withColumn("Duration", floor((datediff(df_hist3["Cnd_end_date"], df_hist3["Cnd_start_date"]) / 7)/4))
df_hist4.show()

DEFINE THE PARAMETER TO SEARCH

In [88]:
job_titleSearch = 'data'

SEARCH FOR THE WILD CHARACTER DEFINED IN THE PARAMETER

In [89]:
df_hist5 = df_hist4.filter(df_hist4.Candidate_job_title.rlike(job_titleSearch)).sort(df_hist4.Candidate_job_title.asc())
df_hist5.show(50)

In [90]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, lower
from pyspark.ml.feature import Tokenizer, Word2Vec
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

REMOVE SPECIAL CHARACTERS AS A PREPROCESSING STEP

In [91]:
#Removal of special characters and conerting the the strings to lower case
df_clean = df_hist4.withColumn("Candidate_job_title", lower(regexp_replace("Candidate_job_title", "[^a-zA-Z0-9\\s]", "")))
df_clean.show()

TREAT NULL VALUES

In [92]:
df_clean2 = df_clean.na.drop(subset=["Candidate_job_title"])

In [93]:
#Filter to explore analyst programmer job title
df_clean3 = df_clean.filter(df_clean.Candidate_job_title == "analystprogrammer  programmer")
df_clean3.show(100, truncate = False)

LOAD THE SAMPLE DATA TO BE USED FOR TRAINING

In [94]:
%%pyspark
df_salaries = spark.read.load('abfss://landing@esgadls2.dfs.core.windows.net/salaries_clean.csv', format='csv'
,header=True
)
df_salaries.show()

In [95]:
#Select the required column from the sample dataset
df_train = df_salaries.select('job_title', 'job_title_category')
df_train.show()

In [96]:
from pyspark.sql.functions import col, when

In [97]:
#Treat missing values in the dataset
dfTrain_clean2 = df_train.na.drop(subset=["job_title"])

In [98]:
#prepare the dataset by converting cell to its appropriate general form
df_train2 = dfTrain_clean2.withColumn("job_title_category",
                   when(col("job_title").rlike("analyst"), "Data Analyst")
                   .when(col("job_title").rlike("data engineer"), "Data Engineer")
                   .when(col("job_title").rlike("data architect"), "Data Architect")
                   .when(col("job_title").rlike("data scientist"), "Data Science")
                   .when(col("job_title").rlike("database"), "Data Management")
                   .when(col("job_title").rlike("visualization"), "Data Analyst")
                   .otherwise(col("job_title_category"))
                  )

In [99]:
df_train2.show(200)

USE RANDOM FOREST TEXT CLASSIFICATION TO CATEGORIZE THE JOB_TITLE COLUMN

In [100]:
from pyspark.ml.feature import CountVectorizer, StringIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

Encode the column values from categorical to numerical using string indexer, so as to be suitable for running the model

In [101]:
#Encode the column values from categorical to numerical using string indexer, so as to be suitable for running the model
Indexer = StringIndexer(inputCol="job_title_category", outputCol="indexedLabel").fit(df_train2)

Transform the training data with the encoded data

In [102]:
#Transform the training data with the encoded data
trainingDataIndexed = Indexer.transform(df_train2)

Store the encoded data (indexedLabel) into the labelCopy column created

In [103]:
#Store the encoded data (indexedLabel) into the labelCopy column created
training_data_indexed = trainingDataIndexed.withColumn("labelCopy", col("indexedLabel"))

Tokenize the training data

In [104]:
#Tokenize the training data
tokenizer = Tokenizer(inputCol="job_title", outputCol="words")

Aggregate other variables into a list called features

In [105]:
#Aggregate other variables into a list called features
df_countVectorizer = CountVectorizer(inputCol=tokenizer.getOutputCol(), outputCol="features")

In [106]:
#Initialize the random forest algorithm
df_rf = RandomForestClassifier(labelCol="labelCopy", featuresCol="features", numTrees=45, maxDepth=30)
df_rf

Convert the encoded column back to the original string format, this works opposite with StringIndexer

In [107]:
#Convert the encoded column back to the original string format, this works opposite with StringIndexer
Converter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=Indexer.labels)

***Define a pipeline to pass the preparatory/pre-processing steps***

In [108]:
pipeline = Pipeline(stages=[tokenizer, df_countVectorizer, df_rf, Converter])

In [109]:
#Split the data into test and train
(train_set, test_set) = training_data_indexed.randomSplit([0.7, 0.3])

test the model on the test dataset

In [110]:
#Train the model with the train set
model = pipeline.fit(train_set)

make predictions of the built model on the main dataset

In [111]:
#Deploy the model on the test set
predictions_test = model.transform(test_set)
predictions_test.show()

EVALUATING OUR MODEL

In [112]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelCopy", predictionCol="prediction",
                                              metricName="accuracy")

Get the model accuracy

In [113]:
accuracy = evaluator.evaluate(predictions_test)
print("Test Accuracy = %g" % accuracy)

In [114]:
predictions2 = model.transform(df_clean2.withColumnRenamed("Candidate_job_title", "job_title"))

In [115]:
predictions2.show()

In [116]:
#Select some columns to properly track the model
predictions3 = predictions2.select('ID', 'WorkHistory_ID', 'job_title', 'predictedLabel', 'Duration')
predictions3.show()

# Hyper parameter tunning of random forest model

Specify the grid values

In [117]:
#Specify the grid values
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(df_rf.numTrees, [15, 30, 45]) \
    .addGrid(df_rf.maxDepth, [10, 20, 30]) \
    .build()

In [118]:
#Initialize the cross validator
cross_validator = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="labelCopy", metricName="accuracy"),
                          numFolds=5, seed=42)

In [119]:
#Deploy the cross-validator on the training set
new_model = cross_validator.fit(train_set)

In [120]:
best_param = new_model.bestModel

In [121]:
#Get the best parameters
best_rf_model = best_param.stages[-2]

bestParameter = {
    "numTrees": best_rf_model.getNumTrees,
    "maxDepth": best_rf_model.getMaxDepth(),
}


In [122]:
maxDepthValue = best_rf_model.getMaxDepth()

In [123]:
print(bestParameter)

# Run a logistic regression model

In [124]:
from pyspark.ml.classification import LogisticRegression

In [125]:
#Initialize random forest model
df_lr = LogisticRegression(labelCol="indexedLabel", featuresCol="features")

In [126]:
#Create the model pipeline
pipeline_lr = Pipeline(stages=[tokenizer, df_countVectorizer, df_lr, Converter])

Train the model

In [127]:
model_lr = pipeline_lr.fit(train_set)

Test the model

In [128]:
predictions_test_lr = model_lr.transform(test_set)

Evaluate the model to get the accuracy

In [129]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction",
                                              metricName="accuracy")

In [130]:
#Evaluate the accuracy of the model
accuracy_lr = evaluator.evaluate(predictions_test_lr)
print("Test Accuracy (Logistic Regression) = %g" % accuracy_lr)

In [131]:
predictions_lr = model_lr.transform(df_clean2.withColumnRenamed("Candidate_job_title", "job_title"))

In [132]:
predictions_lr.show()

In [133]:
#Track the predictions generated
predictions_lr2 = predictions_lr.select('ID', 'Candidate_contact', 'WorkHistory_ID', 'job_title', 'predictedLabel', 'Duration')
predictions_lr2.show()

# Trying the Decision tree model

In [134]:
from pyspark.ml.classification import DecisionTreeClassifier

In [135]:
#Initialize the decision tree algorithm
df_dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features", minInstancesPerNode = 1, maxDepth = 30, impurity = 'gini')

In [136]:
#Create the stage pipeline
pipeline_dt = Pipeline(stages=[tokenizer, df_countVectorizer, df_dt, Converter])

In [137]:
#Train the decision tree model with the training data
model_dt = pipeline_dt.fit(train_set)

In [138]:
#Test the decision tree on the test data
predictions_test_dt = model_dt.transform(test_set)

In [139]:
#Evaluate the model by generating the accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy_dt = evaluator.evaluate(predictions_test_dt)

In [140]:
print("Test Accuracy (Decision Tree) = %g" % accuracy_dt)

In [141]:
predictions_dt = model_dt.transform(df_clean2.withColumnRenamed("Candidate_job_title", "job_title"))
predictions_dt.show()

In [142]:
#Track the predictions
predictions_dt2 = predictions_dt.select('ID', 'Candidate_contact', 'WorkHistory_ID', 'job_title', 'predictedLabel', 'Duration')
predictions_dt2.show()

# Decision Tree Hyper parameter tunning

In [143]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [144]:
#Define the parameter search grid
dt_paramGrid = ParamGridBuilder() \
    .addGrid(df_dt.maxDepth, [10, 20, 30]) \
    .addGrid(df_dt.minInstancesPerNode, [1, 2, 4]) \
    .addGrid(df_dt.impurity, ["gini", "entropy"]) \
    .build()

In [145]:
#Initialize the cross validator required to evaluate the model with the new parameters
crossvalidator = CrossValidator(estimator=pipeline_dt,
                          estimatorParamMaps=dt_paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="labelCopy", metricName="accuracy"),
                          numFolds=3)

In [146]:
#Search for the best parameters to train the dataset
cvModel_DT = crossvalidator.fit(train_set) 

In [147]:
best_param_dt = cvModel_DT.bestModel

In [148]:
#Generate the best parameters
best_dt_model = best_param_dt.stages[-2]

bestParameter_dt = {
    "minInstancesPerNode": best_dt_model.getMinInstancesPerNode(),
    "maxDepth": best_dt_model.getMaxDepth(),
    "impurity": best_dt_model.getImpurity(),
}

In [149]:
print(bestParameter_dt)

# Join the two tables at their candidate contact

In [153]:
#Merge the two tables at their common column (candidate contact)
join_df = predictions_lr2.join(candidate_skilllevel, predictions_lr2.Candidate_contact == candidate_skilllevel.Candidate_contactno)
join_df.show()

In [154]:
#Explore the merged dataframe
new_df = join_df.select('Candidate_contact','Cand_ID', 'job_title', 'predictedLabel', 'Cand_skill_name', 'Cand_experience', 'Cand_level', 'Duration')
new_df.show()

In [155]:
#Create the seniority level column
new_df2 = new_df.withColumn("Seniority",
                   when(col("job_title").rlike("(?i)senior"), "Senior")
                   .when(col("job_title").rlike("(?i)junior"), "Junior")
                   .when(col("job_title").rlike("(?i)mid"), "Mid")
                   .when(col("job_title").rlike("(?i)head"), "Head")
                   .when(col("job_title").rlike("(?i)lead"), "Lead")
                   .otherwise("None"))


In [156]:
new_df2.show(truncate = False)

# Approach 2: Using Clustering

In [157]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, Word2Vec
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import col

In [158]:
#Initialize the tokenizer
tokenizer = Tokenizer(inputCol="job_title", outputCol="words")

In [159]:
#Tokenize the dataset
df_tokenized = tokenizer.transform(new_df2)

In [160]:
#embed the datatset with word2vec model
word2Vec = Word2Vec(vectorSize=100, minCount=0, inputCol="words", outputCol="features")
model = word2Vec.fit(df_tokenized)
df_vectorized = model.transform(df_tokenized)

In [161]:
from pyspark.ml.clustering import KMeans

In [162]:
#Set the value of k, and fit the k-means on the embedded dataset
kmeans = KMeans().setK(10).setSeed(1)
km_model = kmeans.fit(df_vectorized)
df_clusters = km_model.transform(df_vectorized)

In [163]:
df_clusters.select("job_title", "prediction").show()

In [164]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, Word2Vec
from pyspark.ml.clustering import BisectingKMeans
from pyspark.sql.functions import col

In [165]:
Bisect = BisectingKMeans().setK(10).setSeed(1)  # Set K depending on how many broad categories you expect
Bisect_model = Bisect.fit(df_vectorized)
df_Bis_clusters = Bisect_model.transform(df_vectorized)

In [166]:
df_Bis_clusters.select("job_title", "prediction").show(50)

In [167]:
candidate_workhist.show()

In [168]:
candidate_skilllevel.show()

In [169]:
from pyspark.sql.functions import when, col, lit, regexp_extract

Aggregate for each job_category

In [170]:
#Aggregate the merged dataset according to the job category
grouped_new = new_df2.groupBy('predictedLabel','Candidate_contact', 'job_title', 'Cand_skill_name', 'Cand_experience', 'Cand_level', 'Duration', 'Seniority').count().sort('predictedLabel', ascending=False)
grouped_new.show()

Aggregate for each ID

In [171]:
#Aggregate the dataset according to each candidate
grouped_ID = new_df2.groupBy('Candidate_contact','Cand_ID', 'job_title', 'predictedLabel', 'Cand_skill_name', 'Cand_experience', 'Cand_level','Duration', 'Seniority').count().sort('Cand_ID', ascending=False)
grouped_ID.show(100,truncate = False)

In [172]:
from pyspark.sql import functions as F

aggregate_df = new_df2.groupBy('Candidate_contact','Cand_ID', 'predictedLabel', 'Cand_skill_name', 'Seniority').agg(F.mean("Duration").alias("Average_Months"))

aggregate_df.show()

In [173]:
#Define the candidate ID
candidateID = '9effb87c-d3f5-eb11-94ef-0022481a5643'

# Resume Parser
A resume skill extractor that extracts data related skills from a cv

In [174]:
cv = """
IFEANYI FRANKLIN NWOSU
DATA SCIENTIST | ANALYST | SQL DEVELOPER
07876582032 | Manchester, England | github.com/franklinn008 | franklinnwosu008@gmail.com
linkedin.com/in/Ifeanyi-nwosu-523977169
PROFESSIONAL PROFILE
Experienced data professional recognised for driving data-driven decision-making and optimising
organisational objectives. Proficient in Python, Jupyter, Excel, R, T-SQL, and data analysis tools (Apache
Spark Databricks, PowerBI, Tableau, Microsoft Azure). Proven track record of delivering efficient results
through innovative solutions. Committed to continuous self-development in the evolving field of data
science
EDUCATION
MSc Data Science |University of Salford | September 2022 – September 2023
• Advanced Skill Development: Actively pursued self-development in the evolving field of data
science. Developed comprehensive skills in R, PowerBI, and Tableau, enabling effective data
analysis and visualization.
• Machine Learning Expertise: Gained practical experience in Python for Machine Learning and
Data Mining. Successfully developed predictive models, extracting valuable insights from
extensive datasets and driving data-driven decisions.
• Database Mastery: Enhanced proficiency in Advanced Databases through mastery of TSQL and
SSMS. Efficiently managed databases conducted complex queries and ensured data integrity.
• Big Data Handling: Explored Big Data tools like Apache Data Bricks, enabling the handling and
processing of massive datasets with speed and scalability.
MSc Architecture |Nnamdi Azikiwe University| September 2015 - March 2018
• Architectural Proficiency: Demonstrated proficiency in architectural software, including AutoCAD,
Revit, SketchUp, Lumion, and Artlantis, streamlining design and presentation processes.
• Critical Thinking & Soft Skills: Showcased critical thinking, effective communication, time
management, adaptability, and attention to detail in job evaluations, emphasizing task
prioritization.
• Integration of Expertise: Integrated technical expertise in architectural software, essential soft
skills, and data analysis proficiency for successful project execution, consistently exceeding project
expectations.
BSc (Hons.) Architecture |Nnamdi Azikiwe University | September 2011 - July 2015
SSCE |Special Science School | September 2003 - June 2009
FSLC |Osumenyi central school | September 1997 - July 2003
WORK EXPERIENCE/PROJECTS
Data Specialist | Disruptive Learning Solutions (CINQ Game Developers) | France| June 2023 – To Date
• Data Cleaning & Transformation: Applied advanced data cleaning and transformation techniques
to ensure data accuracy, consistency and readiness for analysis.
• Machine Learning Model Development: Developed predictive machine learning models using
techniques such as predictive analytics and early action analysis to predict task time and failures
accurately.
• Data-Driven Insights: Delivered actionable insights through data analysis, enabling informed
decision-making in business and educational settings.
• User Engagement Optimization: Improved user engagement and satisfaction by analyzing user
behaviour and feedback, achieving a significant 25% increase.
• Cross-functional collaboration: Collaborated with cross-functional teams to integrate data-driven
features, ensuring holistic and effective solutions.
• Data Visualization: Created compelling data visualizations to convey insights, facilitating
transparent and data-informed decision-making.
Data Scientist/Analyst | DArTech Group | Manchester & Abuja| September 2022 – To Date
• Leadership & Innovation: Spearheaded the development of a Python-based classification model,
harnessing ensemble methods and hyperparameter optimization to significantly enhance auction
verification accuracy by 20%, reducing false positives and optimizing resources.
• Strategic Insights: Applied Python for customer segmentation clustering, identifying distinct
customer groups based on purchasing patterns. Conducted sentiment analysis on extensive hotel
reviews using NLP techniques, leading to informed marketing strategies and tailored customer
experiences.
• Operational Efficiency: Engineered Python scripts automating complex tasks such as determining
rollercoaster eligibility, lifespan calculations, BMI calculations, and tip bill calculations, resulting in
a 30% reduction in time and effort.
• Data Visualization Mastery: Created captivating and interactive data visualizations using Tableau,
PowerBI, Seaborn, and Matplotlib, integrating features like interactive filters, drill-down
functionality, and custom visualizations. This enabled data-driven decision-making, contributing
to a 15% increase in revenue.
• Statistical Proficiency: Conducted in-depth statistical analysis in R on world development
indicators, employing techniques such as hypothesis testing and regression analysis to uncover
critical patterns and relationships.
• Efficiency Enhancement: Analyzed large-scale clinical trial data using distributed computing
frameworks, including RDD, DataFrame, and SQL within the Databricks platform, improving
analytical efficiency by 25%.
• Database Excellence: Architected and developed a robust library management system and
comprehensive pharmaceutical database using T-SQL. Ensured data integrity, retrieval efficiency,
and security while streamlining operations.
Facility/Building Data Manager | DIFF GLOBAL LINKS, [Medical Centre] | February 2021 – August 2022
• Strategic Data Management: Conducted meticulous data assessments to identify data sources,
gaps, and integration opportunities within facility management processes, laying the groundwork
for data-driven decision-making.
• Standardization & Efficiency: Introduced data management frameworks, standardizing data
collection and storage, including SQL-based data management. This initiative empowered data
analytics to transform complex data into actionable insights, leading to a 10% increase in energy
efficiency
Data Science Intern | Early Code Tech Institute Abuja | May 2022 – August 2022
• Hands-On Learning: Gained hands-on experience in data cleaning, preprocessing, and exploratory
data analysis techniques, elevating proficiency in data wrangling and data quality assessment.
• Applied Knowledge: Developed skills in statistical analysis, machine learning algorithms, and
predictive modelling, directly applying data science methodologies to real-world challenges and
facilitating data-driven decisions.
• Effective Communication: Acquired proficiency in data visualization and storytelling, effectively
communicating insights derived from data analysis to stakeholders and presenting findings with
clarity and impact.
REFERENCES:
Available on request
"""

In [175]:
cv

In [176]:
import re

In [177]:
#Create a function to preprocess the cv text
def resume_skills(text, skill_list):
    skill = []
    
    for Candskill in skill_list:
        pattern = r"\b{}\b".format(re.escape(Candskill))
        match = re.search(pattern, text, re.IGNORECASE)
        if match:
            skill.append(Candskill)
    return skill

**Edensmith's skills list library**

In [178]:
crimson_skills = spark.read.load('abfss://enriched@esgadls2.dfs.core.windows.net/nurture/skills/skills_database/v1', format='delta'

)

crimson_skills.show(10)

crimson_skills=crimson_skills.select(lower(col('crimson_name')))

crimson_skills.show(10)

skill_exists=crimson_skills.rdd.map(lambda x: x[0]).collect()

In [179]:
skill_exists

Extract the skills from the cv

In [180]:
if __name__ == '__main__':
    text = cv
    
    skill_list = skill_exists
    
    extracted_skills = resume_skills(text, skill_list)
    
    if extracted_skills:
        print('Skills_extracted', extracted_skills)
    else:
            print('No skills matched')

In [181]:
extracted_skills

In [182]:
#Convert the skills to lowercase
extracted_skills2 = [skill.lower() for skill in extracted_skills]
print(extracted_skills2)

**View the career history of the Candidate selected from the database by his candidate ID**

In [183]:
df_ID = grouped_ID.filter(grouped_ID.Candidate_contact == candidateID)  
df_ID.show(50, truncate=False)

## GROUPING AND AGGREGATING
grouping to get the average duration of time for each candidate

In [184]:
from pyspark.sql import functions as F

aggregate_df = new_df2.groupBy('Candidate_contact', 'Cand_ID', 'predictedLabel', 'job_title', 'Cand_skill_name', 'Cand_experience', 'Cand_level', 'Seniority').agg(F.mean("Duration").alias("TotalMonths"))

aggregate_df.show()


**CHECKING FOR THE RELATIONSHIP BETWEEN THE CANDIDATE EXPREIENCE, CANDIDATE SKILL LEVEL , AND NUMBER OF MONTHS/DURATION**

In [185]:
from pyspark.sql.functions import corr

In [186]:
import pandas as pd
from scipy.stats import spearmanr

In [187]:
df_ID2 = new_df2.toPandas()
df_ID2

In [188]:
df_ID3 = df_ID2[['Cand_level', 'Duration', 'Cand_experience']]
df_ID3

In [189]:
#Treat for missing values
df_clean = df_ID3.dropna()
df_clean

In [190]:
#Generate the correlation matrix
correlation_matrix = df_clean.corr()
print(correlation_matrix)

In [191]:
import seaborn as sns
import matplotlib.pyplot as plt

In [192]:
#Plot the correlation Matrix
plt.figure(figsize=(8,6))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm')
plt.show()

# Define the parameters to be analysed

In [193]:
target_jobCategory = 'Data Science'

In [194]:
target_level = 'Senior'

In [195]:
candidateID = '9effb87c-d3f5-eb11-94ef-0022481a5643'

In [196]:
candSkill = candidateID

In [197]:
#Explore the Candidate contact profile
df_ID = grouped_ID.filter(grouped_ID.Candidate_contact == candidateID)  
df_ID.show(50, truncate=False)

In [198]:
df_ID_Skills = df_ID.select('Cand_skill_name').distinct()

In [199]:
#Current skills Of the candidate profile
from pyspark.sql.functions import collect_list
df_ID_list = df_ID_Skills.agg(collect_list("Cand_skill_name")).first()[0]
print(df_ID_list)

In [200]:
#Explore the target career
df_target = grouped_ID.filter((grouped_ID.predictedLabel == target_jobCategory) & (grouped_ID.Seniority == target_level))
df_target.show(50, truncate=False)

In [201]:
df_target2 = df_target.select('Cand_skill_name', 'Cand_experience', 'Cand_level', 'Duration')
df_target2.show()

GET THE TOP MOST COMMON SKILLS

In [202]:
#Aggregate to get the averave of Cand_skill_name, Cand_level, Cand_experience
df_target3 = df_target2.groupBy('Cand_skill_name')\
    .agg(F.count('Cand_skill_name').alias('skill_count'),
         F.mean("Cand_level").alias("avg_skill_level"),
         F.mean("Cand_experience").alias("avg_experience")).orderBy('skill_count', ascending = False)

df_target3.show()

MOST COMMON SKILLS OF THE TARGET CAREER

In [203]:
#Get the most common target skills
most_important_skills = df_target3.filter(df_target3.skill_count > 50)
most_important_skills.show()

In [204]:
#Get the average target skills
mid_important_skills = df_target3.filter((df_target3.skill_count > 20) & (df_target3.skill_count < 100))
mid_important_skills.show()

In [205]:
#Get the least target skills gap
least_important_skills = df_target3.filter(df_target3.skill_count < 20)
least_important_skills.show()

In [206]:
#Get the skill gap between the current and target jobs
skill_gap = df_target2.filter(~col('Cand_skill_name').isin(candSkill)).distinct()
skill_gap.show(truncate = False)

In [207]:
#remove duplicated enteries
skill_gap2 = skill_gap.dropDuplicates(['Cand_skill_name'])
skill_gap2.show(truncate = False)

RECOMMENDATION ANALYSIS

In [208]:
#most recommended skills to learn
top_skills = most_important_skills.filter(~col('Cand_skill_name')\
.isin(candSkill)).distinct().orderBy('skill_count', ascending = False)
top_skills.show(10,truncate = False)

Mid Important Skills

In [209]:
#mid recommended skills to learn
mid_skills = mid_important_skills.filter(~col('Cand_skill_name')\
.isin(candSkill)).distinct().orderBy('skill_count', ascending = False)
mid_skills.show(10,truncate = False)

Least Important skill

In [210]:
#most recommended skills to learn
least_skills = least_important_skills.filter(~col('Cand_skill_name')\
.isin(candSkill)).distinct().orderBy('skill_count', ascending = False)
least_skills.show(10,truncate = False)

In [211]:
top_skillsPD = top_skills.toPandas()
top_skillsPD

In [212]:
mid_skillsPD = mid_skills.toPandas()

In [213]:
least_skillsPD = least_skills.toPandas()

In [214]:
#Plot the skill gap
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_style('whitegrid')
sns.barplot(x='skill_count', y='Cand_skill_name', data=top_skillsPD.head(10))
plt.xlabel('Frequency')
plt.ylabel('Cand_skill_name')
sns.despine(left=True, bottom=True)

In [215]:
crimson_skills = spark.read.load('abfss://enriched@esgadls2.dfs.core.windows.net/nurture/skills/skills_database/v1', format='delta'

)

crimson_skills.show(10)

crimson_skills=crimson_skills.select(lower(col('crimson_name')))

crimson_skills.show(10)

skill_exists=crimson_skills.rdd.map(lambda x: x[0]).collect()

In [216]:
account_name = 'esgadls2' # fill in your primary account name

container_name = 'enriched' # fill in your container name

table_name  = 'career_recommender_joinedDF'

version_num = 'v1'

relative_path = '/recruitment/%s/%s' %(table_name,version_num) # fill in your relative folder path

adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (container_name, account_name, relative_path)

 

# Create table in the metastore using DataFrame's schema and write data to it

join_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("esgmain.%s" %(table_name))

# Create or replace partitioned table with path using DataFrame's schema and write/overwrite data to it

join_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(adls_path)

In [217]:
df_retrieve = spark.read.load('abfss://enriched@esgadls2.dfs.core.windows.net/recruitment/career_recommender_joinedDF/v1', format='delta'

)

In [218]:
career_recommender_joinedDF.show()