In [1]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NaiveBayes").getOrCreate()


In [3]:
 from pyspark.sql import SparkSession 

 spark = SparkSession.builder.master("local").getOrCreate() 
 print(spark.sparkContext.version)

2.4.5


In [4]:
import pandas as pd
from pyspark.sql import SQLContext
sc =spark.sparkContext

In [5]:
sqlContext = SQLContext(sc)

In [6]:
# text processing
from nltk.corpus import stopwords
import re as re
from pyspark.ml.feature import CountVectorizer, IDF

In [7]:
# building the model
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.mllib.clustering import LDA, LDAModel

In [9]:
from pyspark import SparkFiles
from pyspark.sql.types import StructField,IntegerType, StructType,StringType
newDF=[ StructField("review_id",IntegerType(),True),
        StructField("property_id",IntegerType(),True),
       StructField("number_of_reviews",IntegerType(),True),
       StructField("review_scores_rating",IntegerType(),True),
       StructField("review_scores_accuracy",IntegerType(),True),
       StructField("review_scores_checkin", IntegerType(),True),
       StructField("review_scores_communication",IntegerType(),True),
       StructField("review_scores_value",IntegerType(),True),
       StructField("review_scores_location",IntegerType(),True),
        StructField("comments",StringType(),True)
       ]
finalStruct=StructType(fields=newDF)
url ="https://data-analytics-airbnb.s3.us-east-2.amazonaws.com/Data/review.csv"
spark.sparkContext.addFile(url)
df=spark.read.csv(SparkFiles.get("review.csv"), sep=",",schema=finalStruct,header=True, timestampFormat="yyyy/MM/dd")
# df_clean = df.dropna()
df = df.dropna()
df.show()
df.printSchema()

+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+
|review_id|property_id|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_checkin|review_scores_communication|review_scores_value|review_scores_location|            comments|
+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+
|        1|   14078522|               14|                 100|                    10|                   10|                         10|                 10|                    10|My wife, son and ...|
|        2|   14078522|               14|                 100|                    10|                   10|                         10|                 10|                    10|We loved staying ...|


In [10]:
# url ="https://data-analytics-airbnb.s3.us-east-2.amazonaws.com/Data/review.csv"
# spark.sparkContext.addFile(url)
# df=spark.read.csv(SparkFiles.get("review.csv"), sep=",",header=True, schema=finalStruct, timestampFormat="yyyy/MM/dd")
# df.show()
# df_clean = df.dropna()
from pyspark.ml.feature import Bucketizer

df_buck = Bucketizer(splits=[0, 41, 71, 100],inputCol="review_scores_rating", outputCol="label").setHandleInvalid("keep").transform(df)
# df_buck = bucketizer.setHandleInvalid("keep").transform(df)
df_buck.show(truncate=False)

+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|review_id|property_id|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_checkin|review_scores_communication|review_scores_value|r

In [None]:
# # Read in data from S3 Buckets
# from pyspark import SparkFiles
# # from pyspark.sql.types import StructField,IntegerType, StructType,StringType
# from pyspark.ml.feature import Bucketizer
# url ="https://data-analytics-airbnb.s3.us-east-2.amazonaws.com/Data/review.csv"
# spark.sparkContext.addFile(url)
# df = spark.read.csv(SparkFiles.get("review.csv"), sep=",", header=True, inferSchema=True, timestampFormat="yyyy/MM/dd")

# # Show DataFrame
# df_clean = df.dropna()


# df_new = df_clean.withColumn("review_scores_rating", df_clean['review_scores_rating'].cast('float'))
# df_new = df_clean.withColumn("number_of_reviews", df_clean['number_of_reviews'].cast('float'))
# df_new_2 = df_clean.withColumn("review_scores_accuracy", df_clean['review_scores_accuracy'].cast('float'))
# df_new_3 = df_clean.withColumn("review_scores_communication", df_clean['review_scores_communication'].cast('float'))
# df_new_4 = df_clean.withColumn("review_scores_value", df_clean['review_scores_value'].cast('float'))
# df_new_5 = df_clean.withColumn("review_scores_location", df_clean['review_scores_location'].cast('float'))
# bucketizer = Bucketizer(splits=[ 0, 40, 70, 100],inputCol="review_scores_rating", outputCol="label")
# df_buck = bucketizer.setHandleInvalid("keep").transform(df_new)
# df_buck.show(truncate=False)

In [11]:
# why this happend?
df_buck.orderBy(df["review_scores_rating"]).show(50)

+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+-----------------------------------+-----+
|review_id|property_id|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_checkin|review_scores_communication|review_scores_value|review_scores_location|                           comments|label|
+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+-----------------------------------+-----+
|   416524|   16181951|                1|                  20|                     2|                    2|                          2|                  2|                     2|               Norma never respo...|  0.0|
|   456377|    9617789|                1|                  20|                     6|                    2|         

In [12]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df_buck.withColumn('length', length(df_buck['comments']))
data_df.show()

+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+-----+------+
|review_id|property_id|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_checkin|review_scores_communication|review_scores_value|review_scores_location|            comments|label|length|
+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+-----+------+
|        1|   14078522|               14|                 100|                    10|                   10|                         10|                 10|                    10|My wife, son and ...|  2.0|   658|
|        2|   14078522|               14|                 100|                    10|                   10|                         10|             

### Feature Transformations


In [13]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
# Create all the features to the data set
# pos_neg_to_num = StringIndexer(inputCol='review_scores_rating',outputCol='label')
tokenizer = Tokenizer(inputCol="comments", outputCol="token_comments")
tokenized = tokenizer.transform(data_df)
tokenized.show()

+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+-----+------+--------------------+
|review_id|property_id|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_checkin|review_scores_communication|review_scores_value|review_scores_location|            comments|label|length|      token_comments|
+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+-----+------+--------------------+
|        1|   14078522|               14|                 100|                    10|                   10|                         10|                 10|                    10|My wife, son and ...|  2.0|   658|[my, wife,, son, ...|
|        2|   14078522|               14|                 100|  

In [29]:
from pyspark.sql.functions import col,udf,lower, regexp_replace

def get_rid_of_punctuations(reviews):
  punctuations = ['\.', '!']
  review_lower = lower(reviews)
  for i in range(len(punctuations)):
    review_lower = regexp_replace(review_lower, punctuations[i], "")
    return review_lower
    df.withColumn("Reviews_updated", get_rid_of_punctuations(col('Reviews'))).show(truncate=False)

In [30]:
# Remove stop words
remover = StopWordsRemover(inputCol='token_comments',outputCol='stop_tokens')
# Transform new DataFrame
removed_frame = remover.transform(tokenized)
removed_frame.show(truncate=False)

+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+------------------------------------------------------------------------------------------------------------------------------------------------------

In [31]:
# Show simplified review
removed_frame.select("stop_tokens").show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|stop_tokens                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
+---------------------

In [32]:
# Run the hashing term frequency
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')

In [33]:
# Transform into a DF
hashed_df = hashingTF.transform(removed_frame)
hashed_df.show()

+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+-----+------+--------------------+--------------------+--------------------+
|review_id|property_id|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_checkin|review_scores_communication|review_scores_value|review_scores_location|            comments|label|length|      token_comments|         stop_tokens|          hash_token|
+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+-----+------+--------------------+--------------------+--------------------+
|        1|   14078522|               14|                 100|                    10|                   10|                         10|                 10|                 

In [34]:
# Fit the IDF on the data set 
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [35]:
print(idf)

IDF_10590eeaa1a3


In [36]:
# this command won't run
idfModel = idf.fit(hashed_df)

In [38]:
rescaledData = idfModel.transform(hashed_df)

In [39]:
# Display the DataFrame
rescaledData.select("token_comments", "idf_token").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [40]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')
output = clean_up.transform(rescaledData)

In [None]:
# Create a and run a data processing Pipeline
# from pyspark.ml import Pipeline
# data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])

In [None]:
# Fit and transform the pipeline
# cleaner = data_prep_pipeline.fit(data_df)
# cleaned = cleaner.transform(data_df)

In [41]:
# Show label and resulting features
output.show()
# final_df = output.select(['points','country', 'winery','price']).show()

+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+-----+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|review_id|property_id|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_checkin|review_scores_communication|review_scores_value|review_scores_location|            comments|label|length|      token_comments|         stop_tokens|          hash_token|           idf_token|            features|
+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+--------------------+-----+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        1|   14078522|               14|     

In [42]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = output.randomSplit([0.8, 0.2])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [None]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show()

+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+-----------------------------+-----+------+----------------------------+----------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|review_id|property_id|number_of_reviews|review_scores_rating|review_scores_accuracy|review_scores_checkin|review_scores_communication|review_scores_value|review_scores_location|                     comments|label|length|              token_comments|                 stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+---------+-----------+-----------------+--------------------+----------------------+---------------------+---------------------------+-------------------+----------------------+-----------------------------+

In [None]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.990380
