In [176]:
import re
import random
import numpy as np
import pandas as pd
from nltk.corpus import stopwords

import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data import random_split
from torch.nn.utils.rnn import pad_sequence

## Data Preprocessing

In [177]:
dataset = pd.read_csv('./dataset/SMSSpamCollection', sep='\t', header=None, names=['label', 'text'])
dataset.head(10)

Unnamed: 0,label,text
0,ham,"Go until jurong point, crazy.. Available only ..."
1,ham,Ok lar... Joking wif u oni...
2,spam,Free entry in 2 a wkly comp to win FA Cup fina...
3,ham,U dun say so early hor... U c already then say...
4,ham,"Nah I don't think he goes to usf, he lives aro..."
5,spam,FreeMsg Hey there darling it's been 3 week's n...
6,ham,Even my brother is not like to speak with me. ...
7,ham,As per your request 'Melle Melle (Oru Minnamin...
8,spam,WINNER!! As a valued network customer you have...
9,spam,Had your mobile 11 months or more? U R entitle...


In [178]:
stopwords = stopwords.words('english')
def clean_text(text):
    text = text.lower()
    text = re.sub(r'[^a-z\s]', '', text)
    text = ' '.join([word for word in text.split() if word not in stopwords])
    return text
dataset.text = dataset.text.apply(clean_text)
dataset.head(10)

Unnamed: 0,label,text
0,ham,go jurong point crazy available bugis n great ...
1,ham,ok lar joking wif u oni
2,spam,free entry wkly comp win fa cup final tkts st ...
3,ham,u dun say early hor u c already say
4,ham,nah dont think goes usf lives around though
5,spam,freemsg hey darling weeks word back id like fu...
6,ham,even brother like speak treat like aids patent
7,ham,per request melle melle oru minnaminunginte nu...
8,spam,winner valued network customer selected receiv...
9,spam,mobile months u r entitled update latest colou...


In [179]:
def tokenize(text, word_to_idx):
    tokens = []
    for word in text.split():
        tokens.append(word_to_idx[word])
    return tokens
words = set((' '.join(dataset.text)).split())
word_to_idx = {word: i for i, word in enumerate(words, 1)}
tokens = dataset.text.apply(lambda x: tokenize(x, word_to_idx))
tokens.head(10)


0    [5553, 6702, 7793, 1354, 3642, 1962, 5749, 267...
1                 [6123, 3202, 7377, 3030, 8240, 5360]
2    [4667, 8443, 2951, 4007, 6450, 3386, 6698, 584...
3    [8240, 8092, 828, 845, 4184, 8240, 2445, 127, ...
4      [1791, 2060, 5441, 2305, 149, 3981, 7079, 3867]
5    [4477, 5181, 1972, 829, 2668, 10, 4496, 134, 6...
6       [4143, 6546, 134, 3990, 1642, 134, 3878, 6495]
7    [4122, 3178, 7730, 7730, 2483, 2990, 4752, 498...
8    [2639, 4025, 4495, 4185, 1804, 7465, 5822, 216...
9    [4434, 2104, 8240, 275, 6558, 2968, 6038, 8002...
Name: text, dtype: object

In [180]:
def pad_and_truncate(messages, max_length=30):
    features = np.zeros((len(messages), max_length), dtype=int)
    for i, text in enumerate(messages):
        if len(text):
            features[i, -len(text):] = text[:max_length]
    return features
inputs = pad_and_truncate(tokens)
print(inputs)

[[   0    0    0 ... 4597 8031 7722]
 [   0    0    0 ... 3030 8240 5360]
 [   0    0    0 ... 7197 1656 7821]
 ...
 [   0    0    0 ... 2740 1531 1323]
 [   0    0    0 ... 4443 6783 4667]
 [   0    0    0 ... 1121 6931   56]]


In [181]:
labels = np.array((dataset.label == 'spam').astype(int))
print(labels)

[0 0 1 ... 0 0 0]


## Model

In [182]:
class SMSClassifier(nn.Module):
    def __init__(self, vocab_size, output_size=1, embedding_dim=50, hidden_dim=10, dropout=0.2):
        super(SMSClassifier, self).__init__()

        self.hidden_dim = hidden_dim

        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # GRU layer
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True)
        
        # Dropout layer
        self.dropout = nn.Dropout(p=dropout)
        
        # Fully-connected layer
        self.fc = nn.Linear(hidden_dim, output_size)
        
        # Sigmoid layer
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, h):
        # Apply embedding
        x = self.embedding(x)

        # Passing through the GRU
        out, h = self.gru(x, h)

        # Taking the output of the last time step
        out = out[:, -1, :]
        
        # Dropout and fully-connected layers
        out = self.dropout(out)
        sig_out = self.sigmoid(self.fc(out))

        return sig_out, h


## Training

In [183]:
# Hyperparameters
learning_rate = 0.0002
epochs = 10
batch_size = 4
vocab_size = int(inputs.max()) + 1

inputs= torch.tensor(inputs)
labels = torch.tensor(labels)

# Convert data to TensorDataset and then DataLoader for batching
dataset = TensorDataset(inputs, labels)
# data_loader = DataLoader(dataset, shuffle=True, batch_size=batch_size)

# Given the size of the dataset, calculate lengths for each split
total_size = len(dataset)
train_size = int(0.8 * total_size)
valid_size = (total_size - train_size) // 2
test_size = total_size - train_size - valid_size

# Use random_split to split the dataset
train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size])

def collate_fn(batch):
    # Assuming each element in batch is a tuple (sequence, label)
    sequences = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    
    # Pad sequences
    sequences_padded = pad_sequence(sequences, batch_first=True)

    # If this batch is smaller than the desired batch size, pad it with random sequences
    while len(sequences_padded) < batch_size:
        # Randomly select a sequence and its label from the dataset
        random_seq, random_label = random.choice(train_dataset)
        sequences_padded = torch.cat([sequences_padded, random_seq.unsqueeze(0)], dim=0)
        labels.append(random_label)
    
    return sequences_padded, torch.tensor(labels)

# Create separate data loaders for train, validation, and test sets
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, shuffle=False, batch_size=batch_size, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size, collate_fn=collate_fn)

# Initialize the model, criterion, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SMSClassifier(vocab_size=vocab_size, embedding_dim=50, hidden_dim=10).to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    # Training Phase
    model.train()
    total_train_loss = 0
    correct_train_predictions = 0
    
    for batch_inputs, batch_labels in train_loader:
        # Move data to the device
        batch_inputs = batch_inputs.to(device)
        batch_labels = batch_labels.to(device)
        
        # Reset the hidden state or initialize it
        h = torch.zeros(1, batch_size, model.hidden_dim).to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        output, _ = model(batch_inputs, h)
        loss = criterion(output.squeeze(), batch_labels.float())
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        total_train_loss += loss.item()
        
        # Get the predictions
        predictions = torch.round(output.squeeze())  # Round to get 0 or 1
        correct_train_predictions += torch.sum(predictions == batch_labels.float()).item()
    
    avg_train_loss = total_train_loss / len(train_loader)
    train_accuracy = correct_train_predictions / len(train_dataset) * 100
    
    # Validation Phase
    model.eval()
    total_valid_loss = 0
    correct_valid_predictions = 0
    with torch.no_grad():
        for batch_inputs, batch_labels in valid_loader:
            batch_inputs = batch_inputs.to(device)
            batch_labels = batch_labels.to(device)
            
            h = torch.zeros(1, batch_size, model.hidden_dim).to(device)
            
            output, _ = model(batch_inputs, h)
            loss = criterion(output.squeeze(), batch_labels.float())
            
            total_valid_loss += loss.item()
            
            predictions = torch.round(output.squeeze())
            correct_valid_predictions += torch.sum(predictions == batch_labels.float()).item()
            
    avg_valid_loss = total_valid_loss / len(valid_loader)
    valid_accuracy = correct_valid_predictions / len(valid_dataset) * 100
    
    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Valid Loss: {avg_valid_loss:.4f}, Valid Accuracy: {valid_accuracy:.4f}")

Epoch 1/10, Train Loss: 0.5747, Train Accuracy: 68.5663, Valid Loss: 0.3733, Valid Accuracy: 87.4327
Epoch 2/10, Train Loss: 0.3206, Train Accuracy: 87.1887, Valid Loss: 0.2711, Valid Accuracy: 89.4075
Epoch 3/10, Train Loss: 0.2349, Train Accuracy: 90.7561, Valid Loss: 0.2000, Valid Accuracy: 93.1777
Epoch 4/10, Train Loss: 0.1733, Train Accuracy: 94.2787, Valid Loss: 0.1503, Valid Accuracy: 96.7684
Epoch 5/10, Train Loss: 0.1312, Train Accuracy: 96.3653, Valid Loss: 0.1206, Valid Accuracy: 97.1275
Epoch 6/10, Train Loss: 0.1044, Train Accuracy: 97.1730, Valid Loss: 0.0992, Valid Accuracy: 98.2047
Epoch 7/10, Train Loss: 0.0843, Train Accuracy: 97.8236, Valid Loss: 0.0864, Valid Accuracy: 98.5637
Epoch 8/10, Train Loss: 0.0711, Train Accuracy: 98.1602, Valid Loss: 0.0770, Valid Accuracy: 98.5637
Epoch 9/10, Train Loss: 0.0570, Train Accuracy: 98.6314, Valid Loss: 0.0696, Valid Accuracy: 98.7433
Epoch 10/10, Train Loss: 0.0485, Train Accuracy: 98.9904, Valid Loss: 0.0647, Valid Accurac

In [184]:
# Evaluate on the test set
model.eval()
total_test_loss = 0
correct_test_predictions = 0
with torch.no_grad():
    for batch_inputs, batch_labels in test_loader:
        batch_inputs = batch_inputs.to(device)
        batch_labels = batch_labels.to(device)
        
        h = torch.zeros(1, batch_size, model.hidden_dim).to(device)
        
        output, _ = model(batch_inputs, h)
        loss = criterion(output.squeeze(), batch_labels.float())
        
        total_test_loss += loss.item()
        
        predictions = torch.round(output.squeeze())
        correct_test_predictions += torch.sum(predictions == batch_labels.float()).item()

avg_test_loss = total_test_loss / len(test_loader)
test_accuracy = correct_test_predictions / len(test_dataset) * 100

print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


Test Loss: 0.1043, Test Accuracy: 97.3118
