In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.1.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com] [Conn                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Waiting for headers] [Wa                                                                               Hit:3 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Waiting for headers] [Co                                                                               Hit:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-le

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Tokens").getOrCreate()

In [3]:
from pyspark.ml.feature import Tokenizer

In [4]:
dataframe = spark.createDataFrame([
                                   (0, "Spark is great"),
                                   (1, "We are learning Spark"),
                                   (2, "Spark is better than Hadoop no doubt")
                                   ],
                                   ["id","sentence"])

dataframe.show()
                                    


+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|      Spark is great|
|  1|We are learning S...|
|  2|Spark is better t...|
+---+--------------------+



In [5]:
# Tokenize sentences
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
tokenizer

Tokenizer_a199f9f71197

In [6]:
tokenizer_df = tokenizer.transform(dataframe)
tokenizer_df.show(truncate=False)

+---+------------------------------------+--------------------------------------------+
|id |sentence                            |words                                       |
+---+------------------------------------+--------------------------------------------+
|0  |Spark is great                      |[spark, is, great]                          |
|1  |We are learning Spark               |[we, are, learning, spark]                  |
|2  |Spark is better than Hadoop no doubt|[spark, is, better, than, hadoop, no, doubt]|
+---+------------------------------------+--------------------------------------------+



In [7]:
# Create a function to return the length of a list
def word_list_length(word_list):
    return len(word_list)

In [8]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [9]:
# Create a user defined function
count_tokens = udf(word_list_length, IntegerType())

In [10]:
# Tokenize sentences
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
tokenizer

Tokenizer_8598dd00755d

In [11]:
tokenizer_df = tokenizer.transform(dataframe)
tokenizer_df.withColumn("tokens",count_tokens(col("words"))).show(truncate=False)

+---+------------------------------------+--------------------------------------------+------+
|id |sentence                            |words                                       |tokens|
+---+------------------------------------+--------------------------------------------+------+
|0  |Spark is great                      |[spark, is, great]                          |3     |
|1  |We are learning Spark               |[we, are, learning, spark]                  |4     |
|2  |Spark is better than Hadoop no doubt|[spark, is, better, than, hadoop, no, doubt]|7     |
+---+------------------------------------+--------------------------------------------+------+



In [12]:
sentencedata = spark.createDataFrame([
                                   (0, ["Big","data","is","super","powerful"]),
                                   (1, ["This","is","going","to","be","epic"])
                                   ],
                                   ["id","raw"])

sentencedata.show(truncate=False)

+---+--------------------------------+
|id |raw                             |
+---+--------------------------------+
|0  |[Big, data, is, super, powerful]|
|1  |[This, is, going, to, be, epic] |
+---+--------------------------------+



In [13]:
# Import stop words library
from pyspark.ml.feature import StopWordsRemover

In [14]:
# Run the Remover
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")

In [15]:
remover.transform(sentencedata).show(truncate=False)

+---+--------------------------------+----------------------------+
|id |raw                             |filtered                    |
+---+--------------------------------+----------------------------+
|0  |[Big, data, is, super, powerful]|[Big, data, super, powerful]|
|1  |[This, is, going, to, be, epic] |[going, epic]               |
+---+--------------------------------+----------------------------+



In [16]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

In [17]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/airlines.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("airlines.csv"), sep=",", header=True)

# Show DataFrame
df.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------+
|@VirginAmerica plus you've added commercials to the experience... tacky.                                                               |
|@VirginAmerica seriously would pay $30 a flight for seats that didn't have this playing. it's really the only bad thing about flying VA|
|@VirginAmerica do you miss me? Don't worry we'll be together very soon.                                                                |
|@VirginAmerica Are the hours of operation for the Club at SFO that are posted online current?                                          |
|@VirginAmerica awaiting my return

In [18]:
# Tokenize DataFrame
tokened = Tokenizer(inputCol="Airline Tweets", outputCol="words")
tokened_transformed = tokened.transform(df)
tokened_transformed.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |
+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|@VirginAmerica plus you've added commercials to the experience... tacky.                                 

In [19]:
# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed_frame = remover.transform(tokened_transformed)
removed_frame.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------

In [20]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TF-IDF").getOrCreate()

In [21]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

In [22]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/airlines.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("airlines.csv"), sep=",", header=True)

# Show DataFrame
df.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------+
|@VirginAmerica plus you've added commercials to the experience... tacky.                                                               |
|@VirginAmerica seriously would pay $30 a flight for seats that didn't have this playing. it's really the only bad thing about flying VA|
|@VirginAmerica do you miss me? Don't worry we'll be together very soon.                                                                |
|@VirginAmerica Are the hours of operation for the Club at SFO that are posted online current?                                          |
|@VirginAmerica awaiting my return

In [23]:
# Tokenize DataFrame
tokened = Tokenizer(inputCol="Airline Tweets", outputCol="words")
tokened_transformed = tokened.transform(df)
tokened_transformed.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |
+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|@VirginAmerica plus you've added commercials to the experience... tacky.                                 

In [24]:
# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed_frame = remover.transform(tokened_transformed)
removed_frame.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------

In [25]:
hashing = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,18))

hashed_frame = hashing.transform(removed_frame)
hashed_frame.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                       |hashedValues                                                             

In [26]:
idf = IDF(inputCol="hashedValues", outputCol="features")
idfModel = idf.fit(hashed_frame)
rescaledData = idfModel.transform(hashed_frame)

rescaledData.select("words", "features").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                                          |features                                                                                                                                                                                                                                                                                                        |
+-----------------------------------------------------------------

In [27]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Yelp_NLP").getOrCreate()

In [28]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show(truncate=False)

+--------+---------------------------------------------------------------------------------------------------------------+
|class   |text                                                                                                           |
+--------+---------------------------------------------------------------------------------------------------------------+
|positive|Wow... Loved this place.                                                                                       |
|negative|Crust is not good.                                                                                             |
|negative|Not tasty and the texture was just nasty.                                                                      |
|positive|Stopped by during the late May bank holiday off Rick Steve recommendation and loved it.                        |
|positive|The selection on the menu was great and so were the prices.                                                    |
|negative|Now I 

In [29]:
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [30]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature
data_df = df.withColumn('length', length(df['text']))
data_df.show(truncate= False)

+--------+---------------------------------------------------------------------------------------------------------------+------+
|class   |text                                                                                                           |length|
+--------+---------------------------------------------------------------------------------------------------------------+------+
|positive|Wow... Loved this place.                                                                                       |24    |
|negative|Crust is not good.                                                                                             |18    |
|negative|Not tasty and the texture was just nasty.                                                                      |41    |
|positive|Stopped by during the late May bank holiday off Rick Steve recommendation and loved it.                        |87    |
|positive|The selection on the menu was great and so were the prices.                     

In [31]:
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')


In [32]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')


In [33]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [34]:
data_prep_pipeline

Pipeline_f7de5280a8ca

In [35]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [36]:
cleaned.show()

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|positive|Wow... Loved this...|    24|  1.0|[wow..., loved, t...|[wow..., loved, p...|(262144,[177414,2...|(262144,[177414,2...|(262145,[177414,2...|
|negative|  Crust is not good.|    18|  0.0|[crust, is, not, ...|      [crust, good.]|(262144,[49815,23...|(262144,[49815,23...|(262145,[49815,23...|
|negative|Not tasty and the...|    41|  0.0|[not, tasty, and,...|[tasty, texture, ...|(262144,[109649,1...|(262144,[109649,1...|(262145,[109649,1...|
|positive|Stopped by during...|    87|  1.0|[stopped, by, dur...|[stopped, late, m...|(262144,[53101

In [37]:
cleaned.select(['label', 'features']).show(truncate=False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                                                                                                     |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1.0  |(262145,[177414,216221,239331,262144],[6.215607598755275,4.51085950651685,3.8642323415917974,24.0])                                                            

In [38]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [39]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [40]:
test_results = predictor.transform(testing)
test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

In [41]:
test_results = predictor.transform(testing)
test_results.show()

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

In [42]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
accEval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
acc = accEval.evaluate(test_results)
print("Accuracy of the prediction was: %f" %acc)

Accuracy of the prediction was: 0.700298


In [43]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://bbora-bucket.s3.amazonaws.com/user_data.csv"
spark.sparkContext.addFile(url)
df_bhas = spark.read.csv(SparkFiles.get("user_data.csv"), sep=",", header=True)

# Show DataFrame
df_bhas.show(truncate=False)

+---+----------+---------+-----------+----------------------+--------------------+------------+
|id |first_name|last_name|active_user|street_address        |state               |username    |
+---+----------+---------+-----------+----------------------+--------------------+------------+
|1  |Cletus    |Lithcow  |FALSE      |78309 Riverside Way   |Virginia            |ibearham0   |
|2  |Caz       |Felgat   |FALSE      |83 Hazelcrest Place   |Alabama             |wwaller1    |
|3  |Kerri     |Crowson  |FALSE      |112 Eliot Pass        |North Carolina      |ichesnut2   |
|4  |Freddie   |Caghy    |FALSE      |15 Merchant Way       |New York            |tsnarr3     |
|5  |Sadella   |Deuss    |FALSE      |079 Acker Avenue      |Tennessee           |fwherrit4   |
|6  |Fraser    |Korneev  |TRUE       |76084 Novick Court    |Minnesota           |fstappard5  |
|7  |Demott    |Rapson   |TRUE       |86320 Dahle Park      |District of Columbia|lhambling6  |
|8  |Robert    |Poile    |FALSE      |15