In [2]:
import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-2.4.7'
# spark_version = 'spark-2.<enter version>'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()



0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:11 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease [15.4 kB]
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:14 http://security.ubuntu.com/ubuntu bionic-securit

In [3]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("YelpReview").getOrCreate()

In [4]:
from pyspark.sql.functions import col, udf,length
from pyspark.sql.types import StringType

### Data cleaning

In [5]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://usc-bootcamp-yelpreview-text-analysis.s3.us-east-2.amazonaws.com/reviews.csv"
spark.sparkContext.addFile(url)
raw_df = spark.read.csv(SparkFiles.get("reviews.csv"), sep=",", header=True)

# Show DataFrame
raw_df.show()

+--------------------+-------------+------------+
|             reviews|       rating|review_count|
+--------------------+-------------+------------+
|Panda Express was...|5 star rating|          63|
|The dude and I ca...|5 star rating|          63|
|I ordered 5 total...|1 star rating|          63|
|I always order Pa...|3 star rating|          63|
|Decided to try Pa...|5 star rating|          63|
|I've never had a ...|4 star rating|          63|
|The family meal d...|1 star rating|          63|
|Quality has sever...|2 star rating|          63|
|Paid for a bowl a...|1 star rating|          63|
|Order a bowl with...|1 star rating|          63|
|Went through the ...|1 star rating|          63|
|When I think of p...|2 star rating|          63|
|Horrible is a und...|1 star rating|          63|
|Yes the drive thr...|2 star rating|          63|
|Okay..so Panda is...|4 star rating|          63|
|Going through Dri...|2 star rating|          63|
|My entrees were a...|2 star rating|          63|


In [6]:

raw_df.groupBy("rating").count().show()

+-------------+-----+
|       rating|count|
+-------------+-----+
|3 star rating| 1967|
|4 star rating| 2222|
|1 star rating| 7959|
|2 star rating| 2566|
|5 star rating| 2906|
+-------------+-----+



In [7]:
# new column function - reduce dimension of rating column into 3 categories
def rating_category(rating:str)->str:
  """create new column 
  """
  if rating in ["1 star rating"]:
      return "bad"
  elif rating in ["2 star rating", "3 star rating"]:
      return "descent"
  else: 
      return "good"

assert rating_category("1 star rating")=="bad"


In [8]:
# Store a user defined function
convert_rating = udf(rating_category, StringType())
convert_rating

<function __main__.rating_category>

In [9]:
# add new column
selected_df = raw_df.withColumn("output_label", convert_rating(col("rating")))
selected_df = selected_df.withColumn("length", length(selected_df["reviews"]))
selected_df.show()

+--------------------+-------------+------------+------------+------+
|             reviews|       rating|review_count|output_label|length|
+--------------------+-------------+------------+------------+------+
|Panda Express was...|5 star rating|          63|        good|   334|
|The dude and I ca...|5 star rating|          63|        good|   770|
|I ordered 5 total...|1 star rating|          63|         bad|   151|
|I always order Pa...|3 star rating|          63|     descent|   628|
|Decided to try Pa...|5 star rating|          63|        good|   261|
|I've never had a ...|4 star rating|          63|        good|   640|
|The family meal d...|1 star rating|          63|         bad|   129|
|Quality has sever...|2 star rating|          63|     descent|   350|
|Paid for a bowl a...|1 star rating|          63|         bad|   158|
|Order a bowl with...|1 star rating|          63|         bad|   151|
|Went through the ...|1 star rating|          63|         bad|   675|
|When I think of p..

### Feature Transformation

In [10]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer


In [11]:
# create all features to the dataset
label_encoder = StringIndexer(inputCol="output_label", outputCol="label")
tokenizer = Tokenizer(inputCol="reviews", outputCol="token")
stop_word_remover = StopWordsRemover(inputCol="token", outputCol="filtered_token")
hasher = HashingTF(inputCol="filtered_token", outputCol="hashed_token")
idf = IDF(inputCol="hashed_token", outputCol="idf_token")


In [12]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
vectorizer = VectorAssembler(inputCols = ["idf_token", "length"], outputCol = "features")


### Create a Pipeline to Automate The Data Transformations

In [13]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[label_encoder, tokenizer, stop_word_remover, hasher, idf, vectorizer])


In [14]:
# fit and transform data with pipeline
pipeline_model = pipeline.fit(selected_df)
cleaned_df = pipeline_model.transform(selected_df)
cleaned_df.show()

+--------------------+-------------+------------+------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|             reviews|       rating|review_count|output_label|length|label|               token|      filtered_token|        hashed_token|           idf_token|            features|
+--------------------+-------------+------------+------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|Panda Express was...|5 star rating|          63|        good|   334|  1.0|[panda, express, ...|[panda, express, ...|(262144,[2711,610...|(262144,[2711,610...|(262145,[2711,610...|
|The dude and I ca...|5 star rating|          63|        good|   770|  1.0|[the, dude, and, ...|[dude, came, pand...|(262144,[9090,131...|(262144,[9090,131...|(262145,[9090,131...|
|I ordered 5 total...|1 star rating|          63|         bad|   151|  0.0|[i, ordered, 5, t...

### Create training and testing dataset

In [15]:
from pyspark.ml.classification import NaiveBayes

# Break data down into a training set and a testing set
training, testing = cleaned_df.randomSplit([0.7, 0.3], seed = 43)

### Fit and predict NaiveBaye model

In [16]:
# Create a Naive Bayes model and fit training data
model = NaiveBayes()
predictor = model.fit(training)


In [17]:
# Tranform the model with the testing data
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+-------------+------------+------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|             reviews|       rating|review_count|output_label|length|label|               token|      filtered_token|        hashed_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+-------------+------------+------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|"""I never want t...|2 star rating|          29|     descent|  1410|  2.0|["""i, never, wan...|["""i, never, wan...|(262144,[14,4200,...|(262144,[14,4200,...|(262145,[14,4200,...|[-6053.7857632465...|[4.13879883391400...|       2.0|
|"""No soup for yo...|1 star rating|         150|         bad|  

In [18]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.671084
