# Qualifications analysis for data jobs
#### Aleksandar Banov



Problem Overview:
Can we predict the job title based on the job_descriptions alone for Data Scientist, Data Analyst, Data Engineer jobs.

**This notebook** 

### Setup Spark 
- **Load Spark with the respective path** 
Set path where Spark is installed (differences in between paths with windows / linux) 

- **Create Spark Session** 
Either take the existing sesseion or create a new one if there is none. Create session based on the parameters.

- **Load SparkContext** 
Context as main entry point for Spark functionality, which we will need later

In [1]:
# Import the findspark module 
import findspark

# Initialize via the full spark path
#findspark.init("/usr/local/spark/")
findspark.init()

In [2]:
# Import the SparkSession module
from pyspark.sql import SparkSession

# Import the collections module
import collections

# Gets an existing :class:`SparkSession` or, if there is no existing one, creates a
# new one based on the options set in this builder.
spark = SparkSession.builder \
   .master("local[8]") \
   .appName("RatingsHistogram") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()

# Main entry point for Spark functionality. A SparkContext represents the
# connection to a Spark cluster, and can be used to create :class:`RDD` and
# broadcast variables on that cluster.
sc = spark.sparkContext


### Read in the datasets and do the preprocecssing steps

- **Load CSV file.**
Read the CSV file with the preprocessed scraped Karriere data

- **Select non-duplicates**
Using the pyspark distinct() function.

The **karriere_3_classes** dataset that we scraped from Karriere.at with Selenium. After that we preprocessed and cleaned the data.

In [3]:
# reading in the data with the proper options
df3 = spark.read.format("com.databricks.spark.csv") \
    .options(header='True', inferSchema='True', delimiter=',') \
    .csv("karriere_3_classes.csv")

df3.show()

# assigining it a proper name
data = df3

+------------+--------------------+
|   job_title|     job_description|
+------------+--------------------+
|Data Analyst|We are Unito - Au...|
|Data Analyst|Group leader: in ...|
|Data Analyst|(Senior) Data Ana...|
|Data Analyst|Secure informatio...|
|Data Analyst|Nice code Valley ...|
|Data Analyst|Full-time; Advanc...|
|Data Analyst|International Loc...|
|Data Analyst|"Data analyst (m/...|
|Data Analyst|Moving Healthcare...|
|Data Analyst|With 12 Bachelor ...|
|Data Analyst|Data Management /...|
|Data Analyst|Win2day is the ga...|
|Data Analyst|Are you open to n...|
|Data Analyst|"Data analyst (f/...|
|Data Analyst|The right way for...|
|Data Analyst|Delivery Hero Aus...|
|Data Analyst|Internship: Finan...|
|Data Analyst|"Capgemini Invent...|
|Data Analyst|Data analyst; for...|
|Data Analyst|Moving Healthcare...|
+------------+--------------------+
only showing top 20 rows



In [4]:
# selecting only non-duplicates
data = data.distinct()

# look at how many jobs we got in total
data.count()

292

#### From the intitial 1300 rows after preprocessing steps we only end up with around 300. This is not quite satisfying as our models would benefit quite a bit from more observations but lets try and build them and see what happens.

In [6]:
from pyspark.sql.functions import col

# this is similar to the pandas value_counts() function but we have to spell it out quite a bit more for PySpark
data.groupBy("job_title").count().orderBy(col("count").desc()).show()

+--------------+-----+
|     job_title|count|
+--------------+-----+
|  Data Analyst|  178|
| Data Engineer|   60|
|Data Scientist|   54|
+--------------+-----+



#### We notice that  our dataset is also not quite the most balanced one but lets continue further.

### Feature engineering

We have two columns job_titles (Data Scientist, Data Engineer or a Data Analyst) and a column with the respective job_descriptions

We are going to utilize the **MLlib** for this task. Machine learning algorithms do not understand texts so we have to convert them into numeric values during this stage. We need to extract the relevant features and characteristics from the raw data that will act as inputs into our model and will be used in making predictions.

These features are in form of a tokenizer, vectorizer, and extractor.

The steps look like this:

- Tokenizer - RegexTokenizer - which allows more advanced tokenization based on regular expression (regex) matching.
- StopWordsRemover - drops all the stop words from the input sequences plus any additionally specified ones.
- Extractor - CountVectorizer - which will select the top vocabSize words ordered by term frequency across the corpus

In [8]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="job_description", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https"] # we add these further stopwords because there's sometimes a direct link to apply
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

Here we encode the label column with the StringIndexer instead of doing it manually. Build the pipeline and fit it to our training data.

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# encodes a string column (job_title) of labels to a column of label indices.
label_stringIdx = StringIndexer(inputCol = "job_title", outputCol = "label")

#creates a pipeline object with all the previous steps
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to the documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------+--------------------+--------------------+--------------------+--------------------+-----+
|     job_title|     job_description|               words|            filtered|            features|label|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
| Data Engineer|Data Engineer (40...|[data, engineer, ...|[data, engineer, ...|(2179,[0,1,2,3,4,...|  1.0|
|Data Scientist|(Senior) Data Ana...|[senior, data, an...|[senior, data, an...|(2179,[0,1,2,3,4,...|  2.0|
|  Data Analyst|"Do your life's b...|[do, your, life, ...|[do, your, life, ...|(2179,[0,1,2,3,4,...|  0.0|
| Data Engineer|Data Engineer (m/...|[data, engineer, ...|[data, engineer, ...|(2179,[0,1,2,3,4,...|  1.0|
|  Data Analyst|"We are Navax, a ...|[we, are, navax, ...|[we, are, navax, ...|(2179,[0,1,2,3,4,...|  0.0|
+--------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [10]:
# split the data and set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 42)

Training Dataset Count: 209
Test Dataset Count: 83


### Logistic Regression
Next up is building  and fitting the model. We are going to use Logistic Regression which is a generalized linear model that predicts the probability of the categorical outcomes.

In [11]:
building lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

#fitting the model
lrModel = lr.fit(trainingData)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# calculate the predictions
predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("job_description","job_title","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------+------------------------------+-----+----------+
|               job_description|   job_title|                   probability|label|prediction|
+------------------------------+------------+------------------------------+-----+----------+
|Financial data analyst (f/m...|Data Analyst|[0.9938094788656089,0.00336...|  0.0|       0.0|
|Senior Analyst (F/M/X)*; FO...|Data Analyst|[0.9897014781985672,0.00594...|  0.0|       0.0|
|Senior Project Leader for A...|Data Analyst|[0.9881477222898951,0.00585...|  0.0|       0.0|
|Financial Analyst - Revenue...|Data Analyst|[0.9875541052733894,0.00834...|  0.0|       0.0|
|Moving Healthcare Forward. ...|Data Analyst|[0.9870463255340176,0.00530...|  0.0|       0.0|
|Business Analyst - Recyclin...|Data Analyst|[0.9821995138369595,0.00450...|  0.0|       0.0|
|Financial Crime Systems Ana...|Data Analyst|[0.9805037326823108,0.01320...|  0.0|       0.0|
|Pallas Capital Advisory AG ...|Data Analyst|[0.978090878928

In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#evaluating the model's accuracy 
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.5345124339723426

#### The we get an accuracy of a bit over 50% which is not good. Let's see if we can undertake some steps to increase the score of our model.

### Further feature engineering with TF-IDF

Term frequency-inverse document frequency (TF-IDF) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus.
There are several variants on the definition of term frequency and document frequency. In MLlib TF and IDF are separate  so we have to add them as separete steps. Because both HashingTF and CountVectorizer can be used to generate the term frequency vectors we get rid of the CountVectorizer and substitute it for the HashingTF in the pipeline.

In [13]:
from pyspark.ml.feature import HashingTF, IDF

# HashingTF takes sets of terms and converts those sets into fixed-length feature vectors.
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)

# IDF takes feature vectors and scales each feature. It down-weights features which appear frequently in a corpus.
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

# adding the new steps to the old pipeline
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

# fitting the pipeline to the data
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

# again splitting the data into train/test
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 42)

# instantiate the linear model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

lrModel = lr.fit(trainingData)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# calculate the predictions
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("job_description","job_title","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------+------------------------------+-----+----------+
|               job_description|   job_title|                   probability|label|prediction|
+------------------------------+------------+------------------------------+-----+----------+
|Financial data analyst (f/m...|Data Analyst|[0.9946893163710926,0.00238...|  0.0|       0.0|
|Financial Analyst - Revenue...|Data Analyst|[0.9933732996713391,0.00484...|  0.0|       0.0|
|Design a piece of future wi...|Data Analyst|[0.9919549235358895,0.00401...|  0.0|       0.0|
|Moving Healthcare Forward. ...|Data Analyst|[0.9905219683864447,0.00424...|  0.0|       0.0|
|Senior Analyst (F/M/X)*; FO...|Data Analyst|[0.9894460095413148,0.00647...|  0.0|       0.0|
|Business Analyst - Recyclin...|Data Analyst|[0.9891197033068023,0.00218...|  0.0|       0.0|
|Job-ID: 146085; Security An...|Data Analyst|[0.9884380111637738,0.00949...|  0.0|       0.0|
|Senior Project Leader for A...|Data Analyst|[0.984909215029

In [14]:
#evaluating the model's accuracy again
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.5525550477773162

#### Just a few percentage points increase from the first model.

### CV and tunning the model

As we know crossValidator begins by splitting the dataset into a set of folds which are used as separate training and test datasets. In our case with k=5 folds, CrossValidator will generate 5 (training, test) dataset pairs, each of which uses 4/5 of the data for training and 1/5 for testing and evaluate the final score.

We use paramGrid to help us construct the parameter grid and tune the Logistic Regression model.

In [15]:
# creates a pipeline object with all the previous steps
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# fits and transforms the pipeline to the data
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

# split train/test
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# instantiate the linear model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2])
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

# calculate the predictions
predictions = cvModel.transform(testData)

# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.8194553564205611

#### Wow! So after tuning the model we got a significant boost in our evaluation score. A score of 82% is not that bad considering that this is a 3-class problem and  the amount of observations we have are below 300.

### Advanced models

We decided to try out more advanced models in order to explore what Spark NLP has to offer.

First we find spark as usual.

In [1]:
# Import the findspark module 
import findspark

# Initialize via the full spark path
#findspark.init("/usr/local/spark/")
findspark.init()

#prints the home of spark
findspark.find()

'C:\\Users\\Asus\\Documents\\spark-3.1.3-bin-hadoop3.2'

Here we initialize an Spark NLP session and import relevant libraries.

In [2]:
import sparknlp

# reassigning spark to sparknlp
spark = sparknlp.start()

# for training on GPU
sparknlp.start(gpu=True)

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

# we use pandas only to visualize the evaluation metrics of the models
import pandas as pd

# printing the versions for future reference 
print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

Spark NLP version 4.0.1
Apache Spark version: 3.1.3


### Data

Since we plan to build more advanced models that require deep learning we decided to use a dataset with more observations that our scraped data from Karriere.

You can find the dataset that we used [here](https://www.kaggle.com/code/inigoml/data-exploration/data?select=indeed_job_dataset.csv). As the previous one it has 3 labels (Data Scientist, Data Engineer, Data Analyst) and the split bewteeen the jobs is decent enough.

In [3]:
# reading in the data with the proper options
df3 = spark.read \
    .options(header='True', inferSchema='True', delimiter=',') \
    .csv("DA_DE_DS.csv")

df3.show()

# assigining it a proper name
data = df3


+--------------+--------------------+
|     job_title|     job_description|
+--------------+--------------------+
|data_scientist|" POSITION SUMMAR...|
|data_scientist| What do we need?...|
|data_scientist| Validate, analyz...|
|data_scientist| Full time, Washi...|
|data_scientist| Assist in consul...|
|data_scientist| Collecting and c...|
|data_scientist| With demand sens...|
|data_scientist| Masters degree i...|
|data_scientist| Duties   Summary...|
|data_scientist| The Department o...|
|data_scientist| Salary Commensur...|
|data_scientist| Mid Data Scienti...|
|data_scientist| WHY CATALINA   ,...|
|data_scientist| Located along Fl...|
|data_scientist|    Achievement N...|
|data_scientist| Implement large-...|
|data_scientist| ExxonMobil Resea...|
|data_scientist| Working at MIT o...|
|data_scientist| We've made a lot...|
|data_scientist| Raytheon is an E...|
+--------------+--------------------+
only showing top 20 rows



In [4]:
# selecting only non-duplicates
data = data.distinct()

# look at how many jobs we got in total
data.count()

4787

In [5]:
from pyspark.sql.functions import col

# this is similar to the pandas value_counts() function but we have to spell it out quite a bit more for PySpark
data.groupBy("job_title").count().orderBy(col("count").desc()).show()

+--------------+-----+
|     job_title|count|
+--------------+-----+
|data_scientist| 2155|
|  data_analyst| 1486|
| data_engineer| 1146|
+--------------+-----+



The split of the data isn't perfect but it will do the trick. Let's now do the test/train split and start building the model.

In [5]:

# set seed for reproducibility
(trainDataset, testDataset) = data.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainDataset.count()))
print("Test Dataset Count: " + str(testDataset.count()))


Training Dataset Count: 3338
Test Dataset Count: 1449


### Building the pipeline

A standard text classification problem follows these steps:

- Text preprocessing and cleaning
- Feature engineering
- Feature vectorization

and finally 
- Training a model with ML and DL algorithms.

In approaching the problem we could have used any text preprocessing and feature vectorization steps that are offered by Spark NLP and then build a model from MLlib. However we wanted to try some of the new pretrained DL models that Spark NLP offers.

Below we will utilize the ClassifierDL annotator which uses a deep learning model (DNNs) that has been built inside TensorFlow and supports up to 100 classes. ClassifierDL uses the state-of-the-art Universal Sentence Encoder as an input for text classifications. 

The Universal Sentence Encoder part of a group of text embedding methods which are crucial for building any Deep Learning model in Natural Language Processing. The text embedding converts text (words or sentences) into a numerical vector as we always have to boil everything down to a mathematical representation. The Universal Sentence Encoder goes past standard techniques of converting a word to a vector, along with words the context of the whole sentence will be captured in a vector.

**Now** before putting the data into a transformer and an annotator we need to get it ready for Spark NLP. **DocumentAssembler()** prepares data into a format that is processable by Spark NLP and is the entry point for every Spark NLP pipeline.

This is because the annotator in Spark NLP accepts certain types of columns and outputs new columns in another type. This is why in Spark NLP we constaintly have to **setInputCols** and **setOutputCol**.

In [6]:
# actual content is inside job_description column
document = DocumentAssembler()\
    .setInputCol("job_description")\
    .setOutputCol("document")


# we can also use sentence detector here 
# if we want to train on and get predictions for each sentence

# Transformer
# downloading pretrained embeddings
use = UniversalSentenceEncoder.pretrained()\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")

# the labels are are in the job_title column
classsifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("job_title")\
  .setMaxEpochs(5)\
  .setEnableOutputLogs(True)

# building the pipeline
use_clf_pipeline = Pipeline(
    stages = [
        document,
        use,
        classsifierdl
    ])

tfhub_use download started this may take some time.
Approximate size to download 923,7 MB
[OK!]


In [7]:
# fitting the pipeline to the data
use_pipelineModel = use_clf_pipeline.fit(trainDataset)

In [8]:
# transforming to the data to make predictions
preds = use_pipelineModel.transform(testDataset)

preds.select('job_title','job_description',"class.result").show(10, truncate = 80)

+--------------+--------------------------------------------------------------------------------+----------------+
|     job_title|                                                                 job_description|          result|
+--------------+--------------------------------------------------------------------------------+----------------+
|  data_analyst| Enterprise Data Governance is a Corporate-wide initiative focused on the est...|  [data_analyst]|
| data_engineer|    Design, access and implementation of extensive databases and analytical t...|[data_scientist]|
| data_engineer|    Implement their knowledge of cloud, serverless, and hybrid-cloud technolo...|  [data_analyst]|
| data_engineer| C.H. Robinson helps drive corporate strategy and provides business value usi...| [data_engineer]|
| data_engineer| Castlight is looking for seasoned data engineers who have a penchant for pro...| [data_engineer]|
|data_scientist| Onboard with the Grand Rounds team in San Francisco, setup your

We briefly use pandas here to create the evaluation matrix.

In [9]:
from sklearn.metrics import classification_report, accuracy_score

# get the predictions in a pandas dataframe
df= use_pipelineModel.transform(testDataset).select('job_title','job_description',"class.result").toPandas()

# get the first element of each field of the result column
df['result'] = df['result'].apply(lambda x:x[0])

# print evaluation matrix and the accuracy score
print(classification_report(df.job_title, df.result))
print(accuracy_score(df.job_title, df.result))

                precision    recall  f1-score   support

  data_analyst       0.77      0.74      0.75       455
 data_engineer       0.85      0.76      0.80       347
data_scientist       0.80      0.87      0.83       647

      accuracy                           0.80      1449
     macro avg       0.81      0.79      0.80      1449
  weighted avg       0.80      0.80      0.80      1449

0.8019323671497585


Alright! Looks good! And all that took just a few lines of code and we didn't do any text preprocessing or cleaning steps along the way like we had to before with the MLlib model.

### Bert pipeline

Because the BERT model was the initial plan of the project we decided to implement it here. Lets see if we gain any extra accuracy!

We are going to use sentence-level embeddings using BERT. BERT (Bidirectional Encoder Representations from Transformers) provides dense vector representations for natural language by using a deep, pre-trained neural network with the Transformer architecture.

We pretty much have exactly the same code as above but here we just use a different transformer - **BertSentenceEmbeddings** instead of the **UniversalSentenceEncoder**

In [10]:
gain # actual content is inside job_description column
document = DocumentAssembler()\
    .setInputCol("job_description")\
    .setOutputCol("document")
    

# we can also use sentence detector here 
# if we want to train on and get predictions for each sentence

# Transformer
# downloading pretrained embeddings
bert_sent = BertSentenceEmbeddings.pretrained('sent_small_bert_L8_512')\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")

# the labels are are in the job_title column
classsifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("job_title")\
  .setMaxEpochs(5)\
  .setEnableOutputLogs(True)

# building the pipeline
use_clf_pipeline = Pipeline(
    stages = [
        document,
        use,
        classsifierdl
    ])

sent_small_bert_L8_512 download started this may take some time.
Approximate size to download 149,1 MB
[OK!]


**Note:** Here we specified the pretraining that we want the **BertSentenceEmbeddings** to have. We can see that the size of the **sent_small_bert_L8_512** is considerably smaller than what we had above with the UniversalSentenceEncoder which was approx. 1GB.

In [11]:
# fitting the pipeline to the data
useBERT_pipelineModel = use_clf_pipeline.fit(trainDataset)

In [12]:
# transforming to the data to make predictions
preds = useBERT_pipelineModel.transform(testDataset)

preds.select('job_title','job_description',"class.result").show(10, truncate = 80)

+--------------+--------------------------------------------------------------------------------+----------------+
|     job_title|                                                                 job_description|          result|
+--------------+--------------------------------------------------------------------------------+----------------+
|  data_analyst| Enterprise Data Governance is a Corporate-wide initiative focused on the est...|  [data_analyst]|
| data_engineer|    Design, access and implementation of extensive databases and analytical t...|[data_scientist]|
| data_engineer|    Implement their knowledge of cloud, serverless, and hybrid-cloud technolo...|  [data_analyst]|
| data_engineer| C.H. Robinson helps drive corporate strategy and provides business value usi...| [data_engineer]|
| data_engineer| Castlight is looking for seasoned data engineers who have a penchant for pro...| [data_engineer]|
|data_scientist| Onboard with the Grand Rounds team in San Francisco, setup your

In [13]:
from sklearn.metrics import classification_report, accuracy_score

# get the predictions in a pandas dataframe
df= useBERT_pipelineModel.transform(testDataset).select('job_title','job_description',"class.result").toPandas()

# get the first element of each field of the result column
df['result'] = df['result'].apply(lambda x:x[0])

# print evaluation matrix and the accuracy score
print(classification_report(df.job_title, df.result))
print(accuracy_score(df.job_title, df.result))

                precision    recall  f1-score   support

  data_analyst       0.76      0.75      0.76       455
 data_engineer       0.80      0.82      0.81       347
data_scientist       0.84      0.83      0.84       647

      accuracy                           0.81      1449
     macro avg       0.80      0.80      0.80      1449
  weighted avg       0.81      0.81      0.81      1449

0.8053830227743272


Alright! We just got a few extra decimal points to add to the accuracy and neatly round it up to 81%

### Build a pipeline that includes text preprocessing with GloVe

We saw how easy it was to build, train and predict with Spark NLP pipelines. We didn't need any text preprocessing beforehand. Nevertheless let's try and introduce some into our pipeline and see what are the results. 

We will at first apply several text preprocessing steps (normalize, remove stopwords and then find lemmas out of words)and then get word embeddings per token (lemma of a token) and then average the word embeddings in each sentence to get a sentence embeddings per row. Then we will utilize Glove word embeddings - GloVe (Global Vectors) is a model for distributed word representation. This is achieved by mapping words into a meaningful space where the distance between words is related to semantic similarity. One benefit of GloVe is that it is the result of directly modeling relationships, instead of getting them as a side effect of training a language model.

In [14]:
# actual content is inside job_description column
# prepares data into a format that is processable by Spark NLP
documentAssembler = DocumentAssembler()\
    .setInputCol("job_description")\
    .setOutputCol("document")

# Tokenizes raw text in document type columns into TokenizedSentence
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# Annotator that cleans out tokens. Requires stems - tokens.
#Removes all dirty characters from text following a regex pattern and transforms words based on a provided dictionary
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")

# This annotator takes a sequence of strings and drops all the stop words from the input sequences.
stopwords_cleaner = StopWordsCleaner()\
    .setInputCols("normalized")\
    .setOutputCol("cleanTokens")\
    .setCaseSensitive(False)

# Class to find lemmas out of words with the objective of returning a base dictionary word.
# Meaning that it retrieves the significant part of a word. 
lemma = LemmatizerModel.pretrained('lemma_antbnc') \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("lemma")

# Word Embeddings lookup annotator that maps tokens to vectors.
word_embeddings = WordEmbeddingsModel().pretrained() \
      .setInputCols(["document",'lemma'])\
      .setOutputCol("embeddings")\
      .setCaseSensitive(False)

# Converts the results from WordEmbeddings into sentence or document embeddings
# It does that by either summing up or averaging all the word embeddings in a sentence or a document
# We have setPoolingStrategy to Average so it does the former
embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "embeddings"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")

# the labels are are in the job_title column
classsifierdl = ClassifierDLApproach()\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("class")\
    .setLabelColumn("job_title")\
    .setMaxEpochs(5)\
    .setEnableOutputLogs(True)

# building the pipeline
clf_pipeline_mix = Pipeline(stages=[
    documentAssembler,
    tokenizer,
    normalizer,
    stopwords_cleaner,
    lemma,
    word_embeddings,
    embeddingsSentence,
    classsifierdl
 ])

lemma_antbnc download started this may take some time.
Approximate size to download 907,6 KB
[OK!]
glove_100d download started this may take some time.
Approximate size to download 145,3 MB
[OK!]


In [15]:
# fitting the pipeline to the data
clf_pipelineModel_long = clf_pipeline_mix.fit(trainDataset)

In [16]:
# transforming to the data to make predictions
preds = clf_pipelineModel_long.transform(testDataset)

preds.select('job_title','job_description',"class.result").show(10, truncate = 80)

+--------------+--------------------------------------------------------------------------------+----------------+
|     job_title|                                                                 job_description|          result|
+--------------+--------------------------------------------------------------------------------+----------------+
|  data_analyst| Enterprise Data Governance is a Corporate-wide initiative focused on the est...|  [data_analyst]|
| data_engineer|    Design, access and implementation of extensive databases and analytical t...| [data_engineer]|
| data_engineer|    Implement their knowledge of cloud, serverless, and hybrid-cloud technolo...|  [data_analyst]|
| data_engineer| C.H. Robinson helps drive corporate strategy and provides business value usi...| [data_engineer]|
| data_engineer| Castlight is looking for seasoned data engineers who have a penchant for pro...| [data_engineer]|
|data_scientist| Onboard with the Grand Rounds team in San Francisco, setup your

In [17]:
from sklearn.metrics import classification_report, accuracy_score

# get the predictions in a pandas dataframe
df= clf_pipelineModel_long.transform(testDataset).select('job_title','job_description',"class.result").toPandas()

# get the first element of each field of the result column
df['result'] = df['result'].apply(lambda x:x[0])

# get the first element of each field of the result column
print(classification_report(df.job_title, df.result))
print(accuracy_score(df.job_title, df.result))

                precision    recall  f1-score   support

  data_analyst       0.79      0.66      0.72       455
 data_engineer       0.79      0.71      0.75       347
data_scientist       0.76      0.88      0.82       647

      accuracy                           0.77      1449
     macro avg       0.78      0.75      0.76      1449
  weighted avg       0.77      0.77      0.77      1449

0.772256728778468


Finished! We got an accuracy of 77% which is a bit lower than the accuracy from the previous pipelines despite the fact that here we did quite a lot of text preprocessing steps. Turns out that while doing text preprocessing we are hurting the model's performance. That could be due to the fact that the Spark NLP models are already trained on unclean text data. And our feature engineering steps may just be including more noise that worsens the model's performance 

**Sources:**


In building the MLlib models we found usefull:
- [Section Article](https://www.section.io/engineering-education/multiclass-text-classification-with-pyspark/)
- [MLlib Doc](https://spark.apache.org/docs/1.1.0/mllib-feature-extraction.html)
- [Medium Article](https://towardsdatascience.com/multi-class-text-classification-with-pyspark-7d78d022ed35)
- [Regex](https://www.codepicky.com/regex/#:~:text=Regex%20is%20one%20of%20the,in%20long%20pieces%20of%20text.&text=Regex%20or%20Regexp%2C%20short%20for,typically%20used%20on%20larger%20texts.)


In building the Spark NLP models we found usefull:
- [Medium Article on Document Assembler](https://medium.com/spark-nlp/spark-nlp-101-document-assembler-500018f5f6b5)
- [Text Preprocessing](https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/2.Text_Preprocessing_with_SparkNLP_Annotators_Transformers.ipynb)
- [Spark NLP Doc](https://nlp.johnsnowlabs.com/docs/en/quickstart)
- [Medium Article](https://towardsdatascience.com/text-classification-in-spark-nlp-with-bert-and-universal-sentence-encoders-e644d618ca32)
- [Word Embeddings](https://machinelearningmastery.com/what-are-word-embeddings/)
- [Spark NLP Examples](https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/5.1_Text_classification_examples_in_SparkML_SparkNLP.ipynb)


We saw that the models can predict with a decent certainty what describes a Data Scientist vs Data Analyst or a Data Engineering role. Let us further investigate the 2 datasets that we have - one we scraped, the other we combined and cleaned with PySpark. Please check up next.