# Spam Email detection
- using **Spark** cluster computing

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName('spam sms analysis').getOrCreate()


In [2]:
sms = spark.read.csv('sms.csv',header=False,sep=';')
sms.show()

+---+--------------------+---+
|_c0|                 _c1|_c2|
+---+--------------------+---+
|  1|Sorry, I'll call ...|  0|
|  2|Dont worry. I gue...|  0|
|  3|Call FREEPHONE 08...|  1|
|  4|Win a 1000 cash p...|  1|
|  5|Go until jurong p...|  0|
|  6|Ok lar... Joking ...|  0|
|  7|Free entry in 2 a...|  1|
|  8|U dun say so earl...|  0|
|  9|Nah I don't think...|  0|
| 10|FreeMsg Hey there...|  1|
| 11|Even my brother i...|  0|
| 12|As per your reque...|  0|
| 13|WINNER!! As a val...|  1|
| 14|Had your mobile 1...|  1|
| 15|I'm gonna be home...|  0|
| 16|SIX chances to wi...|  1|
| 17|URGENT! You have ...|  1|
| 18|I've been searchi...|  0|
| 19|I HAVE A DATE ON ...|  0|
| 20|XXXMobileMovieClu...|  1|
+---+--------------------+---+
only showing top 20 rows



In [3]:
sms = sms.withColumnRenamed('_c0', 'id') \
         .withColumnRenamed('_c1', 'text') \
         .withColumnRenamed('_c2', 'label')
sms.show()

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|Sorry, I'll call ...|    0|
|  2|Dont worry. I gue...|    0|
|  3|Call FREEPHONE 08...|    1|
|  4|Win a 1000 cash p...|    1|
|  5|Go until jurong p...|    0|
|  6|Ok lar... Joking ...|    0|
|  7|Free entry in 2 a...|    1|
|  8|U dun say so earl...|    0|
|  9|Nah I don't think...|    0|
| 10|FreeMsg Hey there...|    1|
| 11|Even my brother i...|    0|
| 12|As per your reque...|    0|
| 13|WINNER!! As a val...|    1|
| 14|Had your mobile 1...|    1|
| 15|I'm gonna be home...|    0|
| 16|SIX chances to wi...|    1|
| 17|URGENT! You have ...|    1|
| 18|I've been searchi...|    0|
| 19|I HAVE A DATE ON ...|    0|
| 20|XXXMobileMovieClu...|    1|
+---+--------------------+-----+
only showing top 20 rows



In [4]:
from pyspark.sql.functions import regexp_replace 

# regexp
REGEX = '[,\\-]'

sms = sms.withColumn('text',regexp_replace(sms.text,REGEX,' '))

sms.show()

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|Sorry  I'll call ...|    0|
|  2|Dont worry. I gue...|    0|
|  3|Call FREEPHONE 08...|    1|
|  4|Win a 1000 cash p...|    1|
|  5|Go until jurong p...|    0|
|  6|Ok lar... Joking ...|    0|
|  7|Free entry in 2 a...|    1|
|  8|U dun say so earl...|    0|
|  9|Nah I don't think...|    0|
| 10|FreeMsg Hey there...|    1|
| 11|Even my brother i...|    0|
| 12|As per your reque...|    0|
| 13|WINNER!! As a val...|    1|
| 14|Had your mobile 1...|    1|
| 15|I'm gonna be home...|    0|
| 16|SIX chances to wi...|    1|
| 17|URGENT! You have ...|    1|
| 18|I've been searchi...|    0|
| 19|I HAVE A DATE ON ...|    0|
| 20|XXXMobileMovieClu...|    1|
+---+--------------------+-----+
only showing top 20 rows



In [5]:
sms.dtypes

[('id', 'string'), ('text', 'string'), ('label', 'string')]

In [6]:
from pyspark.sql.types import IntegerType

sms = sms.withColumn('label', sms['label'].cast(IntegerType()))

In [7]:
sms = sms.drop('id')

In [8]:
sms.dtypes

[('text', 'string'), ('label', 'int')]

# text col preprocesing

In [11]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Break text into tokens at non-word characters
tokenizer =Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol=remover.getOutputCol(), outputCol="hash")
idf = IDF(inputCol=hasher.getOutputCol(), outputCol="features")


# Model & pipeline

In [12]:
# Create a logistic regression object and add everything to a pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

logistic = LogisticRegression()

pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, logistic])

In [13]:
sms_train, sms_test = sms.randomSplit([0.8, 0.2], seed=43)

In [14]:
pipe_model = pipeline.fit(sms_train)

predictions = pipe_model.transform(sms_test)

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

print(f'Accuracy: {accuracy:.4f}')

Accuracy: 0.9844


In [23]:
# check logistic model stage
print(pipe_model.stages[4])
# get intercept 
print(pipe_model.stages[4].intercept)

LogisticRegressionModel: uid=LogisticRegression_ebd449f2ee22, numClasses=2, numFeatures=262144
-20.96207418483443


In [25]:
# coef 
print(pipe_model.stages[4].coefficients)

(262144,[87,153,161,167,173,189,212,238,247,299,329,353,415,421,423,432,443,469,475,489,504,511,531,535,537,545,591,632,636,640,645,699,724,774,789,810,816,822,876,929,956,960,968,991,1057,1074,1094,1133,1154,1212,1242,1264,1269,1272,1279,1298,1303,1307,1315,1316,1322,1328,1371,1386,1415,1454,1509,1546,1559,1579,1583,1619,1652,1689,1690,1696,1706,1728,1765,1770,1797,1845,1876,1884,1921,1922,1924,1968,1997,2005,2050,2098,2122,2139,2147,2149,2160,2162,2181,2195,2306,2307,2325,2328,2338,2340,2354,2366,2409,2410,2437,2448,2507,2548,2574,2603,2613,2622,2638,2641,2677,2699,2701,2731,2751,2829,2848,2855,2856,2932,2958,2968,3023,3050,3066,3077,3091,3128,3153,3186,3190,3261,3277,3282,3283,3284,3298,3358,3378,3386,3388,3436,3450,3503,3509,3513,3524,3530,3534,3611,3613,3622,3654,3672,3763,3785,3789,3880,3910,3924,3928,3938,3944,3982,3993,4003,4022,4060,4075,4093,4106,4117,4146,4159,4167,4184,4214,4235,4240,4246,4254,4310,4314,4333,4360,4364,4379,4409,4475,4504,4522,4553,4556,4562,4588,4604,4631,4

# cross validation

In [31]:
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

params = ParamGridBuilder().build()

cv = CrossValidator(estimator=pipeline,
                   estimatorParamMaps=params,
                   evaluator= evaluator,
                   numFolds=5)

In [32]:
cv = cv.fit(sms_train)

In [33]:
# evaluation
evaluator.evaluate(cv.transform(sms_test))

0.9844322344322345

# Grid search

In [47]:
from pyspark.ml.tuning import ParamGridBuilder 

params = ParamGridBuilder()

logistic = LogisticRegression()

# Define the parameter grid
params = ParamGridBuilder()

params = params.addGrid(logistic.regParam, [0.01, 0.1, 1.0]) \
             .addGrid(logistic.elasticNetParam, [0.0, 0.5, 1.0]) \
             .addGrid(logistic.maxIter, [10, 50, 100])
# construct the grid 
params = params.build()

print('Number of models to be tested: ', len(params))

Number of models to be tested:  27


In [48]:
cv = CrossValidator(estimator=pipeline,
                   estimatorParamMaps=params,
                   evaluator= evaluator,
                   numFolds=5)
cv = cv.fit(sms_train)

evaluator.evaluate(cv.transform(sms_test))

0.9844322344322345

In [49]:
cv.bestModel

PipelineModel_8eef2c6b415b

# Random Forest

In [19]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize RandomForestClassifier
random_forest = RandomForestClassifier(featuresCol='features', labelCol='label')

# Define the parameter grid
params = (ParamGridBuilder()
          .addGrid(random_forest.numTrees, [10, 50, 100])  # Number of trees in the forest
          .addGrid(random_forest.maxDepth, [5, 10, 20])    # Maximum depth of each tree
          .addGrid(random_forest.maxBins, [32, 64])        # Number of bins to use for splitting features
          .build())

# Print the number of models to be tested
print('Number of models to be tested: ', len(params))


Number of models to be tested:  18


In [None]:
pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, random_forest])

cv = CrossValidator(estimator=pipeline,
                   estimatorParamMaps=params,
                   evaluator= evaluator,
                   numFolds=5)
cv = cv.fit(sms_train)

evaluator.evaluate(cv.transform(sms_test))

# Gradient-Boosted

In [None]:
gbt = GBTClassifier(featuresCol='features', labelCol='label')

# Create the parameter grid for GBTClassifier
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [5, 10, 15])       # Maximum depth of each tree
             .addGrid(gbt.maxIter, [20, 50, 100])      # Number of iterations (trees in the ensemble)
             .addGrid(gbt.stepSize, [0.01, 0.1, 0.2])  # Step size (learning rate)
             .build())

# Print the number of models to be tested
print('Number of models to be tested: ', len(paramGrid))

pipeline = Pipeline(stages=[tokenizer, remover, hasher, idf, gbt])

cv = CrossValidator(estimator=pipeline,
                   estimatorParamMaps=params,
                   evaluator= evaluator,
                   numFolds=5)
cv = cv.fit(sms_train)

evaluator.evaluate(cv.transform(sms_test))