In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
import pandas as pd

# Preprocessing the dataset
dataset = pd.read_csv('/kaggle/input/spam-or-not-spam-dataset7/spam_or_not_spam.csv')
texts = dataset['email'].astype(str)
labels = dataset['label']

NUM_WORDS = 20000
MAX_LENGTH = 200 

tokenizer = Tokenizer(num_words=NUM_WORDS, oov_token="<OOV>")
tokenizer.fit_on_texts(texts)

sequences = tokenizer.texts_to_sequences(texts)
padded_sequences = pad_sequences(sequences, maxlen=MAX_LENGTH, padding='post', truncating='post')

X_train, X_test, y_train, y_test = train_test_split(padded_sequences, labels, test_size=0.2, random_state=42)

X_train_tensor = torch.tensor(X_train, dtype=torch.long)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.float)
X_test_tensor = torch.tensor(X_test, dtype=torch.long)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.float)

class Encoder(nn.Module):
    def __init__(self, embed_dim, vocab_size, n_layers_E, n_hidden_E, dim_z):
        super(Encoder, self).__init__()
        self.n_layers_E = n_layers_E
        self.n_hidden_E = n_hidden_E
        self.lstm = nn.LSTM(embed_dim, n_hidden_E, n_layers_E, batch_first=True, bidirectional=True)
        self.hidden_to_mu = nn.Linear(2 * n_hidden_E, dim_z)
        self.hidden_to_logvar = nn.Linear(2 * n_hidden_E, dim_z)

    def forward(self, x):
        out, _ = self.lstm(x)
        e_hidden = out[:, -1, :]  # Take the last hidden state
        mu = self.hidden_to_mu(e_hidden)
        logvar = self.hidden_to_logvar(e_hidden)
        epsilon = torch.randn_like(mu)
        z = mu + torch.exp(logvar * 0.5) * epsilon
        return mu, logvar, z

class Decoder(nn.Module):
    def __init__(self, n_hidden_D, n_layers_D, embedding_dim, dim_z, vocab_size):
        super(Decoder, self).__init__()
        self.lstm = nn.LSTM(embedding_dim + dim_z, n_hidden_D, n_layers_D, batch_first=True)
        self.fc = nn.Linear(n_hidden_D, vocab_size)

    def forward(self, x, z):
        batch_size, seq_len, _ = x.size()
        z_expanded = z.unsqueeze(1).repeat(1, seq_len, 1)  # Expand z across sequence length
        x = torch.cat([x, z_expanded], dim=2)
        out, _ = self.lstm(x)
        logits = self.fc(out)
        return logits

class VAE(nn.Module):
    def __init__(self, embedding_dim, vocab_size, n_layers_E, n_hidden_E, dim_z, n_hidden_D, n_layers_D):
        super(VAE, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.encoder = Encoder(embedding_dim, vocab_size, n_layers_E, n_hidden_E, dim_z)
        self.decoder = Decoder(n_hidden_D, n_layers_D, embedding_dim, dim_z, vocab_size)

    def forward(self, x):
        x_embed = self.embedding(x)
        mu, logvar, z = self.encoder(x_embed)
        kld = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
        logits = self.decoder(x_embed, z)
        return logits, kld


In [None]:
import pickle
import torch
from tensorflow.keras.preprocessing.sequence import pad_sequences

vae = pickle.load(open('/kaggle/input/notebook8589762367/VAE_LSTM_model', 'rb')) # version 9
vae.eval()  # Switching the VAE to evaluation mode

# Custom examples for testing
custom_examples = [
    "Congratulations! You have been selected as a winner for a free iPhone. Click the link to claim.",
    "Dear team, please find the attached report for your review and feedback.",
    "Your account has been flagged for suspicious activity. Please verify your details immediately.",
    "Meeting rescheduled to tomorrow at 10 AM. Let me know if this works.",
    "Don't miss out on this limited-time offer to save 50% on your favorite products!",
]

# Converting the custom examples to padded sequences using the tokenizer
MAX_LENGTH = 200  
sequences = tokenizer.texts_to_sequences(custom_examples)
padded_sequences = pad_sequences(sequences, maxlen=MAX_LENGTH, padding='post', truncating='post')

custom_input_tensor = torch.tensor(padded_sequences, dtype=torch.long)

# Passing each custom example through the VAE
with torch.no_grad():
    # Encoding step
    mu, logvar, z = vae.encoder(vae.embedding(custom_input_tensor))
    # Decoding step
    reconstructed_logits = vae.decoder(vae.embedding(custom_input_tensor), z)

# Decoding original and reconstructed sequences
original_texts = tokenizer.sequences_to_texts(custom_input_tensor.cpu().numpy())
reconstructed_sequences = reconstructed_logits.argmax(dim=-1).cpu().numpy()
reconstructed_texts = tokenizer.sequences_to_texts(reconstructed_sequences)

# results for each custom example
for i, (original, reconstructed) in enumerate(zip(custom_examples, reconstructed_texts)):
    print(f"Original Text {i + 1}:")
    print(original)
    print("\nReconstructed Text:")
    print(reconstructed)
    print("-" * 50)


  return torch.load(io.BytesIO(b))


Original Text 1:
Congratulations! You have been selected as a winner for a free iPhone. Click the link to claim.

Reconstructed Text:
congratulations you have been selected as a winner for a free <OOV> click the link to claim <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> <OOV> 

In [None]:
def generate_synthetic_data(vae, tokenizer, num_samples=10, max_length=200):
    # Ensuring the model is in evaluation mode
    vae.eval()

    # Sample random latent vectors from a standard Gaussian distribution
    z_random = torch.randn(num_samples, vae.encoder.hidden_to_mu.out_features).to(torch.device('cpu'))

    # Creating a dummy input sequence filled with padding tokens (e.g., index 0)
    dummy_input = torch.zeros((num_samples, max_length), dtype=torch.long).to(torch.device('cpu'))

    # the embedded inputs for the dummy input
    embedded_input = vae.embedding(dummy_input)

    # Generate synthetic sequences using the decoder
    with torch.no_grad():  # No gradient computation needed
        synthetic_logits = vae.decoder(embedded_input, z_random)
        synthetic_sequences = synthetic_logits.argmax(dim=-1).cpu().numpy()

    # Decoding synthetic sequences to text using the tokenizer
    synthetic_texts = tokenizer.sequences_to_texts(synthetic_sequences)
    return synthetic_texts


In [None]:
def calculate_reconstruction_accuracy(original_sequences, reconstructed_logits):
    # Get reconstructed sequences from logits
    reconstructed_sequences = reconstructed_logits.argmax(dim=-1).cpu().numpy()

    # Calculating token-level accuracy
    correct_tokens = 0
    total_tokens = 0
    for original, reconstructed in zip(original_sequences, reconstructed_sequences):
        for orig_token, rec_token in zip(original, reconstructed):
            if orig_token != 0:  # Ignore padding tokens
                total_tokens += 1
                if orig_token == rec_token:
                    correct_tokens += 1

    return correct_tokens / total_tokens if total_tokens > 0 else 0.0

# Evaluating reconstruction accuracy
with torch.no_grad():
    mu, logvar, z = vae.encoder(vae.embedding(X_test_tensor))
    reconstructed_logits = vae.decoder(vae.embedding(X_test_tensor), z)

reconstruction_accuracy = calculate_reconstruction_accuracy(X_test_tensor.cpu().numpy(), reconstructed_logits)
print(f"Reconstruction Accuracy: {reconstruction_accuracy:.2f}")


Reconstruction Accuracy: 0.94


In [None]:
def calculate_kl_divergence(mu, logvar):
    # KL Divergence formula: 0.5 * sum(1 + logvar - mu^2 - exp(logvar))
    kl_div = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp(), dim=-1)
    return kl_div.mean().item()

with torch.no_grad():
    mu, logvar, _ = vae.encoder(vae.embedding(X_test_tensor))

kl_divergence = calculate_kl_divergence(mu, logvar)
print(f"Average KL Divergence: {kl_divergence:.4f}")

Average KL Divergence: 0.0001


In [None]:
# Instantiating the model
embedding_dim = 64
vocab_size = NUM_WORDS
n_layers_E = 1
n_hidden_E = 128
dim_z = 32
n_hidden_D = 128
n_layers_D = 1

vae = VAE(embedding_dim, vocab_size, n_layers_E, n_hidden_E, dim_z, n_hidden_D, n_layers_D)

# Generating synthetic spam and non-spam data
synthetic_data = generate_synthetic_data(vae, tokenizer, num_samples=7, max_length=10)
print("Generated Synthetic Data:")
for i, text in enumerate(synthetic_data, 1):
    print(f"{i}: {text}")


Generated Synthetic Data:
1: nomination nomination nomination facers facers facers facers facers facers facers
2: scenario scenario scenario consommation consommation consommation consommation consommation consommation consommation
3: trigger trigger trigger trigger trigger trigger trigger trigger trigger trigger
4: bello bombs orgel making making making making making making making
5: apparently dallin dallin dallin dallin dallin dallin dallin dallin dallin
6: graphical graphical graphical graphical graphical graphical graphical graphical graphical graphical
7: benches benches benches benches benches benches benches benches benches benches
