In [3]:
%%init_spark
launcher.master="yarn"
launcher.num_executors=6
launcher.executor_cores=2
launcher.executor_memory='5000m'
launcher.packages=["com.github.master:spark-stemming_2.10:0.2.0"]


Data Exploration: Extracting stars and reviews from review.json, and then find distribution of stars.                  


In [2]:
//Data Exploration
val explore=spark.read.json("/hadoop-user/data/review.json").select("Text","Stars").filter($"Text"!=="").
filter($"Stars">=1).toDF("reviews","stars").createOrReplaceTempView("temp")
//sql("select * from temp").show(20)

//finding distribution of stars attribute
sql("select stars,count(reviews) as no_of_reviews from temp group by stars order by stars ASC").show()

Intitializing Scala interpreter ...

Spark Web UI available at http://C570BD-HM-Master:8088/proxy/application_1543561993902_0002
SparkContext available as 'sc' (version = 2.3.2, master = yarn, app id = application_1543561993902_0002)
SparkSession available as 'spark'


2018-11-30 02:54:26 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-11-30 02:54:28 WARN  Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
2018-11-30 02:54:36 WARN  Client:66 - Same path resource file:///home/administrator/.ivy2/jars/com.github.master_spark-stemming_2.10-0.2.0.jar added multiple times to distributed cache.
+-----+-------------+
|stars|no_of_reviews|
+-----+-------------+
|    1|       731363|
|    2|       438161|
|    3|       615481|
|    4|      1223316|
|    5|      2253348|
+-----+-------------+



explore: Unit = ()


# Feature Engineering

Feature Engineering: Assigning 1 or 0 according to the stars given by user

In [3]:
//Feature Engineering 
val FE=sql("select stars,reviews,IF(stars>3,1,0) as ratings from temp").toDF()
FE.createOrReplaceTempView("Data")
//FE.show(20)
//finding distribution of rating
//sql("select * from Data")
sql("select ratings,Count(ratings) as total from Data Group by ratings").show()

+-------+-------+
|ratings|  total|
+-------+-------+
|      1|3476664|
|      0|1785005|
+-------+-------+



FE: org.apache.spark.sql.DataFrame = [stars: bigint, reviews: string ... 1 more field]


As (1785005/3476664)=0.5134 so, we are now gonna down sample the data set using stratified sampling by keeping the lowest value as it is.
As data set is so big so to only get its 10%, we are dividing the map fractions by 10. hence we are gonna Map 1->0.05134 times and 0 -> 0.1 times.

PS: Don't use (1785005/3476664) while maping for fractions, instead use proper value 0.05134. It took me 3 days to figure out this error.

In [4]:
// DownSampling Data using Stratified Sampling
val factor=Map(1 -> 0.05134, 0-> 0.1)
val DownSampleData=FE.stat.sampleBy("ratings",factor,111)
DownSampleData.groupBy("ratings").count().show()

+-------+------+
|ratings| count|
+-------+------+
|      1|178574|
|      0|178418|
+-------+------+



factor: scala.collection.immutable.Map[Int,Double] = Map(1 -> 0.05134, 0 -> 0.1)
DownSampleData: org.apache.spark.sql.DataFrame = [stars: bigint, reviews: string ... 1 more field]


# Extracting TFIDF Vector

Now we see that the it is balanced!  ## Tokenization is the next step.
We are gonna tokenize the reviews and then remove the punctuations from it and then after performing stemming, we will

In [5]:
import org.apache.spark.ml.feature._

val tokenized = new RegexTokenizer().setMinTokenLength(2).setToLowercase(true).
setInputCol("reviews").setOutputCol("words")

//Defining a udf to remove punctuations from a sequence of words
import org.apache.spark.sql.functions.udf

def removePunc(words:Seq[String]):Seq[String]={
 return words.map(_.replaceAll("\\p{Punct}",""))
}

spark.udf.register("removePuncUDF",removePunc(_:Seq[String]) )

//use the removePuncUDF to remove all punctuations from boilerplate_wordss
val puncRemover = new SQLTransformer().
setStatement("SELECT removePuncUDF(words) as punc_free_words, ratings from __THIS__ ")

val stopWordRemover=new StopWordsRemover().setInputCol("punc_free_words").setOutputCol("cleaned_words")

import org.apache.spark.mllib.feature.Stemmer
val stemmer = new Stemmer().setInputCol("cleaned_words").setOutputCol("stemmed_words")

val vectorizer = new CountVectorizer().setInputCol("stemmed_words").setOutputCol("BOW").setMinDF(100)

val tfidf = new IDF().setInputCol("BOW").setOutputCol("TFIDF_vector")


import org.apache.spark.ml.feature._
tokenized: org.apache.spark.ml.feature.RegexTokenizer = regexTok_abc701eff4e7
import org.apache.spark.sql.functions.udf
removePunc: (words: Seq[String])Seq[String]
puncRemover: org.apache.spark.ml.feature.SQLTransformer = sql_b7f93e541c48
stopWordRemover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_153ba24d1948
import org.apache.spark.mllib.feature.Stemmer
stemmer: org.apache.spark.mllib.feature.Stemmer = stemmer_b6dee9900620
vectorizer: org.apache.spark.ml.feature.CountVectorizer = cntVec_44df2b3bdfe2
tfidf: org.apache.spark.ml.feature.IDF = idf_2ff562cda607


# Building Logistic regression Model for prediction

Now for predicting the stars, we are going to make Logistic Regression model exactly as we learnt in lab.

In [32]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml._
import org.apache.spark.ml.regression._

//Creating the logistic Regression model and fit it to the transformed training data
val LR= new LogisticRegression().setLabelCol("ratings").setFeaturesCol("TFIDF_vector")

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml._
import org.apache.spark.ml.regression._
LR: org.apache.spark.ml.classification.LogisticRegression = logreg_52f0174b7a32


# Creating PipeLine

we are gonna make pipeline for machine learning algos, we will put all the steps in pipelines that have to be followed orderly. every step of pipeline would be either transformer or estimator.

In [33]:
//Creating a Pipeline and add the transformation we did so far to this pipeline
val pipeline = new Pipeline().setStages(Array(tokenized,puncRemover,stopWordRemover,stemmer,vectorizer,tfidf,LR))

pipeline: org.apache.spark.ml.Pipeline = pipeline_740b9d512678


Now As we have made pipeline, so we will now train and test our model and for that, 
we will split the downsampled data into training and testing sets. Here we are taking 80% data for training, 
and 20% for testing purpose as described in labs, however we can change the ratio as per requirements.

In [34]:
//Split the data randomly to 80% tranining and 20% testing. The training data is used to build the model and the testing data is used for testing the model
val Array(training,testing)=DownSampleData.randomSplit(Array(0.8,0.2),111)

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [stars: bigint, reviews: string ... 1 more field]
testing: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [stars: bigint, reviews: string ... 1 more field]


Now we are training the model for prediction

In [35]:
//Fitting the pipeline to the traning data and transforming the training data
val pipeline_model = pipeline.fit(training)
//training.show(100)

pipeline_model: org.apache.spark.ml.PipelineModel = pipeline_740b9d512678


now we are gonna test the model

In [42]:
// Make predictions.
val predictions = pipeline_model.transform(testing)

// Select example rows to display.
predictions.select("ratings", "prediction","Probability","stemmed_words").show(15)

+-------+----------+--------------------+--------------------+
|ratings|prediction|         Probability|       stemmed_words|
+-------+----------+--------------------+--------------------+
|      0|       0.0|[0.99999999712446...|[, reader, pleas,...|
|      0|       0.0|[0.99123550037204...|[appar, arriv, 10...|
|      0|       0.0|[0.97723998163800...|[star, walk, ladi...|
|      0|       0.0|[0.95353909262916...|[star, make, sele...|
|      0|       0.0|[0.96426454732683...|[1st, time, order...|
|      0|       0.0|[0.99847967495729...|[2nd, time, ive, ...|
|      0|       0.0|[0.99999854435074...|[6252017, 1030pm,...|
|      0|       0.0|[0.99619023934215...|[good, friend, mi...|
|      0|       0.0|[0.99999989216620...|[back, decid, wri...|
|      0|       0.0|[0.99997896231672...|[alamo, hand, car...|
|      0|       0.0|[0.96378236075468...|[avoid, appar, me...|
|      0|       0.0|[0.99997228741567...|[absolut, aw, exp...|
|      0|       0.0|[0.75213313181340...|[absolut, vile

predictions: org.apache.spark.sql.DataFrame = [punc_free_words: array<string>, ratings: int ... 7 more fields]


# Cross Validation and AUC evaluators and Tuning of Hyper Parameters for Logistic Regression

We are using cross validation using 3 folds and Area Under the Curve to tune and evaluate the hyper parameters here.

In [37]:
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._


val LR_new= new LogisticRegression().setLabelCol("ratings").setFeaturesCol("TFIDF_vector")

val paramGrid_LR =new ParamGridBuilder()
             .addGrid(LR_new.regParam, Array(0.01, 0.5, 2.0))
             .addGrid(LR_new.elasticNetParam, Array(0.0, 0.5, 1.0))
             .addGrid(tfidf.minDocFreq, Array(5,10))
             .build()

val evaluator_LR = new BinaryClassificationEvaluator().
setRawPredictionCol("rawPrediction").setLabelCol("ratings").setMetricName("areaUnderROC")

val cv_LR = new CrossValidator().setEstimator(LR_new).setEvaluator(evaluator_LR).setEstimatorParamMaps(paramGrid_LR).setNumFolds(3)

//Creating a Pipeline and add the transformation we did so far to this pipeline
val pipeline_LR = new Pipeline().setStages(Array(tokenized,puncRemover,stopWordRemover,stemmer,vectorizer,tfidf,cv_LR))

//Split the data randomly to 80% tranining and 20% testing. The training data is used to build the model and the testing data is used for testing the model
val Array(trainingLR,testingLR)=DownSampleData.randomSplit(Array(0.8,0.2),111)

//Fitting the pipeline to the traning data and transforming the training data
val pipeline_modelLR = pipeline_LR.fit(trainingLR)

// Make predictions.
val predictionsLR = pipeline_modelLR.transform(testingLR)

// Select example rows to display.
predictionsLR.select("ratings", "prediction","stemmed_words").show(15)

val AUC = evaluator_LR.evaluate(predictionsLR)
println(s"Area under ROC curve(AUC) for LR on test data = $AUC")

+-------+----------+--------------------+
|ratings|prediction|       stemmed_words|
+-------+----------+--------------------+
|      0|       0.0|[, reader, pleas,...|
|      0|       0.0|[appar, arriv, 10...|
|      0|       0.0|[star, walk, ladi...|
|      0|       0.0|[star, make, sele...|
|      0|       0.0|[1st, time, order...|
|      0|       0.0|[2nd, time, ive, ...|
|      0|       0.0|[6252017, 1030pm,...|
|      0|       0.0|[good, friend, mi...|
|      0|       0.0|[back, decid, wri...|
|      0|       0.0|[alamo, hand, car...|
|      0|       0.0|[avoid, appar, me...|
|      0|       0.0|[absolut, aw, exp...|
|      0|       0.0|[absolut, vile, c...|
|      0|       0.0|[absolut, terribl...|
|      0|       0.0|[experienc, excel...|
+-------+----------+--------------------+
only showing top 15 rows

Area under ROC curve(AUC) for LR on test data = 0.9438427887341851


import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
LR_new: org.apache.spark.ml.classification.LogisticRegression = logreg_72f55f220286
paramGrid_LR: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	logreg_72f55f220286-elasticNetParam: 0.0,
	idf_0f05023ff858-minDocFreq: 5,
	logreg_72f55f220286-regParam: 0.01
}, {
	logreg_72f55f220286-elasticNetParam: 0.0,
	idf_0f05023ff858-minDocFreq: 5,
	logreg_72f55f220286-regParam: 0.5
}, {
	logreg_72f55f220286-elasticNetParam: 0.0,
	idf_0f05023ff858-minDocFreq: 5,
	logreg_72f55f220286-regParam: 2.0
}, {
	logreg_72f55f220286-elasticNet...

"Area under ROC curve(AUC) for LR on test data = 0.9438427887341851" is the result of AUC for Logistic regression, that means AUC is 94.38%

# Building Random Forest Model for Prediction

Now for predicting the stars, we are going to make Linear Regression model exactly as we learnt in lab using 
RandomForestClassifier. Everything is almost same as we did for Linear regression.

In [20]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

//Building Random Forest Model
val Random_Forest = new RandomForestClassifier().setLabelCol("ratings").setFeaturesCol("TFIDF_vector")

import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
Random_Forest: org.apache.spark.ml.classification.RandomForestClassifier = rfc_e37cc74bfb0e


In [24]:
//Creating a Pipeline and add the transformation we did so far to this pipeline
val pipeline_RF = new Pipeline().setStages(Array(tokenized,puncRemover,stopWordRemover,stemmer,vectorizer,tfidf,Random_Forest))

pipeline_RF: org.apache.spark.ml.Pipeline = pipeline_624ae8d41db6


In [25]:
//Split the data randomly to 80% tranining and 20% testing. 
val Array(trainingRF,testingRF)=DownSampleData.randomSplit(Array(0.8,0.2),111)

trainingRF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [stars: bigint, reviews: string ... 1 more field]
testingRF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [stars: bigint, reviews: string ... 1 more field]


In [26]:
//Fitting the pipeline to the traning data and transforming the training data
val pipeline_model_RF = pipeline_RF.fit(trainingRF)

pipeline_model_RF: org.apache.spark.ml.PipelineModel = pipeline_624ae8d41db6


In [41]:
// Make predictions.
val predictionsRF = pipeline_model_RF.transform(testingRF)

// Select example rows to display.
predictionsRF.select("ratings", "prediction","Probability","stemmed_words").show(15)

+-------+----------+--------------------+--------------------+
|ratings|prediction|         Probability|       stemmed_words|
+-------+----------+--------------------+--------------------+
|      0|       0.0|[0.69062083508328...|[, reader, pleas,...|
|      0|       0.0|[0.53001695905751...|[appar, arriv, 10...|
|      0|       0.0|[0.51313455334488...|[star, walk, ladi...|
|      0|       0.0|[0.52943907439286...|[star, make, sele...|
|      0|       0.0|[0.51825493182364...|[1st, time, order...|
|      0|       0.0|[0.60036818904750...|[2nd, time, ive, ...|
|      0|       0.0|[0.63456268714765...|[6252017, 1030pm,...|
|      0|       0.0|[0.52757052689521...|[good, friend, mi...|
|      0|       0.0|[0.64726390682647...|[back, decid, wri...|
|      0|       0.0|[0.69224997290498...|[alamo, hand, car...|
|      0|       0.0|[0.50649827481452...|[avoid, appar, me...|
|      0|       0.0|[0.57741458539382...|[absolut, aw, exp...|
|      0|       1.0|[0.49996505819774...|[absolut, vile

predictionsRF: org.apache.spark.sql.DataFrame = [punc_free_words: array<string>, ratings: int ... 7 more fields]


# Cross Validation and AUC evaluators and Tuning of Hyper Parameters for Random Forest

In [31]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

val rf1 = new RandomForestClassifier().setLabelCol("ratings").setFeaturesCol("TFIDF_vector")
val paramGrid1 =new ParamGridBuilder()
             .addGrid(rf1.maxDepth, Array(2, 5))
             .addGrid(rf1.numTrees, Array(5, 20))
             .addGrid(tfidf.minDocFreq, Array(5,10))
             .build()

val evaluator1 = new BinaryClassificationEvaluator().
setRawPredictionCol("rawPrediction").setLabelCol("ratings").setMetricName("areaUnderROC")


val cv_rf = new CrossValidator().setEstimator(rf1).setEvaluator(evaluator1).
setEstimatorParamMaps(paramGrid1).setNumFolds(3)

val pipeline_rf1 = new Pipeline().setStages(Array(tokenized,puncRemover,stopWordRemover,stemmer,vectorizer,tfidf,cv_rf))

val Array(training11,testing11)=DownSampleData.randomSplit(Array(0.8,0.2),111)

//Fit the training data to the pipeline
val pipelineModel_rf1 = pipeline_rf1.fit(training11)

// Make predictions.
val predictions11 = pipelineModel_rf1.transform(testing11)
val AUC = evaluator1.evaluate(predictions11)
println(s"Area under ROC curve(AUC) for RF on test data = $AUC")

Area under ROC curve(AUC) for RF on test data = 0.8443359946215225


import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
rf1: org.apache.spark.ml.classification.RandomForestClassifier = rfc_1e3567b216fc
paramGrid1: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_1e3567b216fc-maxDepth: 2,
	idf_0f05023ff858-minDocFreq: 5,
	rfc_1e3567b216fc-numTrees: 5
}, {
	rfc_1e3567b216fc-maxDepth: 2,
	idf_0f05023ff858-minDocFreq: 5,
	rfc_1e3567b216fc-numTrees: 20
}, {
	rfc_1e3567b216fc-maxDepth: 5,
	idf_0f05023ff858-minDocFreq: 5,
	rfc_1e3567b216fc-numTrees: 5
}, {
	rfc_1e3567b216fc-maxDepth: 5,
	idf_0f05023ff858-minDocFreq: 5,
	rfc_1e3567b216fc-numTrees: 20
}, {
	rfc_1e3567b216fc-maxD...

"Area under ROC curve(AUC) for RF on test data = 0.8443359946215225" This the AUC that means 84.4%

In [12]:
//Testing Only Purpose

//GBT Regression

import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._




val indexer=new StringIndexer().setInputCol("reviews").setOutputCol("words_indexer")

//Now let's assemble everyting together in a feature vector
val vectorizer1=new VectorAssembler().
setInputCols(Array("words_indexer")).setOutputCol("features")

// Create a GBT model.
val gbt = new GBTRegressor()
  .setLabelCol("ratings")
  .setFeaturesCol("features")



//Create ParamGrid for Cross Validation
val paramGrid = new ParamGridBuilder()
             .addGrid(gbt.maxDepth, Array(2,5))
             .addGrid(gbt.maxIter, Array(10, 20,100))
             .build()

val evaluator = new BinaryClassificationEvaluator().
setRawPredictionCol("rawPrediction").setLabelCol("ratings").setMetricName("areaUnderROC")

val cv = new CrossValidator().setEstimator(gbt).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)


val pipeline = new Pipeline().setStages(Array(indexer, vectorizer1,cv))


val Array(training,testing)=DownSampleData.randomSplit(Array(0.8,0.2),111)

//Fit the training data to the pipeline
val pipelineModel = pipeline.fit(training)

// Make predictions.
val predictions = pipelineModel.transform(testing)

// Select example rows to display.
predictions.select("prediction", "ratings", "features").show(5)

val AUC = evaluator.evaluate(predictions)
println(s"Area Under the ROC Curve on test data = $AUC")



java.lang.OutOfMemoryError:  Java heap space

In [1]:
//GBT Classifier 

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._



val labelIndexer = new StringIndexer()
  .setInputCol("ratings")
  .setOutputCol("indexedLabel")
  .fit(DownSampleData)

val featureIndexer = new VectorIndexer()
  .setInputCol("reviews")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(DownSampleData)


// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = DownSampleData.randomSplit(Array(0.8, 0.2))

// Train a GBT model.
val gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

//Create ParamGrid for Cross Validation
val paramGrid = new ParamGridBuilder()
             .addGrid(gbt.maxDepth, Array(2,5))
             .addGrid(gbt.maxIter, Array(10, 20,100))
             .build()

val evaluator = new BinaryClassificationEvaluator().
setRawPredictionCol("rawPrediction").setLabelCol("ratings").setMetricName("areaUnderROC")

val cv = new CrossValidator().setEstimator(gbt).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

// Chain indexers and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, cv, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "ratings", "TFIDF_vector").show(5)



val AUC = evaluator.evaluate(predictions)
println(s"Area Under the ROC Curve on test data = $AUC")


Intitializing Scala interpreter ...

Spark Web UI available at http://C570BD-HM-Master:4040
SparkContext available as 'sc' (version = 2.3.2, master = local[*], app id = local-1543567979390)
SparkSession available as 'spark'


2018-11-30 02:52:58 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<console>: 41: error: not found: value DownSampleData

# Building Gradient Boosted Tree Regression Model for Rating Prediction

Now we are gonna predict the ratings using Gradient Boosted Tree (GBT) models using GBTRegressor. For that, we have to convert the String features to indicecs using StringIndexer and assemble all the features ( except the target variable) as a vector.  We are going to follow almost the same steps but a slight change only as explained in the labs. first we'll make the GBT model, then pipeline, then data splitting and then training and testing, and after that we will perform evaluation.

In [8]:
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

// Create a GBT model.
val gbt = new GBTRegressor().setLabelCol("ratings").setFeaturesCol("TFIDF_vector")



import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
gbt: org.apache.spark.ml.regression.GBTRegressor = gbtr_0299445ceac1


In [9]:
//making pipeline for GBT Regression
val pipeline_gbt = new Pipeline().setStages(Array(tokenized,puncRemover,stopWordRemover,stemmer,vectorizer,tfidf,gbt))


pipeline_gbt: org.apache.spark.ml.Pipeline = pipeline_701c6ce0731a


In [10]:
//Splitting data into training and testing sets
val Array(training_gbt,testing_gbt)=DownSampleData.randomSplit(Array(0.8,0.2),111)


training_gbt: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [stars: bigint, reviews: string ... 1 more field]
testing_gbt: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [stars: bigint, reviews: string ... 1 more field]


In [11]:
//Fit the training data to the pipeline
val pipelineModel_gbt = pipeline_gbt.fit(training_gbt)


pipelineModel_gbt: org.apache.spark.ml.PipelineModel = pipeline_701c6ce0731a


In [12]:
// Make predictions.
val predictions_gbt = pipelineModel_gbt.transform(testing_gbt)

// Select example rows to display.
predictions_gbt.select("ratings", "prediction","stemmed_words").show(15)


+-------+--------------------+--------------------+
|ratings|          prediction|       stemmed_words|
+-------+--------------------+--------------------+
|      0|-0.11536258114046004|[, reader, pleas,...|
|      0| 0.21778032076752749|[appar, arriv, 10...|
|      0| 0.25089040713640864|[star, walk, ladi...|
|      0| 0.11048811651336213|[star, make, sele...|
|      0| 0.18723617479923235|[1st, time, order...|
|      0| 0.07677282414354611|[2nd, time, ive, ...|
|      0|-0.06641511722031082|[6252017, 1030pm,...|
|      0|  0.4352886560959326|[good, friend, mi...|
|      0|-0.09895209726060106|[back, decid, wri...|
|      0| 0.04591080097629264|[alamo, hand, car...|
|      0|  0.3683313337001561|[avoid, appar, me...|
|      0| 0.10615398411210614|[absolut, aw, exp...|
|      0| 0.33517389492886956|[absolut, vile, c...|
|      0|-0.13450946429206584|[absolut, terribl...|
|      0|  0.6332569431092511|[experienc, excel...|
+-------+--------------------+--------------------+
only showing

predictions_gbt: org.apache.spark.sql.DataFrame = [punc_free_words: array<string>, ratings: int ... 5 more fields]


# Cross Validation and AUC evaluators and Tuning of Hyper Parameters for Gradient-Boosted Trees (GBT)

In [18]:
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

// Create a GBT model.
val gbt1 = new GBTRegressor().setLabelCol("ratings").setFeaturesCol("TFIDF_vector")


//Create ParamGrid for Cross Validation
val paramGrid_gbt = new ParamGridBuilder()
             .addGrid(gbt.maxDepth, Array(2,5))
             .addGrid(gbt.maxIter, Array(10, 20,100))
             .build()

//Evaluators for GBT
val evaluator_gbt = new BinaryClassificationEvaluator().
setLabelCol("ratings").setMetricName("areaUnderROC")

//cross validation with 3 folds
val cv_gbt = new CrossValidator().setEstimator(gbt1).
setEvaluator(evaluator_gbt).setEstimatorParamMaps(paramGrid_gbt).setNumFolds(3)

//making pipeline for GBT Regression
val pipeline_gbt1 = new Pipeline().setStages(Array(tokenized,puncRemover,stopWordRemover,stemmer,vectorizer,tfidf,cv_gbt))

//Splitting data into training and testing sets
val Array(training_gbt1,testing_gbt1)=DownSampleData.randomSplit(Array(0.8,0.2),111)

//Fit the training data to the pipeline
val pipelineModel_gbt1 = pipeline_gbt1.fit(training_gbt1)

// Make predictions.
val predictions_gbt1 = pipelineModel_gbt1.transform(testing_gbt1)

// Select example rows to display.
predictions_gbt1.select("ratings", "prediction","stemmed_words").show(15)

val AUC_gbt = evaluator_gbt.evaluate(predictions_gbt1)
println(s"Area under ROC curve(AUC) for RF on test data = $AUC_gbt")


So, the AUC value for GBT is .3419 that means 34.19%

# Conclusion

So, seeing the AUC of all the three model, we can clearly see that the logictic regression have highest value of AUC that is "94.38%". So we can conclude that Logistic Regression idid the better job for predicting the ratings based on the reviews given by the user.

# Ensembling the prediction of all models

Now, We are going to take the predictions generated by the three models above (that is,logistic regression and random forest, and gradient boosted classification tree), zip them together and compute a “prediction_ensembled” column which is basically a majority vote of the three prediction columns generated by each model. That is, if two or more of the models generated the same prediction,then use that prediction in the prediction_ensembled column; otherwise, if none of the predictions are the same, then use the rating value1 for the prediction_ensemble column.

In [16]:
predictionsLR.toDF().createOrReplaceTempView("LR_Output")
predictionsRF.toDF().createOrReplaceTempView("RF_Output")
predictions_gbt.toDF().createOrReplaceTempView("GBT_Output")

Gathering the prediction from all 3 models and then finding the ensembeled prediction as described in the assignment. Here, The logic I am following is as the prediction will always be either 1 or 0. So, if 2 or more models will predict 1 then the sum of all the 3 predictions will always be greater then 1 and else it will remain 0.

PS: Respected ma'am, I don't know wheather my approach to ensemble the predictions is right or not but after trying so many other things, I preffered to follow the simplest thing I understood from the instructions given in Assignmnet 5. However, I was doing something like this https://spark.apache.org/docs/1.6.2/mllib-ensembles.html ,don't know I was right or not.

In [None]:
val ensemble=spark.sql("select a.ratings As Original_Ratings,CASE WHEN (a.prediction + b.prediction + c.prediction)>1 THEN 1 ELSE 0 END AS prediction_ensemble"+
"from LR_Output a,  RF_Output b, GBT_Output c "+
"where a.TFIDF_vector=b.TFIDF_vector AND b.TFIDF_vector=c.TFIDF_vector").toDF()

Converting ensemble DF into RDD and then finding the accuracy of ensembled prediction, using Area under the ROC Curve

In [None]:
//Importing the required libraries
import spark.implicits._
import org.apache.spark.mllib.evaluation._

//Converting the ensemble dataframe to an rdd of the form (prediction_ensemble, ratings)
val predictionsAndLabels=ensemble.selectExpr("cast(prediction_ensemble as
Double) prediction_ensemble", "cast(ratings as Double) ratings").rdd.
map(row =>(row.getAs[Double]("prediction_ensemble"),row.getAs[Double]("ratings")))

//Compute the accuracy of the ensemble predictions
val metrics= new BinaryClassificationMetrics( predictionsAndLabels)
println(metrics.areaUnderROC)


*********** THE END ****************