# Importing modules and file

In [1]:
# Import findspark
import findspark

In [2]:
# Initiate Apache Spark
findspark.init()

In [3]:
# Import Pyspark
import pyspark
#import numpy as np

In [4]:
# Configure Spark Application
conf = pyspark.SparkConf().\
    setAppName('hva-data-scientist').\
    setMaster('local[*]')

In [5]:
# PySparkSQL
from pyspark.sql import SQLContext, HiveContext

In [6]:
sc = pyspark.SparkContext(conf=conf)
sqlContext = HiveContext(sc)

In [7]:
# Import Dataframe Functions
from pyspark.sql import functions as fn

In [8]:
# Import hotel reviews in dataframe
df = sqlContext.read.format("csv").option("header", "true").load("hotel-reviews.csv")

In [9]:
# Print Dataframe Schema
df.printSchema()

root
 |-- Hotel_Address: string (nullable = true)
 |-- Additional_Number_of_Scoring: string (nullable = true)
 |-- Review_Date: string (nullable = true)
 |-- Average_Score: string (nullable = true)
 |-- Hotel_Name: string (nullable = true)
 |-- Reviewer_Nationality: string (nullable = true)
 |-- Negative_Review: string (nullable = true)
 |-- Review_Total_Negative_Word_Counts: string (nullable = true)
 |-- Total_Number_of_Reviews: string (nullable = true)
 |-- Positive_Review: string (nullable = true)
 |-- Review_Total_Positive_Word_Counts: string (nullable = true)
 |-- Total_Number_of_Reviews_Reviewer_Has_Given: string (nullable = true)
 |-- Reviewer_Score: string (nullable = true)
 |-- Tags: string (nullable = true)
 |-- days_since_review: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)



In [10]:
#Show first record
df.first()

Row(Hotel_Address=' s Gravesandestraat 55 Oost 1092 AA Amsterdam Netherlands', Additional_Number_of_Scoring='194', Review_Date='8/3/2017', Average_Score='7.7', Hotel_Name='Hotel Arena', Reviewer_Nationality=' Russia ', Negative_Review=' I am so angry that i made this post available via all possible sites i use when planing my trips so no one will make the mistake of booking this place I made my booking via booking com We stayed for 6 nights in this hotel from 11 to 17 July Upon arrival we were placed in a small room on the 2nd floor of the hotel It turned out that this was not the room we booked I had specially reserved the 2 level duplex room so that we would have a big windows and high ceilings The room itself was ok if you don t mind the broken window that can not be closed hello rain and a mini fridge that contained some sort of a bio weapon at least i guessed so by the smell of it I intimately asked to change the room and after explaining 2 times that i booked a duplex btw it cost

# Structuring dataframe for analysis


In [11]:
# Print reviewers score, we need this to change into a sentiment value
df.select('Hotel_Name', 'Reviewer_Score').show(10)

+-----------+--------------+
| Hotel_Name|Reviewer_Score|
+-----------+--------------+
|Hotel Arena|           2.9|
|Hotel Arena|           7.5|
|Hotel Arena|           7.1|
|Hotel Arena|           3.8|
|Hotel Arena|           6.7|
|Hotel Arena|           6.7|
|Hotel Arena|           4.6|
|Hotel Arena|            10|
|Hotel Arena|           6.5|
|Hotel Arena|           7.9|
+-----------+--------------+
only showing top 10 rows



In [12]:
# Change Reviewer_Score in Sentiment value (1 <= 5.5, 0 < 5.5)
df = df.withColumn('Reviewer_Score', fn.when(df.Reviewer_Score >= 5.5, 1).otherwise(0))

In [13]:
# Change column names from Reviewer_Score --> Sentiment
df = df.withColumnRenamed('Reviewer_Score', 'Sentiment')

In [14]:
# Check to verify Reviewer_Score is changed into sentiment value
df.select('Hotel_Name', 'Sentiment').show(10)

+-----------+---------+
| Hotel_Name|Sentiment|
+-----------+---------+
|Hotel Arena|        0|
|Hotel Arena|        1|
|Hotel Arena|        1|
|Hotel Arena|        0|
|Hotel Arena|        1|
|Hotel Arena|        1|
|Hotel Arena|        0|
|Hotel Arena|        1|
|Hotel Arena|        1|
|Hotel Arena|        1|
+-----------+---------+
only showing top 10 rows



In [55]:
# Check to verify Reviewer_Score is changed into sentiment value
df.first()

Row(Hotel_Address=' s Gravesandestraat 55 Oost 1092 AA Amsterdam Netherlands', Additional_Number_of_Scoring='194', Review_Date='8/3/2017', Average_Score='7.7', Hotel_Name='Hotel Arena', Reviewer_Nationality=' Russia ', Negative_Review=' I am so angry that i made this post available via all possible sites i use when planing my trips so no one will make the mistake of booking this place I made my booking via booking com We stayed for 6 nights in this hotel from 11 to 17 July Upon arrival we were placed in a small room on the 2nd floor of the hotel It turned out that this was not the room we booked I had specially reserved the 2 level duplex room so that we would have a big windows and high ceilings The room itself was ok if you don t mind the broken window that can not be closed hello rain and a mini fridge that contained some sort of a bio weapon at least i guessed so by the smell of it I intimately asked to change the room and after explaining 2 times that i booked a duplex btw it cost

In [56]:
# Import PySpark function collection
from pyspark.sql.functions import col

In [57]:
# Concatenate the negative and positive to a single review text
df_with_text = df.withColumn('Review_Text', 
                    fn.concat(fn.col('Negative_Review'),fn.lit(' '), fn.col('Positive_Review')))

In [58]:
# Verify the new Schema
df_with_text.printSchema()

root
 |-- Hotel_Address: string (nullable = true)
 |-- Additional_Number_of_Scoring: string (nullable = true)
 |-- Review_Date: string (nullable = true)
 |-- Average_Score: string (nullable = true)
 |-- Hotel_Name: string (nullable = true)
 |-- Reviewer_Nationality: string (nullable = true)
 |-- Negative_Review: string (nullable = true)
 |-- Review_Total_Negative_Word_Counts: string (nullable = true)
 |-- Total_Number_of_Reviews: string (nullable = true)
 |-- Positive_Review: string (nullable = true)
 |-- Review_Total_Positive_Word_Counts: string (nullable = true)
 |-- Total_Number_of_Reviews_Reviewer_Has_Given: string (nullable = true)
 |-- Sentiment: integer (nullable = false)
 |-- Tags: string (nullable = true)
 |-- days_since_review: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- Review_Text: string (nullable = true)



In [59]:
# Strip Dataframe to only what is necessary for sentiment analysis
df_stripped = df_with_text.select('Negative_Review', 'Positive_Review', 'Review_Text', 'Sentiment')

In [60]:
# Verify the new stripped Dataframe
df_stripped.show()

+--------------------+--------------------+--------------------+---------+
|     Negative_Review|     Positive_Review|         Review_Text|Sentiment|
+--------------------+--------------------+--------------------+---------+
| I am so angry th...| Only the park ou...| I am so angry th...|        0|
|         No Negative| No real complain...|No Negative  No r...|        1|
| Rooms are nice b...| Location was goo...| Rooms are nice b...|        1|
| My room was dirt...| Great location i...| My room was dirt...|        0|
| You When I booke...| Amazing location...| You When I booke...|        1|
| Backyard of the ...| Good restaurant ...| Backyard of the ...|        1|
| Cleaner did not ...| The room is spac...| Cleaner did not ...|        0|
| Apart from the p...| Good location Se...| Apart from the p...|        1|
| Even though the ...|         No Positive| Even though the ...|        1|
| The aircondition...| The room was big...| The aircondition...|        1|
|  Nothing all great | Ro

In [61]:
# Verify the new stripped Dataframe
df_stripped.first()

Row(Negative_Review=' I am so angry that i made this post available via all possible sites i use when planing my trips so no one will make the mistake of booking this place I made my booking via booking com We stayed for 6 nights in this hotel from 11 to 17 July Upon arrival we were placed in a small room on the 2nd floor of the hotel It turned out that this was not the room we booked I had specially reserved the 2 level duplex room so that we would have a big windows and high ceilings The room itself was ok if you don t mind the broken window that can not be closed hello rain and a mini fridge that contained some sort of a bio weapon at least i guessed so by the smell of it I intimately asked to change the room and after explaining 2 times that i booked a duplex btw it costs the same as a simple double but got way more volume due to the high ceiling was offered a room but only the next day SO i had to check out the next day before 11 o clock in order to get the room i waned to Not the

# Sentiment Analysis

In [None]:
# Importing Stopwords to filter out of the reviews to exclude stopwords
import requests
stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()
stop_words[0:10]

In [None]:
# Importing RegexTokenizer
from pyspark.ml.feature import RegexTokenizer

In [None]:
# Configure tokenizer to extract words with only letters and save in column words
tokenizer = RegexTokenizer().setGaps(False)\
  .setPattern("\\p{L}+")\
  .setInputCol("Review_Text")\
  .setOutputCol("words")

In [None]:
# Import StopwordsRemover to remove stopwords from the tokenized words
from pyspark.ml.feature import StopWordsRemover

In [None]:
# Configure stopwords filter
sw_filter = StopWordsRemover()\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered")

In [None]:
# Import CountVectorizer 
from pyspark.ml.feature import CountVectorizer

In [None]:
# Configure CountVectorizer to count words and remove words that are not used more than 5 times
cv = CountVectorizer(minTF=1., minDF=5., vocabSize=2**17)\
  .setInputCol("filtered")\
  .setOutputCol("tf")

In [None]:
# Import Pipeline to create a Pipeline
from pyspark.ml import Pipeline

In [None]:
# Create Pipeline with Tokenizer, Stopwords Filter and CountVectorizer
cv_pipeline = Pipeline(stages=[tokenizer, sw_filter, cv]).fit(df_stripped)

In [None]:
# Verify the results of the pipeline
cv_pipeline.transform(df_stripped).printSchema()

In [None]:
# Verify the results of the pipeline
cv_pipeline.transform(df_stripped).show(5)

In [None]:
# Import Term frequency–inverse Document Frequency (TFIDF)
from pyspark.ml.feature import IDF

In [None]:
# Configure TFIDF
idf = IDF().\
    setInputCol('tf').\
    setOutputCol('tfidf')

In [None]:
# Create new Pipeline for this
idf_pipeline = Pipeline(stages=[cv_pipeline, idf]).fit(df_stripped)

In [None]:
# Split data into training, validation and testing data (60%, 30%, 10%)
training_df, validation_df, testing_df = df_stripped.randomSplit([0.6, 0.3, 0.1], seed=0)

In [None]:
# Count values of dataframes to verify
[training_df.count(), validation_df.count(), testing_df.count()]

In [None]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

In [None]:
# Configure LogisticRegression for analysis of the reviews
lr = LogisticRegression().\
    setLabelCol('Sentiment').\
    setFeaturesCol('tfidf').\
    setRegParam(0.0).\
    setMaxIter(100).\
    setElasticNetParam(0.)

In [None]:
# Create new Pipelines for the LogisticRegression and train the model
model = Pipeline(stages=[idf_pipeline, lr]).fit(training_df)

In [None]:
# Calculate Score of our Model using the validation Dataframe
model.transform(validation_df).\
    select(fn.expr('float(prediction = Sentiment)').alias('correct')).\
    select(fn.avg('correct')).show()

# Analyze your own review

In [None]:
# Create SparkSession to analyze user input
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("user_input_analysis") \
    .getOrCreate()

In [None]:
# Moet nog wel een functie van worden gemaakt...
#
# Iets zoals dit denk:
#
# def userinputanalysis(userinput):

# Ask for review
userinput = input("Enter the review you would like to predict: ")

if userinput:
    usersentiment = {'text' : userinput}
    df_userinput = sqlContext.createDataFrame([usersentiment])
    result_df = model.transform(df_userinput.withColumnRenamed('text', 'Review_Text')).select('Review_Text', 'prediction')
    result_df.show()
    
    result = str(result_df.collect()[0][1])
    print("Result is " + result)
    
    if result == "1.0":
        print("Your review is positive!")
        #return("Your review is positive!")
    else:
        print("Your review is negative!")
        #return("Your review is negative!")
else:
    print("No user input was given")
    # return("No user input was given")
