In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease [15.9 kB]
Get:12 http://archive.ubuntu.com/ubuntu bionic-backports 

In [2]:
# Start Spark session by importing the library and setting the spark variable to the code below
# This creates a Spark application called "DataFrameBasics."
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

In [3]:
dataframe = spark.createDataFrame([(0, "Here is our dataframe"),
                                   (1, "We are making one from scratch"),
                                   (2, "This will look very similar to a PandasDF")], ["id", "words"])
dataframe.show()

+---+--------------------+
| id|               words|
+---+--------------------+
|  0|Here is our dataf...|
|  1|We are making one...|
|  2|This will look ve...|
+---+--------------------+



In [4]:
# Spark also lets us import data directly into a DataFrame. To do this, we import SparkFiles from the pyspark library that allows us to retrieve files.
# The next three lines of code tell Spark to pull data from Amazon's Simple Storage Service (S3), a cloud-based data storage service. This boilerplate code can be used to read other public files hosted on Amazon's services.

# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_1/food.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("food.csv"), sep=",", header=True)

df.show()

+-------+-----+
|   food|price|
+-------+-----+
|  pizza|    0|
|  sushi|   12|
|chinese|   10|
+-------+-----+



In [5]:
# Spark will infer the schema from the data, unless otherwise specified. We can check the schema by running the following code:

# Print our schema
df.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: string (nullable = true)



In [6]:
df.columns

['food', 'price']

In [7]:
df.describe()

DataFrame[summary: string, food: string, price: string]

In [8]:
# we need to updayte the price to be a number
# we can set our schema and then apply it to the data. We'll start by importing the different types of data with the following code

# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [9]:
# Next, create the schema by creating a StructType, which is one of Spark's complex types, like an array or map. The StructField will define the column name, the data type held, and a Boolean to define whether null values will be included or not:
# Next we need to create the list of struct fields
schema = [StructField("food", StringType(), True), StructField("price", IntegerType(), True),]
schema

[StructField(food,StringType,true), StructField(price,IntegerType,true)]

In [10]:
# Next, enter the code that will pass the schema just created as fields in a StructType. All this will be stored in a variable called final
# Pass in our fields
final = StructType(fields=schema)
final

StructType(List(StructField(food,StringType,true),StructField(price,IntegerType,true)))

In [11]:
# Now that we have a predefined schema, we can read in the data again, only this time passing in our own schema. 
# Read our data with our new schema
dataframe = spark.read.csv(SparkFiles.get("food.csv"), schema=final, sep=",", header=True)
dataframe.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: integer (nullable = true)



In [12]:
# Add new column
dataframe.withColumn('newprice', dataframe['price']).show()
# Update column name
dataframe.withColumnRenamed('price','newerprice').show()
# Double the price
dataframe.withColumn('doubleprice',dataframe['price']*2).show()
# Add a dollar to the price
dataframe.withColumn('add_one_dollar',dataframe['price']+1).show()
# Half the price
dataframe.withColumn('half_price',dataframe['price']/2).show()

+-------+-----+--------+
|   food|price|newprice|
+-------+-----+--------+
|  pizza|    0|       0|
|  sushi|   12|      12|
|chinese|   10|      10|
+-------+-----+--------+

+-------+----------+
|   food|newerprice|
+-------+----------+
|  pizza|         0|
|  sushi|        12|
|chinese|        10|
+-------+----------+

+-------+-----+-----------+
|   food|price|doubleprice|
+-------+-----+-----------+
|  pizza|    0|          0|
|  sushi|   12|         24|
|chinese|   10|         20|
+-------+-----+-----------+

+-------+-----+--------------+
|   food|price|add_one_dollar|
+-------+-----+--------------+
|  pizza|    0|             1|
|  sushi|   12|            13|
|chinese|   10|            11|
+-------+-----+--------------+

+-------+-----+----------+
|   food|price|half_price|
+-------+-----+----------+
|  pizza|    0|       0.0|
|  sushi|   12|       6.0|
|chinese|   10|       5.0|
+-------+-----+----------+



In [13]:
# In the last section, you might have noticed that when calling a column with Spark, no real results were shown. 
# As you might recall, Spark uses lazy evaluation. This means that Spark takes a list of instructions and formulates the best way to fulfill them, 
# and then waits until you tell Spark to complete them. The process of listing and reading the instructions is called transformation, 
# and your directive to complete them is called an action.

# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameFunctions").getOrCreate()


In [14]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_1/wine.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("wine.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|country|         description|         designation|points|price|          province|            region_1|         region_2|           variety|              winery|
+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|     US|This tremendous 1...|   Martha's Vineyard|    96|  235|        California|         Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|
|  Spain|Ripe aromas of fi...|Carodorum Selecci...|    96|  110|    Northern Spain|                Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|
|     US|Mac Watson honors...|Special Selected ...|    96|   90|        California|      Knights Valley|           Sonoma|   Sauvignon Blanc|            Macauley|
|     US|This spent 20

In [15]:
# Transformations
# with the data loaded in, let's perform some transformations
# Order a DataFrame by ascending values
#  All we're doing is telling Spark that we want this DataFrame to be organized in this particular way, and Spark says, "Okay, got it—just let me know when you want me to do this."

df.orderBy(df["points"].desc())

DataFrame[country: string, description: string, designation: string, points: string, price: string, province: string, region_1: string, region_2: string, variety: string, winery: string]

In [16]:
# Actions
# Actions direct Spark to perform the computation instructions and return a result.
# orderBy() and desc() are transformations telling Spark how to organize the data. Spark will read these transformations as instructions, but it won't act on them just yet.
# show() is an action that gives the go-ahead for Spark to run all of those transformations and to produce a result.

df.orderBy(df["points"].desc()).show(5)

+-------+--------------------+--------------------+------+-----+----------+-----------+--------+--------------------+--------------------+
|country|         description|         designation|points|price|  province|   region_1|region_2|             variety|              winery|
+-------+--------------------+--------------------+------+-----+----------+-----------+--------+--------------------+--------------------+
|     US|This is an absolu...|           IX Estate|    99|  290|California|Napa Valley|    Napa|           Red Blend|              Colgin|
| France|98-100 Barrel sam...|       Barrel sample|    99| null|  Bordeaux|   Pauillac|    null|Bordeaux-style Re...|Ch̢teau Pontet-Canet|
|     US|There are incredi...|Elevation 1147 Es...|    99|  150|California|Napa Valley|    Napa|  Cabernet Sauvignon|        David Arthur|
| France|A magnificent Cha...|Dom P̩rignon Oeno...|    99|  385| Champagne|  Champagne|    null|     Champagne Blend|     Mo��t & Chandon|
|  Italy|Even better than .

In [17]:
# More Functions

#import functions

#avg
from pyspark.sql.functions import avg
df.select(avg("points")).show()

#filter
df.filter("price>20").show(5)

# filter and select certain columns; Both filter and select are separate transformations, and show is again the action.
df.filter("price>20").select(["points", "country"]).show(5)



+-----------------+
|      avg(points)|
+-----------------+
|87.88834105383143|
+-----------------+

+-------+--------------------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+
|country|         description|         designation|points|price|      province|         region_1|         region_2|           variety|              winery|
+-------+--------------------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+
|     US|This tremendous 1...|   Martha's Vineyard|    96|  235|    California|      Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|
|  Spain|Ripe aromas of fi...|Carodorum Selecci...|    96|  110|Northern Spain|             Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|
|     US|Mac Watson honors...|Special Selected ...|    96|   90|    California|   Knights Valley|           Sonoma|   S

In [18]:
# NLP - Tokenized data
from pyspark.ml.feature import Tokenizer

In [19]:
# We'll create a small DataFrame that will show the pre-tokenized data, using the following code
#create sample dataframe
dataframe = spark.createDataFrame([
                                   (0, "Spark is great."),
                                   (1, "We are learning Spark."),
                                   (2, "Spark is better than Hadoop no doubt.")
], ["id", "sentence"])
dataframe.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|     Spark is great.|
|  1|We are learning S...|
|  2|Spark is better t...|
+---+--------------------+



In [20]:
# The tokenizer function takes input and output parameters. The input passes the name of the column that we want to have tokenized, and the output takes the name that we want the column called.
# Tokenize sentences
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
tokenizer

Tokenizer_ebd747bfdbf7

In [21]:
# The tokenizer that we created uses a transform method that takes a DataFrame as input. This is a transformation, 
# so to reveal the results, we'll call show(truncate=False) as our action to display the results without shortening the output, as shown below:
tokenizeddf = tokenizer.transform(dataframe)
tokenizeddf.show(truncate=False)

+---+-------------------------------------+---------------------------------------------+
|id |sentence                             |words                                        |
+---+-------------------------------------+---------------------------------------------+
|0  |Spark is great.                      |[spark, is, great.]                          |
|1  |We are learning Spark.               |[we, are, learning, spark.]                  |
|2  |Spark is better than Hadoop no doubt.|[spark, is, better, than, hadoop, no, doubt.]|
+---+-------------------------------------+---------------------------------------------+



In [22]:
# User-defined functions (UDFs) are functions created by the user to add custom output columns.
# For the example below, we can create a function that will enhance our tokenizer by returning a word count for each line. 
# Start by creating a Python function that takes a list of words as its input, then returns the length of that list. 

# Create a function to return the length of a list
def word_list_length(word_list):
    return len(word_list)

In [23]:
#  we'll import the udf function, the col function to select a column to be passed into a function, and the type IntegerType that 
# will be used in our udf to define the data type of the output

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [24]:
# Using the udf function, we can create our function to be passed in. The udf will take in the name of the function as a parameter 
# and the output data type, which is the IntegerType that we just imported.

# Create a user defined function
count_tokens = udf(word_list_length, IntegerType())

In [25]:
# Now we can redo the tokenizer process. Only this time, after the DataFrame has outputted the tokenized values, 
# we can use our own created function to return the number of tokens created. This will give us another data point to use in the future, if needed.

# create our tokenizer
tokenizer2 = Tokenizer(inputCol="sentence", outputCol="words")
tokenizeddf2 = tokenizer2.transform(dataframe)

#select the needed columns and don't truncate the results
tokenizeddf2.withColumn("tokens", count_tokens(col("words"))).show(truncate=False)




+---+-------------------------------------+---------------------------------------------+------+
|id |sentence                             |words                                        |tokens|
+---+-------------------------------------+---------------------------------------------+------+
|0  |Spark is great.                      |[spark, is, great.]                          |3     |
|1  |We are learning Spark.               |[we, are, learning, spark.]                  |4     |
|2  |Spark is better than Hadoop no doubt.|[spark, is, better, than, hadoop, no, doubt.]|7     |
+---+-------------------------------------+---------------------------------------------+------+



In [26]:
# Stop Words
# Stop words are words that have little or no linguistic value in NLP. 
# Removing these words from the data can improve the accuracy of the language model because it removes inessential words.
# Let's take a look at the stop word removal code.

# create a dataframe
sentenceData = spark.createDataFrame([
                                      (0, ["big", "data", "is", "super", "powerful"]),
                                      (1, ["this", "is", "going", "to", "be", "epic"])
], ["id", "raw"])
sentenceData.show(truncate=False)


+---+--------------------------------+
|id |raw                             |
+---+--------------------------------+
|0  |[big, data, is, super, powerful]|
|1  |[this, is, going, to, be, epic] |
+---+--------------------------------+



In [27]:
# Import stop words library
from pyspark.ml.feature import StopWordsRemover

In [28]:
# Run the Remover
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")

In [29]:
# transform and show data
remover.transform(sentenceData).show(truncate=False)

+---+--------------------------------+----------------------------+
|id |raw                             |filtered                    |
+---+--------------------------------+----------------------------+
|0  |[big, data, is, super, powerful]|[big, data, super, powerful]|
|1  |[this, is, going, to, be, epic] |[going, epic]               |
+---+--------------------------------+----------------------------+



In [30]:
# Term Frequency-Inverse Document Frequency Weight
# Term frequency (TF) measures the frequency of a word occurring in a document
#  inverse document frequency (IDF) measures the significance of a word across a set of documents

from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

In [31]:
# This is a dataset of tweets from people directed at an airline. We'll load in the data the same way we did when working with DataFrames earlier
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/airlines.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("airlines.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------------------+
|      Airline Tweets|
+--------------------+
|@VirginAmerica pl...|
|@VirginAmerica se...|
|@VirginAmerica do...|
|@VirginAmerica Ar...|
|@VirginAmerica aw...|
+--------------------+



In [32]:
# Tokenize DataFrame
tokened = Tokenizer(inputCol="Airline Tweets", outputCol="words")
tokened_transformed = tokened.transform(df)
tokened_transformed.show()

+--------------------+--------------------+
|      Airline Tweets|               words|
+--------------------+--------------------+
|@VirginAmerica pl...|[@virginamerica, ...|
|@VirginAmerica se...|[@virginamerica, ...|
|@VirginAmerica do...|[@virginamerica, ...|
|@VirginAmerica Ar...|[@virginamerica, ...|
|@VirginAmerica aw...|[@virginamerica, ...|
+--------------------+--------------------+



In [33]:
# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed_frame = remover.transform(tokened_transformed)
removed_frame.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------

In [34]:
# The HashingTF function takes an argument for an input column, an output column, and a numFeature parameter, 
# which specifies the number of buckets for the split words. This number must be higher than the number of unique words. 
# By default, the value is 2^18^or 262,144. The power of two should be used so that indexes are evenly mapped. Here we supply 
# the numFeatures argument with its default value for demonstration. This argument can normally be left out and will use the same value by default.

# RUN THE HASHING TERM FREQUENCY
hashing = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,18))

#TRANSFORM INTO A DATAFRAME; scroll right to see the hashed result;
hashed_df = hashing.transform(removed_frame) 
hashed_df.show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                       |hashedValues                                                             

In [35]:
# With our words successfully converted to numbers, we can plug it all into an IDFModel, 
# which will scale the values while down-weighting based on document frequency.

# fit the IDF to the dataset 
idf = IDF(inputCol="hashedValues", outputCol="features")
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

#display the DF, scroll to the right to see the new output column
rescaledData.select("words", "features").show(truncate=False)

# Remember that computers can't just read text and analyze it. With the process we have just completed, 
# we have now given values from raw text that a computer can work with. 
# The next section will put all of this together and show how we can use text data to determine the accuracy of the corresponding rating.

+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                                          |features                                                                                                                                                                                                                                                                                                        |
+-----------------------------------------------------------------

In [36]:
# Set Up the Pipeline
# We'll put the pipeline to use by employing a sample Yelp dataset. 
# This will allow you to practice with a smaller set of reviews using similar data that you'll be working on for your client.

# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [37]:
#  import all the functions that will be used in our NLP process, or pipeline
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [38]:
# Next, create a new column that uses the lengthfunction to create a future feature with the length of each row. 
from pyspark.sql.functions import length
# Create a length column to be used as a future feature
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [39]:
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

we'll create all the transformations to be applied in our pipeline. Note that the StringIndexer encodes a string column to a column of table indexes. Here we are working with positive and negative game reviews, which will be converted to 0 and 1. This will form our labels, which we'll delve into in the ML unit. The label is what we're trying to predict: will the review's given text let us know if it was positive or negative?

Also note that we don't need to run all of these completely as we did before. By creating all the functions now, we can then use them all in the pipeline later.

We'll create a feature vector containing the output from the IDFModel (the last stage in the pipeline) and the length. This will combine all the raw features to train the ML model that we'll be using. 

In [40]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

Now it's time to create our pipeline, the easiest step. We'll import the pipeline from pyspark.ml, and then store a list of the stages created earlier. It's important to list the stages in the order they need to be executed. As we mentioned before, the output from one stage will then be passed off to another stage.

In [41]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

All the stages of the pipeline have been set up. You may have noticed that each individual step didn't need to be set up, which is the perk of setting up the pipeline! Now your data is ready to be run through the pipeline. Then we can run it through a machine learning model.

In [42]:
# Run the Model
# After our pipeline has been set up, we'll fit the outcome with our original DataFrame and transform it.

# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

our labels and features that we created early on in the process are numerical representations of positive and negative reviews. The features will be used in our model and predict whether a given review will be positive or negative. These features are the result of all the work we have been doing with the pipeline.

In [43]:
# show label and resulting features
cleaned.select(["label", "features"]).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(262145,[177414,2...|
|  0.0|(262145,[49815,23...|
|  0.0|(262145,[109649,1...|
|  1.0|(262145,[53101,68...|
|  1.0|(262145,[15370,77...|
|  0.0|(262145,[98142,13...|
|  0.0|(262145,[59172,22...|
|  0.0|(262145,[63420,85...|
|  1.0|(262145,[53777,17...|
|  1.0|(262145,[221827,2...|
|  1.0|(262145,[43756,22...|
|  0.0|(262145,[127310,1...|
|  0.0|(262145,[407,3153...|
|  1.0|(262145,[18098,93...|
|  0.0|(262145,[23071,12...|
|  0.0|(262145,[129941,1...|
|  1.0|(262145,[19633,21...|
|  0.0|(262145,[27707,65...|
|  0.0|(262145,[20891,27...|
|  0.0|(262145,[8287,208...|
+-----+--------------------+
only showing top 20 rows



Now let's run our ML model on the data. One of the basics of ML is that data gets broken into training data and testing data. Training data is the data that will be passed to our NLP model that will train our model to predict results. The testing data is used to test our predictions. We can do this with the randomSplit method, which takes in a list of the percent of data we want split into each group. Standard conventions use 70% with training and 30% with testing.

To split the data into a training set and a testing set, run the following code:

In [44]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

The array supplied to randomSplit is the percentage of the data that will be broken into training and testing respectively. So 70% to training and 30% to testing. The second number supplied is called a seed. The seed number here, 21, is arbitrary. But as long as the same seed is used, the result will be the same each time. Using a seed number ensures reproducible results.

The ML model we'll use is Naive Bayes, which we'll import and then fit the model using the training dataset. Naive Bayes is a group of classifier algorithms based on Bayes' theorem. Bayes theorem provides a way to determine the probability of an event based on new conditions or information that might be related to the event.

In [45]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [46]:
# Once the model has been trained, we'll transform the model with our testing data. 
# transform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

# scroll all the way to the right to view the prediction column

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

This prediction column will indicate with a 1.0 if the model thinks this review is negative and 0.0 if it thinks it's positive. Future data sets can now be run with this model and determine whether a review was positive or negative without having already supplied in the data.

How useful is this model? Should we just blindly trust that it will be right every time? There is one last step in the process to answer these questions.

It's often not enough to simply train and use a machine learning model for predictions without knowing how well the model performs at its prediction task. The last step is to import the BinaryClassificationEvaluator, which will display how accurate our model is in determining if a review with be positive or negative based solely on the text within a review.

The BinaryClassificationEvaluator uses two arguments, labelCol and rawPredictionCol. The labelCol takes the labels which were the result of using StringIndexer to convert our positive and negative strings to integers. The rawPredictionCol takes in numerical predictions from the output of running the Naive Bayes model.

The performance of a model can be measured based on the difference between its predicted values and actual values. This is what the BinaryClassificationEvaluator does. We will dive more into accuracy, precision, and sensitivity when we get to Machine Learning. Run the following code:

In [47]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
acc_eval = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.700298


The accuracy of the model isn't perfect, but it's not too low either: 0.700298. Machine learning isn't a guarantee, and tweaking our models as well as the data used is part of the process. One of the ways to do this is to add more data; when you keep adding data, eventually you grow from your local storage to something much larger—thus leading to big data!