In [1]:
# Install Java, Spark, and Findspark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connecting to security.u                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.38)] [Co0% [1 InRelease gpgv 21.3 kB] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:3 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease
0% [1 InRelease gpgv 21.3 kB] [Waiting for headers] [Connecting to security.ubu                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [1 InRelease gpgv 21.3 kB] [4 InRelease 14.2 kB/88.7 kB 16%] [Connecting to                                                   

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Hashing").getOrCreate()

In [3]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

In [4]:
from pyspark import SparkFiles

df = spark.read.csv(SparkFiles.get("/content/indeed_data_processed.csv"), sep=",", header=True)

df = df.na.drop()

df = df[["jobclass", "jobdescription"]]

# Show DataFrame
df.show()

+--------------+--------------------+
|      jobclass|      jobdescription|
+--------------+--------------------+
|data scientist|Preventable illne...|
|data scientist|As Data Scientist...|
|data scientist|Data Scientist If...|
|data scientist|Medidata is leadi...|
|data scientist|POSITION SUMMARY ...|
|data scientist|Ph D in STEM plus...|
|data scientist|Power the Possibi...|
|data scientist|Job Overview The ...|
|data scientist|Why choose betwee...|
|data scientist|TITLE Data Scient...|
|data scientist|Please make sure ...|
|data scientist|Join a team recog...|
|data scientist|We are Applied In...|
|data scientist|Organization Acce...|
|data scientist|"About the Team A...|
|data scientist|SUMMARY The CMC D...|
|data scientist|Formation provide...|
|data scientist|Job Description T...|
|data scientist|Organization Acce...|
|data scientist|At Varen our perf...|
+--------------+--------------------+
only showing top 20 rows



In [5]:

from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['jobdescription']))
data_df.show()

+--------------+--------------------+------+
|      jobclass|      jobdescription|length|
+--------------+--------------------+------+
|data scientist|Preventable illne...|  3339|
|data scientist|As Data Scientist...|  2628|
|data scientist|Data Scientist If...|  1460|
|data scientist|Medidata is leadi...|  4802|
|data scientist|POSITION SUMMARY ...|  3672|
|data scientist|Ph D in STEM plus...|  2858|
|data scientist|Power the Possibi...|  4314|
|data scientist|Job Overview The ...|  1927|
|data scientist|Why choose betwee...|  2471|
|data scientist|TITLE Data Scient...|  1350|
|data scientist|Please make sure ...|  6638|
|data scientist|Join a team recog...|  4474|
|data scientist|We are Applied In...|  3080|
|data scientist|Organization Acce...|  3067|
|data scientist|"About the Team A...|  4429|
|data scientist|SUMMARY The CMC D...|  2196|
|data scientist|Formation provide...|  3969|
|data scientist|Job Description T...|  3476|
|data scientist|Organization Acce...|  3621|
|data scie

In [6]:

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='jobclass',outputCol='label')
tokenizer = Tokenizer(inputCol="jobdescription", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [8]:

# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [9]:

# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [10]:

# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[1836,388...|
|  0.0|(262145,[167,813,...|
|  0.0|(262145,[619,1836...|
|  0.0|(262145,[666,788,...|
|  0.0|(262145,[619,666,...|
|  0.0|(262145,[619,966,...|
|  0.0|(262145,[168,619,...|
|  0.0|(262145,[966,1079...|
|  0.0|(262145,[4525,622...|
|  0.0|(262145,[966,1836...|
|  0.0|(262145,[1079,183...|
|  0.0|(262145,[966,1836...|
|  0.0|(262145,[966,1836...|
|  0.0|(262145,[966,1232...|
|  0.0|(262145,[1115,183...|
|  0.0|(262145,[966,1667...|
|  0.0|(262145,[966,1836...|
|  0.0|(262145,[619,1079...|
|  0.0|(262145,[966,1232...|
|  0.0|(262145,[1836,337...|
+-----+--------------------+
only showing top 20 rows



In [11]:
cleaned.select('label').distinct().show()

+-----+
|label|
+-----+
|  0.0|
|  1.0|
|  4.0|
|  3.0|
|  2.0|
|  6.0|
|  5.0|
+-----+



In [12]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [13]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(40)

+------------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    jobclass|      jobdescription|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+------------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|data analyst|"Eaton’s Electric...|  5695|  1.0|["eaton’s, electr...|["eaton’s, electr...|(262144,[619,1232...|(262144,[619,1232...|(262145,[619,1232...|[-18013.518581535...|[1.0,2.3965111456...|       0.0|
|data analyst|"REQUIRED QUALIFI...|  4685|  1.0|["required, quali...|["required, quali...|(262144,[619,2362...|(262144,[619,2362...|(262145,[619,2362...|[-17128.617873883..

In [14]:

# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting jobclass was: %f" % acc)

Accuracy of model at predicting jobclass was: 0.944408
