In [1]:
import math
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import random

In [3]:

###############################################################
# 1. Creating a Synthetic Dataset
#
# For illustration, let's say we have some labeled conversations.
# Each conversation has a series of utterances, and at the end
# we have a label: 0 = legitimate, 1 = vishing attack.
#
# We also want the model to predict the next part of the conversation
# based on what has been said so far.
#
# In a real scenario, you'd have real conversations and ground truth.
###############################################################

In [2]:
conversations = [
    (["[CLS]", "hello", "this", "is", "bank", "security", "team"],      # Input so far
     ["we", "need", "your", "account", "details"],                      # Next to predict
     1), # vishing (fake scenario)
    (["[CLS]", "hi", "i", "am", "calling", "about", "your", "internet"], 
     ["please", "confirm", "your", "password", "now"], 
     1), # vishing
    (["[CLS]", "hello", "how", "can", "i", "help", "you", "today"], 
     ["i", "am", "just", "checking", "my", "balance"], 
     0), # legitimate
    (["[CLS]", "hi", "this", "is", "abc", "bank"], 
     ["we", "just", "need", "to", "confirm", "your", "last", "transaction"], 
     0)  # legitimate
]

In [4]:
# Let's build a small vocabulary from these tokens.
all_tokens = []
for conv_inp, conv_out, label in conversations:
    all_tokens.extend(conv_inp)
    all_tokens.extend(conv_out)

In [5]:
# Add special tokens
PAD_TOKEN = "<pad>"
UNK_TOKEN = "<unk>"
BOS_TOKEN = "<bos>"
EOS_TOKEN = "<eos>"
CLS_TOKEN = "[CLS]"  # already in the data

In [6]:

# Build vocab
vocab = {PAD_TOKEN:0, UNK_TOKEN:1, BOS_TOKEN:2, EOS_TOKEN:3, CLS_TOKEN:4}
for tok in all_tokens:
    if tok not in vocab:
        vocab[tok] = len(vocab)

inv_vocab = {v:k for k,v in vocab.items()}
vocab_size = len(vocab)

In [7]:

def encode_sentence(sentence, vocab, bos=True, eos=True):
    tokens = []
    # We'll only add BOS/EOS for the parts we want to predict (the target)
    # For input (which already has [CLS]), we might skip BOS since [CLS] can serve as a start.
    # But here we'll just be consistent and add BOS/EOS to targets. 
    # Inputs already have [CLS] so we might skip BOS for them.
    # Let's say we don't add BOS/EOS to input, just to targets for language modeling.
    encoded = []
    for w in sentence:
        encoded.append(vocab[w] if w in vocab else vocab[UNK_TOKEN])
    tokens = torch.tensor(encoded, dtype=torch.long)
    return tokens

In [8]:
def encode_target(sentence, vocab):
    # For the target, we add BOS and EOS to define the predicted sequence boundaries.
    encoded = [vocab[BOS_TOKEN]]
    for w in sentence:
        encoded.append(vocab[w] if w in vocab else vocab[UNK_TOKEN])
    encoded.append(vocab[EOS_TOKEN])
    return torch.tensor(encoded, dtype=torch.long)

In [9]:
###############################################################
# 2. Dataset & Dataloader
#
# Each item: (input_ids, target_ids, label)
# We'll have the model predict the target tokens and classify.
###############################################################

In [10]:
class VishingDataset(Dataset):
    def __init__(self, conversations, vocab):
        self.data = conversations
        self.vocab = vocab

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        inp, tgt, lbl = self.data[idx]
        inp_ids = encode_sentence(inp, self.vocab, bos=False, eos=False) 
        tgt_ids = encode_target(tgt, self.vocab)  # includes BOS/EOS
        label = torch.tensor(lbl, dtype=torch.float)
        return inp_ids, tgt_ids, label

In [11]:
def collate_fn(batch):
    # batch: list of (inp_ids, tgt_ids, label)
    inp_batch = [b[0] for b in batch]
    tgt_batch = [b[1] for b in batch]
    labels = torch.stack([b[2] for b in batch], dim=0)

    inp_padded = pad_sequence(inp_batch, batch_first=True, padding_value=vocab[PAD_TOKEN])
    tgt_padded = pad_sequence(tgt_batch, batch_first=True, padding_value=vocab[PAD_TOKEN])

    # src_mask for Transformer encoder (mask out PAD)
    # shape: (B, 1, 1, S)
    src_mask = (inp_padded != vocab[PAD_TOKEN]).unsqueeze(1).unsqueeze(2)
    return inp_padded, tgt_padded, labels, src_mask

In [12]:
dataset = VishingDataset(conversations, vocab)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

In [28]:
dataloader

<torch.utils.data.dataloader.DataLoader at 0x1ab1b7189d0>

In [13]:
###############################################################
# 3. Model Implementation
#
# We will add a classification head on top of the encoder output.
# The classification will be done based on the hidden state of [CLS].
#
# We also predict the next sequence tokens.
# So the loss is a combination of:
# - Language Modeling loss (predicting tgt_ids from encoder output)
# - Classification loss (binary classification from [CLS] state)
###############################################################

In [14]:
class Embedding(nn.Module):
    def __init__(self, vocab_size, d_model):
        super(Embedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.d_model = d_model

    def forward(self, x):
        return self.embedding(x) * math.sqrt(self.d_model)

In [15]:

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0) # shape (1, max_len, d_model)

    def forward(self, x):
        return x + self.encoding[:, :x.size(1), :].to(x.device)

In [16]:
def scaled_dot_product_attention(query, key, value, mask=None):
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

    if mask is not None:
        scores = scores.masked_fill(mask == 0, float('-inf'))

    attention_weights = torch.softmax(scores, dim=-1)
    return torch.matmul(attention_weights, value), attention_weights

In [17]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, d_model):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.linear_q = nn.Linear(d_model, d_model)
        self.linear_k = nn.Linear(d_model, d_model)
        self.linear_v = nn.Linear(d_model, d_model)
        self.fc_out = nn.Linear(d_model, d_model)

    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        def transform(x, linear_layer):
            x = linear_layer(x)
            return x.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        query = transform(query, self.linear_q)
        key   = transform(key,   self.linear_k)
        value = transform(value, self.linear_v)

        attention_output, attention_weights = scaled_dot_product_attention(query, key, value, mask)

        # Concatenate attention heads
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)

        return self.fc_out(attention_output)

In [18]:
class PositionwiseFeedforward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedforward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(self.dropout(F.relu(self.linear1(x))))

In [19]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(num_heads, d_model)
        self.feed_forward = PositionwiseFeedforward(d_model, d_ff, dropout)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_mask):
        # Self-attention
        src2 = self.attention(src, src, src, src_mask)
        src = self.layer_norm1(src + self.dropout(src2))
        
        # Feed-forward
        src2 = self.feed_forward(src)
        src = self.layer_norm2(src + self.dropout(src2))
        return src

In [20]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout):
        super(Encoder, self).__init__()
        self.embedding = Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

    def forward(self, src, src_mask):
        src = self.embedding(src)
        src = self.positional_encoding(src)
        for layer in self.layers:
            src = layer(src, src_mask)
        return src

In [21]:
class TransformerVishingModel(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout):
        super(TransformerVishingModel, self).__init__()
        self.encoder = Encoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout)
        # For language modeling (next conversation tokens)
        self.fc_out = nn.Linear(d_model, vocab_size)
        # For classification (vishing or not)
        self.cls_head = nn.Linear(d_model, 1)

    def forward(self, src, src_mask):
        # src: (B, S)
        enc_output = self.encoder(src, src_mask)
        # enc_output: (B, S, d_model)
        # The classification token [CLS] is at src[:,0], so let's use that:
        cls_emb = enc_output[:, 0, :]  # (B, d_model)
        # Classification output:
        cls_output = self.cls_head(cls_emb)  # (B, 1)

        # Language modeling output: predict next tokens for the entire sequence.
        # Usually, you'd train a decoder for this, but here we do a simple LM over the encoder output.
        # This means we are using a causal scenario or we might just treat this as a reconstruction.
        # In a real scenario, you'd likely have a separate decoder for prediction of next tokens.
        # For simplicity, we'll just produce output for each token position.
        lm_output = self.fc_out(enc_output)  # (B, S, vocab_size)

        return lm_output, cls_output

In [22]:

###############################################################
# 4. Training Loop
#
# We will train using a joint loss:
# - LM Loss: CrossEntropy on predicted tokens vs. target tokens
# - Classification Loss: Binary cross entropy on cls_output vs. label
###############################################################

In [23]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

d_model = 128
num_layers = 2
num_heads = 4
d_ff = 512
dropout = 0.1

model = TransformerVishingModel(vocab_size, d_model, num_layers, num_heads, d_ff, dropout).to(device)

In [24]:
# Loss functions
lm_criterion = nn.CrossEntropyLoss(ignore_index=vocab[PAD_TOKEN])
cls_criterion = nn.BCEWithLogitsLoss()

In [25]:
optimizer = optim.Adam(model.parameters(), lr=0.001)


  from .autonotebook import tqdm as notebook_tqdm


In [30]:
epochs = 10

model.train()
for epoch in range(epochs):
    total_loss = 0
    for inp_padded, tgt_padded, labels, src_mask in dataloader:
        inp_padded = inp_padded.to(device)
        tgt_padded = tgt_padded.to(device)
        labels = labels.to(device)
        src_mask = src_mask.to(device)

        optimizer.zero_grad()
        lm_output, cls_output = model(inp_padded, src_mask) 
        # lm_output: (B, S_in, vocab_size)

        # Shift targets by removing the BOS token (assuming tgt_padded includes BOS)
        # If tgt_padded = [BOS, w1, w2, w3, EOS], after dropping BOS:
        # tgt_padded = [w1, w2, w3, EOS]
        # This should align with the model's predictions for [CLS, w1, w2, w3].
        tgt_padded = tgt_padded[:, 1:]  # Remove BOS

        # Now ensure both sequences are the same length
        min_length = min(lm_output.size(1), tgt_padded.size(1))
        lm_output = lm_output[:, :min_length, :]
        tgt_padded = tgt_padded[:, :min_length]

        # Compute language modeling loss
        lm_loss = lm_criterion(lm_output.reshape(-1, vocab_size), tgt_padded.reshape(-1))

        # Classification loss
        cls_loss = cls_criterion(cls_output.view(-1), labels)

        # Combined loss
        loss = lm_loss + cls_loss
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}")

print("Training completed.")


Epoch 1, Loss: 4.2834
Epoch 2, Loss: 3.3252
Epoch 3, Loss: 2.7470
Epoch 4, Loss: 2.0891
Epoch 5, Loss: 1.5366
Epoch 6, Loss: 1.2184
Epoch 7, Loss: 0.9861
Epoch 8, Loss: 0.7602
Epoch 9, Loss: 0.5577
Epoch 10, Loss: 0.4800
Training completed.


In [31]:
###############################################################
# 5. Inference
#
# Example: given a partial conversation, we predict next tokens and classify.
###############################################################


In [32]:
model.eval()
with torch.no_grad():
    # Take first example from dataset
    inp, tgt, lbl = dataset[0]
    # inp: [CLS, hello, this, is, bank, security, team]
    # tgt: next utterance
    inp = inp.unsqueeze(0).to(device)
    src_mask = (inp != vocab[PAD_TOKEN]).unsqueeze(1).unsqueeze(2).to(device)
    lm_output, cls_output = model(inp, src_mask)
    pred_tokens = lm_output.argmax(dim=-1).squeeze(0).cpu().tolist()
    pred_label = torch.sigmoid(cls_output).item()
    print("Input:", [inv_vocab[i.item()] for i in dataset[0][0]])
    print("True Label (vishing=1):", lbl.item())
    print("Predicted Label:", pred_label, "(>0.5 means vishing)")

    predicted_words = [inv_vocab[t] for t in pred_tokens]
    print("Predicted Next Tokens:", predicted_words)

Input: ['[CLS]', 'hello', 'this', 'is', 'bank', 'security', 'team']
True Label (vishing=1): 1.0
Predicted Label: 0.9770664572715759 (>0.5 means vishing)
Predicted Next Tokens: ['we', 'need', 'your', 'account', 'details', '<eos>', '<eos>']
