In [1]:
%%init_spark
launcher.master="yarn"
launcher.num_executors=4
launcher.executor_cores=2
launcher.executor_memory='6000m'
launcher.packages=["com.github.master:spark-stemming_2.10:0.2.0"]


# 1) Data Exploration: -Loading data and Extracting 'stars' and respective review's 'text'

# Finding distribution of stars over each text


In [2]:
val review_df=spark.read.json("/hadoop-user/data/yelp/review.json")
              .select(($"stars").alias("stars"),($"text").alias("text"))

review_df.cache()
review_df.printSchema()
review_df.show(5)
review_df.count()

Intitializing Scala interpreter ...

Spark Web UI available at http://bdu-hm19:8088/proxy/application_1544057569429_0001
SparkContext available as 'sc' (version = 2.3.1, master = yarn, app id = application_1544057569429_0001)
SparkSession available as 'spark'


root
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)

+-----+--------------------+
|stars|                text|
+-----+--------------------+
|    5|Love the staff, l...|
|    5|Super simple plac...|
|    5|Small unassuming ...|
|    5|Lester's is locat...|
|    4|Love coming here....|
+-----+--------------------+
only showing top 5 rows



review_df: org.apache.spark.sql.DataFrame = [stars: bigint, text: string]
res0: Long = 5261669


# 2) Feature Engineering: -Creating column 'rating' for categorizing stars into two groups.

In [3]:
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
val splits = Array(Double.NegativeInfinity,4, Double.PositiveInfinity)

val bucketizer = new Bucketizer()
  .setInputCol("stars")
  .setOutputCol("rating")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(review_df)
bucketedData.show

+-----+--------------------+------+
|stars|                text|rating|
+-----+--------------------+------+
|    5|Love the staff, l...|   1.0|
|    5|Super simple plac...|   1.0|
|    5|Small unassuming ...|   1.0|
|    5|Lester's is locat...|   1.0|
|    4|Love coming here....|   1.0|
|    4|Had their chocola...|   1.0|
|    5|Cycle Pub Las Veg...|   1.0|
|    4|Who would have gu...|   1.0|
|    4|Always drove past...|   1.0|
|    3|Not bad!! Love th...|   0.0|
|    5|Love this place!
...|   1.0|
|    4|This is currently...|   1.0|
|    3|Server was a litt...|   0.0|
|    1|I thought Tidy's ...|   0.0|
|    3|Wanted to check o...|   0.0|
|    5|This place is awe...|   1.0|
|    4|a must stop when ...|   1.0|
|    1|I too have been t...|   0.0|
|    3|Came here with my...|   0.0|
|    3|Came here for a b...|   0.0|
+-----+--------------------+------+
only showing top 20 rows



import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg._
splits: Array[Double] = Array(-Infinity, 4.0, Infinity)
bucketizer: org.apache.spark.ml.feature.Bucketizer = bucketizer_1d7665d54ff8
bucketedData: org.apache.spark.sql.DataFrame = [stars: bigint, text: string ... 1 more field]


# -Finding disrtibution of rating

In [4]:
bucketedData.createOrReplaceTempView("ratingData")
spark.sql("select rating,count(text) from ratingData group by rating").show()


+------+-----------+
|rating|count(text)|
+------+-----------+
|   0.0|    1785005|
|   1.0|    3476664|
+------+-----------+



# -Using sampleBy() to down sample the data.

In [7]:
val fractionMapKey = Map(0.0->0.001,1.0->(0.00050))
bucketedData.stat.sampleBy("rating",fractionMapKey,50).groupBy("rating").count().show()


+------+-----+
|rating|count|
+------+-----+
|   0.0| 1788|
|   1.0| 1771|
+------+-----+



fractionMapKey: scala.collection.immutable.Map[Double,Double] = Map(0.0 -> 0.001, 1.0 -> 5.0E-4)


# -Finding TFIDF vector of text review.
# -Spliting data into test and training, creating pipeline

In [9]:
//Storing Down Sampled data
val sampled_data = bucketedData.stat.sampleBy("rating",fractionMapKey,50)


import org.apache.spark.ml.feature._

val tokenizer = new RegexTokenizer().setMinTokenLength(3).setToLowercase(true).setInputCol("text").setOutputCol("text_words")

//Defining a udf to remove punctuations from a sequence of words
import org.apache.spark.sql.functions.udf

def removePunc(words:Seq[String]):Seq[String]={
 return words.map(_.replaceAll("\\p{Punct}"," "))
}
//val removePuncUDF=udf(removePunc(_:Seq[String]))
spark.udf.register("removePuncUDF",removePunc(_:Seq[String]) )

//use the removePuncUDF to remove all punctuations from boilerplate_wordss
val puncRemover = new SQLTransformer().setStatement("SELECT removePuncUDF(text_words) as review_words, rating from __THIS__ ")

val stopWordRemover=new StopWordsRemover().setInputCol("review_words").setOutputCol("filtered_words")

import org.apache.spark.mllib.feature.Stemmer
val stemmer = new Stemmer().setInputCol("filtered_words").setOutputCol("stemmed_words")

val vectorizer = new CountVectorizer().setInputCol("stemmed_words").setOutputCol("review_BOW").setMinDF(100)
val tfidf = new IDF().setInputCol("review_BOW").setOutputCol("review_TFIDF")

import org.apache.spark.ml.{Pipeline, PipelineModel}

val pipeline = new Pipeline().setStages(Array(tokenizer,puncRemover,stopWordRemover, stemmer, vectorizer, tfidf))
val Array(training,testing)=sampled_data.randomSplit(Array(0.8,0.2),111)
val preprocessing_pipeline= pipeline.fit(training)
                                        
val training_cleaned=preprocessing_pipeline.transform(training)
val testing_cleaned=preprocessing_pipeline.transform(testing)

sampled_data: org.apache.spark.sql.DataFrame = [stars: bigint, text: string ... 1 more field]
import org.apache.spark.ml.feature._
tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_5cf1fd84b479
import org.apache.spark.sql.functions.udf
removePunc: (words: Seq[String])Seq[String]
puncRemover: org.apache.spark.ml.feature.SQLTransformer = sql_baaf90d57ce4
stopWordRemover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_40bdcd5ea7df
import org.apache.spark.mllib.feature.Stemmer
stemmer: org.apache.spark.mllib.feature.Stemmer = stemmer_6e573d5c01f2
vectorizer: org.apache.spark.ml.feature.CountVectorizer = cntVec_018d16e5f9e3
tfidf: org.apache.spark.ml.feature.IDF = idf_a770a1ad8552
import org.apache.spark.ml.{Pipeline, PipelineModel}
pipeline: org.apache.spark.ml.Pipe...

# 3) Machine Learning models:-Creating Gradient boosted classifier model


In [11]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
// Train a GBT model.
val gbt = new GBTClassifier().setLabelCol("rating").setFeaturesCol("review_TFIDF")

//Setting param grid parameters
val paramGrid =new ParamGridBuilder()
             .addGrid(gbt.maxDepth, Array(2, 5))
             .build()

val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("rating").setMetricName("areaUnderROC")

val cv_gb = new CrossValidator().setEstimator(gbt).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

//Fit the training data to the pipeline
val model = cv_gb.fit(training_cleaned)

// Make predictions.
val predictions_gbt = model.transform(testing_cleaned)

// Select example rows to display.
predictions.select("rating", "prediction", "probability", "stemmed_words").show(5)
val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for GBT on test data = $AUC")

+------+----------+--------------------+--------------------+
|rating|prediction|         probability|       stemmed_words|
+------+----------+--------------------+--------------------+
|   0.0|       0.0|[0.66052134867120...|[came, locat, boc...|
|   0.0|       0.0|[0.70749349361436...|[i v, times , wel...|
|   0.0|       0.0|[0.78836886859910...|[give, zero , wou...|
|   0.0|       0.0|[0.68090435030942...|[went, 11pm, sund...|
|   0.0|       0.0|[0.69306604895833...|[place, crap , do...|
+------+----------+--------------------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for GBT on test data = 0.7967578497750715


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
gbt: org.apache.spark.ml.classification.GBTClassifier = gbtc_a10209091230
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	gbtc_a10209091230-maxDepth: 2
}, {
	gbtc_a10209091230-maxDepth: 5
})
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_793803026bb5
cv_gb: org.apache.spark.ml.tuning.CrossValidator = cv_8e4e...

# -Creating Logistic regression model

In [13]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

//Building logistic regression model
val lr = new LogisticRegression().setLabelCol("rating").setFeaturesCol("review_TFIDF")
val paramGrid =new ParamGridBuilder()
             .addGrid(lr.regParam, Array(0.5, 2.0))
             .addGrid(lr.elasticNetParam, Array(0.2,0.7))
             .build()
val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("rating").setMetricName("areaUnderROC")
val cv = new CrossValidator().setEstimator(lr).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

//Fit the training data to the pipeline
val model = cv.fit(training_cleaned)

// Make predictions.
val predictions_lr = model.transform(testing_cleaned)

// Select example rows to display.
predictions.select("rating", "prediction", "probability", "stemmed_words").show(5)

val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for LR on test data = $AUC")

+------+----------+--------------------+--------------------+
|rating|prediction|         probability|       stemmed_words|
+------+----------+--------------------+--------------------+
|   0.0|       0.0|[0.50387068266630...|[came, locat, boc...|
|   0.0|       0.0|[0.50387068266630...|[i v, times , wel...|
|   0.0|       0.0|[0.50387068266630...|[give, zero , wou...|
|   0.0|       0.0|[0.50387068266630...|[went, 11pm, sund...|
|   0.0|       0.0|[0.50387068266630...|[place, crap , do...|
+------+----------+--------------------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for LR on test data = 0.588911104042653


import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_3be6a861253d
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	logreg_3be6a861253d-elasticNetParam: 0.2,
	logreg_3be6a861253d-regParam: 0.5
}, {
	logreg_3be6a861253d-elasticNetParam: 0.7,
	logreg_3be6a861253d-regParam: 0.5
}, {
	logreg_3be6a861253d-elasticNetParam: 0.2,
	logreg_3be6a861253d-regParam: 2.0
}, {
	logreg_3be6a861253d-elasticNetParam: 0.7,
	logreg_3be6a861253d-regParam: 2.0
})
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_a78e7e32e0c8
cv: org.apache.spark.ml.tuning.CrossValidator = cv_d38e...

# -Creating Random Forest model

In [14]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

//Building Random Forest model
val rf = new RandomForestClassifier().setLabelCol("rating").setFeaturesCol("review_TFIDF")
val paramGrid =new ParamGridBuilder()
             .addGrid(rf.maxDepth, Array(2, 5))
             .addGrid(rf.numTrees, Array(5, 10))
             .build()

val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("rating").setMetricName("areaUnderROC")

val cv_rf = new CrossValidator().setEstimator(rf).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

val model = cv_rf.fit(training_cleaned)

val predictions_rf = model.transform(testing_cleaned)

// Select example rows to display.
predictions.select("rating", "prediction", "probability", "stemmed_words").show(5)
val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for RF on test data = $AUC")

+------+----------+--------------------+--------------------+
|rating|prediction|         probability|       stemmed_words|
+------+----------+--------------------+--------------------+
|   0.0|       0.0|[0.50387068266630...|[came, locat, boc...|
|   0.0|       0.0|[0.50387068266630...|[i v, times , wel...|
|   0.0|       0.0|[0.50387068266630...|[give, zero , wou...|
|   0.0|       0.0|[0.50387068266630...|[went, 11pm, sund...|
|   0.0|       0.0|[0.50387068266630...|[place, crap , do...|
+------+----------+--------------------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for RF on test data = 0.588911104042653


import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_884e228e416b
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_884e228e416b-maxDepth: 2,
	rfc_884e228e416b-numTrees: 5
}, {
	rfc_884e228e416b-maxDepth: 2,
	rfc_884e228e416b-numTrees: 10
}, {
	rfc_884e228e416b-maxDepth: 5,
	rfc_884e228e416b-numTrees: 5
}, {
	rfc_884e228e416b-maxDepth: 5,
	rfc_884e228e416b-numTrees: 10
})
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_24c88c948c2a
cv_rf: org.apache.spark.ml.tuning.CrossValidator = cv_a260af710...

# Creating views for prediction dataframes of all three models

In [20]:
predictions_gbt.createOrReplaceTempView("predictionsGbt_view")
predictions_lr.createOrReplaceTempView("predictionsLR_view")
predictions_rf.createOrReplaceTempView("predictionsRF_view")



# Ensembling prediction from all three models

In [32]:
//Sql  query to select if A=B or A=C then A else B
val ensemble = spark.sql("select CASE WHEN gbt.prediction = lr.prediction OR gbt.prediction = rf.prediction THEN  gbt.prediction ELSE rf.prediction END AS prediction_ensemble, gbt.rating FROM predictionsGbt_view gbt INNER JOIN predictionsLR_view lr ON gbt.rating =lr.rating INNER JOIN predictionsRF_view rf ON lr.rating=rf.rating")

ensemble.show()

+-------------------+------+
|prediction_ensemble|rating|
+-------------------+------+
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
|                0.0|   0.0|
+-------------------+------+
only showing top 20 rows



ensemble: org.apache.spark.sql.DataFrame = [prediction_ensemble: double, rating: double]


# Area under ROC curve for ensembled model.

In [34]:
import spark.implicits._
import org.apache.spark.mllib.evaluation._

//Converting the ensemble dataframe to an rdd of the form (prediction_ensemble, rating)
val predictionsAndLabels = ensemble.selectExpr("cast(prediction_ensemble as Double) prediction_ensemble","cast(rating as Double) rating")
.rdd.map(row=>(row.getAs[Double]("prediction_ensemble"),row.getAs[Double]("rating")))

//Compute the accuracy of the ensemble predictions
val metrics = new BinaryClassificationMetrics(predictionsAndLabels)

println(metrics.areaUnderROC)

0.742132331646631


import spark.implicits._
import org.apache.spark.mllib.evaluation._
predictionsAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[9232] at map at <console>:102
metrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@32c3ccdc
