First configure our spark shell on yarn:

In [1]:
%%init_spark
launcher.master="yarn"
launcher.num_executors=6
launcher.executor_cores=2
launcher.executor_memory='6000m'
launcher.packages=["com.github.master:spark-stemming_2.10:0.2.0"]

Loading and Exploring Data. That have copied the data(amazon_alexa.tsv) to hdfs, loading the data in spark, see the schema, and print a few rows

In [2]:
val reviewsDF=spark.read.option("header","true").option("delimiter","\t").option("inferschema", "true").option("escape","\"").csv("/hadoop-user/data/amazon_alexa.tsv")
reviewsDF.cache()
reviewsDF.show(3)
reviewsDF.printSchema()
reviewsDF.count

Intitializing Scala interpreter ...

Spark Web UI available at http://bd-hm:8088/proxy/application_1575160893525_0011
SparkContext available as 'sc' (version = 2.4.4, master = yarn, app id = application_1575160893525_0011)
SparkSession available as 'spark'


+------+---------+----------------+--------------------+--------+
|rating|     date|       variation|    verified_reviews|feedback|
+------+---------+----------------+--------------------+--------+
|     5|31-Jul-18|Charcoal Fabric |       Love my Echo!|       1|
|     5|31-Jul-18|Charcoal Fabric |           Loved it!|       1|
|     4|31-Jul-18|  Walnut Finish |Sometimes while p...|       1|
+------+---------+----------------+--------------------+--------+
only showing top 3 rows

root
 |-- rating: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- variation: string (nullable = true)
 |-- verified_reviews: string (nullable = true)
 |-- feedback: integer (nullable = true)



reviewsDF: org.apache.spark.sql.DataFrame = [rating: int, date: string ... 3 more fields]
res0: Long = 3150


Let's get a quick view of this categorical variables:

In [3]:
reviewsDF.createOrReplaceTempView("reviews")
reviewsDF.show(3)

+------+---------+----------------+--------------------+--------+
|rating|     date|       variation|    verified_reviews|feedback|
+------+---------+----------------+--------------------+--------+
|     5|31-Jul-18|Charcoal Fabric |       Love my Echo!|       1|
|     5|31-Jul-18|Charcoal Fabric |           Loved it!|       1|
|     4|31-Jul-18|  Walnut Finish |Sometimes while p...|       1|
+------+---------+----------------+--------------------+--------+
only showing top 3 rows



Count of feedback, variation and verified_reviews from their dataframe, reviewDF and viewing table output

In [7]:
spark.sql("select variation, count(variation) from reviews group by variation").show(50)
//spark.sql("select verified_reviews, count(verified_reviews) from reviews group by verified_reviews").show(3)
spark.sql("select feedback, count(feedback) from reviews group by feedback").show()

+--------------------+----------------+
|           variation|count(variation)|
+--------------------+----------------+
|Heather Gray Fabric |             157|
|          Black  Dot|             516|
|         Oak Finish |              14|
|Configuration: Fi...|             350|
|   Sandstone Fabric |              90|
|         White  Show|              85|
|         White  Plus|              78|
|         White  Spot|             109|
|         Black  Spot|             241|
|         Black  Show|             265|
|      Walnut Finish |               9|
|               White|              91|
|    Charcoal Fabric |             430|
|          White  Dot|             184|
|         Black  Plus|             270|
|               Black|             261|
+--------------------+----------------+

+--------+---------------+
|feedback|count(feedback)|
+--------+---------------+
|       1|           2893|
|       0|            257|
+--------+---------------+



Indexing the unordered categorical variables, variation.

In [8]:
import org.apache.spark.ml.feature._
val df=Seq(("Charcoal Fabric"),("Walnut Finish"),("Heather Gray Fabric"),("Oak Finish"),("Sandstone Fabric"),
          ("Configuration: Fire TV Stick"),("White  Show"),("White  Plus"),("White  Spot"),("Black  Spot"),
          ("Black  Show"),("White"),("White  Dot"),("Black  Plus"),("Black Dot"),("Black")).toDF("variation")
df.show()
val indexer=new StringIndexer().setOutputCol("index").setInputCol("variation")
val indexed=indexer.fit(df).transform(df)

+--------------------+
|           variation|
+--------------------+
|     Charcoal Fabric|
|       Walnut Finish|
| Heather Gray Fabric|
|          Oak Finish|
|    Sandstone Fabric|
|Configuration: Fi...|
|         White  Show|
|         White  Plus|
|         White  Spot|
|         Black  Spot|
|         Black  Show|
|               White|
|          White  Dot|
|         Black  Plus|
|           Black Dot|
|               Black|
+--------------------+



import org.apache.spark.ml.feature._
df: org.apache.spark.sql.DataFrame = [variation: string]
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_2fba33d92ceb
indexed: org.apache.spark.sql.DataFrame = [variation: string, index: double]


OneHotEncoding the index variable, variation.

In [9]:
val encoder= new OneHotEncoderEstimator().setInputCols(Array("index")).setOutputCols(Array("codedvec"))
val encoded=encoder.fit(indexed).transform(indexed)
encoded.show()
encoded.printSchema()

+--------------------+-----+---------------+
|           variation|index|       codedvec|
+--------------------+-----+---------------+
|     Charcoal Fabric|  2.0| (15,[2],[1.0])|
|       Walnut Finish| 12.0|(15,[12],[1.0])|
| Heather Gray Fabric|  9.0| (15,[9],[1.0])|
|          Oak Finish|  0.0| (15,[0],[1.0])|
|    Sandstone Fabric| 13.0|(15,[13],[1.0])|
|Configuration: Fi...|  3.0| (15,[3],[1.0])|
|         White  Show| 11.0|(15,[11],[1.0])|
|         White  Plus| 14.0|(15,[14],[1.0])|
|         White  Spot|  5.0| (15,[5],[1.0])|
|         Black  Spot|  7.0| (15,[7],[1.0])|
|         Black  Show|  1.0| (15,[1],[1.0])|
|               White|  4.0| (15,[4],[1.0])|
|          White  Dot| 10.0|(15,[10],[1.0])|
|         Black  Plus|  6.0| (15,[6],[1.0])|
|           Black Dot| 15.0|     (15,[],[])|
|               Black|  8.0| (15,[8],[1.0])|
+--------------------+-----+---------------+

root
 |-- variation: string (nullable = true)
 |-- index: double (nullable = false)
 |-- codedvec: 

encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_36a441c88949
encoded: org.apache.spark.sql.DataFrame = [variation: string, index: double ... 1 more field]


Using the weighted logistic regression to generate weighteddatase since we have an imbalanced data where the minority id far less than the majority class.

In [10]:
    // Re-balancing (weighting) of records to be used in the logistic loss objective function
    val numNegatives = reviewsDF.filter(reviewsDF("feedback") === 0).count
    val datasetSize = reviewsDF.count
    val balancingRatio = (datasetSize - numNegatives).toDouble / datasetSize

    val calculateWeights = udf { feedback: Double =>
      if (feedback == 0.0) {
        1 * balancingRatio
      }
      else {
        (1 * (1.0 - balancingRatio))
      }
    }

    val weightedDataset = reviewsDF.withColumn("classWeightCol", calculateWeights(reviewsDF("feedback")))
    weightedDataset.show(5)

+------+---------+----------------+--------------------+--------+-------------------+
|rating|     date|       variation|    verified_reviews|feedback|     classWeightCol|
+------+---------+----------------+--------------------+--------+-------------------+
|     5|31-Jul-18|Charcoal Fabric |       Love my Echo!|       1|0.08158730158730154|
|     5|31-Jul-18|Charcoal Fabric |           Loved it!|       1|0.08158730158730154|
|     4|31-Jul-18|  Walnut Finish |Sometimes while p...|       1|0.08158730158730154|
|     5|31-Jul-18|Charcoal Fabric |I have had a lot ...|       1|0.08158730158730154|
|     5|31-Jul-18|Charcoal Fabric |               Music|       1|0.08158730158730154|
+------+---------+----------------+--------------------+--------+-------------------+
only showing top 5 rows



numNegatives: Long = 257
datasetSize: Long = 3150
balancingRatio: Double = 0.9184126984126985
calculateWeights: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(DoubleType)))
weightedDataset: org.apache.spark.sql.DataFrame = [rating: int, date: string ... 4 more fields]


Feature engineering processes: Tokenizing, Removing Stop Words and Stemming, Vectorizing words using CountVectorizer, Vector Assembler, indexer, encoder, Generating TFIDF vectors, vectorizer_all

In [11]:
import org.apache.spark.ml.feature._

val tokenizer = new RegexTokenizer().setMinTokenLength(2).setToLowercase(true).setInputCol("verified_reviews").setOutputCol("words")

//Defining a udf to remove punctuations from a sequence of words
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame

//df1 = df.withColumnRenamed('sum("session")', "session")
//val reviewerDF=reviewsDF.withColumn("classWeightCol", reviewsDF("classWeightCol"))

def removePunc(words:Seq[String]):Seq[String]={
 return words.map(_.replaceAll("\\p{Punct}"," "))
}

//val removePuncUDF=udf(removePunc(_:Seq[String]))
spark.udf.register("removePuncUDF",removePunc(_:Seq[String]) )

val puncRemover = new SQLTransformer().setStatement("SELECT removePuncUDF(words) as words,variation,classWeightCol,feedback from __THIS__ ")

val stopWordRemover=new StopWordsRemover().setInputCol("words").setOutputCol("filtered_words")

import org.apache.spark.mllib.feature.Stemmer
val stemmer = new Stemmer().setInputCol("filtered_words").setOutputCol("stemmed_words")

val vectorizer = new CountVectorizer().setMinDF(100).setInputCol("stemmed_words").setOutputCol("stemmed_BOW")

val tfidf = new IDF().setInputCol("stemmed_BOW").setOutputCol("reviews_TFIDF") 

val indexer=new StringIndexer().setOutputCol("index").setInputCol("variation")

val encoder= new OneHotEncoderEstimator().setInputCols(Array("index")).setOutputCols(Array("codedvec"))

val vectorizer_all=new VectorAssembler().setInputCols(Array("codedvec","reviews_TFIDF")).setOutputCol("features")

import org.apache.spark.ml.feature._
tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_1a99d07c7083
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame
removePunc: (words: Seq[String])Seq[String]
puncRemover: org.apache.spark.ml.feature.SQLTransformer = sql_e9d17e889d9f
stopWordRemover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_f99d4c4e278c
import org.apache.spark.mllib.feature.Stemmer
stemmer: org.apache.spark.mllib.feature.Stemmer = stemmer_e5e76c7b9f08
vectorizer: org.apache.spark.ml.feature.CountVectorizer = cntVec_6c1b3833c3d4
tfidf: org.apache.spark.ml.feature.IDF = idf_2ef2f0451390
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_49ccaab31863
encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEn...

Building, Tunning, and Evaluating a weighted Logistic Regression model

In [12]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

val lr = new LogisticRegression().setWeightCol("classWeightCol").setLabelCol("feedback").setFeaturesCol("features")
val paramGrid =new ParamGridBuilder()
             .addGrid(lr.regParam, Array(0.01, 0.5, 2.0))
             .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
             .addGrid(tfidf.minDocFreq, Array(5,10))
             .build()
val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("feedback").setMetricName("areaUnderROC")
val cv = new CrossValidator().setEstimator(lr).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(5)


val pipeline = new Pipeline().setStages(Array(tokenizer,puncRemover,stopWordRemover, stemmer,vectorizer,tfidf,indexer, encoder,vectorizer_all, cv))

val Array(training,testing)=weightedDataset.randomSplit(Array(0.8,0.2),111)

//Fit the training data to the pipeline
val pipelineModel = pipeline.fit(training)

// Make predictions.
val predictions = pipelineModel.transform(testing)

// Select example rows to display.
predictions.select("feedback", "prediction", "probability", "stemmed_words").show(5)

val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for LR on test data = $AUC")

+--------+----------+--------------------+--------------------+
|feedback|prediction|         probability|       stemmed_words|
+--------+----------+--------------------+--------------------+
|       0|       0.0|[0.84551088820414...|[item, never, wor...|
|       0|       0.0|[0.79036624675199...|[cant, seem, get,...|
|       0|       1.0|[0.29785282846358...|[love, product, d...|
|       0|       0.0|[0.70373145846611...|                  []|
|       0|       0.0|[0.96318669237066...|[can t, turn,   3...|
+--------+----------+--------------------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for LR on test data = 0.8147730526918672


import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_1dd746a0b885
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	logreg_1dd746a0b885-elasticNetParam: 0.0,
	idf_2ef2f0451390-minDocFreq: 5,
	logreg_1dd746a0b885-regParam: 0.01
}, {
	logreg_1dd746a0b885-elasticNetParam: 0.0,
	idf_2ef2f0451390-minDocFreq: 10,
	logreg_1dd746a0b885-regParam: 0.01
}, {
	logreg_1dd746a0b885-elasticNetParam: 0.5,
	idf_2ef2f0451390-minDocFreq: 5,
	logreg_1dd746a0b885-regParam: 0.01
}, {
	logreg_1dd746a0b885-elasticNetParam: 0.5,
	idf_2ef2f0451390-minDocFreq: 10...

Second Feature engineering processes while removing the weighted classColumn: Tokenizing, Removing Stop Words and Stemming, Vectorizing words using CountVectorizer, Vector Assembler, indexer, encoder, Generating TFIDF vectors, vectorizer_all

In [10]:
import org.apache.spark.ml.feature._

val tokenizer = new RegexTokenizer().setMinTokenLength(2).setToLowercase(true).setInputCol("verified_reviews").setOutputCol("words")

//Defining a udf to remove punctuations from a sequence of words
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame

def removePunc(words:Seq[String]):Seq[String]={
 return words.map(_.replaceAll("\\p{Punct}"," "))
}

//val removePuncUDF=udf(removePunc(_:Seq[String]))
spark.udf.register("removePuncUDF",removePunc(_:Seq[String]) )

val puncRemover = new SQLTransformer().setStatement("SELECT removePuncUDF(words) as words,variation,feedback from __THIS__ ")

val stopWordRemover=new StopWordsRemover().setInputCol("words").setOutputCol("filtered_words")

import org.apache.spark.mllib.feature.Stemmer
val stemmer = new Stemmer().setInputCol("filtered_words").setOutputCol("stemmed_words")

val vectorizer = new CountVectorizer().setMinDF(100).setInputCol("stemmed_words").setOutputCol("stemmed_BOW")

val tfidf = new IDF().setInputCol("stemmed_BOW").setOutputCol("reviews_TFIDF") 

val indexer=new StringIndexer().setOutputCol("index").setInputCol("variation")

val encoder= new OneHotEncoderEstimator().setInputCols(Array("index")).setOutputCols(Array("codedvec"))

val vectorizer_all=new VectorAssembler().setInputCols(Array("codedvec","reviews_TFIDF")).setOutputCol("features")

import org.apache.spark.ml.feature._
tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_10a4c5665649
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame
removePunc: (words: Seq[String])Seq[String]
puncRemover: org.apache.spark.ml.feature.SQLTransformer = sql_afac762c6feb
stopWordRemover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_fd85ce1019cc
import org.apache.spark.mllib.feature.Stemmer
stemmer: org.apache.spark.mllib.feature.Stemmer = stemmer_a45077bb5be6
vectorizer: org.apache.spark.ml.feature.CountVectorizer = cntVec_eeac5fb05921
tfidf: org.apache.spark.ml.feature.IDF = idf_1422f5b60202
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_35b115fe9888
encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEn...

SPlitting the DataFrame ReviewsDF into the Training and Testing Data and seeding at 111.

In [11]:
val Array(training,testing)=reviewsDF.randomSplit(Array(0.8,0.2),111)
training.count()

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [rating: int, date: string ... 3 more fields]
testing: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [rating: int, date: string ... 3 more fields]
res7: Long = 2520


UpSampling the splitted training Data due imbalanced Dataset

In [12]:
val newMinorclass=training.filter(training("feedback") === 0)
val deMinorsDF=newMinorclass.count()
val noOfSamples = 20
var minsampleDF = newMinorclass.sample(true, 1D*deMinorsDF / noOfSamples)
minsampleDF.show(3)
println(minsampleDF.count())

+------+--------+---------+--------------------+--------+
|rating|    date|variation|    verified_reviews|feedback|
+------+--------+---------+--------------------+--------+
|     1|1-Jul-18|    White|This item did not...|       0|
|     1|1-Jul-18|    White|This item did not...|       0|
|     1|1-Jul-18|    White|This item did not...|       0|
+------+--------+---------+--------------------+--------+
only showing top 3 rows

2116


newMinorclass: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [rating: int, date: string ... 3 more fields]
deMinorsDF: Long = 209
noOfSamples: Int = 20
minsampleDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [rating: int, date: string ... 3 more fields]


In [13]:
val trainingsDF = training.union(minsampleDF)

trainingsDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [rating: int, date: string ... 3 more fields]


Viewing DataFrame while selecting rows

In [14]:
trainingsDF.createOrReplaceTempView("trainings")
spark.sql("select feedback,count(feedback) from trainings group by feedback ").show()

+--------+---------------+
|feedback|count(feedback)|
+--------+---------------+
|       1|           2311|
|       0|           2325|
+--------+---------------+



Building, Tunning, and Evaluating a RandomForest model

In [15]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

val rf = new RandomForestClassifier().setLabelCol("feedback").setFeaturesCol("features")
val paramGrid =new ParamGridBuilder()
             .addGrid(rf.maxDepth, Array(2, 5))
             .addGrid(rf.numTrees, Array(5, 20))
             .build()

val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("feedback").setMetricName("areaUnderROC")

val cv_rf = new CrossValidator().setEstimator(rf).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

val pipeline_rf = new Pipeline().setStages(Array(tokenizer,puncRemover,stopWordRemover, stemmer,vectorizer,tfidf,indexer, encoder,vectorizer_all, cv_rf))

//Fit the training data to the pipeline
val pipelineModel_rf = pipeline_rf.fit(trainingsDF)

// Make predictions.
val predictions = pipelineModel_rf.transform(testing)

predictions.select("feedback", "prediction", "probability", "stemmed_words").show(5) 

val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for RF on test data = $AUC")

+--------+----------+--------------------+--------------------+
|feedback|prediction|         probability|       stemmed_words|
+--------+----------+--------------------+--------------------+
|       0|       0.0|[0.59859082272866...|[item, never, wor...|
|       0|       0.0|[0.73957681464467...|[cant, seem, get,...|
|       0|       1.0|[0.39954812810485...|[love, product, d...|
|       0|       0.0|[0.53617399265660...|                  []|
|       0|       0.0|[0.60464527348659...|[can t, turn,   3...|
+--------+----------+--------------------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for RF on test data = 0.810889175257732


import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_a2b963fe9922
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_a2b963fe9922-maxDepth: 2,
	rfc_a2b963fe9922-numTrees: 5
}, {
	rfc_a2b963fe9922-maxDepth: 2,
	rfc_a2b963fe9922-numTrees: 20
}, {
	rfc_a2b963fe9922-maxDepth: 5,
	rfc_a2b963fe9922-numTrees: 5
}, {
	rfc_a2b963fe9922-maxDepth: 5,
	rfc_a2b963fe9922-numTrees: 20
})
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_656f673ba702
cv_rf: org.apache.spark.ml.tuning.CrossValidator = cv_edf1a820d...

Building, Tunning, and Evaluating a GBT model

In [16]:
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
// Create a GBT model.
val gbt = new GBTClassifier().setLabelCol("feedback").setFeaturesCol("features")
//Create ParamGrid for Cross Validation
val paramGrid = new ParamGridBuilder()
             .addGrid(gbt.maxDepth, Array(2,5))
             .addGrid(gbt.maxIter, Array(10, 20,100))
             .build()

val evaluator = new BinaryClassificationEvaluator()
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("feedback")
  .setMetricName("areaUnderROC")

val cv = new CrossValidator().setEstimator(gbt).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

val pipeline = new Pipeline().setStages(Array(tokenizer,puncRemover,stopWordRemover, stemmer,vectorizer,tfidf,indexer, encoder,vectorizer_all, cv))

//Fit the training data to the pipeline
val pipelineModel = pipeline.fit(trainingsDF)

// Make predictions.
val predictions = pipelineModel.transform(testing)

// Select example rows to display.
predictions.select("feedback", "prediction", "probability", "stemmed_words").show(5)

val AUC = evaluator.evaluate(predictions)
println(s"Area under ROC curve(AUC) for GBT on test data = $AUC")

+--------+----------+--------------------+--------------------+
|feedback|prediction|         probability|       stemmed_words|
+--------+----------+--------------------+--------------------+
|       0|       0.0|[0.59454606810298...|[item, never, wor...|
|       0|       0.0|[0.95670287869975...|[cant, seem, get,...|
|       0|       1.0|[0.03149227055179...|[love, product, d...|
|       0|       0.0|[0.67108377333154...|                  []|
|       0|       1.0|[0.34449974663627...|[can t, turn,   3...|
+--------+----------+--------------------+--------------------+
only showing top 5 rows

Area under ROC curve(AUC) for GBT on test data = 0.8274806701030928


import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
gbt: org.apache.spark.ml.classification.GBTClassifier = gbtc_3be9670c32ba
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	gbtc_3be9670c32ba-maxDepth: 2,
	gbtc_3be9670c32ba-maxIter: 10
}, {
	gbtc_3be9670c32ba-maxDepth: 2,
	gbtc_3be9670c32ba-maxIter: 20
}, {
	gbtc_3be9670c32ba-maxDepth: 2,
	gbtc_3be9670c32ba-maxIter: 100
}, {
	gbtc_3be9670c32ba-maxDepth: 5,
	gbtc_3be9670c32ba-maxIter: 10
}, {
	gbtc_3be9670c32ba-maxDepth: 5,
	gbtc_3be9670c32ba-maxIter: 20
}, {
	gbtc_3be9670c32ba-maxDepth: 5,
	gbtc_3be9670c32ba-maxIter: 100
})
eval...