# This file runs on Google Colab 
## The data input source is AWS RDS PostgresSQL 
### Running this file end-to-end can take 10-15 minutes

In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:7 http://security.ubuntu.com/ubuntu bionic-security InRelease
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Hit:12 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Reading package lists... Done


In [2]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Hotel_Reviews_NLP_from_RDS").getOrCreate()

In [3]:
# Importing data from RDS (AWS)
import psycopg2
connection = psycopg2.connect(
    host = 'sample-hotel-reviews.cfxm7dziqs2d.us-east-2.rds.amazonaws.com',
    port = 5432,
    user = 'postgres',
    password = 'Postgres$123',
    database = 'PySpark_NLP'
    )
cursor=connection.cursor()

# using pandas to execute SQL queries
import pandas as pd
sql = """
SELECT *
FROM public."db_for_nlp"
"""
pandas_df = pd.read_sql(sql, con=connection, index_col='review_id')

# Show DataFrame
pandas_df

  """)


Unnamed: 0_level_0,review,reviewer_sentiment
review_id,Unnamed: 1_level_1,Unnamed: 2_level_1
0,I am so angry that i made this post available...,negative
1,No real complaints the hotel was great great ...,positive
2,Rooms are nice but for elderly a bit difficul...,positive
3,My room was dirty and I was afraid to walk ba...,negative
4,You When I booked with your company on line y...,positive
...,...,...
512338,no trolly or staff to help you take the lugga...,positive
512339,The hotel looks like 3 but surely not 4 Brea...,positive
512340,The ac was useless It was a hot week in vienn...,negative
512341,The rooms are enormous and really comfortable...,positive


In [4]:
# converting PD dataframe to PySpark DataFrame

import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Create a Spark DataFrame from a pandas DataFrame using Arrow
df = spark.createDataFrame(pandas_df)
df.show()

  PyArrow >= 0.15.1 must be installed; however, your version was 0.14.1.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


+--------------------+------------------+
|              review|reviewer_sentiment|
+--------------------+------------------+
| I am so angry th...|          negative|
| No real complain...|          positive|
| Rooms are nice b...|          positive|
| My room was dirt...|          negative|
| You When I booke...|          positive|
| Backyard of the ...|          positive|
| Cleaner did not ...|          negative|
| Apart from the p...|          positive|
| Even though the ...|          positive|
| The aircondition...|          positive|
| Nothing all grea...|          positive|
| 6 30 AM started ...|          positive|
| The floor in my ...|          negative|
| This hotel is be...|          positive|
| Rooms a bit smal...|          positive|
| The staff in the...|          positive|
| This hotel is aw...|          positive|
| Very steep steps...|          positive|
| We did not like ...|          positive|
| Public areas are...|          positive|
+--------------------+------------

In [5]:
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [6]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature
data_df = df.withColumn('length', length(df['Review']))
data_df.show()

+--------------------+------------------+------+
|              review|reviewer_sentiment|length|
+--------------------+------------------+------+
| I am so angry th...|          negative|  1913|
| No real complain...|          positive|   611|
| Rooms are nice b...|          positive|   301|
| My room was dirt...|          negative|  1221|
| You When I booke...|          positive|   774|
| Backyard of the ...|          positive|   186|
| Cleaner did not ...|          negative|   235|
| Apart from the p...|          positive|   157|
| Even though the ...|          positive|   162|
| The aircondition...|          positive|   312|
| Nothing all grea...|          positive|   568|
| 6 30 AM started ...|          positive|   430|
| The floor in my ...|          negative|   152|
| This hotel is be...|          positive|   329|
| Rooms a bit smal...|          positive|    56|
| The staff in the...|          positive|   229|
| This hotel is aw...|          positive|   413|
| Very steep steps..

In [7]:
# Create all the features to the data set
convert_review_output_to_numbers = StringIndexer(inputCol='reviewer_sentiment',outputCol='label')
tokenizer = Tokenizer(inputCol="review", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [9]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[convert_review_output_to_numbers, tokenizer, stopremove, hashingTF, idf, clean_up])

In [10]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)

# Show "Combined_Review" and resulting features
cleaned.select(['label', 'features']).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(262145,[2437,302...|
|  0.0|(262145,[4714,514...|
|  0.0|(262145,[22346,23...|
|  1.0|(262145,[1797,230...|
|  0.0|(262145,[14870,20...|
|  0.0|(262145,[9781,304...|
|  1.0|(262145,[21641,34...|
|  0.0|(262145,[25789,43...|
|  0.0|(262145,[22815,31...|
|  0.0|(262145,[2437,216...|
|  0.0|(262145,[9129,181...|
|  0.0|(262145,[1696,383...|
|  1.0|(262145,[1729,216...|
|  0.0|(262145,[15370,23...|
|  0.0|(262145,[22346,33...|
|  0.0|(262145,[6957,304...|
|  0.0|(262145,[5765,218...|
|  0.0|(262145,[3280,110...|
|  0.0|(262145,[329,9129...|
|  0.0|(262145,[11941,17...|
+-----+--------------------+
only showing top 20 rows



In [11]:
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)

In [12]:
from pyspark.ml.classification import NaiveBayes
# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [13]:
# Transform the data with the testing data
test_results = predictor.transform(testing)
test_results.show(10)

+--------------------+------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|              review|reviewer_sentiment|length|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                    |          positive|     1|  0.0|                  []|                  []|      (262144,[],[])|      (262144,[],[])|(262145,[262144],...|[-0.3992652917160...|[0.95933263647863...|       0.0|
|                    |          positive|     1|  0.0|                  []|                  []|      (262144,[],[])|      (262144,[],[])|(262145,[2

In [14]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" %acc)

Accuracy of model at predicting reviews was: 0.916450
