In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB 16%] [Connecting to cloud                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic I

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Hashing").getOrCreate()

In [3]:
 from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [4]:
# Read in CSV
from pyspark import SparkFiles
df = spark.read.csv(SparkFiles.get("/content/news.csv"),sep=",", escape='"', encoding="utf-8", quote='"',  header=True)
df.show(5)

+--------------------+--------------------+-------+----------+------------+
|               title|                text|subject|      date|news_outcome|
+--------------------+--------------------+-------+----------+------------+
|['donald', 'trump...|['donald', 'trump...|   News|31/12/2017|           1|
|['drunk', 'braggi...|['house', 'intell...|   News|31/12/2017|           1|
|['sheriff', 'davi...|['friday', 'revea...|   News|30/12/2017|           1|
|['trump', 'obsess...|['christmas', 'da...|   News|29/12/2017|           1|
|['pope', 'francis...|['pope', 'francis...|   News|25/12/2017|           1|
+--------------------+--------------------+-------+----------+------------+
only showing top 5 rows



In [107]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
df = df.withColumn('length', length(df['title']))
df.show(3)

+--------------------+--------------------+-------+----------+------------+------+
|               title|                text|subject|      date|news_outcome|length|
+--------------------+--------------------+-------+----------+------------+------+
|['donald', 'trump...|['donald', 'trump...|   News|31/12/2017|           1|    87|
|['drunk', 'braggi...|['house', 'intell...|   News|31/12/2017|           1|    91|
|['sheriff', 'davi...|['friday', 'revea...|   News|30/12/2017|           1|    97|
+--------------------+--------------------+-------+----------+------------+------+
only showing top 3 rows



In [108]:
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- date: string (nullable = true)
 |-- news_outcome: string (nullable = true)
 |-- length: integer (nullable = true)



In [109]:
#changing the title type to array
from pyspark.sql.functions import udf, col, split

tolist_udf = udf(lambda x: x.replace("[","").replace("]","").replace("'",""))

In [110]:
df_2 = df.withColumn("title", tolist_udf(col("title")))
df_2 = df_2.withColumn("label", tolist_udf(col("news_outcome")))

In [111]:
from pyspark.sql.types import IntegerType
df_2 = df_2.withColumn("label", df_2["label"].cast(IntegerType()))

In [112]:
df_3 = df_2.select(split(col("title"),",").alias("TitleArray"), "label", "length") \
    .drop("title")
df_3.printSchema()

root
 |-- TitleArray: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- label: integer (nullable = true)
 |-- length: integer (nullable = true)



Hashing Term Frequency

In [113]:
# Run the hashing term frequency
hashing = HashingTF(inputCol="TitleArray", outputCol="TitleHashedValues", numFeatures=pow(2,5))

In [114]:
hashed_df = hashing.transform(df_3)

In [115]:
hashed_df.show(3, truncate=False)

+--------------------------------------------------------------------------------------+-----+------+-------------------------------------------------------------------+
|TitleArray                                                                            |label|length|TitleHashedValues                                                  |
+--------------------------------------------------------------------------------------+-----+------+-------------------------------------------------------------------+
|[donald,  trump,  send,  embarrassing,  new,  year,  eve,  message,  disturb]         |1    |87    |(32,[4,9,11,13,22,25,28,30],[1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0])     |
|[drunk,  bragging,  trump,  staffer,  start,  russian,  collusion,  investigation]    |1    |91    |(32,[0,2,10,14,16,17,22,29],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])     |
|[sheriff,  david,  clarke,  become,  internet,  joke,  threaten,  poke,  people,  eye]|1    |97    |(32,[3,5,6,7,10,13,16,22,24],[1.0,1.0,1.0,1.0,1.0

In [116]:
text_df = df_2.withColumn("text", tolist_udf(col("text")))

In [117]:
text_df2 = text_df.select(split(col("text"),",").alias("TextArray"), "label", "length") \
    .drop("text")
text_df2.printSchema()

root
 |-- TextArray: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- label: integer (nullable = true)
 |-- length: integer (nullable = true)



In [118]:
# Run the hashing term frequency - text
hashing = HashingTF(inputCol="TextArray", outputCol="TextHashedValues", numFeatures=pow(2,5))

In [119]:
hashed_df_text = hashing.transform(text_df2)

In [120]:
hashed_df_text.show(3)

+--------------------+-----+------+--------------------+
|           TextArray|label|length|    TextHashedValues|
+--------------------+-----+------+--------------------+
|[donald,  trump, ...|    1|    87|(32,[0,1,2,3,4,6,...|
|[house,  intellig...|    1|    91|(32,[0,1,2,3,4,5,...|
|[friday,  reveal,...|    1|    97|(32,[0,1,2,3,4,5,...|
+--------------------+-----+------+--------------------+
only showing top 3 rows



Fitting IDF on the data set

In [121]:
# Fit the IDF on the data set 
idf = IDF(inputCol="TitleHashedValues", outputCol="TitleFeatures")
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

In [122]:
rescaledData.show(3)

+--------------------+-----+------+--------------------+--------------------+
|          TitleArray|label|length|   TitleHashedValues|       TitleFeatures|
+--------------------+-----+------+--------------------+--------------------+
|[donald,  trump, ...|    1|    87|(32,[4,9,11,13,22...|(32,[4,9,11,13,22...|
|[drunk,  bragging...|    1|    91|(32,[0,2,10,14,16...|(32,[0,2,10,14,16...|
|[sheriff,  david,...|    1|    97|(32,[3,5,6,7,10,1...|(32,[3,5,6,7,10,1...|
+--------------------+-----+------+--------------------+--------------------+
only showing top 3 rows



In [123]:
# Fit the IDF on the data set - text
idf2 = IDF(inputCol="TextHashedValues", outputCol="TextFeatures")
idfModel2 = idf2.fit(hashed_df_text)
rescaledData2 = idfModel2.transform(hashed_df_text)

In [124]:
rescaledData2.show(3)

+--------------------+-----+------+--------------------+--------------------+
|           TextArray|label|length|    TextHashedValues|        TextFeatures|
+--------------------+-----+------+--------------------+--------------------+
|[donald,  trump, ...|    1|    87|(32,[0,1,2,3,4,6,...|(32,[0,1,2,3,4,6,...|
|[house,  intellig...|    1|    91|(32,[0,1,2,3,4,5,...|(32,[0,1,2,3,4,5,...|
|[friday,  reveal,...|    1|    97|(32,[0,1,2,3,4,5,...|(32,[0,1,2,3,4,5,...|
+--------------------+-----+------+--------------------+--------------------+
only showing top 3 rows



Naive Bayes

In [125]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['TitleFeatures', 'length'], outputCol='features')

In [137]:
cleaner = clean_up.transform(rescaledData)

In [131]:
cleaner.show(2)

+--------------------+-----+------+--------------------+--------------------+--------------------+
|          TitleArray|label|length|   TitleHashedValues|       TitleFeatures|            features|
+--------------------+-----+------+--------------------+--------------------+--------------------+
|[donald,  trump, ...|    1|    87|(32,[4,9,11,13,22...|(32,[4,9,11,13,22...|(33,[4,9,11,13,22...|
|[drunk,  bragging...|    1|    91|(32,[0,2,10,14,16...|(32,[0,2,10,14,16...|(33,[0,2,10,14,16...|
+--------------------+-----+------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [133]:
 from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaner.randomSplit([0.7, 0.3])
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [134]:
 # Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+-----+------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|          TitleArray|label|length|   TitleHashedValues|       TitleFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|[1,  million,  do...|    1|    94|(32,[1,7,11,15,21...|(32,[1,7,11,15,21...|(33,[1,7,11,15,21...|[-92.468299837790...|[0.70809060732337...|       0.0|
|[10,  second,  sa...|    1|   110|(32,[6,9,11,14,15...|(32,[6,9,11,14,15...|(33,[6,9,11,14,15...|[-95.156499655143...|[0.49189732071980...|       1.0|
|[10,  u,  navy,  ...|    1|   100|(32,[3,5,10,11,13...|(32,[3,5,10,11,13...|(33,[3,5,10,11,13...|[-97.878722368196...|[0.52663515834801...|       0.0|
|[100,  fed,  hill...|    1|    99|(32,[2,3,9,10,17,...|(32,[2,3,9,10,17,...|(33,[2,3,9,

In [135]:
 # Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.638448


In [142]:
# Create feature vectors
clean_up2 = VectorAssembler(inputCols=['TextFeatures', 'length'], outputCol='features')
cleaner_text = clean_up2.transform(rescaledData2)

In [144]:
cleaner_text.show(2)

+--------------------+-----+------+--------------------+--------------------+--------------------+
|           TextArray|label|length|    TextHashedValues|        TextFeatures|            features|
+--------------------+-----+------+--------------------+--------------------+--------------------+
|[donald,  trump, ...|    1|    87|(32,[0,1,2,3,4,6,...|(32,[0,1,2,3,4,6,...|[0.90043953270749...|
|[house,  intellig...|    1|    91|(32,[0,1,2,3,4,5,...|(32,[0,1,2,3,4,5,...|[0.65486511469635...|
+--------------------+-----+------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [145]:
 from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaner_text.randomSplit([0.7, 0.3])
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [146]:
 # Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+---------+-----+------+----------------+--------------------+--------------------+--------------------+--------------------+----------+
|TextArray|label|length|TextHashedValues|        TextFeatures|            features|       rawPrediction|         probability|prediction|
+---------+-----+------+----------------+--------------------+--------------------+--------------------+--------------------+----------+
|       []|    1|    35| (32,[28],[1.0])|(32,[28],[0.05212...|(33,[28,32],[0.05...|[-8.5429441258376...|[0.17449378052233...|       1.0|
|       []|    1|    47| (32,[28],[1.0])|(32,[28],[0.05212...|(33,[28,32],[0.05...|[-11.119183442055...|[0.11334766184906...|       1.0|
|       []|    1|    56| (32,[28],[1.0])|(32,[28],[0.05212...|(33,[28,32],[0.05...|[-13.051362929219...|[0.08060483914375...|       1.0|
|       []|    1|    58| (32,[28],[1.0])|(32,[28],[0.05212...|(33,[28,32],[0.05...|[-13.480736148588...|[0.07460786932016...|       1.0|
|       []|    1|    59| (32,[28],[1.0])|

In [147]:
 # Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.606742
