In [None]:
import os, math, re, warnings
from collections import Counter

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from sklearn.metrics import classification_report
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.sparse import hstack, csr_matrix

warnings.filterwarnings("ignore")

# ============================================================
# Global Setup
# ============================================================
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print("Using device:", device)

# ============================================================
# Dataset Loading + Label Normalization
# ============================================================
HF_DATASET = "mshenoda/spam-messages"

def load_data():
    print(f"Loading dataset: {HF_DATASET}")
    ds = load_dataset(HF_DATASET)

    cols = ds["train"].column_names
    if "text" in cols: text_col = "text"
    elif "message" in cols: text_col = "message"
    else:
        text_col = next((c for c in cols if ds["train"].features[c].dtype == "string"), cols[0])

    print("Using text column:", text_col)

    label_map = {"ham": 0, "spam": 1, "0": 0, "1": 1}

    def normalize(example):
        lab = example.get("label")
        if isinstance(lab, (int, float)):
            example["label"] = int(lab)
        else:
            example["label"] = label_map.get(str(lab).lower(), 0)
        return example

    ds = ds.map(normalize)

    if "validation" in ds:
        train, val, test = ds["train"], ds["validation"], ds["test"]
    else:
        split = ds["train"].train_test_split(test_size=0.2, seed=SEED)
        train, val, test = split["train"], split["test"], split["test"]

    return (
        [x[text_col] for x in train], [int(x["label"]) for x in train],
        [x[text_col] for x in val],   [int(x["label"]) for x in val],
        [x[text_col] for x in test],  [int(x["label"]) for x in test]
    )

train_texts, train_labels, val_texts, val_labels, test_texts, test_labels = load_data()

# ============================================================
# Tokenizer / Utilities
# ============================================================
def tokenize(text):
    return re.findall(r"\w+|[!?.]", str(text).lower())

# ============================================================
# Model 1: Naive Bayes
# ============================================================
def train_naive_bayes(texts, labels):
    vocab = set()
    for t in texts:
        vocab.update(tokenize(t))
    vocab = sorted(vocab)

    wc_spam = Counter()
    wc_ham = Counter()

    spam_docs = sum(1 for l in labels if l == 1)
    ham_docs = len(labels) - spam_docs
    total_docs = len(labels)

    for txt, lab in zip(texts, labels):
        toks = tokenize(txt)
        (wc_spam if lab == 1 else wc_ham).update(toks)

    P_spam = spam_docs / total_docs
    P_ham = ham_docs / total_docs

    alpha = 1
    V = len(vocab)

    total_spam = sum(wc_spam.values()) + alpha * V
    total_ham  = sum(wc_ham.values()) + alpha * V

    log_spam = {w: math.log((wc_spam[w] + alpha) / total_spam) for w in vocab}
    log_ham  = {w: math.log((wc_ham[w]  + alpha) / total_ham) for w in vocab}

    unk_spam = math.log(alpha / total_spam)
    unk_ham = math.log(alpha / total_ham)

    def predict(text):
        toks = tokenize(text)
        s_spam = math.log(P_spam + 1e-12)
        s_ham  = math.log(P_ham + 1e-12)
        for t in toks:
            s_spam += log_spam.get(t, unk_spam)
            s_ham  += log_ham.get(t,  unk_ham)
        return 1 if s_spam > s_ham else 0

    return predict

print("\n=== Model 1: Naive Bayes ===")
nb_model = train_naive_bayes(train_texts, train_labels)
nb_preds = [nb_model(t) for t in test_texts]
print(classification_report(test_labels, nb_preds, digits=4))

# ============================================================
# Model 2: Logistic Regression
# ============================================================
def extract_features(txt):
    t = str(txt)
    return {
        "len": len(t),
        "exclaim": t.count("!"),
        "digits": sum(c.isdigit() for c in t),
        "upper": sum(c.isupper() for c in t),
        "urls": len(re.findall(r"http|www|\\.com", t.lower())),
        "spam_kw": sum(k in t.lower() for k in ["free", "win", "cash", "urgent", "click"])
    }

print("\n=== Model 2: Logistic Regression ===")

df_train = pd.DataFrame([extract_features(t) for t in train_texts])
df_test  = pd.DataFrame([extract_features(t) for t in test_texts])

tfidf = TfidfVectorizer(ngram_range=(1,2), min_df=2)
X_train_tfidf = tfidf.fit_transform(train_texts)
X_test_tfidf  = tfidf.transform(test_texts)

X_train = hstack([X_train_tfidf, csr_matrix(df_train.values)])
X_test  = hstack([X_test_tfidf,  csr_matrix(df_test.values)])

lr = LogisticRegression(max_iter=2000, solver="saga", n_jobs=-1)
lr.fit(X_train, train_labels)
lr_preds = lr.predict(X_test)

print(classification_report(test_labels, lr_preds, digits=4))

# ============================================================
# Shared Embedding Vocab (CNN & LSTM)
# ============================================================
print("\nPreparing vocab for neural models...")

vocab = {"<PAD>":0, "<UNK>":1}
for t in train_texts:
    for tok in tokenize(t):
        if tok not in vocab:
            vocab[tok] = len(vocab)

MAX_LEN = 40

def encode(text):
    toks = tokenize(text)
    ids = [vocab.get(t, 1) for t in toks[:MAX_LEN]]
    return ids + [0]*(MAX_LEN - len(ids))

class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.X = [encode(t) for t in texts]
        self.y = labels
    def __len__(self): return len(self.X)
    def __getitem__(self, idx):
        return torch.tensor(self.X[idx]), torch.tensor(self.y[idx])

train_dl = DataLoader(TextDataset(train_texts, train_labels), batch_size=32, shuffle=True)
test_dl  = DataLoader(TextDataset(test_texts, test_labels),  batch_size=32)

# ============================================================
# Model 3: CNN
# ============================================================
print("\n=== Model 3: CNN ===")

class TextCNN(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, 100, padding_idx=0)
        self.convs = nn.ModuleList([
            nn.Conv1d(100, 100, 3),
            nn.Conv1d(100, 100, 4),
            nn.Conv1d(100, 100, 5)
        ])
        self.fc = nn.Linear(300, 2)

    def forward(self, x):
        x = self.embed(x).transpose(1,2)
        pools = [torch.relu(c(x)).max(2)[0] for c in self.convs]
        return self.fc(torch.cat(pools, 1))

cnn = TextCNN(len(vocab)).to(device)
opt = optim.Adam(cnn.parameters(), lr=1e-3)
crit = nn.CrossEntropyLoss()

for _ in range(2):
    cnn.train()
    for X, y in train_dl:
        X, y = X.to(device), y.to(device)
        opt.zero_grad()
        loss = crit(cnn(X), y)
        loss.backward()
        opt.step()

cnn.eval()
cnn_preds = []
with torch.no_grad():
    for X, y in test_dl:
        preds = cnn(X.to(device)).argmax(1).cpu().tolist()
        cnn_preds += preds

print(classification_report(test_labels, cnn_preds, digits=4))

# ============================================================
# Model 4: BiLSTM
# ============================================================
print("\n=== Model 4: BiLSTM ===")

class BiLSTM(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, 100)
        self.lstm = nn.LSTM(100, 128, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(256, 2)

    def forward(self, x):
        x = self.embed(x)
        _, (h, _) = self.lstm(x)
        h_final = torch.cat([h[-2], h[-1]], 1)
        return self.fc(h_final)

lstm = BiLSTM(len(vocab)).to(device)
opt_l = optim.Adam(lstm.parameters(), lr=1e-3)

for _ in range(2):
    lstm.train()
    for X, y in train_dl:
        X, y = X.to(device), y.to(device)
        opt_l.zero_grad()
        loss = crit(lstm(X), y)
        loss.backward()
        opt_l.step()

lstm.eval()
lstm_preds = []
with torch.no_grad():
    for X, y in test_dl:
        lstm_preds += lstm(X.to(device)).argmax(1).cpu().tolist()

print(classification_report(test_labels, lstm_preds, digits=4))



Using device: cpu
Loading dataset: mshenoda/spam-messages
Using text column: text


Map: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 47392/47392 [00:00<00:00, 71031.24 examples/s]
Map: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 5923/5923 [00:00<00:00, 71800.60 examples/s]
Map: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 5926/5926 [00:00<00:00, 68002.47 examples/s]



=== Model 1: Naive Bayes ===
              precision    recall  f1-score   support

           0     0.9749    0.8954    0.9334      3595
           1     0.8567    0.9644    0.9074      2331

    accuracy                         0.9225      5926
   macro avg     0.9158    0.9299    0.9204      5926
weighted avg     0.9284    0.9225    0.9232      5926


=== Model 2: Logistic Regression ===
              precision    recall  f1-score   support

           0     0.5939    0.5752    0.5844      3595
           1     0.3752    0.3934    0.3841      2331

    accuracy                         0.5037      5926
   macro avg     0.4846    0.4843    0.4843      5926
weighted avg     0.5079    0.5037    0.5056      5926


Preparing vocab for neural models...

=== Model 3: CNN ===
              precision    recall  f1-score   support

           0     0.9640    0.9744    0.9692      3595
           1     0.9599    0.9438    0.9518      2331

    accuracy                         0.9624      5926


In [1]:
%%writefile app.py
print("Hello from Streamlit!")
import streamlit as st
import numpy as np
import math
import re
import torch
import torch.nn as nn
import torch.optim as optim
from collections import Counter
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.sparse import hstack, csr_matrix
from datasets import load_dataset

# ==========================================================
#                  BASIC UTILITIES
# ==========================================================
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def tokenize(text):
    return re.findall(r"\w+|[!?.]", str(text).lower())

def softmax(logits):
    a = np.array(logits)
    e = np.exp(a - np.max(a))
    return e / e.sum()

# ==========================================================
#                  LOAD DATASET
# ==========================================================
@st.cache_resource
def load_data():
    ds = load_dataset("mshenoda/spam-messages")
    cols = ds["train"].column_names

    text_col = "text" if "text" in cols else cols[0]
    label_map = {"ham": 0, "spam": 1, "0": 0, "1": 1}

    def normalize(example):
        lab = example.get("label")
        if isinstance(lab, (int, float)):
            example["label"] = int(lab)
        else:
            example["label"] = label_map.get(str(lab).lower(), 0)
        return example

    ds = ds.map(normalize)

    split = ds["train"].train_test_split(test_size=0.2, seed=SEED)
    train, test = split["train"], split["test"]

    return (
        [x[text_col] for x in train],
        [int(x["label"]) for x in train],
        [x[text_col] for x in test],
        [int(x["label"]) for x in test]
    )

# ==========================================================
#                 NAIVE BAYES IMPLEMENTATION
# ==========================================================
class NaiveBayes:
    def __init__(self):
        self.vocab = None
        self.log_spam = {}
        self.log_ham = {}
        self.P_spam = 0.5
        self.P_ham = 0.5
        self.unk_spam = 0.0
        self.unk_ham = 0.0

    def fit(self, texts, labels, alpha=1):
        vocab = set()
        for t in texts: vocab.update(tokenize(t))
        vocab = sorted(vocab)
        wc_spam, wc_ham = Counter(), Counter()

        spam_docs = sum(1 for l in labels if l == 1)
        ham_docs  = len(labels) - spam_docs
        total_docs = len(labels)

        for txt, lab in zip(texts, labels):
            toks = tokenize(txt)
            (wc_spam if lab == 1 else wc_ham).update(toks)

        self.P_spam = spam_docs / total_docs
        self.P_ham  = ham_docs  / total_docs

        V = len(vocab)
        total_spam = sum(wc_spam.values()) + alpha * V
        total_ham  = sum(wc_ham.values()) + alpha * V

        self.log_spam = {w: math.log((wc_spam[w] + alpha) / total_spam) for w in vocab}
        self.log_ham  = {w: math.log((wc_ham[w] + alpha) / total_ham)  for w in vocab}

        self.unk_spam = math.log(alpha / total_spam)
        self.unk_ham  = math.log(alpha / total_ham)
        self.vocab = set(vocab)

    def predict_proba(self, text):
        toks = tokenize(text)
        s_spam = math.log(self.P_spam + 1e-12)
        s_ham  = math.log(self.P_ham  + 1e-12)

        for t in toks:
            s_spam += self.log_spam.get(t, self.unk_spam)
            s_ham  += self.log_ham.get(t,  self.unk_ham)

        probs = softmax([s_ham, s_spam])
        return probs

# ==========================================================
#                   CNN MODEL
# ==========================================================
class TextCNN(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, 100, padding_idx=0)
        self.convs = nn.ModuleList([
            nn.Conv1d(100, 100, 3),
            nn.Conv1d(100, 100, 4),
            nn.Conv1d(100, 100, 5)
        ])
        self.fc = nn.Linear(300, 2)

    def forward(self, x):
        x = self.embed(x).transpose(1,2)
        pools = [torch.relu(c(x)).max(2)[0] for c in self.convs]
        return self.fc(torch.cat(pools, 1))

# ==========================================================
#                   BiLSTM MODEL
# ==========================================================
class BiLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim=100, hidden=128):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden * 2, 2)

    def forward(self, x):
        x = self.embed(x)
        _, (h, _) = self.lstm(x)
        h = torch.cat([h[0], h[1]], dim=1)
        return self.fc(h)

# ==========================================================
#         VOCAB + ENCODING FUNCTION
# ==========================================================
@st.cache_resource
def build_vocab(texts):
    vocab = {"<PAD>": 0, "<UNK>": 1}
    for t in texts:
        for tok in tokenize(t):
            if tok not in vocab:
                vocab[tok] = len(vocab)
    return vocab

MAX_LEN = 40

def encode(text, vocab):
    toks = tokenize(text)
    ids = [vocab.get(t, 1) for t in toks[:MAX_LEN]]
    return ids + [0] * (MAX_LEN - len(ids))

# ==========================================================
#                 TRAIN ALL MODELS
# ==========================================================
@st.cache_resource
def train_all_models():
    train_texts, train_labels, _, _ = load_data()

    # ---- NB ----
    nb = NaiveBayes()
    nb.fit(train_texts, train_labels)

    # ---- LR ----
    tfidf = TfidfVectorizer(ngram_range=(1,2), min_df=2)
    X_train_tfidf = tfidf.fit_transform(train_texts)

    extra = np.array([[
        len(t), t.count("!"), sum(c.isdigit() for c in t),
        sum(c.isupper() for c in t),
        len(re.findall(r"http|www|\.com", t.lower())),
        sum(k in t.lower() for k in ["free", "win", "cash", "urgent", "click"])
    ] for t in train_texts])

    X_lr = hstack([X_train_tfidf, csr_matrix(extra)])
    lr = LogisticRegression(max_iter=2000, solver="saga", n_jobs=-1)
    lr.fit(X_lr, train_labels)

    # ---- Vocab ----
    vocab = build_vocab(train_texts)

    # ---- CNN ----
    cnn = TextCNN(len(vocab)).to(device)
    cnn_opt = optim.Adam(cnn.parameters(), lr=1e-3)
    loss_fn = nn.CrossEntropyLoss()

    encoded = [encode(t, vocab) for t in train_texts]
    ds = list(zip(encoded, train_labels))

    cnn.train()
    for epoch in range(1):
        np.random.shuffle(ds)
        for i in range(0, len(ds), 64):
            batch = ds[i:i+64]
            Xb = torch.tensor([b[0] for b in batch], dtype=torch.long).to(device)
            yb = torch.tensor([b[1] for b in batch], dtype=torch.long).to(device)

            cnn_opt.zero_grad()
            loss = loss_fn(cnn(Xb), yb)
            loss.backward()
            cnn_opt.step()
    cnn.eval()

    # ---- BiLSTM ----
    lstm = BiLSTM(len(vocab)).to(device)
    lstm_opt = optim.Adam(lstm.parameters(), lr=1e-3)

    lstm.train()
    for epoch in range(1):
        np.random.shuffle(ds)
        for i in range(0, len(ds), 64):
            batch = ds[i:i+64]
            Xb = torch.tensor([b[0] for b in batch], dtype=torch.long).to(device)
            yb = torch.tensor([b[1] for b in batch], dtype=torch.long).to(device)
            lstm_opt.zero_grad()
            loss = loss_fn(lstm(Xb), yb)
            loss.backward()
            lstm_opt.step()
    lstm.eval()

    return {
        "nb": nb,
        "lr": lr,
        "tfidf": tfidf,
        "cnn": cnn,
        "lstm": lstm,
        "vocab": vocab
    }

models = train_all_models()

# ==========================================================
#           PREDICT ACROSS ALL FOUR MODELS
# ==========================================================
def predict_all_models(text):
    results = []
    vocab = models["vocab"]

    # NB
    probs = models["nb"].predict_proba(text)
    results.append(("Naive Bayes", int(np.argmax(probs)), float(max(probs)), probs.tolist()))

    # LR
    tfidf = models["tfidf"]
    lr = models["lr"]
    feat = tfidf.transform([text])
    extra = np.array([[len(text), text.count("!"), sum(c.isdigit() for c in text), sum(c.isupper() for c in text), len(re.findall(r"http|www|\.com", text.lower())), sum(k in text.lower() for k in ["free", "win", "cash", "urgent", "click"])]])
    Xlr = hstack([feat, csr_matrix(extra)])
    probs = lr.predict_proba(Xlr)[0]
    results.append(("Logistic Regression", int(np.argmax(probs)), float(max(probs)), probs.tolist()))

    # CNN
    ids = torch.tensor([encode(text, vocab)], dtype=torch.long).to(device)
    with torch.no_grad():
        logits = models["cnn"](ids).cpu().numpy()[0]
        probs = softmax(logits)
    results.append(("CNN", int(np.argmax(probs)), float(max(probs)), probs.tolist()))

    # LSTM
    with torch.no_grad():
        logits = models["lstm"](ids).cpu().numpy()[0]
        probs = softmax(logits)
    results.append(("BiLSTM", int(np.argmax(probs)), float(max(probs)), probs.tolist()))

    return results

# ==========================================================
#                  STREAMLIT UI
# ==========================================================

st.set_page_config(page_title="AI Spam Detector", page_icon="üì©", layout="centered")

# Styling
st.markdown("""
<style>
.main-title { font-size: 42px; font-weight: 900; text-align: center; color: #4A90E2; }
.sub-text { text-align: center; font-size: 18px; color: #555; }
.result-box { padding: 20px; border-radius: 12px; background: #f7f9fc; border: 1px solid #d8e3f0; margin-bottom: 20px; }
.winner { padding: 25px; border-radius: 15px; background: #E3F7E0; border: 2px solid #9ED89E; }
</style>
""", unsafe_allow_html=True)

st.markdown('<div class="main-title">üì© AI Spam Detection Demo</div>', unsafe_allow_html=True)
st.markdown('<div class="sub-text">Test one message across all four models and pick the most confident one.</div>', unsafe_allow_html=True)

st.markdown("---")

text = st.text_area("‚úçÔ∏è Enter a message to classify:", height=160)

if st.button("üöÄ Classify Message", use_container_width=True):
    if not text.strip():
        st.warning("Please type a message first.")
    else:
        with st.spinner("Running all models..."):
            results = predict_all_models(text)

        st.subheader("üìä Model Results")

        for name, pred, conf, probs in results:
            st.markdown(f"<div class='result-box'>", unsafe_allow_html=True)
            st.markdown(f"### üîπ {name}")
            st.write(f"Prediction: **{pred}**  (0=ham, 1=spam)")
            st.write(f"Confidence: **{conf:.4f}**")
            st.write(f"Probabilities ‚Üí Ham: {probs[0]:.4f} | Spam: {probs[1]:.4f}")
            st.markdown("</div>", unsafe_allow_html=True)

        best = max(results, key=lambda x: x[2])

        st.markdown("<div class='winner'>", unsafe_allow_html=True)
        st.markdown(f"## üèÜ Best Model: **{best[0]}**")
        st.write(f"Final Prediction: **{best[1]}**")
        st.write(f"Highest Confidence: **{best[2]:.4f}**")
        st.markdown("</div>", unsafe_allow_html=True)


Writing app.py
