# Spam detector using pyspark

In [1]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 66kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 49.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=4645b376bc700d032ca43627fd7a29726c26b6dd277adaa8418b0ca26803850f
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [49]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('NLP').getOrCreate()

In [50]:
from pyspark.sql.functions import length
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer, VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from sklearn.metrics import classification_report

## Data wrangling

In [51]:
data = spark.read.csv('SMSSpamCollection',inferSchema=True,sep='\t')
data.show(5)

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
+----+--------------------+
only showing top 5 rows



In [52]:
data = data.withColumnRenamed('_c0','target').withColumnRenamed('_c1','text')
data.show(5)

+------+--------------------+
|target|                text|
+------+--------------------+
|   ham|Go until jurong p...|
|   ham|Ok lar... Joking ...|
|  spam|Free entry in 2 a...|
|   ham|U dun say so earl...|
|   ham|Nah I don't think...|
+------+--------------------+
only showing top 5 rows



In [53]:
data = data.withColumn('length',length(data['text']))
data.show(5)

+------+--------------------+------+
|target|                text|length|
+------+--------------------+------+
|   ham|Go until jurong p...|   111|
|   ham|Ok lar... Joking ...|    29|
|  spam|Free entry in 2 a...|   155|
|   ham|U dun say so earl...|    49|
|   ham|Nah I don't think...|    61|
+------+--------------------+------+
only showing top 5 rows



In [54]:
data.groupBy('target').mean().show()

+------+-----------------+
|target|      avg(length)|
+------+-----------------+
|   ham|71.45431945307645|
|  spam|138.6706827309237|
+------+-----------------+



In [55]:
# NLP tools
tokenizer = Tokenizer(inputCol='text', outputCol='token_text')
stop_rm = StopWordsRemover(inputCol='token_text', outputCol='stop_token')
count_vect = CountVectorizer(inputCol='stop_token', outputCol=('c_vec'))
idf = IDF(inputCol='c_vec', outputCol='tf_idf')

# Label encoder
label_enc = StringIndexer(inputCol='target', outputCol='label')

# Vector assembler
featured_data = VectorAssembler(inputCols=['tf_idf','length'], outputCol='features')

# Pre-process data
pre_processor = Pipeline(stages=[label_enc, tokenizer, stop_rm, count_vect, idf, featured_data])

In [56]:
cleaner = pre_processor.fit(data)
clean_data = cleaner.transform(data)
clean_data.show(5)

+------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|target|                text|length|label|          token_text|          stop_token|               c_vec|              tf_idf|            features|
+------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|   ham|Go until jurong p...|   111|  0.0|[go, until, juron...|[go, jurong, poin...|(13423,[7,11,31,6...|(13423,[7,11,31,6...|(13424,[7,11,31,6...|
|   ham|Ok lar... Joking ...|    29|  0.0|[ok, lar..., joki...|[ok, lar..., joki...|(13423,[0,24,297,...|(13423,[0,24,297,...|(13424,[0,24,297,...|
|  spam|Free entry in 2 a...|   155|  1.0|[free, entry, in,...|[free, entry, 2, ...|(13423,[2,13,19,3...|(13423,[2,13,19,3...|(13424,[2,13,19,3...|
|   ham|U dun say so earl...|    49|  0.0|[u, dun, say, so,...|[u, dun, say, ear...|(13423,[0,70,80,1...|(13423,

In [57]:
clean_data.head(1)

[Row(target='ham', text='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...', length=111, label=0.0, token_text=['go', 'until', 'jurong', 'point,', 'crazy..', 'available', 'only', 'in', 'bugis', 'n', 'great', 'world', 'la', 'e', 'buffet...', 'cine', 'there', 'got', 'amore', 'wat...'], stop_token=['go', 'jurong', 'point,', 'crazy..', 'available', 'bugis', 'n', 'great', 'world', 'la', 'e', 'buffet...', 'cine', 'got', 'amore', 'wat...'], c_vec=SparseVector(13423, {7: 1.0, 11: 1.0, 31: 1.0, 61: 1.0, 72: 1.0, 344: 1.0, 625: 1.0, 731: 1.0, 1409: 1.0, 1598: 1.0, 4485: 1.0, 6440: 1.0, 8092: 1.0, 8838: 1.0, 11344: 1.0, 12979: 1.0}), tf_idf=SparseVector(13423, {7: 3.1126, 11: 3.2055, 31: 3.822, 61: 4.2072, 72: 4.322, 344: 5.4072, 625: 5.918, 731: 6.1411, 1409: 6.6801, 1598: 6.8343, 4485: 7.5274, 6440: 7.9329, 8092: 7.9329, 8838: 7.9329, 11344: 7.9329, 12979: 7.9329}), features=SparseVector(13424, {7: 3.1126, 11: 3.2055, 31: 3.822, 61: 

In [58]:
clean_data_training = clean_data.select('features','label')

## Machine learning

In [59]:
train, test = clean_data_training.randomSplit([0.7,0.3])

In [62]:
# Evaluator precises the target column and the kind of metrics to use 
evaluator = MulticlassClassificationEvaluator(labelCol='label', metricName='f1')

# Define transformtions stages to throw in the pipeline
nb = NaiveBayes()

# Definition of pipeline
pipeline_nb = Pipeline(stages=[nb])

# Definition of the grid parameters
paramGrid = ParamGridBuilder().\
            addGrid(nb.modelType, ["multinomial"]).\
            build()

# Definition of the cross validator
cv = CrossValidator(
  estimator=pipeline_nb,
  estimatorParamMaps=paramGrid, 
  evaluator=evaluator, 
  numFolds=3)

# Train the model
spam_detector = cv.fit(train)

# Predict classes on test part
predictions = spam_detector.transform(test)
predictions.show(5)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(13424,[0,1,2,4,3...|  1.0|[-1223.5862404983...|[4.33774984620535...|       1.0|
|(13424,[0,1,2,5,5...|  1.0|[-933.08697008017...|[0.99999999999999...|       0.0|
|(13424,[0,1,2,7,8...|  0.0|[-799.04508012095...|[1.0,1.9306915449...|       0.0|
|(13424,[0,1,2,12,...|  1.0|[-1138.9496199184...|[1.97863320002813...|       1.0|
|(13424,[0,1,2,15,...|  1.0|[-1158.3840510377...|[1.25255646716027...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [66]:
predictions_pd = predictions.toPandas()
print(classification_report(predictions_pd.prediction, predictions_pd.label))

              precision    recall  f1-score   support

         0.0       0.89      0.99      0.94      1352
         1.0       0.97      0.59      0.73       383

    accuracy                           0.90      1735
   macro avg       0.93      0.79      0.84      1735
weighted avg       0.91      0.90      0.90      1735

