# Load data

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("app").getOrCreate()

In [2]:
data = spark.read.csv(
    "/home/jovyan/udemy_pyspark/resources/Spark_for_Machine_Learning/Natural_Language_Processing/smsspamcollection/SMSSpamCollection.csv",
    inferSchema=True,
    sep="\t",
)
data.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [3]:
data = (data
 .withColumnRenamed("_c0","class")
 .withColumnRenamed("_c1","text")
)
data.show()

+-----+--------------------+
|class|                text|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
| spam|FreeMsg Hey there...|
|  ham|Even my brother i...|
|  ham|As per your reque...|
| spam|WINNER!! As a val...|
| spam|Had your mobile 1...|
|  ham|I'm gonna be home...|
| spam|SIX chances to wi...|
| spam|URGENT! You have ...|
|  ham|I've been searchi...|
|  ham|I HAVE A DATE ON ...|
| spam|XXXMobileMovieClu...|
|  ham|Oh k...i'm watchi...|
|  ham|Eh u remember how...|
|  ham|Fine if thats th...|
| spam|England v Macedon...|
+-----+--------------------+
only showing top 20 rows



# Feature extraction

## Lenght of text

In [4]:
from pyspark.sql.functions import length
data = data.withColumn("length", length(data["text"]))
data.show()

+-----+--------------------+------+
|class|                text|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [5]:
# Average length of spam is longer than of ham.
data.groupby("class").mean().show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



## More features

In [6]:
from pyspark.ml.feature import (
    Tokenizer,
    StopWordsRemover,
    CountVectorizer,
    IDF,
    StringIndexer,
    VectorAssembler,
)
from pyspark.ml import Pipeline

In [7]:
pipeline = Pipeline(stages=[
    Tokenizer(inputCol="text", outputCol="token_text"),
    StopWordsRemover(inputCol="token_text", outputCol="stop_token"),
    CountVectorizer(inputCol="stop_token", outputCol="token_count"),
    IDF(inputCol="token_count", outputCol="tf_idf"),
    VectorAssembler(inputCols=["tf_idf","length"],outputCol="features"),

    StringIndexer(inputCol="class",outputCol="label"),
])

# NaiveBayes()

In [8]:
feature_data=pipeline.fit(data).transform(data)
feature_data.printSchema()
feature_data.head()

root
 |-- class: string (nullable = true)
 |-- text: string (nullable = true)
 |-- length: integer (nullable = true)
 |-- token_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stop_token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- token_count: vector (nullable = true)
 |-- tf_idf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



Row(class='ham', text='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...', length=111, token_text=['go', 'until', 'jurong', 'point,', 'crazy..', 'available', 'only', 'in', 'bugis', 'n', 'great', 'world', 'la', 'e', 'buffet...', 'cine', 'there', 'got', 'amore', 'wat...'], stop_token=['go', 'jurong', 'point,', 'crazy..', 'available', 'bugis', 'n', 'great', 'world', 'la', 'e', 'buffet...', 'cine', 'got', 'amore', 'wat...'], token_count=SparseVector(13423, {7: 1.0, 11: 1.0, 31: 1.0, 61: 1.0, 72: 1.0, 344: 1.0, 625: 1.0, 731: 1.0, 1409: 1.0, 1598: 1.0, 4485: 1.0, 6440: 1.0, 8092: 1.0, 8838: 1.0, 11344: 1.0, 12979: 1.0}), tf_idf=SparseVector(13423, {7: 3.1126, 11: 3.2055, 31: 3.822, 61: 4.2072, 72: 4.322, 344: 5.4072, 625: 5.918, 731: 6.1411, 1409: 6.6801, 1598: 6.8343, 4485: 7.5274, 6440: 7.9329, 8092: 7.9329, 8838: 7.9329, 11344: 7.9329, 12979: 7.9329}), features=SparseVector(13424, {7: 3.1126, 11: 3.2055, 31: 3.822, 61: 4.2072,

# Train model

In [9]:
train_data, test_data = feature_data.select("label","features").randomSplit([0.7,0.3])
train_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(13424,[0,1,2,7,8...|
|  0.0|(13424,[0,1,2,13,...|
|  0.0|(13424,[0,1,2,41,...|
|  0.0|(13424,[0,1,3,9,1...|
|  0.0|(13424,[0,1,4,50,...|
|  0.0|(13424,[0,1,7,8,1...|
|  0.0|(13424,[0,1,7,8,1...|
|  0.0|(13424,[0,1,7,15,...|
|  0.0|(13424,[0,1,9,14,...|
|  0.0|(13424,[0,1,9,14,...|
|  0.0|(13424,[0,1,11,32...|
|  0.0|(13424,[0,1,12,33...|
|  0.0|(13424,[0,1,14,18...|
|  0.0|(13424,[0,1,14,31...|
|  0.0|(13424,[0,1,14,78...|
|  0.0|(13424,[0,1,18,20...|
|  0.0|(13424,[0,1,20,27...|
|  0.0|(13424,[0,1,21,27...|
|  0.0|(13424,[0,1,21,27...|
|  0.0|(13424,[0,1,24,31...|
+-----+--------------------+
only showing top 20 rows



In [10]:
from pyspark.ml.classification import NaiveBayes 
# use other models such as LogisticRegression or RandomForestClassifier

model = NaiveBayes().fit(train_data)

In [11]:
results = model.transform(test_data)
results.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|(13424,[0,1,5,15,...|[-1000.6230644835...|[1.0,7.2163496869...|       0.0|
|  0.0|(13424,[0,1,5,20,...|[-807.17522539058...|[1.0,1.2502394538...|       0.0|
|  0.0|(13424,[0,1,15,20...|[-666.05591848575...|[1.0,2.9806634934...|       0.0|
|  0.0|(13424,[0,1,17,19...|[-806.20232700415...|[1.0,1.7927402686...|       0.0|
|  0.0|(13424,[0,1,23,63...|[-1293.9083732758...|[1.0,2.4780250159...|       0.0|
|  0.0|(13424,[0,1,31,43...|[-339.23325515474...|[1.0,1.7425569725...|       0.0|
|  0.0|(13424,[0,1,72,10...|[-664.11081346361...|[1.0,8.7326595016...|       0.0|
|  0.0|(13424,[0,1,874,1...|[-97.561866276162...|[0.99999998060230...|       0.0|
|  0.0|(13424,[0,2,3,6,9...|[-3301.1886267519...|[1.0,3.6339239551...|       0.0|
|  0.0|(13424,[0

# Evaluate

In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

MulticlassClassificationEvaluator(metricName="f1").evaluate(results)

0.9219574751815383