I - Change the text features into numeric using the suitable classes (StringIndexer,   Tokenizer,   StopWordsRemover,   CountVectorizer,IDF,  VectorAssembler).

In [0]:
from pyspark.ml.feature import StringIndexer, RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, VectorAssembler
from pyspark.ml import Pipeline

In [0]:
rowData = spark.read.csv("/FileStore/shared_uploads/achraf.ben.yahya@efrei.net/SMSSpamCollection", sep="\t", inferSchema=True, header = False)
# Rename columns to type (spam or ham) and text for the sms content
rowData = rowData.withColumnRenamed("_c0", "type").withColumnRenamed("_c1", "text")

In [0]:
rowData.display()

type,text
ham,"Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat..."
ham,Ok lar... Joking wif u oni...
spam,Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's
ham,U dun say so early hor... U c already then say...
ham,"Nah I don't think he goes to usf, he lives around here though"
spam,"FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv"
ham,Even my brother is not like to speak with me. They treat me like aids patent.
ham,As per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune
spam,WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.
spam,Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030


In [0]:
print("The dataset contains", rowData.count(), "columns")
print("The dataset contains", rowData.filter(rowData.type == 'spam').count(), "spam columns")
print("The dataset contains", rowData.filter(rowData.type == 'ham').count(), "ham columns")

The dataset contains 5574 columns
The dataset contains 747 spam columns
The dataset contains 4827 ham columns


Define the stages of the pipeline

In [0]:
# create a Tokenizer to split the text into words
tokenizer = RegexTokenizer(inputCol='text', outputCol='words', pattern='\\W')
tokenizer.setMinTokenLength(3)

Out[40]: RegexTokenizer_61f04a50e80d

In [0]:
# create a StopWordsRemover to Remove stop words from the words feature
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

In [0]:
# create a CountVectorizer to Convert each sms into a sparse vector of word counts
count_vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")

In [0]:
# create an IDF to weight the bag of words features
idf = IDF(inputCol="raw_features", outputCol="features")

In [0]:
# create a StringIndexer to convert the text column into a numerical index
indexer = StringIndexer(inputCol="type", outputCol="label")

In [0]:
# create a VectorAssembler to combine the label and features columns into a single vector
vector_assembler = VectorAssembler(inputCols=["label", "features"], outputCol="final_features")

Create the pipeline, fit it to the data and transform the model

In [0]:
# fit and transform the data using the pre-processing pipeline
preprocessing_pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer, idf, indexer, vector_assembler])
df = preprocessing_pipeline.fit(rowData).transform(rowData)

In [0]:
df.display()

type,text,words,filtered_words,raw_features,features,label,final_features
ham,"Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...","List(until, jurong, point, crazy, available, only, bugis, great, world, buffet, cine, there, got, amore, wat)","List(jurong, point, crazy, available, bugis, great, world, buffet, cine, got, amore, wat)","Map(vectorType -> sparse, length -> 8309, indices -> List(6, 42, 48, 210, 496, 577, 648, 1151, 1193, 2729, 5462, 7279), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(6, 42, 48, 210, 496, 577, 648, 1151, 1193, 2729, 5462, 7279), values -> List(3.1879682874051243, 3.934699714099176, 3.9913186080986836, 5.04252865787221, 5.681608617161879, 5.917997395226109, 5.986990266713061, 6.546606054648484, 6.546606054648484, 7.52743530766021, 7.9329004157683745, 7.9329004157683745))",0.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(7, 43, 49, 211, 497, 578, 649, 1152, 1194, 2730, 5463, 7280), values -> List(3.1879682874051243, 3.934699714099176, 3.9913186080986836, 5.04252865787221, 5.681608617161879, 5.917997395226109, 5.986990266713061, 6.546606054648484, 6.546606054648484, 7.52743530766021, 7.9329004157683745, 7.9329004157683745))"
ham,Ok lar... Joking wif u oni...,"List(lar, joking, wif, oni)","List(lar, joking, wif, oni)","Map(vectorType -> sparse, length -> 8309, indices -> List(205, 326, 1328, 1763), values -> List(1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(205, 326, 1328, 1763), values -> List(4.988461436601934, 5.330210730323991, 6.680137447273006, 7.016609683894219))",0.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(206, 327, 1329, 1764), values -> List(4.988461436601934, 5.330210730323991, 6.680137447273006, 7.016609683894219))"
spam,Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's,"List(free, entry, wkly, comp, win, cup, final, tkts, 21st, may, 2005, text, 87121, receive, entry, question, std, txt, rate, apply, 08452810075over18)","List(free, entry, wkly, comp, win, cup, final, tkts, 21st, may, 2005, text, 87121, receive, entry, question, std, txt, rate, apply, 08452810075over18)","Map(vectorType -> sparse, length -> 8309, indices -> List(2, 13, 14, 78, 138, 214, 247, 264, 333, 395, 489, 639, 714, 793, 910, 1741, 1762, 2240, 2287, 3866), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(2, 13, 14, 78, 138, 214, 247, 264, 333, 395, 489, 639, 714, 793, 910, 1741, 1762, 2240, 2287, 3866), values -> List(3.1879682874051243, 3.4612616224048054, 3.514059807971776, 4.363367719287004, 4.694221963603994, 5.070699534838906, 5.129540034861839, 5.129540034861839, 11.070010285940008, 5.49055338039917, 5.681608617161879, 5.917997395226109, 6.061098238866783, 6.141140946540319, 6.323462503334274, 7.016609683894219, 7.016609683894219, 7.239753235208429, 7.239753235208429, 7.52743530766021))",1.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(0, 3, 14, 15, 79, 139, 215, 248, 265, 334, 396, 490, 640, 715, 794, 911, 1742, 1763, 2241, 2288, 3867), values -> List(1.0, 3.1879682874051243, 3.4612616224048054, 3.514059807971776, 4.363367719287004, 4.694221963603994, 5.070699534838906, 5.129540034861839, 5.129540034861839, 11.070010285940008, 5.49055338039917, 5.681608617161879, 5.917997395226109, 6.061098238866783, 6.141140946540319, 6.323462503334274, 7.016609683894219, 7.016609683894219, 7.239753235208429, 7.239753235208429, 7.52743530766021))"
ham,U dun say so early hor... U c already then say...,"List(dun, say, early, hor, already, then, say)","List(dun, say, early, hor, already, say)","Map(vectorType -> sparse, length -> 8309, indices -> List(58, 62, 126, 242, 2832), values -> List(2.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(58, 62, 126, 242, 2832), values -> List(8.25247585199611, 4.126237925998055, 4.674803877746892, 5.160311693528593, 7.52743530766021))",0.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(59, 63, 127, 243, 2833), values -> List(8.25247585199611, 4.126237925998055, 4.674803877746892, 5.160311693528593, 7.52743530766021))"
ham,"Nah I don't think he goes to usf, he lives around here though","List(nah, don, think, goes, usf, lives, around, here, though)","List(nah, think, goes, usf, lives, around, though)","Map(vectorType -> sparse, length -> 8309, indices -> List(35, 108, 322, 335, 789, 858, 1915), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(35, 108, 322, 335, 789, 858, 1915), values -> List(3.758513145872737, 4.5656045857819, 5.293843086153116, 5.367951058306837, 6.141140946540319, 6.228152323529949, 7.016609683894219))",0.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(36, 109, 323, 336, 790, 859, 1916), values -> List(3.758513145872737, 4.5656045857819, 5.293843086153116, 5.367951058306837, 6.141140946540319, 6.228152323529949, 7.016609683894219))"
spam,"FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv","List(freemsg, hey, there, darling, been, week, now, and, word, back, like, some, fun, you, for, still, xxx, std, chgs, send, rcv)","List(freemsg, hey, darling, week, word, back, like, fun, still, xxx, std, chgs, send, rcv)","Map(vectorType -> sparse, length -> 8309, indices -> List(5, 11, 24, 25, 36, 45, 209, 226, 273, 614, 793, 1625, 3610, 4951), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(5, 11, 24, 25, 36, 45, 209, 226, 273, 614, 793, 1625, 3610, 4951), values -> List(3.1879682874051243, 3.347932937097802, 3.5761915890787823, 3.6288353225642043, 3.9165173950159855, 3.907548725033225, 4.962485950198674, 5.04252865787221, 5.192060391843174, 5.917997395226109, 6.141140946540319, 6.8342881271002645, 7.52743530766021, 7.9329004157683745))",1.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(0, 6, 12, 25, 26, 37, 46, 210, 227, 274, 615, 794, 1626, 3611, 4952), values -> List(1.0, 3.1879682874051243, 3.347932937097802, 3.5761915890787823, 3.6288353225642043, 3.9165173950159855, 3.907548725033225, 4.962485950198674, 5.04252865787221, 5.192060391843174, 5.917997395226109, 6.141140946540319, 6.8342881271002645, 7.52743530766021, 7.9329004157683745))"
ham,Even my brother is not like to speak with me. They treat me like aids patent.,"List(even, brother, not, like, speak, with, they, treat, like, aids, patent)","List(even, brother, like, speak, treat, like, aids, patent)","Map(vectorType -> sparse, length -> 8309, indices -> List(5, 101, 244, 442, 561, 6624, 8269), values -> List(2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(5, 101, 244, 442, 561, 6624, 8269), values -> List(6.375936574810249, 4.498913211283228, 5.099687071712158, 5.581525158604896, 5.792834252272104, 7.9329004157683745, 7.9329004157683745))",0.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(6, 102, 245, 443, 562, 6625, 8270), values -> List(6.375936574810249, 4.498913211283228, 5.099687071712158, 5.581525158604896, 5.792834252272104, 7.9329004157683745, 7.9329004157683745))"
ham,As per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune,"List(per, your, request, melle, melle, oru, minnaminunginte, nurungu, vettam, has, been, set, your, callertune, for, all, callers, press, copy, your, friends, callertune)","List(per, request, melle, melle, oru, minnaminunginte, nurungu, vettam, set, callertune, callers, press, copy, friends, callertune)","Map(vectorType -> sparse, length -> 8309, indices -> List(122, 128, 452, 674, 872, 903, 969, 1408, 1542, 1984, 2073, 2215, 2434), values -> List(1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(122, 128, 452, 674, 872, 903, 969, 1408, 1542, 1984, 2073, 2215, 2434), values -> List(4.5656045857819, 4.6557556827761974, 5.581525158604896, 5.986990266713061, 13.668576254200529, 6.323462503334274, 6.4288230189921, 14.479506470416858, 6.8342881271002645, 7.016609683894219, 7.239753235208429, 7.239753235208429, 7.239753235208429))",0.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(123, 129, 453, 675, 873, 904, 970, 1409, 1543, 1985, 2074, 2216, 2435), values -> List(4.5656045857819, 4.6557556827761974, 5.581525158604896, 5.986990266713061, 13.668576254200529, 6.323462503334274, 6.4288230189921, 14.479506470416858, 6.8342881271002645, 7.016609683894219, 7.239753235208429, 7.239753235208429, 7.239753235208429))"
spam,WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.,"List(winner, valued, network, customer, you, have, been, selected, receivea, 900, prize, reward, claim, call, 09061701461, claim, code, kl341, valid, hours, only)","List(winner, valued, network, customer, selected, receivea, 900, prize, reward, claim, call, 09061701461, claim, code, kl341, valid, hours)","Map(vectorType -> sparse, length -> 8309, indices -> List(0, 44, 59, 121, 275, 291, 298, 368, 417, 548, 660, 780, 846, 3164, 3365, 3904), values -> List(1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(0, 44, 59, 121, 275, 291, 298, 368, 417, 548, 660, 780, 846, 3164, 3365, 3904), values -> List(2.312499550051224, 7.869399428198352, 4.183396339838003, 4.5485101524226, 5.160311693528593, 5.224850214666164, 5.224850214666164, 5.407171771460119, 5.535005142970004, 5.792834252272104, 5.986990266713061, 6.323462503334274, 6.228152323529949, 7.52743530766021, 7.52743530766021, 7.52743530766021))",1.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(0, 1, 45, 60, 122, 276, 292, 299, 369, 418, 549, 661, 781, 847, 3165, 3366, 3905), values -> List(1.0, 2.312499550051224, 7.869399428198352, 4.183396339838003, 4.5485101524226, 5.160311693528593, 5.224850214666164, 5.224850214666164, 5.407171771460119, 5.535005142970004, 5.792834252272104, 5.986990266713061, 6.323462503334274, 6.228152323529949, 7.52743530766021, 7.52743530766021, 7.52743530766021))"
spam,Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030,"List(had, your, mobile, months, more, entitled, update, the, latest, colour, mobiles, with, camera, for, free, call, the, mobile, update, free, 08002986030)","List(mobile, months, entitled, update, latest, colour, mobiles, camera, free, call, mobile, update, free, 08002986030)","Map(vectorType -> sparse, length -> 8309, indices -> List(0, 2, 28, 193, 201, 364, 389, 737, 808, 1036, 2982), values -> List(1.0, 2.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 8309, indices -> List(0, 2, 28, 193, 201, 364, 389, 737, 808, 1036, 2982), values -> List(2.312499550051224, 6.375936574810249, 7.532470383933295, 4.988461436601934, 5.070699534838906, 11.363217234323757, 5.49055338039917, 6.061098238866783, 6.228152323529949, 6.4288230189921, 7.52743530766021))",1.0,"Map(vectorType -> sparse, length -> 8310, indices -> List(0, 1, 3, 29, 194, 202, 365, 390, 738, 809, 1037, 2983), values -> List(1.0, 2.312499550051224, 6.375936574810249, 7.532470383933295, 4.988461436601934, 5.070699534838906, 11.363217234323757, 5.49055338039917, 6.061098238866783, 6.228152323529949, 6.4288230189921, 7.52743530766021))"


II - Train 4 classifiers and compare them.

In [0]:
from pyspark.ml.classification import LogisticRegression

In [0]:
results = []
# split data with transformed test features to numeric into train and test sets
train, test = df.randomSplit([0.7, 0.3])
train.cache()
test.cache()

Out[49]: DataFrame[type: string, text: string, words: array<string>, filtered_words: array<string>, raw_features: vector, features: vector, label: double, final_features: vector]

In [0]:
lr = LogisticRegression(featuresCol='final_features', labelCol='label')

In [0]:
lr_model = lr.fit(train)

In [0]:
evaluation_result = lr_model.evaluate(test)

In [0]:
print("Accuracy:", evaluation_result.accuracy)
print("Precision:", evaluation_result.weightedPrecision)
print("Recall:", evaluation_result.weightedRecall)
print("F1 score:", evaluation_result.weightedFMeasure())

Accuracy: 0.9895577395577395
Precision: 0.9895661597706731
Recall: 0.9895577395577395
F1 score: 0.9894250285326066


In [0]:
results.append({'model': "LogisticRegression", 'accuracy': evaluation_result.accuracy})

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
# create a DecisionTreeClassifier model
dt = DecisionTreeClassifier(featuresCol="final_features", labelCol="label")

In [0]:
# fit the DecisionTreeClassifier model on the training data
dt_model = dt.fit(train)

In [0]:
# make predictions on the test data
predictions  = dt_model.transform(test)

In [0]:
# evaluate the model on the test data
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy1 = evaluator.evaluate(predictions)
print("Accuracy:", accuracy1)

Accuracy: 1.0


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# create the parameter grid for cross-validation
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [2, 5, 10]) \
    .addGrid(dt.maxBins, [10, 20, 30]) \
    .build()

In [0]:
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [0]:
cvModel = cv.fit(train)

In [0]:
# make predictions on the test data using the best model found by cross-validation
predictions_cv = cvModel.transform(test)

In [0]:
# evaluate the model on the test data
accuracy2 = evaluator.evaluate(predictions_cv)
# print the accuracy score
print("Accuracy: %.4f" % accuracy2)

Accuracy: 1.0000


In [0]:
results.append({'model': "DecisionTreeClassifier", 'accuracy': accuracy1})

In [0]:
results.append({'model': "DecisionTreeWithCrossVal", 'accuracy': accuracy2})

In [0]:
from pyspark.ml.classification import RandomForestClassifier

In [0]:
# create a RandomForestClassifier model
rf = RandomForestClassifier(featuresCol="final_features", labelCol="label")

In [0]:
# fit the RandomForestClassifier model on the training data
rf_model = rf.fit(train)

In [0]:
# make predictions on the test data
predictions = rf_model.transform(test)

In [0]:
# evaluate the model on the test data
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.8771498771498771


In [0]:
results.append({'model': "RandomForestClassifier", 'accuracy': accuracy})

In [0]:
from pyspark.ml.classification import NaiveBayes

In [0]:
# create a NaiveBayes model
nb = NaiveBayes(featuresCol="final_features", labelCol="label")

In [0]:
# fit the NaiveBayes model on the training data
nb_model = nb.fit(train)

In [0]:
# make predictions on the test data
predictions = nb_model.transform(test)

In [0]:
# evaluate the model on the test data
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.9441031941031941


In [0]:
results.append({'model': "NaiveBayes", 'accuracy': accuracy})

III - Tune at least one important hyper parameter using ParamGridBuilder and CrossValidator to improve model performance.

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# create a RandomForestClassifier model
rf = RandomForestClassifier(featuresCol="final_features", labelCol="label")

In [0]:
# create a parameter grid to search over
param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

In [0]:
# create a cross-validator with the param_grid and evaluation metric
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=3)

In [0]:
# fit the cross-validator on the training data
cv_model = crossval.fit(train)

In [0]:
# make predictions on the test data using the best model from the cross-validation
best_rf_model = cv_model.bestModel
predictions = best_rf_model.transform(test)

In [0]:
# evaluate the model on the test data
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

Accuracy: 0.9385749385749386


In [0]:
results.append({'model': "TunedRandomForestClassifier", 'accuracy': accuracy})

In [0]:
# print the best model hyperparameters
print("Best numTrees:", best_rf_model.getNumTrees)
print("Best maxDepth:", best_rf_model.getOrDefault("maxDepth"))

Best numTrees: 20
Best maxDepth: 15


IV - Compare and comment the obtained results

In [0]:
results_df = spark.createDataFrame(results)

In [0]:
results_df.select("model", "accuracy").orderBy(results_df.accuracy.desc()).show(truncate=False)

+---------------------------+------------------+
|model                      |accuracy          |
+---------------------------+------------------+
|DecisionTreeClassifier     |1.0               |
|DecisionTreeWithCrossVal   |1.0               |
|LogisticRegression         |0.9895577395577395|
|NaiveBayes                 |0.9441031941031941|
|TunedRandomForestClassifier|0.9385749385749386|
|RandomForestClassifier     |0.8771498771498771|
+---------------------------+------------------+

