In [1]:
%%init_spark
launcher.master="yarn"
launcher.num_executors=6
launcher.executor_cores=2
launcher.executor_memory='6000m'
launcher.packages=["com.github.master:spark-stemming_2.10:0.2.0"]

In [2]:
//Data Exploration
val reviewsDF=spark.read.json("/hadoop-user/data/review.json").select("text", "stars")



Intitializing Scala interpreter ...

Spark Web UI available at http://bd-hm:8088/proxy/application_1574735054766_0001
SparkContext available as 'sc' (version = 2.4.4, master = yarn, app id = application_1574735054766_0001)
SparkSession available as 'spark'


reviewsDF: org.apache.spark.sql.DataFrame = [text: string, stars: double]


In [3]:
reviewsDF.show(3)
reviewsDF.cache()


+--------------------+-----+
|                text|stars|
+--------------------+-----+
|Total bill for th...|  1.0|
|I *adore* Travis ...|  5.0|
|I have to say tha...|  5.0|
+--------------------+-----+
only showing top 3 rows



res0: reviewsDF.type = [text: string, stars: double]


In [4]:
reviewsDF.createOrReplaceTempView("reviews")
//the number of reviews for each star value
spark.sql("select stars,count(text) as count from reviews group by stars").show(6)

+-----+-------+
|stars|  count|
+-----+-------+
|  1.0|1002159|
|  4.0|1468985|
|  3.0| 739280|
|  2.0| 542394|
|  5.0|2933082|
+-----+-------+



In [5]:
//Feature Engineering 
//Bucketing to create a new column “rating” with values 0 (if the star rating is 1,2, or 3) and 1 (if the star rating is 4 or 5)
import org.apache.spark.ml.feature.Bucketizer

val splits = Array(0,4,5.toDouble)


val bucketizer = new Bucketizer()
  .setInputCol("stars")
  .setOutputCol("rating")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(reviewsDF)
bucketedData.show(3)

+--------------------+-----+------+
|                text|stars|rating|
+--------------------+-----+------+
|Total bill for th...|  1.0|   0.0|
|I *adore* Travis ...|  5.0|   1.0|
|I have to say tha...|  5.0|   1.0|
+--------------------+-----+------+
only showing top 3 rows



import org.apache.spark.ml.feature.Bucketizer
splits: Array[Double] = Array(0.0, 4.0, 5.0)
bucketizer: org.apache.spark.ml.feature.Bucketizer = bucketizer_852744a0bdc1
bucketedData: org.apache.spark.sql.DataFrame = [text: string, stars: double ... 1 more field]


In [6]:
bucketedData.createOrReplaceTempView("bucketed_reviews")
// the count of reviews for each rating value
spark.sql("select rating,count(text) as count from bucketed_reviews group by rating").show(3)



+------+-------+
|rating|  count|
+------+-------+
|   0.0|2283833|
|   1.0|4402067|
+------+-------+



In [7]:
//Stratified Sampling and get a sample of only 10% of reviews in each rating category after down-sampling

val fractionKeyMap = Map(0 -> 0.1, 1-> 0.1*0.5)

val StratifiedSample= bucketedData.stat.sampleBy("rating", fractionKeyMap, 7L)


StratifiedSample.show(3);

+--------------------+-----+------+
|                text|stars|rating|
+--------------------+-----+------+
|This place has go...|  1.0|   0.0|
|So good! They did...|  4.0|   1.0|
|Met an old close ...|  4.0|   1.0|
+--------------------+-----+------+
only showing top 3 rows



fractionKeyMap: scala.collection.immutable.Map[Int,Double] = Map(0 -> 0.1, 1 -> 0.05)
StratifiedSample: org.apache.spark.sql.DataFrame = [text: string, stars: double ... 1 more field]


In [8]:
//Extracting TFIDF vectors from the review Text

import org.apache.spark.ml.feature._
import org.apache.spark.mllib.feature.Stemmer

//tokenising
val tokenizer = new RegexTokenizer().setMinTokenLength(3).setToLowercase(true).setInputCol("text").setOutputCol("words")

//Defining a udf to remove punctuations from a sequence of words
import org.apache.spark.sql.functions.udf

def removePunc(words:Seq[String]):Seq[String]={
 return words.map(_.replaceAll("\\p{Punct}"," "))
}

//val removePuncUDF=udf(removePunc(_:Seq[String]))
spark.udf.register("removePuncUDF",removePunc(_:Seq[String]) )

//use the removePuncUDF to remove all punctuations from words
val puncRemover = new SQLTransformer().setStatement("SELECT removePuncUDF(words) as text, rating from __THIS__ ")

//removing stop words using spark's StopWordRemover
val stopWordRemover=new StopWordsRemover().setInputCol("text").setOutputCol("filtered_text")

////Stemming using a third party package
val stemmer = new Stemmer().setInputCol("filtered_text").setOutputCol("stemmed_text")


val vectorizer = new CountVectorizer().setInputCol("stemmed_text").setOutputCol("BOW").setMinDF(100)

//final tfidf vector
val tfidf = new IDF().setInputCol("BOW").setOutputCol("text_TFIDF")




import org.apache.spark.ml.feature._
import org.apache.spark.mllib.feature.Stemmer
tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_9b07384e9a4f
import org.apache.spark.sql.functions.udf
removePunc: (words: Seq[String])Seq[String]
puncRemover: org.apache.spark.ml.feature.SQLTransformer = sql_15288d7b21dc
stopWordRemover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_450a33bb42e6
stemmer: org.apache.spark.mllib.feature.Stemmer = stemmer_069028e38708
vectorizer: org.apache.spark.ml.feature.CountVectorizer = cntVec_229899540b4d
tfidf: org.apache.spark.ml.feature.IDF = idf_c078406b1a0d


In [9]:
//Building Machine Learning pipelines
//Logistic Regression
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
val lr = new LogisticRegression().setLabelCol("rating").setFeaturesCol("text_TFIDF")
val paramGrid =new ParamGridBuilder()
             .addGrid(lr.regParam, Array(0.01, 0.5, 2.0))
             .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
             
             .build()
val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("rating").setMetricName("areaUnderROC")
val cv = new CrossValidator().setEstimator(lr).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

//creating a pipeline
val pipeline = new Pipeline().setStages(Array(tokenizer,puncRemover,stopWordRemover, stemmer, vectorizer, tfidf,cv))

//dividing data into training and test data
val Array(training,testing)=StratifiedSample.randomSplit(Array(0.8,0.2),111)


//Fit the training data to the pipeline
val pipelineModel = pipeline.fit(training)

// Make predictions.
val predictions = pipelineModel.transform(testing)

// Select example rows to display.
predictions.select("rating", "prediction", "probability", "stemmed_text").show(5)

//Evaluating our model
val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for LR on test data = $AUC")




+------+----------+--------------------+--------------------+
|rating|prediction|         probability|        stemmed_text|
+------+----------+--------------------+--------------------+
|   1.0|       1.0|[0.04664469386640...|[ original , take...|
|   0.0|       0.0|[0.91037025362698...|[       , hairdre...|
|   1.0|       0.0|[0.67643752066498...|[ 10 99, champagn...|
|   0.0|       0.0|[0.99594554906796...|[ 148, dinner, gu...|
|   0.0|       0.0|[0.99680105550227...|[small, drip , ta...|
+------+----------+--------------------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for LR on test data = 0.948347173215654


import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_8ee311034225
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	logreg_8ee311034225-elasticNetParam: 0.0,
	logreg_8ee311034225-regParam: 0.01
}, {
	logreg_8ee311034225-elasticNetParam: 0.5,
	logreg_8ee311034225-regParam: 0.01
}, {
	logreg_8ee311034225-elasticNetParam: 1.0,
	logreg_8ee311034225-regParam: 0.01
}, {
	logreg_8ee311034225-elasticNetParam: 0.0,
	logreg_8ee311034225-regParam: 0.5
}, {
	logreg_8ee311034225-elasticNetParam: 0.5,
	logreg_8ee311034225-regParam: 0.5
}, {
	logreg_8...

In [10]:
//Random Forest Classifier
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

val rf = new RandomForestClassifier().setLabelCol("rating").setFeaturesCol("text_TFIDF")
val paramGrid =new ParamGridBuilder()
             .addGrid(rf.maxDepth, Array(2, 5))
             .addGrid(rf.numTrees, Array(5, 20))
             
             .build()

val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("rating").setMetricName("areaUnderROC")


val cv_rf = new CrossValidator().setEstimator(rf).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

val pipeline_rf = new Pipeline().setStages(Array(tokenizer,puncRemover,stopWordRemover, stemmer, vectorizer, tfidf,cv_rf))

val Array(training,testing)=StratifiedSample.randomSplit(Array(0.8,0.2),111)

//Fit the training data to the pipeline
val pipelineModel_rf = pipeline_rf.fit(training)

// Make predictions.
val predictions = pipelineModel_rf.transform(testing)
predictions.select("rating", "prediction", "probability", "stemmed_text").show(5)

//Evaluating the model
val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for RF on test data = $AUC")

+------+----------+--------------------+--------------------+
|rating|prediction|         probability|        stemmed_text|
+------+----------+--------------------+--------------------+
|   1.0|       1.0|[0.48017184576031...|[ original , take...|
|   0.0|       0.0|[0.50467679605114...|[       , hairdre...|
|   1.0|       0.0|[0.56551560174407...|[ 10 99, champagn...|
|   0.0|       0.0|[0.58987161890653...|[ 148, dinner, gu...|
|   0.0|       0.0|[0.55671834440989...|[small, drip , ta...|
+------+----------+--------------------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for RF on test data = 0.8059054542735092


import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_83716b7d4aac
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_83716b7d4aac-maxDepth: 2,
	rfc_83716b7d4aac-numTrees: 5
}, {
	rfc_83716b7d4aac-maxDepth: 5,
	rfc_83716b7d4aac-numTrees: 5
}, {
	rfc_83716b7d4aac-maxDepth: 2,
	rfc_83716b7d4aac-numTrees: 20
}, {
	rfc_83716b7d4aac-maxDepth: 5,
	rfc_83716b7d4aac-numTrees: 20
})
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_6df6878e3773
cv_rf: org.apache.spark.ml.tuning.CrossValidator = cv_8a73b0f4e...

In [11]:
//Gradient Boosted classification Tree
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._



// Create a GBT model.
val gbt = new GBTClassifier()
  .setLabelCol("rating")
  .setFeaturesCol("text_TFIDF")



//Create ParamGrid for Cross Validation
val paramGrid = new ParamGridBuilder()
             .addGrid(gbt.maxDepth, Array(2,5))
             .addGrid(gbt.maxIter, Array(10, 20,100))
             .build()
val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("rating").setMetricName("areaUnderROC")

val cv = new CrossValidator().setEstimator(gbt).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

//creating a pipeline
val pipeline_rf = new Pipeline().setStages(Array(tokenizer,puncRemover,stopWordRemover, stemmer, vectorizer, tfidf,cv))

//dividing data into training and test data
val Array(training,testing)=StratifiedSample.randomSplit(Array(0.8,0.2),111)

//Fit the training data to the pipeline
val pipelineModel = pipeline.fit(training)

// Make predictions.
val predictions = pipelineModel.transform(testing)

// Select example rows to display.
predictions.select("rating", "prediction", "probability", "stemmed_text").show(5)

//evaluating the model
//By mistake , I write RF in the printing statement.
val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for RF on test data = $AUC")



+------+----------+--------------------+--------------------+
|rating|prediction|         probability|        stemmed_text|
+------+----------+--------------------+--------------------+
|   1.0|       1.0|[0.04664469386652...|[ original , take...|
|   0.0|       0.0|[0.91037025362635...|[       , hairdre...|
|   1.0|       0.0|[0.67643752066415...|[ 10 99, champagn...|
|   0.0|       0.0|[0.99594554906797...|[ 148, dinner, gu...|
|   0.0|       0.0|[0.99680105550221...|[small, drip , ta...|
+------+----------+--------------------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for RF on test data = 0.9483471732156529


import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
gbt: org.apache.spark.ml.classification.GBTClassifier = gbtc_57d739f404b4
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	gbtc_57d739f404b4-maxDepth: 2,
	gbtc_57d739f404b4-maxIter: 10
}, {
	gbtc_57d739f404b4-maxDepth: 5,
	gbtc_57d739f404b4-maxIter: 10
}, {
	gbtc_57d739f404b4-maxDepth: 2,
	gbtc_57d739f404b4-maxIter: 20
}, {
	gbtc_57d739f404b4-maxDepth: 5,
	gbtc_57d739f404b4-maxIter: 20
}, {
	gbtc_57d739f404b4-maxDepth: 2,
	gbtc_57d739f404b4-maxIter: 100
}, {
	gbtc_57d739f404b4-maxDepth: 5,
	gbtc_57d739f404b4-maxIter: 100
})
eval...

In [None]:
//From the AUC , we can say that LR and GBT performed better than RF in predicting the "rating" in the test set.
// values of different model in First run are:
//LR:0.948347173215654
//RF:0.8059054542735092
//GBT:0.9483471732156529


In [None]:
//the last part of adding "average_star" with tfidf is in the new notebook, Assignment5 Part2.