<a href="https://colab.research.google.com/github/TWaugh12/Projects/blob/main/SpamTextClassifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import urllib.request, zipfile, io
import re
import sklearn
from sklearn.model_selection import train_test_split
from collections import Counter
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np

In [None]:
def download_and_unzip(url, extract_to='.'):
  with urllib.request.urlopen(url) as response:
    file_content = response.read()
    zip_file_like = io.BytesIO(file_content)

  with zipfile.ZipFile(zip_file_like) as zip_file:
    zip_file.extractall(extract_to)

# File containts text messages, first word is either 'spam' or 'ham' which classifies text as spam or not

download_and_unzip(url='https://archive.ics.uci.edu/static/public/228/sms+spam+collection.zip', extract_to='.')



# Load and prepare data. Lower case, no punctuation.
data = [ln.strip() for ln in open('./SMSSpamCollection')]
data = [re.sub('[^A-Za-z0-9]+', ' ', line).lower() for line in data]
data = [re.sub(' +', ' ', line) for line in data]

In [None]:
# Split data into training and testing
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)


# Create a vocabulary
words = [word for line in data for word in line.split()]
word_counts = Counter(words)
vocab = [word for word, count in word_counts.most_common(10000)]

# Add /UNK and /PAD tokens
vocab.extend(["/UNK", "/PAD"])

# Create a dictionary for word to index mapping
vocabulary = {word: idx for idx, word in enumerate(vocab)}

print("Vocabulary size:", len(vocabulary))
print("Train length:", len(train_data))
print("Test length:", len(test_data))

Vocabulary size: 8747
Train length: 4459
Test length: 1115


In [None]:
class SMSDataset(Dataset):
    def __init__(self, data, vocab):
        self.data = data
        self.vocab = vocab
        self.spam_count = 0
        self.ham_count = 0

        # Count spam and ham messages
        for line in self.data:
            label, _ = line.split(' ', 1)
            if label.lower() == 'spam':
                self.spam_count += 1
            else:
                self.ham_count += 1

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Extracting the message and its label
        line = self.data[idx]
        label, message = line.split(' ', 1)

        # Converting label to integer (1 for spam, 0 for ham)
        label = 1 if label.lower() == 'spam' else 0

        # Preprocessing the message
        processed_message = self.preprocess_message(message)

        return torch.tensor(processed_message, dtype=torch.long), torch.tensor(label, dtype=torch.long)

    def preprocess_message(self, message):
        # Splitting into words
        words = message.split()

        # Truncating if length > 30
        if len(words) > 30:
            words = words[:30]

        # Padding with '/PAD' if length < 30
        elif len(words) < 30:
            pad_length = 30 - len(words)
            words = ['/PAD'] * pad_length + words

        # Converting words to indices
        indices = [self.vocab.get(word, self.vocab['/UNK']) for word in words]

        return indices


train_dataset = SMSDataset(train_data, vocabulary)
test_dataset = SMSDataset(test_data, vocabulary)

In [None]:
class SpamRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.hidden_dim = hidden_dim

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)

        # RNN-like layers
        self.inp2state = nn.Linear(embedding_dim, hidden_dim)
        self.state2state = nn.Linear(hidden_dim, hidden_dim)
        self.state2out = nn.Linear(hidden_dim, 1)

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, std=0.01)
                nn.init.zeros_(m.bias)

    def initial_state(self, batch_size, device):
        return torch.zeros((batch_size, self.hidden_dim)).to(device)

    def forward(self, sequence):
        # Embedding
        embedded = self.embedding(sequence)

        # Average embeddings
        avg_embedding = torch.mean(embedded, dim=1)

        # Compute state
        state = self.initial_state(sequence.size(0), sequence.device)
        state = torch.tanh(self.inp2state(avg_embedding) + self.state2state(state))

        # Output
        output = torch.sigmoid(self.state2out(state))
        return output


# Define the model
vocab_size = len(vocabulary)
embedding_dim = 128
model = SpamRNN(vocab_size, embedding_dim, hidden_dim=256)
print(model)

SpamRNN(
  (embedding): Embedding(8747, 128)
  (inp2state): Linear(in_features=128, out_features=256, bias=True)
  (state2state): Linear(in_features=256, out_features=256, bias=True)
  (state2out): Linear(in_features=256, out_features=1, bias=True)
)


In [None]:
# Hyperparameters
learning_rate = 0.001
epochs = 20
batch_size = 100
optimizer = torch.optim.Adam(model.parameters(), learning_rate)
train_loader = DataLoader(train_dataset, batch_size = 32, shuffle = True)
test_loader = DataLoader(test_dataset, batch_size = 32, shuffle = True)
criterion = nn.BCEWithLogitsLoss()

In [None]:
def calculate_accuracy(loader, model):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.long(), labels.long()
            outputs = model(inputs)
            predicted = (outputs > 0.5).float()
            total += labels.size(0)
            correct += (predicted.squeeze() == labels).sum().item()
    return 100 * correct / total

# Training Loop
for epoch in range(20):
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.long(), labels.long()
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels.unsqueeze(1).float())
        loss.backward()
        optimizer.step()

    train_acc = calculate_accuracy(train_loader, model)
    test_acc = calculate_accuracy(test_loader, model)
    print(f'Epoch [{epoch+1}/20], Loss: {loss.item():.4f}, Train Acc: {train_acc:.2f}%, Test Acc: {test_acc:.2f}%')

Epoch [1/20], Loss: 0.6623, Train Acc: 96.46%, Test Acc: 96.41%
Epoch [2/20], Loss: 0.6251, Train Acc: 98.23%, Test Acc: 97.76%
Epoch [3/20], Loss: 0.6242, Train Acc: 98.79%, Test Acc: 98.12%
Epoch [4/20], Loss: 0.6595, Train Acc: 98.95%, Test Acc: 98.12%
Epoch [5/20], Loss: 0.6241, Train Acc: 99.01%, Test Acc: 98.12%
Epoch [6/20], Loss: 0.6243, Train Acc: 99.22%, Test Acc: 98.21%
Epoch [7/20], Loss: 0.6587, Train Acc: 99.37%, Test Acc: 98.83%
Epoch [8/20], Loss: 0.5896, Train Acc: 99.37%, Test Acc: 98.65%
Epoch [9/20], Loss: 0.6931, Train Acc: 99.37%, Test Acc: 98.48%
Epoch [10/20], Loss: 0.6241, Train Acc: 99.26%, Test Acc: 98.39%
Epoch [11/20], Loss: 0.6586, Train Acc: 99.39%, Test Acc: 98.39%
Epoch [12/20], Loss: 0.5896, Train Acc: 99.51%, Test Acc: 98.65%
Epoch [13/20], Loss: 0.6618, Train Acc: 99.51%, Test Acc: 98.74%
Epoch [14/20], Loss: 0.6241, Train Acc: 99.51%, Test Acc: 98.74%
Epoch [15/20], Loss: 0.6586, Train Acc: 99.51%, Test Acc: 98.74%
Epoch [16/20], Loss: 0.6584, Train

In [None]:
inverse_vocab = {idx: word for word, idx in vocabulary.items()}

def sms_from_indices(indices):
    return ' '.join([inverse_vocab.get(idx, '/UNK') for idx in indices])


# Move the model to evaluation mode
model.eval()

# Processing the first 10 messages from the test set
with torch.no_grad():
    for i, (inputs, labels) in enumerate(test_loader):
        if i >= 10:
            break
        inputs = inputs.long()
        outputs = model(inputs)
        predicted = (outputs > 0.5).squeeze().long()

        for j in range(inputs.size(0)):
            if i * batch_size + j >= 10:
                break
            sms_text = sms_from_indices(inputs[j].tolist())
            print(f"SMS: {sms_text}")
            print(f"Prediction: {'Spam' if predicted[j].item() == 1 else 'Ham'}")
            print(f"Ground Truth: {'Spam' if labels[j].item() == 1 else 'Ham'}")
            print("-----")

SMS: /PAD /PAD /PAD which is why i never wanted to tell you any of this which is why i m so short with you and on edge as of late
Prediction: Ham
Ground Truth: Ham
-----
SMS: /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD huh y lei
Prediction: Ham
Ground Truth: Ham
-----
SMS: /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD me too baby i promise to treat you well i bet you will take good care of me
Prediction: Ham
Ground Truth: Ham
-----
SMS: /PAD /PAD thanks for being there for me just to talk to on saturday you are very dear to me i cherish having you as a brother and role model
Prediction: Ham
Ground Truth: Ham
-----
SMS: /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD /PAD yo im right by yo work
Prediction: Ham
Ground Truth: Ham
-----
SMS: warner village 83118 c colin farrell in swat this wkend warner village get 1 fre