# RNNs for Text Classification

We will use a RNN based model to perform classification of SMS messages into Spam or not Spam. The notebook has split the entire process into several parts for your convienience. To appreciate preprocessing in music, it is important to understand preprocessing in other domains. You may need to read up upon tokenization, embeddings (Glove) and Dataloaders, as well as how to pipeline an end - to - end AI model

**Resources:** \
https://www.geeksforgeeks.org/pre-trained-word-embedding-using-glove-in-nlp-models/

We do not expect you to finish the code entirely, take help whenever required, but understand the code you have written, do not blindly copy code. In case of help required at any time, feel free to contact the project leads.



In [None]:
# Imports
from IPython.display import clear_output
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import spacy
import re
import string
from collections import Counter
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
import tqdm

import warnings
warnings.filterwarnings('ignore')

In [None]:
# Downloading the Spam SMS Dataset
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip

!unzip /content/smsspamcollection.zip
!rm /content/readme
!rm !rm /content/smsspamcollection.zip

clear_output()

In [None]:
# Downloading the GloVe embeddings database

!wget https://nlp.stanford.edu/data/glove.6B.zip

!unzip /content/glove.6B.zip

!rm -rf /content/glove.6B.zip
!rm /content/glove.6B.100d.txt
!rm /content/glove.6B.200d.txt
!rm /content/glove.6B.300d.txt

clear_output()

In [None]:
text = []
label = []

""" read each line of the text file and create a Pandas Data Frame
        label spam messages as 1 and legit messages as 0
"""

with open("/content/SMSSpamCollection") as f:
  for line in f:
    parts = line.split('\t')
    text.append(parts[1].strip())

    if parts[0]=='spam':
      label.append(1)
    else:
      label.append(0)


In [None]:
# Creating a Pandas Dataframe
sms = pd.DataFrame(zip(text, label), columns = ["Text", "Label"])

sms['Text_Length'] = sms['Text'].apply(len)
print(sms.head())

                                                Text  Label  Text_Length
0  Go until jurong point, crazy.. Available only ...      0          111
1                      Ok lar... Joking wif u oni...      0           29
2  Free entry in 2 a wkly comp to win FA Cup fina...      1          155
3  U dun say so early hor... U c already then say...      0           49
4  Nah I don't think he goes to usf, he lives aro...      0           61


In [39]:
spacy_tokenizer = spacy.load('en_core_web_sm')

def tokenize (text):

    """remove any non-ascii characters
       remove punctuations
       tokenize the text
       return the tokenized text
    """

    #remove any non-ascii characters and tokenize
    text = ''.join([char for char in text if ord(char) < 128])
    doc = spacy_tokenizer(text)
    #remove punctuations after iterating over the tokens
    tokens = [token.text for token in doc if not token.is_punct]
    return tokens



In [None]:
# Tokenize the text sms in the Pandas Dataframe
sms["Tokenized_Text"] = sms['Text'].apply(tokenize)

In [None]:
def load_GloVe_embeddings(glove_file):

    """
        load the GloVe embeddings from the files downloaded
        create a dictionary of the form {word : word embedding}
    """

    embeddings_dict = {}

    with open(glove_file, 'r', encoding='utf-8') as f:
      for line in f:
        parts = line.split()
        word = parts[0]
        vector = np.array(parts[1:], dtype= np.float32)
        embeddings_dict[word] = vector

    return embeddings_dict

glove_file = '/content/glove.6B.50d.txt'  # Adjust the path as needed

glove_embeddings = load_GloVe_embeddings(glove_file)


In [None]:
def embed_text(tokenized_text, word_embeddings, max_text_length=20, embedding_size = 50):
    """
        given a sequence of tokens convert them to their word embeddings
    """
    embedding_matrix = np.zeros((max_text_length, embedding_size), dtype = np.float32)

    for i, token in enumerate(tokenized_text[:max_text_length]):
      if token in word_embeddings:
        embedding_matrix[i] = word_embeddings[token]
      else:
        embedding_matrix[i] = np.zeros(embedding_size, dtype = np.float32)

    return embedding_matrix

In [None]:
sms["Embedded_Text"] = sms["Tokenized_Text"].apply(lambda tokens: embed_text(tokens, glove_embeddings, max_text_length=20, embedding_size=50))


In [None]:
"""Complete the below code for the Dataloader class"""
class load_dataset(Dataset):
    def __init__(self, X, Y):
        """
            X: the embeddings of the sentence
            Y: ground truth of the sentence (0- positive, 1- negative)
        """
        self.X = X
        self.y = Y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [None]:
class RNN(nn.Module):
    def __init__(self, embedding_size, hidden_size, num_layers, num_classes):
        """Define your layers, activation functions here"""
        super(RNN, self).__init__()
        self.rnn = nn.RNN(input_size = embedding_size, hidden_size=hidden_size, num_layers=num_layers, batch_first = True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        """Perform a forward pass"""
        out, _ = self.rnn(x)
        out = self.fc(out[:,-1,:])
        return out

In [None]:
def train_model(num_epochs, train_loader, model, criterion, optimizer, device):

    """
    Write a trainer loop for the model. It must follow the below pattern
    1. Pass the input to the model and perform forward propagation
    2. Obtain losses
    3. Backpropagate to find the gradients

    Make sure to check the accuracy of the model at regular intervals
    """

    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:

            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%')


In [None]:
"""
1. Write code to split your available data into training and testing splits
2. Define the model
3. Set up hyper-parameters such as learning rate, number of epochs, batch size
4. Train the model by using the function you defined above
5. Check the model accuracy by running the model on the testing split
6. Save the model as a .pth file
"""

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

X_train, X_test, y_train, y_test = train_test_split(
    np.array(sms["Embedded_Text"].tolist()),
    sms["Label"].values,
    test_size=0.2,
    random_state=42
)

train_dataset = load_dataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
test_dataset = load_dataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.long))
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=64, shuffle=False)

embedding_size = 50
hidden_size = 100
num_layers = 2
num_classes = 2

model = RNN(embedding_size, hidden_size, num_layers, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
train_model(num_epochs, train_loader, model, criterion, optimizer,device)

torch.save(model.state_dict(), 'rnn_model.pth')


Epoch [1/10], Loss: 0.2806, Accuracy: 89.03%
Epoch [2/10], Loss: 0.1814, Accuracy: 93.07%
Epoch [3/10], Loss: 0.1691, Accuracy: 93.94%
Epoch [4/10], Loss: 0.1559, Accuracy: 94.39%
Epoch [5/10], Loss: 0.1600, Accuracy: 94.28%
Epoch [6/10], Loss: 0.1557, Accuracy: 94.37%
Epoch [7/10], Loss: 0.1468, Accuracy: 95.07%
Epoch [8/10], Loss: 0.1292, Accuracy: 95.34%
Epoch [9/10], Loss: 0.1234, Accuracy: 95.58%
Epoch [10/10], Loss: 0.1056, Accuracy: 96.59%


In [40]:
def preprocess_sentence(sentence, spacy_tokenizer, word_embeddings, max_text_length=20):
    # Tokenize the text
    tokens = tokenize(sentence)

    # Embed the tokens
    embedded_text = embed_text(tokens, word_embeddings, max_text_length=max_text_length)

    return torch.tensor(embedded_text, dtype=torch.float32).unsqueeze(0)  # Add batch dimension

def predict(sentence, model, spacy_tokenizer, word_embeddings, device):
    model.eval()
    with torch.no_grad():
        # Preprocess and embed the input sentence
        inputs = preprocess_sentence(sentence, spacy_tokenizer, word_embeddings)
        inputs = inputs.to(device)

        # Perform the prediction
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)

        return predicted.item()

# Example usage
random_sentence = "Congratulations! Your credit score entitles you to a no-interest Visa credit card. Click here to claim"
prediction = predict(random_sentence, model, spacy_tokenizer, glove_embeddings, device)

print(f"Prediction: {'Spam' if prediction == 1 else 'Ham'}")


Prediction: Spam
