### Initializing Spark Session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Fraud detection').getOrCreate()


In [None]:
sms = spark.read.csv("spam.csv", header=True, inferSchema = True )

sms.printSchema()

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [None]:
from pyspark.sql.functions import length
sms = sms.withColumn('length', length(sms['text']))
sms.show()

+---+--------------------+-----+------+
| id|                text|label|length|
+---+--------------------+-----+------+
|  1|Go until jurong p...|    0|   111|
|  2|Ok lar... Joking ...|    0|    29|
|  3|Free entry in 2 a...|    1|   155|
|  4|U dun say so earl...|    0|    49|
|  5|Nah I don't think...|    0|    61|
|  6|FreeMsg Hey there...|    1|   147|
|  7|Even my brother i...|    0|    77|
|  8|As per your reque...|    0|   160|
|  9|WINNER!! As a val...|    1|   157|
| 10|Had your mobile 1...|    1|   154|
| 11|I'm gonna be home...|    0|   109|
| 12|SIX chances to wi...|    1|   136|
| 13|URGENT! You have ...|    1|   155|
| 14|I've been searchi...|    0|   196|
| 15|I HAVE A DATE ON ...|    0|    35|
| 16|XXXMobileMovieClu...|    1|   149|
| 17|Oh k...i'm watchi...|    0|    26|
| 18|Eh u remember how...|    0|    81|
| 19|Fine if that��s t...|    0|    58|
| 20|England v Macedon...|    1|   155|
+---+--------------------+-----+------+
only showing top 20 rows



<b>Next, I create a new column 'length' which signifies the length of the SMS.</b>

In [None]:
sms.groupBy('label').avg('length').show()

+-----+------------------+
|label|       avg(length)|
+-----+------------------+
| NULL|              74.5|
|    1|138.45917001338688|
|    0| 71.04167530582625|
+-----+------------------+



<b>This is interesting, spam messages are twice as long as regular messages.</b>

In [None]:
sms.printSchema()

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- length: integer (nullable = true)



### Text Preprocessing

In [None]:
from pyspark.sql.functions import regexp_replace, lower

wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))

wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))

wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

wrangled = wrangled.withColumn('text', lower(wrangled['text']))


<b>Above, we remove anything other that letters (eg- punctuations,numbers and symbols)</b>

In [None]:
wrangled.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
| NULL|    4|
|    1|  747|
|    0| 4823|
+-----+-----+



<b>There are a total of 5574 SMS, of which only 747 have been labelled as spam.This dataset is highly imbalanced.As a classifier just predicting all the messages as not spam will get a accuracy of 87%.</b>

In [None]:
wrangled.show()

+---+--------------------+-----+------+
| id|                text|label|length|
+---+--------------------+-----+------+
|  1|go until jurong p...|    0|   111|
|  2|ok lar joking wif...|    0|    29|
|  3|free entry in a w...|    1|   155|
|  4|u dun say so earl...|    0|    49|
|  5|nah i don't think...|    0|    61|
|  6|freemsg hey there...|    1|   147|
|  7|even my brother i...|    0|    77|
|  8|as per your reque...|    0|   160|
|  9|winner as a value...|    1|   157|
| 10|had your mobile m...|    1|   154|
| 11|i'm gonna be home...|    0|   109|
| 12|six chances to wi...|    1|   136|
| 13|urgent you have w...|    1|   155|
| 14|i've been searchi...|    0|   196|
| 15|i have a date on ...|    0|    35|
| 16|xxxmobilemovieclu...|    1|   149|
| 17|oh k i'm watching...|    0|    26|
| 18|eh u remember how...|    0|    81|
| 19|fine if that��s t...|    0|    58|
| 20|england v macedon...|    1|   155|
+---+--------------------+-----+------+
only showing top 20 rows



###  Pipeline

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol='text', outputCol='words')
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='terms')
vectorizer = CountVectorizer(inputCol=remover.getOutputCol(), outputCol="raw_features", vocabSize=10000)
idf = IDF(inputCol="raw_features", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, remover, vectorizer, idf])


<b>First we tokenize the text into individual tokens,then remove stopwords.After that I perform hashing(hashing provides a fast and space-efficient way to map a huge number of words present in the SMS messages onto a smaller, finite number of values.At last I create a TF-IDF matrix which gives relatively higher importance to words that are rare across documents.<br>
Next, I create a pipeline which wraps all of the above steps. </b>

In [None]:
pipeline_model = pipeline.fit(wrangled)

In [None]:
sms_transformed = pipeline_model.transform(wrangled)

In [None]:
sms_train, sms_test = sms_transformed.randomSplit([0.7, 0.3], seed=13)

### Class Weights (Handling Imbalanced Data)

<b>Since where we have 87% positives (label == 0) in the dataset, so theoretically we want to "under-sample" the positive class. So that The logistic loss objective function should treat the negative class (label == 1) with higher weight.</b>

The Inverse Class Frequency Method
The Inverse Class Frequency method calculates class weights based on the number of samples in each class. For a binary classification problem with classes 0 and 1, the formula for calculating class weights is as follows:

weight_0 = total_samples / (2 * class_0_samples)
weight_1 = total_samples / (2 * class_1_samples)

By dividing the total number of samples by twice the number of samples in each class, we ensure that the sum of the weights for both classes is the same, helping to balance the impact on the model.


In [None]:
dataset_size=float(sms_train.select("label").count())
numPositives=sms_train.select("label").where('label == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))
print('The number of zeros are {}'.format(numNegatives))

The number of ones are 524
Percentage of ones are 13.357124649502932
The number of zeros are 3399.0


In [None]:
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

BalancingRatio = 0.8664287535049707


In [None]:
from pyspark.sql.functions import when
sms_train=sms_train.withColumn("classWeights", when(sms_train.label == 1,BalancingRatio).otherwise(1-BalancingRatio))
sms_train.select("classWeights","label","features").show(5, truncate=False)

+-------------------+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|classWeights       |label|features                                                                                                                                                                                                                                                                                                                                                                 |
+-------------------+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

<b>Here we give a weight of ~0.87 to spam messages and ~0.13 to non spam messages.</b>

### Model Building and Evaluation

In [None]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Logistic Regression (already weighted)
lr = LogisticRegression(labelCol="label", featuresCol="features", weightCol="classWeights", maxIter=10)

# Random Forest (supports weightCol)
rf = RandomForestClassifier(labelCol="label", featuresCol="features", weightCol="classWeights", numTrees=100)

# Gradient Boosted Tree (no weightCol support)
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=100)

# Drop rows with null labels before fitting the models
sms_train_cleaned = sms_train.dropna(subset=["label"])

# Fit models
lr_model = lr.fit(sms_train_cleaned)
rf_model = rf.fit(sms_train_cleaned)
gbt_model = gbt.fit(sms_train_cleaned.drop("classWeights"))  # remove classWeights or it will error

# Make predictions
lr_pred = lr_model.transform(sms_test)
rf_pred = rf_model.transform(sms_test)
gbt_pred = gbt_model.transform(sms_test)

# Drop rows with null labels and null raw predictions from prediction dataframes before evaluating
lr_pred_cleaned = lr_pred.dropna(subset=["label", "rawPrediction"])
rf_pred_cleaned = rf_pred.dropna(subset=["label", "rawPrediction"])
gbt_pred_cleaned = gbt_pred.dropna(subset=["label", "rawPrediction"])

# Evaluate
evaluator = BinaryClassificationEvaluator(labelCol="label")
print("Logistic Regression AUC:", evaluator.evaluate(lr_pred_cleaned))
print("Random Forest AUC:", evaluator.evaluate(rf_pred_cleaned))
print("GBT AUC:", evaluator.evaluate(gbt_pred_cleaned))

Logistic Regression AUC: 0.9914022016146067
Random Forest AUC: 0.9864418124510964
GBT AUC: 0.9812190270283859


In [None]:
def evaluate_metrics(prediction_df, model_name="Model"):
    total = prediction_df.count()
    TP = prediction_df.filter('label = 1 AND prediction = 1').count()
    TN = prediction_df.filter('label = 0 AND prediction = 0').count()
    FP = prediction_df.filter('label = 0 AND prediction = 1').count()
    FN = prediction_df.filter('label = 1 AND prediction = 0').count()

    accuracy = (TP + TN) / total if total != 0 else 0
    precision = TP / (TP + FP) if (TP + FP) != 0 else 0
    recall = TP / (TP + FN) if (TP + FN) != 0 else 0
    f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    print(f"🔍 {model_name}")
    print(f"Accuracy : {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall   : {recall:.4f}")
    print(f"F1 Score : {f1:.4f}")
    print("-" * 30)

    return {
        "Model": model_name,
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "F1 Score": f1
    }


<b>Wow! we get a roc-auc score of 97% with our baseline model.But it is a fact that F1 score is a better evaluation metric than roc-auc when dealing with imbalanced datasets (see [here](https://www.kaggle.com/lct14558/imbalanced-data-why-you-should-not-use-roc-curve)),so we will consider F1 score as well.</b>

In [None]:
# Logistic Regression
lr_metrics = evaluate_metrics(lr_pred, "Logistic Regression")

# Random Forest
rf_metrics = evaluate_metrics(rf_pred, "Random Forest")

# GBT (no class weighting)
gbt_metrics = evaluate_metrics(gbt_pred, "Gradient Boosted Tree")


🔍 Logistic Regression
Accuracy : 0.9794
Precision: 0.9897
Recall   : 0.8610
F1 Score : 0.9209
------------------------------
🔍 Random Forest
Accuracy : 0.9649
Precision: 0.9882
Recall   : 0.7534
F1 Score : 0.8550
------------------------------
🔍 Gradient Boosted Tree
Accuracy : 0.9727
Precision: 0.9590
Recall   : 0.8386
F1 Score : 0.8947
------------------------------


In [None]:
all_results = [lr_metrics, rf_metrics, gbt_metrics]
import pandas as pd
df = pd.DataFrame(all_results)
print(df)


                   Model  Accuracy  Precision    Recall  F1 Score
0    Logistic Regression  0.979406   0.989691  0.860987  0.920863
1          Random Forest  0.964870   0.988235  0.753363  0.854962
2  Gradient Boosted Tree  0.972744   0.958974  0.838565  0.894737


<b>As you can see we achieved a roc-auc score of 97% but a F1 score of just 90%.With hyper parameter using Grid search I think we can achieve much better results!</b>

<b>To conclude after all the hyper parameter tuning we end up witha model with a roc-auc score of 98% and F1 score of 93%.<br>It is worth mentioning that without adding the class weights we will end up with a model with an F1 score of 0 as precision was 0 but a roc-auc score of nearly 100%.This is because of the highly imbalanced data the model blindly predicts all the messages to be not spam.So it is very important to address the problem of imbalanced datasets.  </b>