In [3]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import pandas as pd
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.utils import get_tokenizer
import numpy as np

# ----------------------------
# 1. Data Loading & Preprocessing
# ----------------------------
# Load SMS Spam data
df = pd.read_csv('spam.csv', encoding='latin-1')[['v1', 'v2']]
df.columns = ['label', 'text']

# Binary labels (0=ham, 1=spam)
df['label'] = df['label'].map({'ham': 0, 'spam': 1})

# Split data
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# Tokenizer and vocabulary
tokenizer = get_tokenizer('basic_english')
vocab = build_vocab_from_iterator(
    (tokenizer(text) for text in train_df['text']),
    specials=['<unk>', '<pad>']
)
vocab.set_default_index(vocab['<unk>'])

# Numericalize texts
def text_to_tensor(text):
    return torch.tensor([vocab[token] for token in tokenizer(text)], dtype=torch.long)

# ----------------------------
# 2. Dataset Class (Parallel to ImageFolder)
# ----------------------------
class SpamDataset(Dataset):
    def __init__(self, dataframe, max_len=50):
        self.data = dataframe
        self.max_len = max_len  # Fixed sequence length

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        text = self.data.iloc[idx]['text']
        label = self.data.iloc[idx]['label']
        
        # Convert text to tensor of indices
        text_tensor = text_to_tensor(text)
        
        # Pad/trim to fixed length
        if len(text_tensor) > self.max_len:
            text_tensor = text_tensor[:self.max_len]
        else:
            padding = torch.zeros(self.max_len - len(text_tensor), dtype=torch.long)
            text_tensor = torch.cat([text_tensor, padding])
        
        return text_tensor, torch.tensor(label, dtype=torch.long)

# Create datasets
train_dataset = SpamDataset(train_df)
test_dataset = SpamDataset(test_df)

# DataLoaders (batch_size=32 like your CNN)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# ----------------------------
# 3. RNN Model (Equivalent to InsectCNN)
# ----------------------------
class SpamRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim=100, hidden_dim=64, num_classes=2):
        super(SpamRNN, self).__init__()
        
        # Embedding layer (like CNN's initial conv layer)
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=vocab['<pad>'])
        
        # LSTM layer (replaces Conv2d)
        self.rnn = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=2,
            batch_first=True,
            bidirectional=False
        )
        
        # Fully connected layers (same as CNN)
        self.fc1 = nn.Linear(hidden_dim, 32)
        self.fc2 = nn.Linear(32, num_classes)
        
        # Dropout and activation
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()

    def forward(self, x):
        # Embed the input (shape: [batch, seq_len] → [batch, seq_len, embed_dim])
        x = self.embedding(x)
        
        # RNN processing
        out, _ = self.rnn(x)  # out shape: [batch, seq_len, hidden_dim]
        
        # Use the last time step's output
        out = out[:, -1, :]
        
        # Fully connected layers (like CNN)
        out = self.relu(self.fc1(out))
        out = self.dropout(out)
        out = self.fc2(out)
        
        return out

# Initialize model
vocab_size = len(vocab)
model = SpamRNN(vocab_size=vocab_size).to(device)

# Loss and optimizer (same as CNN)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# ----------------------------
# 4. Training Loop (Identical to CNN)
# ----------------------------
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for texts, labels in train_loader:
        texts, labels = texts.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(texts)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

# ----------------------------
# 5. Evaluation (Same as CNN)
# ----------------------------
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, preds = torch.max(outputs, 1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Confusion matrix and classification report
print(confusion_matrix(all_labels, all_preds))
print(classification_report(all_labels, all_preds, target_names=['ham', 'spam']))

OSError: [WinError 127] The specified procedure could not be found