# What is Spark MLlib?

Apache Spark’s Machine Learning Library (MLlib) is designed for simplicity, scalability, and easy integration with other tools. With the scalability, language compatibility, and speed of Spark, data scientists can focus on their data problems and models instead of solving the complexities surrounding distributed data (such as infrastructure, configurations, and so on). Built on top of Spark, MLlib is a scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, and underlying optimization primitives. Spark MLLib seamlessly integrates with other Spark components such as Spark SQL, Spark Streaming, and DataFrames and is installed in the Databricks runtime. The library is usable in Java, Scala, and Python as part of Spark applications, so that you can include it in complete workflows. MLlib allows for preprocessing, munging, training of models, and making predictions at scale on data. You can even use models trained in MLlib to make predictions in Structured Streaming. Spark provides a sophisticated machine learning API for performing a variety of machine learning tasks, from classification to regression, clustering to deep learning. 

(https://databricks.com/glossary/what-is-machine-learning-library)

# Loading Dataset

In [0]:
data = spark.sql('SELECT * FROM spam')
data.display()

label,message,length,punct
ham,"Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...",111,9
ham,Ok lar... Joking wif u oni...,29,6
spam,Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's,155,6
ham,U dun say so early hor... U c already then say...,49,6
ham,"Nah I don't think he goes to usf, he lives around here though",61,2
spam,"FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv",147,8
ham,Even my brother is not like to speak with me. They treat me like aids patent.,77,2
ham,As per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune,160,6
spam,WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.,157,6
spam,Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030,154,2


In [0]:
# check schema
data.printSchema()

In [0]:
# check the number of rows in the dataset
data.count()

In [0]:
# check missing values
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).show()

In [0]:
# check if there is a clear boundary bw spam and ham
# notice that data has some quality issues 'ham"""'
data.groupBy('label').mean().show()

# Processing the text data

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer

In [0]:
# algorithms cannot work directly with string labels (ham/spam)
# so we have to convert it into 1 or 0
label_encoder = StringIndexer(inputCol='label', outputCol='labelEncoded')

In [0]:
# tokenizer parses the string text into tokens
tokenizer = Tokenizer(inputCol='message', outputCol='textToken')

In [0]:
# stop word remover removes common english words
stopper = StopWordsRemover(inputCol='textToken', outputCol= 'stopperToken')

In [0]:
# count vectorizer creates a vector of token count
count_vec = CountVectorizer(inputCol='stopperToken', outputCol='countVector')

In [0]:
# idf converts the countvector into tf-idf
tf_idf = IDF(inputCol='countVector', outputCol = 'tfidfVector')

In [0]:
# vector assembler creates a sparse vector for ML 
# same as other examples done in past
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['tfidfVector'], outputCol='features')

In [0]:
# add everything in pipeline
from pyspark.ml import Pipeline
data_pipeline = Pipeline(stages = [label_encoder, tokenizer, stopper, count_vec, tf_idf, assembler])

In [0]:
# fit the data pipeline
data_pipeline_fit = data_pipeline.fit(data)

In [0]:
# see the transformed data
final_data = data_pipeline_fit.transform(data)
final_data = final_data.select('features','labelEncoded')
final_data.display()

features,labelEncoded
"Map(vectorType -> sparse, length -> 13494, indices -> List(7, 11, 31, 61, 71, 344, 622, 754, 1401, 1579, 3592, 7100, 7203, 7497, 8488, 12310), values -> List(3.1126188501633374, 3.2055125970560336, 3.822026551595063, 4.207206988531722, 4.32198250312415, 5.407171771460119, 5.917997395226109, 6.141140946540319, 6.680137447273006, 6.8342881271002645, 7.52743530766021, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 24, 296, 457, 2605, 4503), values -> List(2.0180469710322324, 3.5761915890787823, 5.330210730323991, 5.7356758384321544, 7.239753235208429, 7.52743530766021))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(2, 13, 19, 29, 89, 154, 193, 310, 458, 512, 628, 854, 951, 2005, 2020, 2356, 3026, 3276, 3944, 4168, 5020), values -> List(2.704469176684504, 3.3377805656337842, 3.5634525633013525, 3.6702205387270586, 4.421354976937353, 4.841857962410058, 5.099687071712158, 11.070010285940008, 5.681608617161879, 5.7356758384321544, 5.917997395226109, 6.228152323529949, 6.323462503334274, 7.016609683894219, 7.016609683894219, 15.05487061532042, 7.239753235208429, 7.239753235208429, 7.52743530766021, 7.52743530766021, 7.52743530766021))",1.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 69, 79, 128, 147, 328, 2917, 4487), values -> List(4.036093942064465, 4.256599743861298, 4.32198250312415, 4.674803877746892, 4.734227298217693, 5.407171771460119, 7.239753235208429, 7.52743530766021))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(36, 134, 311, 543, 1319, 3320, 4167), values -> List(3.7977338590260183, 4.7140245909001735, 5.367951058306837, 5.792834252272104, 6.546606054648484, 7.52743530766021, 7.52743530766021))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(10, 60, 139, 281, 356, 368, 488, 1357, 1626, 2087, 2670, 4442, 4546, 4637, 4639, 5139, 5228, 8489), values -> List(3.2278848948105665, 4.148710781850113, 4.734227298217693, 5.258751766341845, 5.49055338039917, 5.49055338039917, 5.7356758384321544, 6.680137447273006, 6.8342881271002645, 7.016609683894219, 7.239753235208429, 7.52743530766021, 7.52743530766021, 7.52743530766021, 7.52743530766021, 7.52743530766021, 7.52743530766021, 7.9329004157683745))",1.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(10, 53, 103, 237, 617, 745, 12144, 12700), values -> List(6.455769789621133, 4.0410801176577476, 4.5151737321550085, 5.129540034861839, 5.917997395226109, 6.061098238866783, 7.9329004157683745, 7.9329004157683745))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(125, 184, 407, 909, 923, 946, 1167, 1578, 1741, 2561, 2839, 2892, 2908, 3111, 3160), values -> List(4.60069590559317, 5.015129683684095, 5.581525158604896, 6.228152323529949, 13.668576254200529, 6.323462503334274, 6.546606054648484, 6.8342881271002645, 6.8342881271002645, 7.239753235208429, 7.239753235208429, 7.239753235208429, 7.239753235208429, 7.239753235208429, 7.239753235208429))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(1, 47, 118, 144, 293, 321, 351, 498, 539, 579, 786, 1105, 1304, 2599, 3997, 4910, 5091, 5144), values -> List(2.3645559120072774, 8.002149566088097, 4.618714411095849, 4.7140245909001735, 5.293843086153116, 5.367951058306837, 5.447993765980374, 5.7356758384321544, 5.792834252272104, 5.853458874088538, 6.141140946540319, 6.4288230189921, 6.546606054648484, 7.239753235208429, 7.52743530766021, 7.52743530766021, 7.52743530766021, 7.52743530766021))",1.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 13, 27, 38, 197, 279, 369, 391, 412, 873, 1061, 1182, 1345, 1843, 2941, 3424), values -> List(2.0180469710322324, 2.3645559120072774, 3.3377805656337842, 3.7060666705001943, 7.81509745006645, 5.04252865787221, 5.258751766341845, 11.585668504544207, 5.535005142970004, 5.581525158604896, 6.228152323529949, 6.4288230189921, 6.546606054648484, 6.546606054648484, 6.8342881271002645, 7.239753235208429, 7.52743530766021))",1.0


## Train Test Split

In [0]:
train, test = final_data.randomSplit([0.7,0.3])

In [0]:
train.count(), test.count()

# Naive Bayes

In [0]:
from pyspark.ml.classification import NaiveBayes

In [0]:
nb = NaiveBayes(featuresCol = 'features', labelCol='labelEncoded')

In [0]:
nb_fit = nb.fit(train)

In [0]:
results = nb_fit.transform(test)

In [0]:
results.display()

features,labelEncoded,rawPrediction,probability,prediction
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 2, 12, 91, 101, 121, 137, 160, 215, 317, 413, 522, 583, 1697, 2387, 3206, 5259, 11174), values -> List(2.0180469710322324, 2.3645559120072774, 2.704469176684504, 3.2415525335392306, 4.39194109173106, 4.482912869936787, 4.674803877746892, 4.6557556827761974, 4.864847480634757, 5.070699534838906, 5.367951058306837, 5.581525158604896, 5.792834252272104, 5.853458874088538, 6.8342881271002645, 7.016609683894219, 7.239753235208429, 7.52743530766021, 7.9329004157683745))",1.0,"Map(vectorType -> dense, length -> 2, values -> List(-962.2341389177699, -748.3713175535122))","Map(vectorType -> dense, length -> 2, values -> List(1.3199479238464482E-93, 1.0))",1.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 2, 15, 47, 78, 99, 106, 125, 191, 358, 481, 685, 723, 806, 1724, 3507, 4509, 11768), values -> List(2.0180469710322324, 2.3645559120072774, 2.704469176684504, 3.368552224300538, 4.001074783044048, 4.295314256041989, 4.451660326432682, 4.5151737321550085, 4.60069590559317, 4.988461436601934, 5.630315322774329, 5.7356758384321544, 5.986990266713061, 6.061098238866783, 6.141140946540319, 6.8342881271002645, 7.52743530766021, 7.52743530766021, 7.9329004157683745))",1.0,"Map(vectorType -> dense, length -> 2, values -> List(-1005.4469205079394, -729.6495209192633))","Map(vectorType -> dense, length -> 2, values -> List(1.669979866903231E-120, 1.0))",1.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 2, 20, 47, 78, 91, 99, 125, 159, 252, 378, 481, 685, 723, 1387, 1661, 1724, 12245), values -> List(2.0180469710322324, 2.3645559120072774, 2.704469176684504, 3.532297395521557, 4.001074783044048, 4.295314256041989, 4.39194109173106, 4.451660326432682, 4.60069590559317, 4.888377978044951, 5.192060391843174, 5.49055338039917, 5.7356758384321544, 5.986990266713061, 6.061098238866783, 6.680137447273006, 7.239753235208429, 6.8342881271002645, 7.9329004157683745))",1.0,"Map(vectorType -> dense, length -> 2, values -> List(-971.4648020310957, -678.6425171236345))","Map(vectorType -> dense, length -> 2, values -> List(6.743688266325221E-128, 1.0))",1.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 2, 41, 171, 172, 217, 306, 606, 1707, 2284, 2429, 2971, 6505, 9296, 9452, 10790), values -> List(2.0180469710322324, 2.3645559120072774, 2.704469176684504, 3.9255672305359033, 4.912475529624012, 4.937168142214383, 5.129540034861839, 5.330210730323991, 5.917997395226109, 6.8342881271002645, 7.016609683894219, 7.239753235208429, 7.239753235208429, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745))",0.0,"Map(vectorType -> dense, length -> 2, values -> List(-923.8845327280821, -1002.6560535308072))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 6.165427344561446E-35))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 4, 13, 47, 86, 98, 99, 102, 103, 226, 251, 270, 668, 717, 1507, 6560, 8229, 10794, 12957, 13304), values -> List(2.0180469710322324, 2.3645559120072774, 2.892706319430574, 3.3377805656337842, 4.001074783044048, 4.39194109173106, 4.451660326432682, 4.451660326432682, 4.4671645129686475, 4.5151737321550085, 5.099687071712158, 5.224850214666164, 5.224850214666164, 5.986990266713061, 12.646925006668548, 6.680137447273006, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745))",1.0,"Map(vectorType -> dense, length -> 2, values -> List(-1208.360824485604, -1030.374175250754))","Map(vectorType -> dense, length -> 2, values -> List(5.027827683122864E-78, 1.0))",1.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 5, 15, 20, 46, 67, 70, 73, 81, 90, 218, 504, 645, 676, 5497, 10048, 11339, 12414), values -> List(4.036093942064465, 2.3645559120072774, 2.751116865476289, 3.368552224300538, 3.532297395521557, 4.126237925998055, 4.308559482792009, 4.269338769638728, 4.256599743861298, 4.406539891152213, 4.3775523542789605, 5.099687071712158, 5.7356758384321544, 5.986990266713061, 5.986990266713061, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745))",0.0,"Map(vectorType -> dense, length -> 2, values -> List(-832.3627603610817, -939.560712052788))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 2.783050641237032E-47))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 7, 8, 139, 266, 504, 623, 833, 1071, 1197, 2243, 2559, 3998, 4687, 5135, 8431, 8744), values -> List(2.0180469710322324, 2.3645559120072774, 3.1126188501633374, 3.145408672986328, 4.734227298217693, 10.735902116613675, 5.7356758384321544, 5.917997395226109, 6.141140946540319, 6.4288230189921, 6.546606054648484, 7.016609683894219, 7.239753235208429, 7.52743530766021, 7.52743530766021, 7.52743530766021, 7.9329004157683745, 7.9329004157683745))",0.0,"Map(vectorType -> dense, length -> 2, values -> List(-1016.0906181041435, -1124.6625364231104))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 7.0439292371295E-48))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 9, 14, 23, 35, 53, 119, 1459, 5063), values -> List(2.0180469710322324, 2.3645559120072774, 15.288106206380252, 6.586657606125901, 3.5698017909800117, 3.743245673741949, 4.0410801176577476, 9.20139181118634, 6.680137447273006, 7.52743530766021))",0.0,"Map(vectorType -> dense, length -> 2, values -> List(-421.3007550098556, -524.8547671047238))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 1.0642997698972957E-45))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 10, 20, 49, 73, 81, 258, 787, 846, 1412, 2063, 2692, 3638, 9115, 10443, 11935), values -> List(2.0180469710322324, 2.3645559120072774, 3.2278848948105665, 3.532297395521557, 4.030927746193729, 4.256599743861298, 4.406539891152213, 5.330210730323991, 6.141140946540319, 6.141140946540319, 6.680137447273006, 7.016609683894219, 7.239753235208429, 7.52743530766021, 7.9329004157683745, 7.9329004157683745, 7.9329004157683745))",1.0,"Map(vectorType -> dense, length -> 2, values -> List(-879.2760071776355, -894.844327057856))","Map(vectorType -> dense, length -> 2, values -> List(0.9999998267135883, 1.7328641151097486E-7))",0.0
"Map(vectorType -> sparse, length -> 13494, indices -> List(0, 1, 12, 33, 43, 46, 67, 90, 281, 2017, 3196), values -> List(2.0180469710322324, 2.3645559120072774, 3.2415525335392306, 3.7508502731271682, 3.9165173950159855, 4.126237925998055, 4.308559482792009, 4.3775523542789605, 5.258751766341845, 7.016609683894219, 7.239753235208429))",0.0,"Map(vectorType -> dense, length -> 2, values -> List(-351.88632951270375, -416.782627377429))","Map(vectorType -> dense, length -> 2, values -> List(1.0, 6.54479339554582E-29))",0.0


In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
# accuracy
accuracy = MulticlassClassificationEvaluator(labelCol = 'labelEncoded', metricName='accuracy')
accuracy.evaluate(results)

In [0]:
# f1
f1 = MulticlassClassificationEvaluator(labelCol = 'labelEncoded', metricName='f1')
f1.evaluate(results)

In [0]:
# log loss
recall = MulticlassClassificationEvaluator(labelCol = 'labelEncoded', metricName='logLoss')
recall.evaluate(results)