In [1]:
# Install PySpark, findspark to locate Spark, and streamlit for the dashboard
!pip install pyspark findspark streamlit -q

# Install ngrok to expose the Streamlit dashboard from Colab
!pip install pyngrok -q

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m25.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m30.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import pandas as pd
import os

# Download the dataset
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip
!unzip -o smsspamcollection.zip

# The file is tab-separated, so we'll rename it and read it with pandas
os.rename('SMSSpamCollection', 'SMSSpamCollection.tsv')

# Load and preview the data
df = pd.read_csv('SMSSpamCollection.tsv', sep='\t', names=['label', 'message'])
print("Dataset loaded successfully!")
df.head()

--2025-09-28 07:04:40--  https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified
Saving to: ‘smsspamcollection.zip’

smsspamcollection.z     [ <=>                ] 198.65K  --.-KB/s    in 0.1s    

2025-09-28 07:04:40 (1.32 MB/s) - ‘smsspamcollection.zip’ saved [203415]

Archive:  smsspamcollection.zip
  inflating: SMSSpamCollection       
  inflating: readme                  
Dataset loaded successfully!


Unnamed: 0,label,message
0,ham,"Go until jurong point, crazy.. Available only ..."
1,ham,Ok lar... Joking wif u oni...
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...
3,ham,U dun say so early hor... U c already then say...
4,ham,"Nah I don't think he goes to usf, he lives aro..."


In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# 1. Create a SparkSession
spark = SparkSession.builder.appName("SpamClassifierTraining").getOrCreate()

# 2. Load the data into a Spark DataFrame
# We'll also convert the 'ham'/'spam' label to a numeric 0/1 format
from pyspark.sql.functions import when
data = spark.read.csv('SMSSpamCollection.tsv', sep='\t', inferSchema=True).toDF('label_str', 'message')
data = data.withColumn('label', when(data.label_str == 'spam', 1.0).otherwise(0.0))

# 3. Define the ML Pipeline stages
# Stage 1: Tokenizer - splits sentences into words
tokenizer = Tokenizer(inputCol="message", outputCol="words")
# Stage 2: HashingTF - converts words to numeric feature vectors
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")
# Stage 3: IDF - down-weights common words
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
# Stage 4: Logistic Regression - the classification algorithm
lr = LogisticRegression(featuresCol="features", labelCol="label")

# 4. Create the full pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, lr])

# 5. Train the model
# We'll split the data to train on 70% and test on 30%
(train_data, test_data) = data.randomSplit([0.7, 0.3], seed=42)
model = pipeline.fit(train_data)
print("✅ Model trained successfully!")

# 6. Save the trained model pipeline for later use
model.write().overwrite().save("./spam_model")
print("✅ Model saved to 'spam_model' directory!")

spark.stop()

✅ Model trained successfully!
✅ Model saved to 'spam_model' directory!


In [16]:
%%writefile dashboard.py
import streamlit as st
import pandas as pd
import time
import glob # Import the glob library to find files

st.set_page_config(layout="wide")
st.title("Live Spam Comment Detection Dashboard 🛡️")

# Create placeholders for metrics and charts
placeholder = st.empty()

# Path to the output directory from PySpark
output_dir_path = 'predictions/'

while True:
    # Find all the part-*.csv files generated by Spark
    csv_files = glob.glob(f"{output_dir_path}/*.csv")

    if not csv_files:
        with placeholder.container():
            st.warning("Waiting for data from the stream...")
    else:
        # Read and concatenate all CSV files into a single DataFrame
        df_list = [pd.read_csv(file, names=['message', 'prediction_label']) for file in csv_files]
        df = pd.concat(df_list, ignore_index=True)

        # Calculate metrics
        total_comments = len(df)
        spam_count = len(df[df['prediction_label'] == 'Spam'])
        ham_count = total_comments - spam_count
        spam_percentage = (spam_count / total_comments) * 100 if total_comments > 0 else 0

        with placeholder.container():
            # Display KPIs
            kpi1, kpi2, kpi3 = st.columns(3)
            kpi1.metric(label="Total Comments Received", value=f"{total_comments}")
            kpi2.metric(label="Spam Detected", value=f"{spam_count}") # Cleaned up label
            kpi3.metric(label="Spam Rate", value=f"{spam_percentage:.2f}%")

            # Display charts
            fig_col1, fig_col2 = st.columns(2)
            with fig_col1:
                st.subheader("Spam vs. Ham Distribution")
                # Use a dictionary for simpler DataFrame creation
                chart_data = pd.DataFrame({
                    'Count': [spam_count, ham_count]
                }, index=['Spam', 'Ham'])
                st.bar_chart(chart_data)

            with fig_col2:
                st.subheader("Latest Comments")
                # Show the last 10 comments with a cleaner index
                st.dataframe(df.tail(10).reset_index(drop=True))

    time.sleep(2) # Refresh every 2 seconds

Overwriting dashboard.py


In [17]:
from pyngrok import ngrok
import subprocess
import time

# 👉 Add your authtoken here
# Get it from https://dashboard.ngrok.com/get-started/your-authtoken
ngrok.set_auth_token("33BWZMZHRETGn0nLti6YekVbGbN_84GgQLPK1pDgvfaZn5C1N")

# Terminate any existing ngrok tunnels
ngrok.kill()

# Start the streamlit app in the background using subprocess
# This is the non-blocking, reliable way to run it
process = subprocess.Popen(['streamlit', 'run', 'dashboard.py', '--server.port', '8501'])

# Give Streamlit a few seconds to start up before connecting ngrok
print("Starting Streamlit server...")
time.sleep(5) # You might need to increase this if your app is large

# Connect to the Streamlit port with ngrok
public_url = ngrok.connect(8501, name="streamlit-dashboard")
print("Streamlit server is running!")
print(f"🚀 Click this public URL to view your dashboard: {public_url}")

Starting Streamlit server...
Streamlit server is running!
🚀 Click this public URL to view your dashboard: NgrokTunnel: "https://ungleaming-jocelyn-unpositively.ngrok-free.dev" -> "http://localhost:8501"


In [18]:
# Directory to simulate incoming data
!mkdir -p stream_data
# Directory for PySpark to write its output
!mkdir -p predictions

In [19]:
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.sql.functions import col, when, udf
from pyspark.sql.types import StringType

# 1. Restart SparkSession for the streaming job
spark = SparkSession.builder.appName("SpamDetectorStreaming").getOrCreate()

# 2. Load the saved model
model = PipelineModel.load("./spam_model")

# 3. Define the schema for the incoming text data
schema = "message STRING"

# 4. Set up the read stream to watch the 'stream_data' directory
# It will process one text file at a time as it appears
stream_df = spark.readStream \
    .schema(schema) \
    .text("stream_data") \
    .withColumnRenamed("value", "message")

# 5. Use the model to make predictions on the streaming data
predictions = model.transform(stream_df)

# 6. Convert numeric predictions (0.0/1.0) back to string labels (Ham/Spam)
label_converter = udf(lambda p: "Spam" if p == 1.0 else "Ham", StringType())
final_df = predictions.withColumn("prediction_label", label_converter(col("prediction"))) \
                      .select("message", "prediction_label")

# 7. Start the write stream
# It will write the output to a CSV file in the 'predictions' directory
# This is what the dashboard reads
query = final_df.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "predictions") \
    .option("checkpointLocation", "checkpoint") \
    .start()

print("🚀 Streaming job started! Now, let's add some data...")

🚀 Streaming job started! Now, let's add some data...


In [20]:
import time
import uuid

# Sample comments to simulate
comments = [
    "URGENT! You have won a 1 week FREE membership in our prize jackpot.",
    "Hey, are we still on for the meeting tomorrow?",
    "Congratulations! You've been selected to receive a $1000 gift card.",
    "Can you please send me the report? Thanks.",
    "WINNER!! As a valued network customer you have been selected to receive a prize.",
    "lol you are so funny"
]

# This loop will drop a new file every 3 seconds
for comment in comments:
    # Use a unique filename each time
    file_name = f"stream_data/comment_{uuid.uuid4()}.txt"
    with open(file_name, 'w') as f:
        f.write(comment)
    print(f"Sent comment: '{comment}'")
    time.sleep(3)

Sent comment: 'URGENT! You have won a 1 week FREE membership in our prize jackpot.'
Sent comment: 'Hey, are we still on for the meeting tomorrow?'
Sent comment: 'Congratulations! You've been selected to receive a $1000 gift card.'
Sent comment: 'Can you please send me the report? Thanks.'
Sent comment: 'WINNER!! As a valued network customer you have been selected to receive a prize.'
Sent comment: 'lol you are so funny'
