In [1]:
# Importing libraries

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("fraud-detection").getOrCreate()

24/12/27 17:25:30 WARN Utils: Your hostname, jayaraj-VMware-Virtual-Platform resolves to a loopback address: 127.0.1.1; using 192.168.3.128 instead (on interface ens33)
24/12/27 17:25:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/12/27 17:25:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/12/27 17:25:42 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer
from pyspark.ml.linalg import Vectors
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, f1_score, roc_curve
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import os

In [3]:
# Load dataset
file_path = "/home/jayaraj/Documents/Spark-VsCode/Fraud-Detection/dataset/spam.csv" 
os.makedirs("output/visualizations", exist_ok=True)
data = spark.read.csv(file_path, header=True, inferSchema=True)
df = data
data.show(5)

+----+--------------------+
| res|             message|
+----+--------------------+
|spam|You’ve been selec...|
|spam|Your bank account...|
| ham|A/c 3XXXXX3438 de...|
| ham|No we sell it all...|
| ham|You should know n...|
+----+--------------------+
only showing top 5 rows



In [4]:
data.printSchema()

root
 |-- res: string (nullable = true)
 |-- message: string (nullable = true)



In [5]:
# Perform basic data analysis
print("Data Summary:")
data.describe().show()

Data Summary:
+-------+----+--------------------+
|summary| res|             message|
+-------+----+--------------------+
|  count| 157|                 157|
|   mean|NULL|                NULL|
| stddev|NULL|                NULL|
|    min| ham|"Indians r poor b...|
|    max|spam|says the  &lt;#&g...|
+-------+----+--------------------+



In [7]:
from pyspark.sql.functions import count, when

# Count missing values for each column
missing_values = data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns])
missing_values.show()

+---+-------+
|res|message|
+---+-------+
|  0|      0|
+---+-------+



In [8]:
# Visualize class distribution
class_distribution = data.groupBy("res").count().toPandas()

plt.figure(figsize=(6, 6))
sns.barplot(x="res", y="count", data=class_distribution, palette="viridis")
plt.title("Class Distribution")
plt.xlabel("Class (Spam/Non-Spam)")
plt.ylabel("Count")
plt.savefig("output/visualizations/class_distribution.png")
plt.close()


Passing `palette` without assigning `hue` is deprecated and will be removed in v0.14.0. Assign the `x` variable to `hue` and set `legend=False` for the same effect.

  sns.barplot(x="res", y="count", data=class_distribution, palette="viridis")


In [9]:
# Preprocessing: Convert `res` to numeric labels
indexer = StringIndexer(inputCol="res", outputCol="label")
data = indexer.fit(data).transform(data)
data.show(5)

+----+--------------------+-----+
| res|             message|label|
+----+--------------------+-----+
|spam|You’ve been selec...|  0.0|
|spam|Your bank account...|  0.0|
| ham|A/c 3XXXXX3438 de...|  1.0|
| ham|No we sell it all...|  1.0|
| ham|You should know n...|  1.0|
+----+--------------------+-----+
only showing top 5 rows



In [10]:
# Tokenize Messages
tokenizer = Tokenizer(inputCol="message", outputCol="words")
data = tokenizer.transform(data)

# Vectorize Messages using HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=100)
data = hashingTF.transform(data)

# Compute TF-IDF (IDF)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf_model = idf.fit(data)
data = idf_model.transform(data)
data.show(5)

                                                                                

+----+--------------------+-----+--------------------+--------------------+--------------------+
| res|             message|label|               words|         rawFeatures|            features|
+----+--------------------+-----+--------------------+--------------------+--------------------+
|spam|You’ve been selec...|  0.0|[you’ve, been, se...|(100,[14,35,36,43...|(100,[14,35,36,43...|
|spam|Your bank account...|  0.0|[your, bank, acco...|(100,[0,5,9,10,34...|(100,[0,5,9,10,34...|
| ham|A/c 3XXXXX3438 de...|  1.0|[a/c, 3xxxxx3438,...|(100,[3,10,14,16,...|(100,[3,10,14,16,...|
| ham|No we sell it all...|  1.0|[no, we, sell, it...|(100,[1,3,4,21,22...|(100,[1,3,4,21,22...|
| ham|You should know n...|  1.0|[you, should, kno...|(100,[2,3,8,10,22...|(100,[2,3,8,10,22...|
+----+--------------------+-----+--------------------+--------------------+--------------------+
only showing top 5 rows



In [11]:
data = data.select("features", "label")
data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(100,[14,35,36,43...|  0.0|
|(100,[0,5,9,10,34...|  0.0|
|(100,[3,10,14,16,...|  1.0|
|(100,[1,3,4,21,22...|  1.0|
|(100,[2,3,8,10,22...|  1.0|
+--------------------+-----+
only showing top 5 rows



In [12]:
# Convert features column to numpy arrays
features = np.array(data.rdd.map(lambda row: row['features'].toArray()).collect())
labels = np.array(data.rdd.map(lambda row: row['label']).collect())

                                                                                

In [14]:
# Train-Test Split using Scikit-learn
X_train, X_test, y_train, y_test = train_test_split(\
                                                    features, \
                                                    labels, \
                                                    test_size=0.3, \
                                                    random_state=42)


In [15]:
isolation_forest = IsolationForest(n_estimators=100, contamination=0.1, random_state=42)
isolation_forest.fit(X_train)

In [16]:
y_pred = isolation_forest.predict(X_test)
y_pred = [0 if x == 1 else 1 for x in y_pred]  # Convert to 0 (inliers) and 1 (outliers)

In [17]:
# Evaluate the model
report = classification_report(y_test, y_pred)
print("Classification Report:\n", report)

Classification Report:
               precision    recall  f1-score   support

         0.0       0.58      1.00      0.74        25
         1.0       1.00      0.22      0.36        23

    accuracy                           0.62        48
   macro avg       0.79      0.61      0.55        48
weighted avg       0.78      0.62      0.55        48



In [18]:
# Confusion Matrix
conf_matrix = confusion_matrix(y_test, y_pred)

# Plot confusion matrix
plt.figure(figsize=(6, 6))
sns.heatmap(conf_matrix, annot=True, cmap="coolwarm", fmt="d", cbar=False, xticklabels=["Normal", "Fraud"], yticklabels=["Normal", "Fraud"])
plt.title("Confusion Matrix")
plt.ylabel("True Label")
plt.xlabel("Predicted Label")
plt.savefig("output/visualizations/confusion_matrix.png")
plt.close()

In [19]:
# F1-Score
f1 = f1_score(y_test, y_pred)
print(f"F1-Score: {f1}")

F1-Score: 0.35714285714285715


In [20]:
# ROC-AUC
roc_auc = roc_auc_score(y_test, y_pred)
print(f"ROC-AUC: {roc_auc}")

ROC-AUC: 0.6086956521739131


In [21]:
# Save ROC Curve
fpr, tpr, _ = roc_curve(y_test, y_pred)
plt.figure()
plt.plot(fpr, tpr, color="blue", label=f"ROC Curve (AUC = {roc_auc:.2f})")
plt.title("ROC Curve")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend()
plt.savefig("output/visualizations/roc_curve.png")
plt.close()

In [22]:
# Plotting pie chart for class distribution
plt.figure(figsize=(6, 6))
labels_pie = ['Normal', 'Fraud']
sizes_pie = [np.sum(y_test == 0), np.sum(y_test == 1)]
plt.pie(sizes_pie, labels=labels_pie, autopct='%1.1f%%', startangle=90, colors=["lightblue", "salmon"])
plt.title("Class Distribution (Test Set)")
plt.savefig("output/visualizations/class_distribution_pie.png")
plt.close()

In [23]:
from pyspark.ml.feature import PCA


In [24]:
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pca_model = pca.fit(data)
pca_data = pca_model.transform(data)

24/12/27 17:32:57 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [25]:
# Extract PCA features
pca_data = pca_data.select("pcaFeatures")
pca_rdd = pca_data.rdd.map(lambda x: x[0].toArray())
pca_vectors = pca_rdd.collect()

                                                                                

In [26]:
# Separate PCA values for plotting
x_vals = [vec[0] for vec in pca_vectors]
y_vals = [vec[1] for vec in pca_vectors]

In [27]:
# Plot PCA Scatter Plot
plt.figure(figsize=(8, 6))
plt.scatter(x_vals, y_vals, c="blue", alpha=0.5)
plt.title("PCA Scatter Plot")
plt.xlabel("PCA1")
plt.ylabel("PCA2")
plt.savefig("output/visualizations/pca_scatter.png")
plt.close()

In [28]:
# Plot bar chart for evaluation metrics
metrics = {"F1-Score": f1, "ROC-AUC": roc_auc}
metrics_items = list(metrics.items())
metrics_labels, metrics_values = zip(*metrics_items)

In [29]:
plt.figure(figsize=(8, 6))
sns.barplot(x=metrics_labels, y=metrics_values, palette="Blues_d")
plt.title("Evaluation Metrics")
plt.savefig("output/visualizations/metrics_bar_chart.png")
plt.close()


Passing `palette` without assigning `hue` is deprecated and will be removed in v0.14.0. Assign the `x` variable to `hue` and set `legend=False` for the same effect.

  sns.barplot(x=metrics_labels, y=metrics_values, palette="Blues_d")


In [60]:
print("Model training, evaluation, and visualizations completed.")

Model training, evaluation, and visualizations completed.


In [None]:
# Save the Model
# joblib.dump(isolation_forest, "output/isolation_forest_model.joblib")