In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [41.5 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:10 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:13 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:14 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
G

In [3]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("fetch_NLP").getOrCreate()

In [4]:
#read in dataframe 16.6.4
df = spark.createDataFrame([
                   (0,"The easiest way to earn points with Fetch Rewards is to just shop for the products you already love. If you have any participating brands on your receipt, you'll get points based on the cost of the products. You don't need to clip any coupons or scan individual barcodes. Just scan each grocery receipt after you shop and we'll find the savings for you."),
                   (1,"The easiest way to earn points with Fetch Rewards is to just shop for the items you already buy. If you have any eligible brands on your receipt, you will get points based on the total cost of the products. You do not need to cut out any coupons or scan individual UPCs. Just scan your receipt after you check out and we will find the savings for you."),
                   (2,"We are always looking for opportunities for you to earn more points, which is why we also give you a selection of Special Offers. These Special Offers are opportunities to earn bonus points on top of the regular points you earn every time you purchase a participating brand. No need to pre-select these offers, we'll give you the points whether or not you knew about the offer. We just think it is easier that way.")],['id','sentence'])

# Show DataFrame
df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|The easiest way t...|
|  1|The easiest way t...|
|  2|We are always loo...|
+---+--------------------+



In [24]:
# Import functions
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, StringIndexer

In [25]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature
data_df = df.withColumn('length', length(df['sentence']))
data_df.show()

+---+--------------------+------+
| id|            sentence|length|
+---+--------------------+------+
|  0|The easiest way t...|   353|
|  1|The easiest way t...|   351|
|  2|We are always loo...|   414|
+---+--------------------+------+



In [26]:
# Tokenize DataFrame-- splitting the sentence into words
tokened = Tokenizer(inputCol="sentence", outputCol="words")
tokened_transformed = tokened.transform(df)
tokened_transformed.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|The easiest way t...|[the, easiest, wa...|
|  1|The easiest way t...|[the, easiest, wa...|
|  2|We are always loo...|[we, are, always,...|
+---+--------------------+--------------------+



In [10]:
# Remove stop words-- removing the un-important words from the sentence
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed_frame = remover.transform(tokened_transformed)
removed_frame.show(truncate=False)

+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------

In [12]:
#run the hashing frequency 
hashing = HashingTF(inputCol= "filtered",outputCol = "hashedValues",numFeatures = pow(2,18))

#transform into a DF
hashed_df = hashing.transform (removed_frame)
hashed_df.show(truncate = False)

+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------

In [13]:
#fit the IDF on the dataset 
idf = IDF(inputCol = "hashedValues",outputCol = 'features')
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

#display the dataframe
rescaledData.select("words","features").show(truncate = False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [27]:
#create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='id',outputCol='label')
tokenizer = Tokenizer(inputCol="sentence", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [28]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num,tokenizer, stopremove, hashingTF, idf, clean_up])

In [29]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [32]:
#show label and resulting features
cleaned.select(['label','features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262145,[2306,767...|
|  1.0|(262145,[2306,963...|
|  2.0|(262145,[991,1537...|
+-----+--------------------+



In [30]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [33]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [34]:
#transform the model with testing data
test_results = predictor.transform(testing)
test_results.show()

+---+--------+------+-----+----------+-----------+----------+---------+--------+-------------+-----------+----------+
| id|sentence|length|label|token_text|stop_tokens|hash_token|idf_token|features|rawPrediction|probability|prediction|
+---+--------+------+-----+----------+-----------+----------+---------+--------+-------------+-----------+----------+
+---+--------+------+-----+----------+-----------+----------+---------+--------+-------------+-----------+----------+

