<a href="https://colab.research.google.com/github/05BHARATHI/Real-Time-SMS-Spam-Detection-using-PySpark-and-LSTM/blob/main/Real_Time_SMS_Spam_Detection_using_PySpark_and_LSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
import pandas as pd
import os
import time
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, LSTM, Dense, Dropout
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

In [3]:
df = pd.read_csv('spam.csv', encoding='latin-1')
df = df.iloc[:, :2]  # Keep only relevant columns
df.columns = ['label', 'message']

In [4]:
encoder = LabelEncoder()
df['label'] = encoder.fit_transform(df['label'])

In [5]:
streaming_dir = "streaming"
os.makedirs(streaming_dir, exist_ok=True)

In [6]:
chunk_size = 100  # Number of rows per chunk
for i in range(0, len(df), chunk_size):
    chunk = df.iloc[i:i+chunk_size]
    chunk.to_csv(f"{streaming_dir}/part_{i}.csv", index=False)
    time.sleep(1)

In [7]:
spark = SparkSession.builder \
    .appName("SimulatedSparkStreaming") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .getOrCreate()

In [8]:
df_streaming = spark.readStream \
    .schema("label STRING, message STRING") \
    .option("maxFilesPerTrigger", 1) \
    .csv(streaming_dir)

In [9]:
spam_count = df_streaming.groupBy("label").agg(count("label").alias("count"))

In [10]:
query = spam_count.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [11]:
max_words = 5000
max_len = 100

In [12]:
tokenizer = Tokenizer(num_words=max_words, oov_token="<OOV>")
tokenizer.fit_on_texts(df['message'])
sequences = tokenizer.texts_to_sequences(df['message'])
padded_sequences = pad_sequences(sequences, maxlen=max_len, padding='post')

In [13]:
X_train, X_test, y_train, y_test = train_test_split(padded_sequences, df['label'], test_size=0.2, random_state=42)

In [14]:
model = Sequential([
    Embedding(input_dim=max_words, output_dim=128, input_length=max_len),
    LSTM(64, return_sequences=True),
    Dropout(0.5),
    LSTM(32),
    Dense(32, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])



In [15]:
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [16]:
model.fit(X_train, y_train, epochs=5, batch_size=32, validation_data=(X_test, y_test))

Epoch 1/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m61s[0m 314ms/step - accuracy: 0.8134 - loss: 0.4883 - val_accuracy: 0.8655 - val_loss: 0.4008
Epoch 2/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m77s[0m 282ms/step - accuracy: 0.8654 - loss: 0.4157 - val_accuracy: 0.8655 - val_loss: 0.3983
Epoch 3/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 275ms/step - accuracy: 0.8685 - loss: 0.4106 - val_accuracy: 0.8655 - val_loss: 0.3950
Epoch 4/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m43s[0m 288ms/step - accuracy: 0.8663 - loss: 0.4107 - val_accuracy: 0.8655 - val_loss: 0.3951
Epoch 5/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 256ms/step - accuracy: 0.8540 - loss: 0.4315 - val_accuracy: 0.8655 - val_loss: 0.3950


<keras.src.callbacks.history.History at 0x7cb3f4e83a90>

In [17]:
loss, accuracy = model.evaluate(X_test, y_test)
print(f"Model Accuracy: {accuracy:.2f}")

[1m35/35[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 80ms/step - accuracy: 0.8760 - loss: 0.3751
Model Accuracy: 0.87


In [18]:
query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 