In [97]:
from datasets import load_dataset
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
import warnings
warnings.filterwarnings('ignore')

In [98]:
from pyspark.sql import SparkSession

# Restart SparkSession
spark = SparkSession.builder \
    .appName("SpamClassifier") \
    .master("local[*]") \
    .getOrCreate()

In [99]:
data = load_dataset("SetFit/enron_spam")

df = data["train"]

df

Repo card metadata block was not found. Setting CardData to empty.


Dataset({
    features: ['message_id', 'text', 'label', 'label_text', 'subject', 'message', 'date'],
    num_rows: 31716
})

In [100]:
# Save train and test splits as JSONL
df.to_json("train.jsonl", lines=True)

Creating json from Arrow format:   0%|          | 0/32 [00:00<?, ?ba/s]

100716400

In [101]:
df_path = "train.jsonl"

df = spark.read.json(df_path)

## Data Understanding

In [102]:
df.show(5)

+----+-----+----------+--------------------+----------+--------------------+--------------------+
|date|label|label_text|             message|message_id|             subject|                text|
+----+-----+----------+--------------------+----------+--------------------+--------------------+
|1119|    1|      spam|understanding oem...|     33214|any software just...|any software just...|
| 992|    0|       ham|19 th , 2 : 00 pm...|     11929|perspective on fe...|perspective on fe...|
|1094|    1|      spam|viagra at $ 1 . 1...|     19784|wanted to try ci ...|wanted to try ci ...|
| 976|    0|       ham|teco tap 30 . 000...|      2209|enron / hpl actua...|enron / hpl actua...|
|1108|    1|      spam|water past also ,...|     15880|looking for cheap...|looking for cheap...|
+----+-----+----------+--------------------+----------+--------------------+--------------------+
only showing top 5 rows



In [103]:
df.printSchema()

root
 |-- date: long (nullable = true)
 |-- label: long (nullable = true)
 |-- label_text: string (nullable = true)
 |-- message: string (nullable = true)
 |-- message_id: long (nullable = true)
 |-- subject: string (nullable = true)
 |-- text: string (nullable = true)



In [104]:
# Drop unnecessary columns
df = df.drop('text', 'label_text','date')

In [105]:
# Check Nulls
null_counts_df = df.select(
    *[ spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns ]
)

null_counts_df.show()

+-----+-------+----------+-------+
|label|message|message_id|subject|
+-----+-------+----------+-------+
|    0|      0|         0|      0|
+-----+-------+----------+-------+



## Data Cleaning

In [106]:
import re
import string
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import spacy
import inflect
from nltk.corpus import stopwords

In [107]:
class TextCleaner:
    def __init__(self, subject_col: str, message_col: str, remove_stopwords=True):
        self.subject_col = subject_col
        self.message_col = message_col
        self.remove_stopwords = remove_stopwords
        self.stop_words = set(stopwords.words('english')) if remove_stopwords else None
        self.inflector = inflect.engine() 
        self.nlp = spacy.load('en_core_web_sm', disable=["ner", "parser"])  # Load spaCy model for lemmatization

    # Cleans a single text string 
    def clean_text(self, text: str) -> str:
        # Handle non-string values gracefully
        if not isinstance(text, str):  
            return str(text) if text is not None else ""

        # Lowercase the text
        text = text.lower()

        # Remove URLs
        text = re.sub(r'http\S+|www\S+|https\S+', '<URL>', text)

        # Remove email addresses
        text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '<EMAIL>', text)

        # Remove HTML tags using regex
        text = re.sub(r'<.*?>', '', text)

        # Replace $ with 'dollar'
        text = text.replace('$', 'dollar')

        # Remove punctuation
        text = text.translate(str.maketrans('', '', string.punctuation))

        # Convert numbers to words
        text = ' '.join([self.inflector.number_to_words(word) if word.isdigit() else word for word in text.split()])

        # Remove extra whitespaces
        text = ' '.join(text.split())

        return text

    # Tokenizes and lemmatizes the text using spaCy
    def lemmatize_text(self, text: str) -> str:

        doc = self.nlp(text)
        tokens = [token.lemma_ for token in doc]  # Lemmatized tokens

        # Remove stop words (optional)
        if self.remove_stopwords:
            tokens = [word for word in tokens if word not in self.stop_words]

        return ' '.join(tokens)
    
    # Cleans the specified columns of the Spark DataFrame.
    def clean_dataframe(self, df):
        # Define UDFs
        clean_text_udf = udf(self.clean_text, StringType())
        lemmatize_text_udf = udf(self.lemmatize_text, StringType())

        # Apply UDFs to clean and lemmatize columns
        df = df.withColumn(self.subject_col, clean_text_udf(col(self.subject_col)))
        df = df.withColumn(self.message_col, clean_text_udf(col(self.message_col)))
        df = df.withColumn(f'{self.subject_col}_lem_tokens', lemmatize_text_udf(col(self.subject_col)))
        df = df.withColumn(f'{self.message_col}_lem_tokens', lemmatize_text_udf(col(self.message_col)))

        return df

In [108]:
# Initialize TextCleaner
cleaner = TextCleaner(subject_col="subject", message_col="message", remove_stopwords=True)

# Clean datasets
cleaned_df = cleaner.clean_dataframe(df)

In [109]:
cleaned_df.show(5)



+-----+--------------------+----------+--------------------+--------------------+--------------------+
|label|             message|message_id|             subject|  subject_lem_tokens|  message_lem_tokens|
+-----+--------------------+----------+--------------------+--------------------+--------------------+
|    1|understanding oem...|     33214|any software just...|software fifteen ...|understand oem so...|
|    0|nineteen th two z...|     11929|perspective on fe...|perspective ferc ...|nineteen th two z...|
|    1|viagra at dollar ...|     19784|wanted to try ci ...|want try ci four ...|viagra dollar one...|
|    0|teco tap thirty z...|      2209|enron hpl actuals...|enron hpl actual ...|teco tap thirty z...|
|    1|water past also b...|     15880|looking for cheap...|look cheap high q...|water past also b...|
+-----+--------------------+----------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

## Data Processing

In [110]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF

In [111]:
class FeatureExtraction:
    def __init__(self, subject_col: str, message_col: str, ngram_range=(1, 2), max_features=1000):

        self.subject_col = subject_col
        self.message_col = message_col
        self.ngram_range = ngram_range
        self.max_features = max_features

        # TF-IDF components for subject
        self.subject_tokenizer = Tokenizer(inputCol=self.subject_col, outputCol=f"{self.subject_col}_tokens")
        self.subject_vectorizer = CountVectorizer(inputCol=f"{self.subject_col}_tokens",
                                                  outputCol=f"{self.subject_col}_raw_features",
                                                  vocabSize=self.max_features)
        self.subject_idf = IDF(inputCol=f"{self.subject_col}_raw_features", outputCol=f"{self.subject_col}_features")

        # TF-IDF components for message
        self.message_tokenizer = Tokenizer(inputCol=self.message_col, outputCol=f"{self.message_col}_tokens")
        self.message_vectorizer = CountVectorizer(inputCol=f"{self.message_col}_tokens",
                                                  outputCol=f"{self.message_col}_raw_features",
                                                  vocabSize=self.max_features)
        self.message_idf = IDF(inputCol=f"{self.message_col}_raw_features", outputCol=f"{self.message_col}_features")

    # Fit TF-IDF vectorizers to the data and add transformed features as new columns.
    def fit_transform(self, df):
        # Tokenize subject and message columns
        df = self.subject_tokenizer.transform(df)
        df = self.message_tokenizer.transform(df)

        # Fit and transform subject TF-IDF
        subject_cv_model = self.subject_vectorizer.fit(df)
        df = subject_cv_model.transform(df)
        subject_idf_model = self.subject_idf.fit(df)
        df = subject_idf_model.transform(df)

        # Fit and transform message TF-IDF
        message_cv_model = self.message_vectorizer.fit(df)
        df = message_cv_model.transform(df)
        message_idf_model = self.message_idf.fit(df)
        df = message_idf_model.transform(df)

        return df

    # Transform new data using the already fitted TF-IDF vectorizers and add features as columns.
    def transform(self, df):
        # Tokenize subject and message columns
        df = self.subject_tokenizer.transform(df)
        df = self.message_tokenizer.transform(df)

        # Transform subject TF-IDF
        df = self.subject_vectorizer.transform(df)
        df = self.subject_idf.transform(df)

        # Transform message TF-IDF
        df = self.message_vectorizer.transform(df)
        df = self.message_idf.transform(df)

        return df

In [112]:
feature_extractor = FeatureExtraction(subject_col="subject_lem_tokens",
                                      message_col="message_lem_tokens",
                                      ngram_range=(1, 2),
                                      max_features=1000)

In [113]:
feature_extracted_df = feature_extractor.fit_transform(cleaned_df)

                                                                                

In [114]:
feature_extracted_df.show(5)

[Stage 3292:>                                                       (0 + 1) / 1]

+-----+--------------------+----------+--------------------+--------------------+--------------------+-------------------------+-------------------------+-------------------------------+---------------------------+-------------------------------+---------------------------+
|label|             message|message_id|             subject|  subject_lem_tokens|  message_lem_tokens|subject_lem_tokens_tokens|message_lem_tokens_tokens|subject_lem_tokens_raw_features|subject_lem_tokens_features|message_lem_tokens_raw_features|message_lem_tokens_features|
+-----+--------------------+----------+--------------------+--------------------+--------------------+-------------------------+-------------------------+-------------------------------+---------------------------+-------------------------------+---------------------------+
|    1|understanding oem...|     33214|any software just...|software fifteen ...|understand oem so...|     [software, fiftee...|     [understand, oem,...|           (1000,[0,8

                                                                                

In [115]:
selected_columns = ["subject_lem_tokens_features", "message_lem_tokens_features", "label"]
df_for_model = feature_extracted_df.select(*selected_columns)

### EDA

In [116]:
from pyspark.sql.functions import explode, split, col

In [126]:
# Count spam and ham emails
feature_extracted_df.groupBy("label").count().show()

[Stage 3350:>                                                       (0 + 8) / 8]

+-----+-----+
|label|count|
+-----+-----+
|    0|15553|
|    1|16163|
+-----+-----+



                                                                                

In [127]:
# Create and apply tokenizer to the subject column
tokenizer = Tokenizer(inputCol="subject", outputCol="subject_tokens")
tokenized_df = tokenizer.transform(feature_extracted_df)

# Now split and explode the tokens
words = tokenized_df.withColumn("word", explode(col("subject_tokens")))

# Count unique words
unique_word_count = words.select("word").distinct().count()
print(f"Total number of unique words in subject: {unique_word_count}")

# Analyze spam words (label == 1)
spam_words = words.filter(col("label") == 1).groupBy("word").count().orderBy(col("count").desc())
spam_words.show(10, truncate=False)

# Analyze ham words (label == 0)
ham_words = words.filter(col("label") == 0).groupBy("word").count().orderBy(col("count").desc())
ham_words.show(10, truncate=False)

                                                                                

Total number of unique words in subject: 16247


                                                                                

+-------+-----+
|word   |count|
+-------+-----+
|and    |2501 |
|your   |1997 |
|you    |1744 |
|hundred|1535 |
|to     |1500 |
|the    |1444 |
|for    |1441 |
|a      |1200 |
|one    |1000 |
|dollar |973  |
+-------+-----+
only showing top 10 rows





+--------+-----+
|word    |count|
+--------+-----+
|re      |3712 |
|and     |2407 |
|two     |2284 |
|one     |2105 |
|for     |1959 |
|thousand|1409 |
|fw      |1176 |
|enron   |1064 |
|hundred |931  |
|to      |750  |
+--------+-----+
only showing top 10 rows



                                                                                

In [128]:
# Create and apply tokenizer to the message column
tokenizer = Tokenizer(inputCol="message", outputCol="message_tokens")
tokenized_df = tokenizer.transform(feature_extracted_df)

# Now split and explode the tokens
words = tokenized_df.withColumn("word", explode(col("message_tokens")))

# Count unique words
unique_word_count = words.select("word").distinct().count()
print(f"Total number of unique words in subject: {unique_word_count}")


# Analyze spam words (label == 1)
print(f"Most used words in spam emails")
spam_words = words.filter(col("label") == 1).groupBy("word").count().orderBy(col("count").desc())
spam_words.show(10, truncate=False)

# Analyze ham words (label == 0)
print(f"Most used words in non-spam emails")
ham_words = words.filter(col("label") == 0).groupBy("word").count().orderBy(col("count").desc())
ham_words.show(10, truncate=False)

                                                                                

Total number of unique words in subject: 137524
Most used words in spam emails


                                                                                

+-------+------+
|word   |count |
+-------+------+
|and    |118166|
|the    |101457|
|to     |77349 |
|of     |66602 |
|a      |48627 |
|hundred|48291 |
|you    |43171 |
|in     |43021 |
|one    |33661 |
|this   |32494 |
+-------+------+
only showing top 10 rows

Most used words in non-spam emails




+-------+------+
|word   |count |
+-------+------+
|the    |170997|
|and    |150786|
|to     |121485|
|of     |72201 |
|hundred|61400 |
|a      |61196 |
|two    |58535 |
|in     |56734 |
|enron  |55656 |
|one    |54259 |
+-------+------+
only showing top 10 rows



                                                                                

## Modelling

In [78]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd

In [79]:
class SparkSpamClassifier:
    def __init__(self, label_col, feature_cols, test_size=0.2, seed=42):
        self.label_col = label_col
        self.feature_cols = feature_cols
        self.test_size = test_size
        self.seed = seed

        # Initialize classifiers
        self.models = {
            "NaiveBayes": NaiveBayes(featuresCol="features", labelCol=self.label_col),
            "LogisticRegression": LogisticRegression(featuresCol="features", labelCol=self.label_col, maxIter=100),
            "GradientBoostedTrees": GBTClassifier(featuresCol="features", labelCol=self.label_col)
        }
        self.trained_models = {}

    # Combine multiple feature columns into a single vector column.
    def combine_features(self, df):
        assembler = VectorAssembler(inputCols=self.feature_cols, outputCol="features")
        return assembler.transform(df)

    # Split the DataFrame into training and testing sets.
    def train_test_split(self, df):
        train_df, test_df = df.randomSplit([1 - self.test_size, self.test_size], seed=self.seed)
        print(f"Training data: {train_df.count()} rows, Test data: {test_df.count()} rows")
        return train_df, test_df
    
    # Train all models on the training data.
    def train(self, train_df):
        for model_name, model in self.models.items():
            print(f"Training {model_name}...")
            trained_model = model.fit(train_df)
            self.trained_models[model_name] = trained_model
        print("Training complete!")

    # Evaluate all trained models on the test data.
    def evaluate(self, test_df, mode='summary'):
        evaluator = MulticlassClassificationEvaluator(labelCol=self.label_col, predictionCol="prediction")
        results = {}

        for model_name, trained_model in self.trained_models.items():
            print(f"Evaluating {model_name}...")
            predictions = trained_model.transform(test_df)
            
            # Compute metrics
            accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
            precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
            recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
            f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
            
            metrics = {
                "predictions": predictions,
                "accuracy": accuracy,
                "precision": precision,
                "recall": recall,
                "f1_score": f1
            }
            results[model_name] = metrics

        # Create a summary DataFrame
        if mode == 'summary':
            summary_data = {
                "Model": [],
                "Accuracy": [],
                "Precision": [],
                "Recall": [],
                "F1 Score": []
            }
            for model_name, metrics in results.items():
                summary_data["Model"].append(model_name)
                summary_data["Accuracy"].append(metrics["accuracy"])
                summary_data["Precision"].append(metrics["precision"])
                summary_data["Recall"].append(metrics["recall"])
                summary_data["F1 Score"].append(metrics["f1_score"])

            summary_df = pd.DataFrame(summary_data)
            return summary_df

        elif mode == 'details':
            for model_name, metrics in results.items():
                print("-" * 50)
                print(f"{model_name} - Metrics:")
                print(f"  Accuracy: {metrics['accuracy']:.4f}")
                print(f"  Precision: {metrics['precision']:.4f}")
                print(f"  Recall: {metrics['recall']:.4f}")
                print(f"  F1 Score: {metrics['f1_score']:.4f}")
                print(f"  Predictions Schema: {metrics['predictions'].schema}")
                print("-" * 50)

        else:
            raise ValueError("Invalid mode. Use 'summary' or 'details'.")

        return results

In [80]:
classifier = SparkSpamClassifier(label_col="label", feature_cols=["subject_lem_tokens_features", "message_lem_tokens_features"])

In [82]:
combined_df = classifier.combine_features(df_for_model)

In [83]:
train_df, test_df = classifier.train_test_split(combined_df)



Training data: 25524 rows, Test data: 6192 rows


                                                                                

In [84]:
classifier.train(train_df)

Training NaiveBayes...


                                                                                

Training LogisticRegression...


                                                                                

Training GradientBoostedTrees...


                                                                                

Training complete!


## Evaluation

In [85]:
# Evaluate models on the test data
summary = classifier.evaluate(test_df, mode="summary")
print(summary)

Evaluating NaiveBayes...


                                                                                

Evaluating LogisticRegression...


                                                                                

Evaluating GradientBoostedTrees...


                                                                                

                  Model  Accuracy  Precision    Recall  F1 Score
0            NaiveBayes  0.968831   0.969180  0.968831  0.968823
1    LogisticRegression  0.971738   0.971745  0.971738  0.971737
2  GradientBoostedTrees  0.921996   0.928939  0.921996  0.921653


In [221]:
# More detailed result
classifier.evaluate(test_df, mode='details')

Evaluating MultinomialNB...
Evaluating LogisticRegression...
Evaluating LightGBM...
--------------------------------------------------

MultinomialNB - Confusion Matrix:
[[2960  124]
 [  99 3161]]

MultinomialNB - Classification Report:
              precision    recall  f1-score   support

           0       0.99      0.96      0.98      3084
           1       0.97      0.99      0.98      3260

    accuracy                           0.98      6344
   macro avg       0.98      0.98      0.98      6344
weighted avg       0.98      0.98      0.98      6344

MultinomialNB - Accuracy Score: 0.96

--------------------------------------------------

LogisticRegression - Confusion Matrix:
[[2983  101]
 [  37 3223]]

LogisticRegression - Classification Report:
              precision    recall  f1-score   support

           0       0.99      0.96      0.98      3084
           1       0.97      0.99      0.98      3260

    accuracy                           0.98      6344
   macro avg     