In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.3'
# spark_version = 'spark-<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease
Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
Get:3 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Get:8 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:12 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease
Get:14 http://archive.ubuntu.com/ubuntu focal-updates/universe 

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NLPCons").getOrCreate()

In [3]:
from pyspark import SparkFiles
# Load in data into a DataFrame

url = "/content/reviews_nlp_input.csv" #enter correct address here

spark.sparkContext.addFile(url)

df = spark.read \
    .option("delimiter", "|") \
    .option("multiline", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("header", "true") \
    .csv(url)

# Show DataFrame
df.show()

+---+--------------------+-----------+--------------------+--------------------+--------------------+--------------+-----------------+--------------+-------------------+----------+-------------+-----------+---------+----------+-------+--------------------+--------------------+--------------------+
|_c0|                firm|date_review|           job_title|             current|            location|overall_rating|work_life_balance|culture_values|diversity_inclusion|career_opp|comp_benefits|senior_mgmt|recommend|ceo_approv|outlook|            headline|                pros|                cons|
+---+--------------------+-----------+--------------------+--------------------+--------------------+--------------+-----------------+--------------+-------------------+----------+-------------+-----------+---------+----------+-------+--------------------+--------------------+--------------------+
|  0|AFH-Wealth-Manage...| 2020-10-01| Office Administr...|Former Employee, ...|Bromsgrove, Engla...|  

Transform DataFrame to fit review_rating table

In [4]:
cons_df = df.select(["cons", "overall_rating", "date_review"])
cons_df.show()

+--------------------+--------------+-----------+
|                cons|overall_rating|date_review|
+--------------------+--------------+-----------+
|Poor pay, huge ga...|             2| 2020-10-01|
|Salaries are much...|             1| 2021-02-05|
|Management can be...|             4| 2021-02-07|
|-Low Salary\n-Mid...|             3| 2021-02-07|
|-Unachievable bon...|             1| 2021-05-12|
|Borderline bullyi...|             1| 2021-05-13|
|Communication bet...|             5| 2021-05-13|
|started to become...|             3| 2020-10-14|
|It’s like a schoo...|             3| 2020-11-25|
|First the minor p...|             1| 2020-12-04|
|Awful cronyism in...|             1| 2020-12-08|
|Pension contribut...|             4| 2020-12-11|
|Poor salary, no f...|             2| 2020-12-21|
|Feel like one of ...|             4| 2021-01-05|
|Pension contribut...|             4| 2021-01-09|
|Under staffed, po...|             2| 2021-01-14|
|Pension contribut...|             4| 2021-01-25|


In [5]:
from pyspark.sql.functions import regexp_extract, length
cons_df = df.withColumnRenamed("overall_rating", "label").select(["label", "date_review", "cons"])
cons_df = cons_df.withColumn('cons_length', length(cons_df['cons'])).dropna()
cons_df.cache()
cons_df.show()

+-----+-----------+--------------------+-----------+
|label|date_review|                cons|cons_length|
+-----+-----------+--------------------+-----------+
|    2| 2020-10-01|Poor pay, huge ga...|        151|
|    1| 2021-02-05|Salaries are much...|        406|
|    4| 2021-02-07|Management can be...|         33|
|    3| 2021-02-07|-Low Salary\n-Mid...|        104|
|    1| 2021-05-12|-Unachievable bon...|        522|
|    1| 2021-05-13|Borderline bullyi...|         55|
|    5| 2021-05-13|Communication bet...|         44|
|    3| 2020-10-14|started to become...|         36|
|    3| 2020-11-25|It’s like a schoo...|        167|
|    1| 2020-12-04|First the minor p...|       1244|
|    1| 2020-12-08|Awful cronyism in...|        120|
|    4| 2020-12-11|Pension contribut...|         99|
|    2| 2020-12-21|Poor salary, no f...|         43|
|    4| 2021-01-05|Feel like one of ...|         43|
|    4| 2021-01-09|Pension contribut...|        152|
|    2| 2021-01-14|Under staffed, po...|      

Create Data Pipeline

In [6]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
# Create all the features to the data set
tokenizer = Tokenizer(inputCol="cons", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors (merge idf_token and review_length)
clean_up = VectorAssembler(inputCols=['idf_token', 'cons_length'], outputCol='features')

In [8]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])

Transform DataFrame

In [9]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(cons_df)
cleaned = cleaner.transform(cons_df)

In [10]:
# Show label of ham spam and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    2|(262145,[2120,218...|
|    1|(262145,[8145,912...|
|    4|(262145,[50617,10...|
|    3|(262145,[27576,42...|
|    1|(262145,[3672,113...|
|    1|(262145,[3112,386...|
|    5|(262145,[29440,10...|
|    3|(262145,[21570,27...|
|    3|(262145,[8804,610...|
|    1|(262145,[154,619,...|
|    1|(262145,[8804,122...|
|    4|(262145,[154,6555...|
|    2|(262145,[27576,71...|
|    4|(262145,[16487,21...|
|    4|(262145,[2325,655...|
|    2|(262145,[15494,90...|
|    4|(262145,[6555,339...|
|    1|(262145,[7400,153...|
|    1|(262145,[619,1546...|
|    3|(262145,[5923,343...|
+-----+--------------------+
only showing top 20 rows



In [11]:
cleaned.printSchema()

root
 |-- label: string (nullable = true)
 |-- date_review: string (nullable = true)
 |-- cons: string (nullable = true)
 |-- cons_length: integer (nullable = true)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- hash_token: vector (nullable = true)
 |-- idf_token: vector (nullable = true)
 |-- features: vector (nullable = true)



In [12]:
from pyspark.sql.functions import col
cleaned = cleaned.withColumn("label", col("label").cast("int"))

Run NaiveBayes

In [13]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes() #labelCol='label', featuresCol='features'
predictor = nb.fit(training)

In [14]:
training.show()

+-----+-----------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|label|date_review|                cons|cons_length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+-----+-----------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    1| 2020-09-13| Had to pay for food|         19|[had, to, pay, fo...|         [pay, food]|(262144,[27576,10...|(262144,[27576,10...|(262145,[27576,10...|
|    1| 2020-09-14|Franchise de la S...|        488|[franchise, de, l...|[franchise, de, l...|(262144,[22959,23...|(262144,[22959,23...|(262145,[22959,23...|
|    1| 2020-09-14|Long hours, no wo...|         64|[long, hours,, no...|[long, hours,, wo...|(262144,[53814,87...|(262144,[53814,87...|(262145,[53814,87...|
|    1| 2020-09-14|You become the ps...|         34|

In [15]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+-----+-----------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|date_review|                cons|cons_length|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+-----+-----------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    1| 2020-09-12|Abusive work envi...|        454|[abusive, work, e...|[abusive, work, e...|(262144,[1455,169...|(262144,[1455,169...|(262145,[1455,169...|[-3635.5319674551...|[1.0,5.7617850082...|       0.0|
|    1| 2020-09-12|All else is a con...|        431|[all, else, is, a...|[else, con., lot,...|(262144,[629,5923...|(262144,[629,5923...|(262145,[629,5923...

Predict accuracy of the model

In [16]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting cons was: %f" % acc)

Accuracy of model at predicting cons was: 0.166016
