# Setup the spark context

In [1]:
import os
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3.6"

In [2]:
from pyspark import SparkConf, SparkContext

SparkURL = "spark://10.5.0.5:7077"

c = SparkConf()
c.setMaster(SparkURL)

SparkContext.setSystemProperty('spark.executor.memory', '6g')
_SC = SparkContext(conf=c)

# Load Dataframe

In [3]:
import pandas as pd

data = pd.read_csv("/home/nielsvm/Documents/BD-HotelReviewVisualizer/Data/Hotel_Reviews.csv")

# Build session & store data

In [4]:
from pyspark.sql import SparkSession

ss = SparkSession(_SC)
df = ss.createDataFrame(data)

In [5]:
keepcols = ["Negative_Review", "Positive_Review"]
df = df.select([col for col in data.columns if col in keepcols])

In [6]:
df.printSchema()

root
 |-- Negative_Review: string (nullable = true)
 |-- Positive_Review: string (nullable = true)



In [7]:
from pyspark.sql.functions import lit

neg_review = df.select("Negative_Review")
neg_review = neg_review.selectExpr("Negative_Review as text")

pos_review = df.select("Positive_Review")
pos_review = pos_review.selectExpr("Positive_Review as text")

neg_review = neg_review.withColumn("positive", lit(0))
pos_review = pos_review.withColumn("positive", lit(1))

all_reviews = neg_review.union(pos_review)

# Data Cleanup & Preparing for ML algorithms

In [8]:
from pyspark.ml.classification import (LogisticRegression, NaiveBayes,RandomForestClassifier)
from pyspark.ml.feature import (CountVectorizer, RegexTokenizer,StopWordsRemover)

In [9]:
regexTokenizer = RegexTokenizer(
    inputCol="text",
    outputCol = "words",
    pattern="\\W")

stopwordsRemover = StopWordsRemover(
    inputCol="words", 
    outputCol="filtered")

countVectors = CountVectorizer(
    inputCol="filtered", 
    outputCol="features", 
    vocabSize=10000,
    minDF=5)


In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import (OneHotEncoder, StringIndexer, VectorAssembler)

labels = StringIndexer(inputCol="positive", outputCol="label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors,labels])

In [11]:
pipelineFit = pipeline.fit(all_reviews)
dataset = pipelineFit.transform(all_reviews)
dataset.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+-----+
|                text|positive|               words|            filtered|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
| I am so angry th...|       0|[i, am, so, angry...|[angry, made, pos...|(10000,[0,3,16,22...|  0.0|
|         No Negative|       0|      [no, negative]|          [negative]|   (10000,[6],[1.0])|  0.0|
| Rooms are nice b...|       0|[rooms, are, nice...|[rooms, nice, eld...|(10000,[11,13,24,...|  0.0|
| My room was dirt...|       0|[my, room, was, d...|[room, dirty, afr...|(10000,[0,1,3,10,...|  0.0|
| You When I booke...|       0|[you, when, i, bo...|[booked, company,...|(10000,[0,1,13,20...|  0.0|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [12]:
(train_df, test_df) = dataset.randomSplit([0.7, 0.3], seed=100)

In [13]:
print("Training Set: {}\nTesting Set: {}".format(train_df.count(), test_df.count()))

Training Set: 722707
Testing Set: 308769


# Logistic Regression

In [14]:
lr = LogisticRegression(maxIter=40, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(train_df)

In [15]:
predictions = lrModel.transform(test_df)

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9357390680385913

# Naive Bayes

In [17]:
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1)
model = nb.fit(train_df)

In [18]:
predictions = model.transform(test_df)

In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9273088736529471

# Random Forest

In [20]:
trainrf_df, _ = train_df.randomSplit([0.05, 0.95], seed=100)
testrf_df, _ = test_df.randomSplit([0.05, 0.95], seed=100)
trainrf_df.count()

35807

In [21]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    numTrees = 50,
    maxDepth = 2,
    maxBins = 32)

rfModel = rf.fit(trainrf_df)

In [22]:
predictions = rfModel.transform(testrf_df)

In [23]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.8031973896423081