<font size=5>

Classification with Scala ML and MLLIB

</font>

<font size=5> This dataset does not have column name, but we will give the proper columns.  

    George Jen  -- Jen Tek LLC
    
</font>

In [178]:
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.util.LongAccumulator
import org.apache.log4j._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._


In [179]:
val spark = SparkSession.builder.getOrCreate()


spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@13145723


In [180]:
val sc = spark.sparkContext


sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@243cced9


In [181]:

val df=spark.read.option("sep", "\t").csv("datasets/SMSSpamCollection")

df: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]


<font size=5> Show 3 lines to get an idea about the dataset,  _c0 looks like as a label, c1 looks feature </font> 

In [182]:
df.show(3, false)

+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|_c0 |_c1                                                                                                                                                        |
+----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham |Ok lar... Joking wif u oni...                                                                                                                              |
|spam|Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
+----+----------------

<font size=5> Note: Spam is Spam, Han is OK.  Rename Column name _c0 as status, _c1 as feature  </font>

In [183]:

val df_renamed = df.withColumnRenamed("_c0", "status").withColumnRenamed("_c1", "message")

df_renamed: org.apache.spark.sql.DataFrame = [status: string, message: string]


In [184]:
df_renamed.show(3, false)

+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|status|message                                                                                                                                                    |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|ham   |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|ham   |Ok lar... Joking wif u oni...                                                                                                                              |
|spam  |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
+------+--

<font size=5> 

Encode status column to numeric: ham to 1.0 and spam to 0. All our fields need to be numeric for machine to learn, also rename the column status to label
    
</font>

In [186]:
df_renamed.createOrReplaceTempView("temp")
val df_select = spark.sql("select case status when \"ham\" then 1.0  else 0 end as label, message from temp")
df_select.show(3, false)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|1.0  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|1.0  |Ok lar... Joking wif u oni...                                                                                                                              |
|0.0  |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
+-----+---------

df_select: org.apache.spark.sql.DataFrame = [label: decimal(11,1), message: string]


<font size=5> 1 is OK, 0 is Junk </font>

<font size=5>
Tokenize the messages
Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). Let’s tokenize the messages and create a list of words of each message.
</font>

In [187]:

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
val tokenizer = new Tokenizer().setInputCol("message").setOutputCol("words")
val wordsData = tokenizer.transform(df_select)
wordsData.show(3, false)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |words                                                                                                                                                                                   |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_f9575cbf17b4
wordsData: org.apache.spark.sql.DataFrame = [label: decimal(11,1), message: string ... 1 more field]


In [11]:
import org.apache.spark.ml.feature.{CountVectorizer}
val count = new CountVectorizer().setInputCol("words").setOutputCol("rawFeatures")
val model = count.fit(wordsData)
val featurizedData = model.transform(wordsData)
featurizedData.show(3,false)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |words                                                                                                                                                                                   |rawFeatures                                                                                         

import org.apache.spark.ml.feature.CountVectorizer
count: org.apache.spark.ml.feature.CountVectorizer = cntVec_fe334b054098
model: org.apache.spark.ml.feature.CountVectorizerModel = cntVec_fe334b054098
featurizedData: org.apache.spark.sql.DataFrame = [label: decimal(11,1), message: string ... 2 more fields]


<font size=5> CountVectorizer converts a collection of text documents to vectors of token counts. 
    
See:
https://spark.apache.org/docs/latest/ml-features#countvectorizer


</font>


In [188]:
import org.apache.spark.ml.feature.{IDF}
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
val rescaledData_select=rescaledData.select("label", "features")  //Only needed to train


import org.apache.spark.ml.feature.IDF
idf: org.apache.spark.ml.feature.IDF = idf_9c988901769a
idfModel: org.apache.spark.ml.feature.IDFModel = idf_9c988901769a
rescaledData: org.apache.spark.sql.DataFrame = [label: decimal(11,1), message: string ... 3 more fields]
rescaledData_select: org.apache.spark.sql.DataFrame = [label: decimal(11,1), features: vector]


<font size=5>
Apply Term frequency - inverse document frequency (TF-IDF)

#IDF reduces the features that often appear in the corpus. When using text as a feature, this usually improves performance because the most common, and therefore less important, words are weighted down.

</font>

<font size=5>
Randomly Split DataFrame into 80% Training (trainDF) and 20 Testing (testDF)
    
</font>


In [190]:
val seed = 0  // random seed 0
//val trainDF, testDF = rescaledData.randomSplit(Array(0.8,0.2),seed)
val Array(trainDF, testDF) = rescaledData_select.randomSplit(Array(0.8,0.2),seed)

seed: Int = 0
trainDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: decimal(11,1), features: vector]
testDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: decimal(11,1), features: vector]


<font size=5>
Try different classifiers. 

Logistic regression classifier

Logistic regression is a common method of predicting classification responses. A special case of a generalized linear model is the probability of predicting a result. In spark.ml, logistic regression can be used to predict binary results by binomial logistic regression, or it can be used to predict multiple types of results by using multiple logistic regression. Use the family parameter to choose between these two algorithms, or leave it unset and Spark will infer the correct variable.

</font>

In [15]:
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.{DataFrame, SparkSession}




import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.{DataFrame, SparkSession}


In [191]:

import scala.collection.mutable
val maxIter=100
val lr = new LogisticRegression()
      .setFeaturesCol("features")
      .setLabelCol("label")
      .setMaxIter(maxIter)

import scala.collection.mutable
maxIter: Int = 100
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_340c1dcb9166


In [192]:
val model_lr = lr.fit(trainDF)


model_lr: org.apache.spark.ml.classification.LogisticRegressionModel = LogisticRegressionModel: uid = logreg_340c1dcb9166, numClasses = 2, numFeatures = 13587


In [193]:
val prediction_lr = model_lr.transform(testDF)

prediction_lr: org.apache.spark.sql.DataFrame = [label: decimal(11,1), features: vector ... 3 more fields]


In [194]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator


In [195]:
val my_eval_aoc = new BinaryClassificationEvaluator()
  .setLabelCol("label")
  .setRawPredictionCol("prediction").setMetricName("areaUnderROC")

my_eval_aoc.evaluate(prediction_lr)

my_eval_aoc: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_abb7e81e9417
res84: Double = 0.8988486413185208


In [196]:
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
var my_mc_f1 = new MulticlassClassificationEvaluator().setPredictionCol("prediction").setLabelCol("label").setMetricName("f1")
my_mc_f1.evaluate(prediction_lr)

import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
my_mc_f1: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_b4ba79f37c9d
res85: Double = 0.9708789304513041


In [197]:
val my_mc_accu = new MulticlassClassificationEvaluator().setPredictionCol("prediction").setLabelCol("label").setMetricName("accuracy")
my_mc_accu.evaluate(prediction_lr)

my_mc_accu: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_276829470e67
res86: Double = 0.9721739130434782


In [198]:
val train_fit_lr = prediction_lr.select("label","prediction")
train_fit_lr.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       1.0|   31|
|  0.0|       0.0|  123|
|  1.0|       1.0|  995|
|  1.0|       0.0|    1|
+-----+----------+-----+



train_fit_lr: org.apache.spark.sql.DataFrame = [label: decimal(11,1), prediction: double]


<font size=5>
Naive Bayes 
Naive Bayesian classifiers are a class of simple probability classifiers that apply strong (naive) independent assumptions between features based on Bayes' theorem. The spark.ml implementation currently supports polynomial naive Bayes and Bernoulli Naïve Bayes.
</font>

In [199]:
import org.apache.spark.ml.classification.{NaiveBayes}
val nb = new NaiveBayes()
val Model_nb = nb.fit(trainDF)

import org.apache.spark.ml.classification.NaiveBayes
nb: org.apache.spark.ml.classification.NaiveBayes = nb_573cfc12507b
Model_nb: org.apache.spark.ml.classification.NaiveBayesModel = NaiveBayesModel (uid=nb_573cfc12507b) with 2 classes


In [200]:
val predictions_nb = Model_nb.transform(testDF)
predictions_nb.select("label", "prediction").show(5)

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows



predictions_nb: org.apache.spark.sql.DataFrame = [label: decimal(11,1), features: vector ... 3 more fields]


In [201]:
my_eval_aoc.evaluate(predictions_nb)

res89: Double = 0.945880926302613


In [202]:
my_mc_f1.evaluate(predictions_nb)

res90: Double = 0.9392549536435162


In [203]:
my_mc_accu.evaluate(predictions_nb)

res91: Double = 0.9347826086956522


<font size=5>

Now let's try Random Forest Classfication to see how it performs on the classification on the same data
    
    
</font>

In [204]:
import org.apache.spark.ml.classification.{RandomForestClassifier}




import org.apache.spark.ml.classification.RandomForestClassifier


In [205]:
val rf = new RandomForestClassifier().setFeaturesCol("features").setLabelCol("label").setPredictionCol("prediction").setProbabilityCol("probability").setRawPredictionCol("rawPrediction").setMaxDepth(3)
val Model_rf = rf.fit(trainDF)

rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_79242216eea3
Model_rf: org.apache.spark.ml.classification.RandomForestClassificationModel = RandomForestClassificationModel (uid=rfc_79242216eea3) with 20 trees


In [206]:
val predictions_rf = Model_rf.transform(testDF)
predictions_rf.select("label", "prediction").show(5,false)

+-----+----------+
|label|prediction|
+-----+----------+
|0.0  |1.0       |
|0.0  |1.0       |
|0.0  |1.0       |
|0.0  |1.0       |
|0.0  |1.0       |
+-----+----------+
only showing top 5 rows



predictions_rf: org.apache.spark.sql.DataFrame = [label: decimal(11,1), features: vector ... 3 more fields]


In [207]:
my_eval_aoc.evaluate(predictions_rf)

res93: Double = 0.5032467532467533


<font size=5>

Given Area Under Curve is 0.5, we do not want to use this Random Forest Classification.  Area under Curve is between 0 to 1,
the more close to 1 the better the classication is.

We wil give up Random Forest on this classitication

</font>

<font size=5>

There are 2 machine learning libraries from Apache Spark.

ml and mllib.  Data for ml is in the form of DataFrame, but in mllib, data is requried in the form of RDD (Resilient Distributed Datasets), on top of that, mllib training and testing data are required to be in the format of labeled point. 
    
In fact, it is lot harder to prepare data for machine learning in mllib than in ml.

Now we switch from ml to mllib with scala.
    
    
    
</font>

In [None]:

import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.ml.linalg.{Vector=>MLVector, Vectors=>MLVectors}
import org.apache.spark.mllib.linalg.{Vector=>MLLibVector, Vectors=>MLLibVectors}




In [209]:
val trainDF_2=trainDF.selectExpr("label as first","features").selectExpr("cast(first as double)","features")

trainDF_2: org.apache.spark.sql.DataFrame = [first: double, features: vector]


In [210]:
val trainDF_rdd=trainDF_2.rdd

trainDF_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[936] at rdd at <console>:110


<font size=5>
Below is to define LabeledPoint, which takes a line from RDD and returns line that complies with LabeledPoint specification.
    
    
    
</font>

In [211]:

val trainDF_lp : RDD[LabeledPoint] = trainDF_rdd.map(row => LabeledPoint(row.getAs[Double]("first"), 
  MLLibVectors.fromML(row.getAs[org.apache.spark.ml.linalg.SparseVector]("features"))))

trainDF_lp: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[937] at map at <console>:110


In [212]:
trainDF_lp.take(2).foreach(println)

(0.0,(13587,[0,1,2,3,6,8,13,14,17,47,48,51,72,86,90,150,154,357,538,663,1885,4133,7302],[2.3966175115908523,1.2464154437187664,5.473359676673107,1.5612885685365177,1.9565495064704403,1.970607245960672,2.3215987939063383,2.3607463835906097,5.083096366506297,3.137109870171633,6.265972305975542,3.2278848948105665,3.649313853907745,3.7508502731271682,3.7356984681065657,4.295314256041989,4.39194109173106,5.192060391843174,5.581525158604896,5.792834252272104,6.8342881271002645,7.52743530766021,7.9329004157683745]))
(0.0,(13587,[0,1,2,3,6,13,40,58,68,83,95,99,100,131,206,212,285,338,368,528,549,1550,1587,2036,2207,10256,12569,12601],[3.5949262673862785,2.492830887437533,1.3683399191682768,1.5612885685365177,1.9565495064704403,2.3215987939063383,3.0313362167264803,3.2981714275387386,3.5634525633013525,3.822026551595063,7.74491481044391,3.8553629718626548,3.9255672305359033,4.231598441655881,4.5656045857819,4.60069590559317,5.04252865787221,5.192060391843174,5.224850214666164,5.581525158604896,

<font size=5>

Train the SVMwithSGD model using RDD that complies with Labeled Point

</font>

In [213]:

import org.apache.spark.mllib.classification.{SVMWithSGD,SVMModel}





import org.apache.spark.mllib.classification.{SVMWithSGD, SVMModel}


In [214]:
val model_mllib_svm = SVMWithSGD.train(trainDF_lp, 100)


model_mllib_svm: org.apache.spark.mllib.classification.SVMModel = org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 13587, numClasses = 2, threshold = 0.0


In [215]:
model_mllib_svm.clearThreshold()

res95: model_mllib_svm.type = org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 13587, numClasses = 2, threshold = None


<font size=5>

Evaluate SVM model using testing data, which also needs to be converted to Labeled Ponint format.

</font>

In [216]:
val testDF_2=testDF.selectExpr("label as first","features").selectExpr("cast(first as double)","features")

testDF_2: org.apache.spark.sql.DataFrame = [first: double, features: vector]


In [217]:
val testDF_rdd=testDF_2.rdd

testDF_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1146] at rdd at <console>:111


In [218]:
val testDF_lp : RDD[LabeledPoint] = testDF_rdd.map(row => LabeledPoint(row.getAs[Double]("first"), 
  MLLibVectors.fromML(row.getAs[org.apache.spark.ml.linalg.SparseVector]("features"))))

testDF_lp: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[1147] at map at <console>:111


In [219]:
// Compute raw scores on the test set.
val scoreAndLabels = testDF_lp.map { point =>
        val score = model_mllib_svm.predict(point.features)
          (score, point.label)
}


scoreAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[1148] at map at <console>:114


In [220]:
// Get evaluation metrics.
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println(s"Area under ROC = $auROC")


Area under ROC = 0.967069577009336


import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
metrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@5732aa7f
auROC: Double = 0.967069577009336


<font size=5>

Since at it, try LogisticRegressionWithLBFGS under mllib with the same way on SVM above.

    
</font>

In [221]:

import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}


import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}


In [222]:
val model_mllib_lr_BFGS = new LogisticRegressionWithLBFGS().setNumClasses(2).run(trainDF_lp)


model_mllib_lr_BFGS: org.apache.spark.mllib.classification.LogisticRegressionModel = org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 13587, numClasses = 2, threshold = 0.5


In [223]:
// Compute raw scores on the test set.
val scoreAndLabels = testDF_lp.map { point =>
        val score = model_mllib_lr_BFGS.predict(point.features)
          (score, point.label)
}


scoreAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[1218] at map at <console>:116


In [224]:
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println(s"Area under ROC = $auROC")

Area under ROC = 0.8895777916862253


metrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@4f22febf
auROC: Double = 0.8895777916862253


<font size=5>
This concludes scala ML/MLLIIB classification Excercise.
</font>