# Final Project

In [1]:
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *       # for datatype conversion
from pyspark.sql.functions import *   # for col() function
import pandas as pd

sc = SparkContext.getOrCreate()

import pyspark.sql.types as typ
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg




spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Hotel_Reviews") \
    .config("spark.executor.memory", '2g') \
    .config('spark.executor.cores', '1') \
    .config('spark.cores.max', '1') \
    .config("spark.driver.memory",'1g') \
    .getOrCreate()

sc = spark.sparkContext
sqlCtx = SQLContext(sc)

In [2]:
path_to_data=os.path.join('data/Hotel_Reviews.csv')
df=spark.read.csv(path_to_data,header=True,inferSchema = True)

In [3]:
from pyspark.sql.functions import lit,col
reviews_pos=df.select(['Positive_Review']).withColumnRenamed("Positive_Review", "Review")
reviews_neg=df.select(['Negative_Review']).withColumnRenamed("Negative_Review", "Review")
reviews_pos = reviews_pos.withColumn("target",lit(1))
reviews_neg = reviews_neg.withColumn("target",lit(0))
reviews_final = reviews_pos.union(reviews_neg)
#reviews_final.dropna(inplace=True)
#reviews_final.reset_index(drop=True,inplace=True)
reviews_final_temp = reviews_final.filter("Review != 'No Positive'")
my_df.dropna(inplace=True)
my_df.reset_index(drop=True,inplace=True)

In [4]:
train_set, val_set, test_set = reviews_final_temp.randomSplit([0.95, 0.025, 0.025], seed = 314)


In [5]:
###hashing TF, IDF, then logistic regression

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="Review", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(100)


+--------------------+------+--------------------+--------------------+--------------------+-----+
|              Review|target|               words|                  tf|            features|label|
+--------------------+------+--------------------+--------------------+--------------------+-----+
|                    |     1|                  []|       (65536,[],[])|       (65536,[],[])|  1.0|
|                    |     1|                  []|       (65536,[],[])|       (65536,[],[])|  1.0|
|                    |     1|                  []|       (65536,[],[])|       (65536,[],[])|  1.0|
|                    |     1|                  []|       (65536,[],[])|       (65536,[],[])|  1.0|
|                    |     1|                  []|       (65536,[],[])|       (65536,[],[])|  1.0|
|                    |     1|                  []|       (65536,[],[])|       (65536,[],[])|  1.0|
|                    |     1|                  []|       (65536,[],[])|       (65536,[],[])|  1.0|
|         

In [6]:
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)

In [7]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.9783991696145778

In [8]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
accuracy

0.9395276717557252

In [10]:
###CountVectorizer, IDF, and then Logistic Regression
from pyspark.ml.feature import CountVectorizer

tokenizer = Tokenizer(inputCol="Review", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
lr = LogisticRegression(maxIter=100)
pipeline = Pipeline(stages=[tokenizer, cv, idf, label_stringIdx, lr])

pipelineFit = pipeline.fit(train_set)
predictions = pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)

print ("Accuracy Score: ", format(accuracy))
print ("ROC-AUC: ", format(roc_auc))

Accuracy Score:  0.9401638040712468
ROC-AUC:  0.9782662298284932


In [14]:
##### N-gram
###### extract around 16,000 features from unigram, bigram, trigram. 
###### This means around 48,000 features in total were extracted. 
###### Then implementing Chi-Squared feature selection to reduce the number of features to 16,000 in total.

from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import ChiSqSelector

def ngrams_function(inputCol=["Review","target"], n=3):
    tokenizer = [Tokenizer(inputCol="Review", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)

In [16]:
trigram_pipelineFit = ngrams_function().fit(train_set)
predictions = trigram_pipelineFit.transform(val_set)
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(val_set.count())
roc_auc = evaluator.evaluate(predictions)
# print accuracy, roc_auc
print ("Accuracy Score: ", format(accuracy))
print ("ROC-AUC: ", format(roc_auc))

Accuracy Score:  0.9522503180661578
ROC-AUC:  0.9859813965327662


In [18]:
###### fit the model on test set

test_predictions = trigram_pipelineFit.transform(test_set)
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(test_predictions)
print ("Accuracy Score: ", format(accuracy))
print ("ROC-AUC: ", format(roc_auc))



Accuracy Score:  0.9522503180661578
ROC-AUC:  0.9859813965327662


In [16]:
##### build recommendation engine for hotel ratings ,"Hotel_Name","Reviewer_Score","Review_Date"
recomm_input = df.select(["Reviewer_Nationality","Hotel_Name","Reviewer_Score","Review_Date"])
recomm_input.show(5)

+--------------------+-----------+--------------+-----------+
|Reviewer_Nationality| Hotel_Name|Reviewer_Score|Review_Date|
+--------------------+-----------+--------------+-----------+
|             Russia |Hotel Arena|           2.9|   8/3/2017|
|            Ireland |Hotel Arena|           7.5|   8/3/2017|
|          Australia |Hotel Arena|           7.1|  7/31/2017|
|     United Kingdom |Hotel Arena|           3.8|  7/31/2017|
|        New Zealand |Hotel Arena|           6.7|  7/24/2017|
+--------------------+-----------+--------------+-----------+
only showing top 5 rows



In [17]:
(training, test) = recomm_input.randomSplit([0.8, 0.2])

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

als = ALS(maxIter=5, regParam=0.01, userCol="Reviewer_Nationality", itemCol="Hotel_Name", ratingCol="Reviewer_Score",
          coldStartStrategy="drop")
model = als.fit(training)

IllegalArgumentException: 'requirement failed: Column Reviewer_Nationality must be of type numeric but was actually of type string.'

In [None]:
##### build recommendation engine for hotel ratings 
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=long(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)