# Using Spark
* use spark.ml package 
* works on dataFrame 
* works as pipeline model

#### Read File

In [15]:
val data = spark.read
.format("csv")
.option("inferSchema",true)
.option("header",true)
.csv("data.csv")

data: org.apache.spark.sql.DataFrame = [content: string, sentiment: int]


In [16]:
data.show(5)

+--------------------+---------+
|             content|sentiment|
+--------------------+---------+
|Great fun!, Got t...|        1|
|Inspiring, I hope...|        1|
|Great CD, My love...|        1|
|First album I've ...|        1|
|Amazing!, I used ...|        1|
+--------------------+---------+
only showing top 5 rows



In [18]:
data.groupBy("sentiment").count().show()

+---------+-----+
|sentiment|count|
+---------+-----+
|        1| 3000|
|        0|30000|
+---------+-----+



In [30]:
//Renaming content-> text and sentiment->label 
val newNames = Seq("text","label")
val dataRenamed = data.toDF(newNames: _*)

newNames: Seq[String] = List(text, label)
dataRenamed: org.apache.spark.sql.DataFrame = [text: string, label: int]


In [31]:
dataRenamed.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|Great fun!, Got t...|    1|
|Inspiring, I hope...|    1|
|Great CD, My love...|    1|
|First album I've ...|    1|
|Amazing!, I used ...|    1|
+--------------------+-----+
only showing top 5 rows



#### Split train and test datasets

In [32]:
//training test split
val Array(trainData, testData) = dataRenamed.randomSplit(Array(0.8,0.2), seed=321)

trainData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: string, label: int]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: string, label: int]


In [33]:
trainData.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 2400|
|    0|23975|
+-----+-----+



In [34]:
trainData.count()

res15: Long = 26375


In [35]:
testData.count()

res16: Long = 6625


#### import packages

In [54]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, StopWordsRemover, IDF, Tokenizer}

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, StopWordsRemover, IDF, Tokenizer}


#### Build a Pipeline 

In [37]:
//tokenizer
val tok = new Tokenizer().setInputCol("text").setOutputCol("words")
//remove stop words
val remove = new StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(false)
//hashign or TF
val hashedTF = new HashingTF().setNumFeatures(5000).setInputCol("filtered").setOutputCol("features")
//IDF
val idf = new IDF().setInputCol("features").setOutputCol("normalizedFeatures").setMinDocFreq(0)
// classifier 
val lr = new LogisticRegression().setRegParam(0.01).setThreshold(0.5)
// Pipeline all
val pipeLine = new Pipeline().setStages(Array(tok, remove, hashedTF, idf, lr))


tok: org.apache.spark.ml.feature.Tokenizer = tok_aa87c1b0a09b
remove: org.apache.spark.ml.feature.StopWordsRemover = stopWords_03d79aefc048
hashedTF: org.apache.spark.ml.feature.HashingTF = hashingTF_779abaed4d9f
idf: org.apache.spark.ml.feature.IDF = idf_9dcbac74602a
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_105391d32b73
pipeLine: org.apache.spark.ml.Pipeline = pipeline_6459801cb227


In [38]:
remove.getStopWords.take(5)


res17: Array[String] = Array(i, me, my, myself, we)


#### Fit a model 

In [39]:
val model = pipeLine.fit(trainData)

model: org.apache.spark.ml.PipelineModel = pipeline_6459801cb227


#### Predict

In [42]:
val predicted = model.transform(testData)

predicted: org.apache.spark.sql.DataFrame = [text: string, label: int ... 7 more fields]


In [46]:
predicted.select("text","label", "probability", "prediction").show(20)

+--------------------+-----+--------------------+----------+
|                text|label|         probability|prediction|
+--------------------+-----+--------------------+----------+
|!!!! Wrong MP3 fi...|    0|[0.99489302164069...|       0.0|
|&lt;3, I'm not a ...|    1|[0.28354425889542...|       1.0|
|&quot;Definitely ...|    0|[0.97141251894541...|       0.0|
|&quot;Takin' Back...|    0|[0.81497603297694...|       0.0|
|&quot;Twilight of...|    0|[0.88131922176733...|       0.0|
|*** JEAN HILL IS ...|    1|[0.82651317141594...|       0.0|
|--------IGNORANCE...|    0|[0.99964429894223...|       0.0|
|...a minor produc...|    0|[0.99992728796269...|       0.0|
|1 set insufficien...|    0|[0.60845008314316...|       0.0|
|1 time use only, ...|    0|[0.95895773992685...|       0.0|
|1*2*3* Rent, Don'...|    0|[0.96561619878039...|       0.0|
|1, and thats bein...|    0|[0.81198321856807...|       0.0|
|100 years of tort...|    0|[0.64021980280992...|       0.0|
|100 years to read...|  

#### Metrics

In [50]:
val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC")
println("AOC : " + evaluator.evaluate(predicted))

AOC : 0.914860580912864


evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_830a0f484f9e


In [96]:
//accuracy 
val actual_pred = predicted.select("label","prediction")
val diff = actual_pred.withColumn("diff",abs(actual_pred("label") -actual_pred("prediction")))
val misClassified = diff.select("diff").filter($"diff" > 0)

actual_pred: org.apache.spark.sql.DataFrame = [label: int, prediction: double]
diff: org.apache.spark.sql.DataFrame = [label: int, prediction: double ... 1 more field]
misClassified: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [diff: double]


In [104]:
val missed = misClassified.count()
val accuracy = 100.0*(1.0 -  missed.toDouble /( missed + diff.count()).toDouble)

missed: Long = 413
accuracy: Double = 94.13185564080705
