In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:8 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
Get:14 http://security.ubuntu.

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()

#from pyspark.context import SparkContext

#sc=SparkContext("local", "test")

In [3]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://project4ak.s3.amazonaws.com/tags_df5.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("tags_df5.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+---+-----+------+----------------+-------------+--------------------+----------+
|_c0|index|target|       cat_codes|trending_days|                tags|category_e|
+---+-----+------+----------------+-------------+--------------------+----------+
|  0| 2098|     0|          Gaming|            1|PokÃ©mon: Twiligh...|       5.0|
|  1| 2099|     0|           Music|            1|Mo3|Dallas|Texas|...|       7.0|
|  2| 2100|     0|  People & Blogs|            1|Cinemassacre Chan...|      10.0|
|  3| 2101|     0| News & Politics|            1|              [None]|       8.0|
|  4| 2102|     0|          Comedy|            1|ryan george|comed...|       1.0|
|  5| 2103|     0| News & Politics|            1|7NEWS|Australia|N...|       8.0|
|  6| 2104|     0|          Comedy|            1|chevy|mahk|zebra ...|       1.0|
|  7| 2105|     0|   Entertainment|            1|              [None]|       3.0|
|  8| 2106|     0|   Entertainment|            1|bradley martyn|st...|       3.0|
|  9| 2097|     

In [4]:
df=df.withColumn('category_e',df['category_e'].cast("float").alias('category_e'))

In [5]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['tags']))
data_df.show()

+---+-----+------+----------------+-------------+--------------------+----------+------+
|_c0|index|target|       cat_codes|trending_days|                tags|category_e|length|
+---+-----+------+----------------+-------------+--------------------+----------+------+
|  0| 2098|     0|          Gaming|            1|PokÃ©mon: Twiligh...|       5.0|    47|
|  1| 2099|     0|           Music|            1|Mo3|Dallas|Texas|...|       7.0|    33|
|  2| 2100|     0|  People & Blogs|            1|Cinemassacre Chan...|      10.0|    90|
|  3| 2101|     0| News & Politics|            1|              [None]|       8.0|     6|
|  4| 2102|     0|          Comedy|            1|ryan george|comed...|       1.0|   448|
|  5| 2103|     0| News & Politics|            1|7NEWS|Australia|N...|       8.0|   138|
|  6| 2104|     0|          Comedy|            1|chevy|mahk|zebra ...|       1.0|   116|
|  7| 2105|     0|   Entertainment|            1|              [None]|       3.0|     6|
|  8| 2106|     0|   

### Feature Transformations


In [6]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
trending_days = StringIndexer(inputCol='target',outputCol='label')
tokenizer = Tokenizer(inputCol="tags", outputCol="token_tags")
stopremove = StopWordsRemover(inputCol='token_tags',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token','length', 'category_e'], outputCol='features')

In [8]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[trending_days, tokenizer, stopremove, hashingTF, idf, clean_up])

In [9]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [10]:
# Show label and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(262146,[62357,13...|
|  1.0|(262146,[56612,25...|
|  1.0|(262146,[22784,26...|
|  1.0|(262146,[10705,26...|
|  1.0|(262146,[7178,395...|
|  1.0|(262146,[7123,497...|
|  1.0|(262146,[16108,11...|
|  1.0|(262146,[10705,26...|
|  1.0|(262146,[45233,47...|
|  1.0|(262146,[33291,17...|
|  1.0|(262146,[2614,898...|
|  1.0|(262146,[76201,85...|
|  1.0|(262146,[208989,2...|
|  1.0|(262146,[26459,27...|
|  1.0|(262146,[15483,43...|
|  1.0|(262146,[9738,312...|
|  1.0|(262146,[110334,1...|
|  1.0|(262146,[10705,26...|
|  1.0|(262146,[21209,28...|
|  1.0|(262146,[4151,328...|
+-----+--------------------+
only showing top 20 rows



In [11]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [12]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(training)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: (262146,[],[])
Intercept: 0.2764010420145615


In [13]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(50)

+-----+-----+------+--------------------+-------------+--------------------+----------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  _c0|index|target|           cat_codes|trending_days|                tags|category_e|length|label|          token_tags|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+-----+-----+------+--------------------+-------------+--------------------+----------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    1| 2099|     0|               Music|            1|Mo3|Dallas|Texas|...|       7.0|    33|  1.0|[mo3|dallas|texas...|[mo3|dallas|texas...|(262144,[56612,25...|(262144,[56612,25...|(262146,[56612,25...|[-221.46133057342.

In [14]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.621882
