<a href="https://www.kaggle.com/code/sunethjayawardana/spam-classification-pyspark?scriptVersionId=160390797" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/spam-or-not-spam-dataset/spam_or_not_spam.csv


# This notebook is about building a spam classification using PySpark. Depending on the text the model can predict it as spam or not. The model uses NLP tool kit for predictions.

In [2]:
#Installing PySpark 
!pip install pyspark --quiet

# Import Libraries

In [3]:
#Apache Spark Libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, when
from pyspark.ml.feature import VectorAssembler
# Import the Decision Tree Classifier class
from pyspark.ml.classification  import DecisionTreeClassifier
# Import the logistic regression class
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.sql.functions import regexp_replace
from pyspark.ml import Pipeline

# Build Spark Session

In [4]:
#Building Spark Session
spark = (SparkSession.builder
                  .appName('flight')
                  .config("spark.executor.memory", "1G")
                  .config("spark.executor.cores","4")
                  .getOrCreate())

spark.sparkContext.setLogLevel('WARN')
spark.version

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/25 13:44:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.5.0'

# Load Data

In [5]:
# Read data from CSV file
spam = spark.read.csv('/kaggle/input/spam-or-not-spam-dataset/spam_or_not_spam.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % spam.count())

# View the first five records
spam.show(5)

# Check column data types
print(spam.dtypes)
spam.count()

The data contain 3000 records.
+--------------------+-----+
|               email|label|
+--------------------+-----+
| date wed NUMBER ...|    0|
|martin a posted t...|    0|
|man threatens exp...|    0|
|klez the virus th...|    0|
| in adding cream ...|    0|
+--------------------+-----+
only showing top 5 rows

[('email', 'string'), ('label', 'int')]


3000

In [6]:
# Drop rows with missing email values
spam = spam.dropna(subset=['email'])
# Remove punctuation (REGEX provided) and numbers
wrangled = spam.withColumn('email', regexp_replace(spam.email, '[_():;,.!?\\-]', ' '))
wrangled = wrangled.withColumn('email', regexp_replace(wrangled.email, '[0-9]', ' '))

# Merge multiple spaces
wrangled = wrangled.withColumn('email', regexp_replace(wrangled.email, ' +', ' '))

In [7]:
wrangled.show()

+--------------------+-----+
|               email|label|
+--------------------+-----+
| date wed NUMBER ...|    0|
|martin a posted t...|    0|
|man threatens exp...|    0|
|klez the virus th...|    0|
| in adding cream ...|    0|
| i just had to ju...|    0|
|the scotsman NUMB...|    0|
|martin adamson wr...|    0|
|the scotsman thu ...|    0|
|i have been tryin...|    0|
|hello have you se...|    0|
|yes great minds t...|    0|
|on mon aug NUMBER...|    0|
| from chris garri...|    0|
|spamassassin is h...|    0|
|hi all apologies ...|    0|
| in forteana y d ...|    0|
|in a nutshell sol...|    0|
|apols if this has...|    0|
|can someone expla...|    0|
+--------------------+-----+
only showing top 20 rows



# Initiating Pipeline
* split the text into tokens
* remove stop words
* applie the hashing trick
* convert the data from counts to IDF
* train a logistic regression model.

In [8]:
# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='email', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol=remover.getOutputCol(), outputCol="hash")
idf = IDF(inputCol=hasher.getOutputCol(), outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression(featuresCol=idf.getOutputCol(),labelCol="label")
pipeline = Pipeline(stages=[tokenizer, remover, hasher,  idf, logistic])

In [9]:
train_data, test_data = wrangled.randomSplit([0.8, 0.2])
model = pipeline.fit(train_data)

24/01/25 13:45:05 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/01/25 13:45:08 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:10 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:10 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:11 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:11 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:11 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:11 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:12 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:12 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/01/25 13:45:12 WARN DAGSchedul

In [10]:
predictions = model.transform(test_data)

In [11]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
area_under_curve = evaluator.evaluate(predictions)
print("Area Under ROC:", area_under_curve)

24/01/25 13:45:20 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB
                                                                                

Area Under ROC: 0.9868070685064162
