In [None]:
# Google Colab: Environment Setup
!pip install -q transformers==4.48.0 scikit-learn pandas numpy matplotlib mlflow beautifulsoup4 shap lime
!pip install -q torch --index-url https://download.pytorch.org/whl/cu126

In [None]:
from google.colab import drive
import os
import random
import numpy as np
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
from utils.preprocessor import preprocess_text, load_glove_embeddings
from models.cnn import SpamCNN
from models.bilstm import BiLSTMSpam
from models.bert import SpamBERT
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim

In [None]:
# Mount Google Drive for saving models
drive.mount('/content/drive')
ROOT_PATH = '/content/drive/MyDrive/Projects/spam_detection2/'
MODEL_SAVE_PATH = os.path.join(ROOT_PATH, 'models')
os.makedirs(MODEL_SAVE_PATH, exist_ok=True)

In [None]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
set_seed(42)

In [None]:
# Load preprocessed data (assumes CSVs are available in data/processed/)
train_df = pd.read_csv('data/processed/train.csv')
test_df = pd.read_csv('data/processed/test.csv')

In [None]:
# Build vocabulary from training data
def build_vocab(texts, min_freq=2):
    from collections import Counter
    counter = Counter()
    for text in texts:
        counter.update(text.split())
    vocab = {word for word, freq in counter.items() if freq >= min_freq}
    word2idx = {word: idx+2 for idx, word in enumerate(sorted(vocab))}
    word2idx['<PAD>'] = 0
    word2idx['<UNK>'] = 1
    return word2idx

word2idx = build_vocab(train_df['text'])

In [None]:
# Tokenize and numericalize
max_len = 200
def encode(text, word2idx, max_len=200):
    tokens = text.split()
    idxs = [word2idx.get(token, word2idx['<UNK>']) for token in tokens]
    if len(idxs) < max_len:
        idxs += [word2idx['<PAD>']] * (max_len - len(idxs))
    else:
        idxs = idxs[:max_len]
    return idxs

In [None]:
X_train = torch.tensor([encode(t, word2idx, max_len) for t in train_df['text']])
y_train = torch.tensor(train_df['label'].values, dtype=torch.float32)
X_test = torch.tensor([encode(t, word2idx, max_len) for t in test_df['text']])
y_test = torch.tensor(test_df['label'].values, dtype=torch.float32)

In [None]:
# Load GloVe embeddings
GLOVE_PATH = os.path.join(ROOT_PATH, 'data/raw/glove.6B/glove.6B.300d.txt')
embedding_dim = 300
pretrained_embeddings = load_glove_embeddings(GLOVE_PATH, word2idx, embedding_dim)

In [None]:
# Choose model: 'cnn', 'bilstm', or 'bert'
model_type = 'cnn'  # Change to 'bilstm' or 'bert' as needed

if model_type == 'cnn':
    model = SpamCNN(vocab_size=len(word2idx), embedding_dim=embedding_dim, pretrained_embeddings=pretrained_embeddings)
    train_inputs, train_labels = X_train, y_train
    test_inputs, test_labels = X_test, y_test
elif model_type == 'bilstm':
    model = BiLSTMSpam(vocab_size=len(word2idx), embedding_dim=embedding_dim, pretrained_embeddings=pretrained_embeddings)
    train_inputs, train_labels = X_train, y_train
    test_inputs, test_labels = X_test, y_test
elif model_type == 'bert':
    from transformers import BertTokenizer
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    def bert_encode(texts, tokenizer, max_len=200):
        return tokenizer(texts.tolist(), padding='max_length', truncation=True, max_length=max_len, return_tensors='pt')
    train_encodings = bert_encode(train_df['text'], tokenizer, max_len)
    test_encodings = bert_encode(test_df['text'], tokenizer, max_len)
    model = SpamBERT()
    train_inputs, train_labels = train_encodings, y_train
    test_inputs, test_labels = test_encodings, y_test
else:
    raise ValueError('Invalid model_type')

In [None]:
# Move model to GPU if available
model = model.cuda() if torch.cuda.is_available() else model

# Training Loop
batch_size = 32
epochs = 5
criterion = nn.BCELoss()
optimizer = optim.AdamW(model.parameters(), lr=2e-4)

if model_type in ['cnn', 'bilstm']:
    train_dataset = TensorDataset(train_inputs, train_labels)
    test_dataset = TensorDataset(test_inputs, test_labels)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)
else:  # BERT
    train_dataset = TensorDataset(train_inputs['input_ids'], train_inputs['attention_mask'], train_labels)
    test_dataset = TensorDataset(test_inputs['input_ids'], test_inputs['attention_mask'], test_labels)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        optimizer.zero_grad()
        if model_type == 'bert':
            input_ids, attention_mask, labels = [b.cuda() if torch.cuda.is_available() else b for b in batch]
            outputs, _ = model(input_ids=input_ids, attention_mask=attention_mask)
        else:
            inputs, labels = [b.cuda() if torch.cuda.is_available() else b for b in batch]
            outputs = model(inputs)
            if isinstance(outputs, tuple):
                outputs = outputs[0]
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{epochs} - Loss: {total_loss/len(train_loader):.4f}")

# Save model to Google Drive
model_save_file = os.path.join(MODEL_SAVE_PATH, f'spam_{model_type}.pt')
model.save(model_save_file)
print(f"Model saved to {model_save_file}")