In [None]:
import os
# Find the latest version of spark 3  from http://www.apache.org/dist/spark/

spark_version = 'spark-3.4.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()


0% [Working]            Get:1 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [1 InRelease 14.2 kB/114                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [1 InRelease 14.2 kB/1140% [Connecting to archive.ubuntu.com (185.125.190.36)] [1 InRelease 34.4 kB/1140% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to ppa.launc                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
                                                                               0% [Waiting for headers] [Waiting for headers]                                              Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal 

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Tokens").getOrCreate()

In [None]:
from pyspark.ml.feature import Tokenizer

In [None]:
dataframe = spark.createDataFrame([
    (0, 'spark is great'),
    (1,"we are learning spark"),
    (2,"Spark is better than Hadoop"),
    (3,"Even if this spark makes no sense")],
    ["id","sentence"]
)
dataframe.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|      spark is great|
|  1|we are learning s...|
|  2|Spark is better t...|
|  3|Even if this spar...|
+---+--------------------+



In [None]:
tokenizer = Tokenizer(inputCol='sentence',outputCol='words')

In [None]:
tokenized_df = tokenizer.transform(dataframe)
tokenized_df.show(truncate=False)

+---+---------------------------------+-----------------------------------------+
|id |sentence                         |words                                    |
+---+---------------------------------+-----------------------------------------+
|0  |spark is great                   |[spark, is, great]                       |
|1  |we are learning spark            |[we, are, learning, spark]               |
|2  |Spark is better than Hadoop      |[spark, is, better, than, hadoop]        |
|3  |Even if this spark makes no sense|[even, if, this, spark, makes, no, sense]|
+---+---------------------------------+-----------------------------------------+



In [None]:
# Create a function to return the length of a list
def word_list_length(word_list):
    return len(word_list)

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [None]:
# Create a user defined function
count_tokens = udf(word_list_length, IntegerType())

In [None]:
tokenized_df.withColumn('tokens',count_tokens(col('words'))).show(truncate=False)

+---+---------------------------------+-----------------------------------------+------+
|id |sentence                         |words                                    |tokens|
+---+---------------------------------+-----------------------------------------+------+
|0  |spark is great                   |[spark, is, great]                       |3     |
|1  |we are learning spark            |[we, are, learning, spark]               |4     |
|2  |Spark is better than Hadoop      |[spark, is, better, than, hadoop]        |5     |
|3  |Even if this spark makes no sense|[even, if, this, spark, makes, no, sense]|7     |
+---+---------------------------------+-----------------------------------------+------+



Stop Words

In [None]:
# new session

spark = SparkSession.builder.appName('StopWords').getOrCreate()

In [None]:
raw_data = [(0, ['The', 'curious', 'cat', 'pounced', 'playfully', 'on', 'the', 'shimmering', 'sunbeam', 'that', 'danced', 'across', 'the', 'hardwood', 'floor.']),
    (1, ['The', 'aroma', 'of', 'freshly', 'baked', 'cookies', 'wafted', 'through', 'the', 'air,', 'enticing', 'everyone', 'in', 'the', 'vicinity', 'with', 'its', 'irresistible', 'sweetness.'])]
sentenceData = spark.createDataFrame(
    raw_data,
    ['id','raw']
)
sentenceData.show(truncate=False)

+---+-------------------------------------------------------------------------------------------------------------------------------------------------+
|id |raw                                                                                                                                              |
+---+-------------------------------------------------------------------------------------------------------------------------------------------------+
|0  |[The, curious, cat, pounced, playfully, on, the, shimmering, sunbeam, that, danced, across, the, hardwood, floor.]                               |
|1  |[The, aroma, of, freshly, baked, cookies, wafted, through, the, air,, enticing, everyone, in, the, vicinity, with, its, irresistible, sweetness.]|
+---+-------------------------------------------------------------------------------------------------------------------------------------------------+



In [None]:
# Import stop words library
from pyspark.ml.feature import StopWordsRemover

In [None]:
remover = StopWordsRemover(inputCol='raw',outputCol='filtered')
remover_df = remover.transform(sentenceData)
remover_df.withColumn('raw_tokens',count_tokens(col('raw'))).withColumn('filtered_tokens',count_tokens(col('filtered'))).show(truncate=False)


+---+-------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+----------+---------------+
|id |raw                                                                                                                                              |filtered                                                                                              |raw_tokens|filtered_tokens|
+---+-------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------+----------+---------------+
|0  |[The, curious, cat, pounced, playfully, on, the, shimmering, sunbeam, that, danced, across, the, hardwood, floor.]                               |[cu

Term Frequency Inverse Document Frequency

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('TF-IDF').getOrCreate()

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-online/v2/module_17/airlines.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("airlines.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------------------+
|      Airline Tweets|
+--------------------+
|@VirginAmerica pl...|
|@VirginAmerica se...|
|@VirginAmerica do...|
|@VirginAmerica Ar...|
|@VirginAmerica aw...|
+--------------------+



In [None]:
# tokenize the dataframe

tokened = Tokenizer(inputCol='Airline Tweets', outputCol='words')
token_transformed = tokened.transform(df)
token_transformed.show()

+--------------------+--------------------+
|      Airline Tweets|               words|
+--------------------+--------------------+
|@VirginAmerica pl...|[@virginamerica, ...|
|@VirginAmerica se...|[@virginamerica, ...|
|@VirginAmerica do...|[@virginamerica, ...|
|@VirginAmerica Ar...|[@virginamerica, ...|
|@VirginAmerica aw...|[@virginamerica, ...|
+--------------------+--------------------+



In [None]:
# remove stop words

remover = StopWordsRemover(inputCol='words',outputCol='filtered')

removed_frame = remover.transform(token_transformed)

removed_frame.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------

In [None]:
# hash the dataframe

hashing = HashingTF(inputCol='filtered',outputCol='hashedValues',numFeatures=pow(2,18))

hashed_df = hashing.transform(removed_frame)

hashed_df.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                       |hashedValues                                                             

In [None]:
# fit the idf

idf = IDF(inputCol='hashedValues',outputCol='features')
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

rescaledData.select('words','features').show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                                          |features                                                                                                                                                                                                                                                                                                        |
+-----------------------------------------------------------------

Yelp NLP

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Yelp_NLP").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-online/v2/module_17/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [None]:
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [None]:
from pyspark.sql.functions import length

# create a length column
data_df = df.withColumn('length',length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [None]:
# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='class',outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [None]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])


In [None]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

In [None]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [None]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [None]:
test_results = predictor.transform(testing)

test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

acc_eval = BinaryClassificationEvaluator(labelCol='label', rawPredictionCol='prediction')
acc = acc_eval.evaluate(test_results)
print(acc)

0.7002978406552495
