# Deception Detection

In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import tqdm
from transformers import BartTokenizer, BartModel, BartForConditionalGeneration
from torch_geometric.nn import GATConv
import torch_geometric
from torch_geometric.data import Data as GeoData
from torch.utils.data import Dataset, DataLoader
import networkx as nx
import numpy as np
import random as random
import json

# Set seeds as in the original code base . 
torch.manual_seed(1994)
np.random.seed(1994)
random.seed(1994)

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", DEVICE)

Using device: cpu


In [None]:
# Data paths 
TRAIN_PATH = r".\Data\train.jsonl"
VAL_PATH = r".\Data\validation.jsonl"
TEST_PATH = r".\Data\test.jsonl"

## Custom Dataset Class

In [None]:
# this is the diplomacy dataset class 
class DiplomacyDataset(torch.utils.data.Dataset):
   
    def __init__(self, path, max_tokens_per_msg=50, max_messages=50, use_game_scores=False):
        super().__init__()
        self.data = []
        self.max_tokens_per_msg = max_tokens_per_msg
        self.max_messages = max_messages
        self.use_game_scores = use_game_scores
        
        # Read and process each line in the JSONL file.
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                record = json.loads(line)
                messages = record.get("messages", [])
                labels = record.get("sender_labels", [])
                # Only use game_score_delta if use_game_scores is True, otherwise default to None.
                game_scores = record.get("game_score_delta", None) if use_game_scores else None
                
                filtered_msgs, filtered_lbls = [], []
                filtered_scores = []
                # If game scores are missing, set them to zero for every message.
                if game_scores is None:
                    game_scores = [0] * len(messages)
                
                # Iterate through messages, labels, and game scores in parallel.
                for m, l, g in zip(messages, labels, game_scores):
                    # Accept only valid boolean labels (or their string representations).
                    if l in [True, False, "true", "false", "True", "False"]:
                        filtered_msgs.append(m)
                        # Convert string labels to 0/1 (0 for false, 1 for true).
                        if isinstance(l, str):
                            filtered_lbls.append(1 if l.lower() == "true" else 0)
                        else:
                            filtered_lbls.append(1 if l else 0)
                        filtered_scores.append(g)
                
                # Skip records with no valid messages.
                if len(filtered_msgs) == 0:
                    continue
                
                # Store the processed conversation as a tuple of (messages, labels, game scores).
                self.data.append((filtered_msgs, filtered_lbls, filtered_scores))
        
        # Build vocabulary from the dataset.
        self._build_vocab()

    def _tokenize(self, text):
     
        tokens = text.lower().replace("\n", " ").split()
        out = []
        for t in tokens:
            if any(ch.isdigit() for ch in t):
                out.append("<NUM>")
            else:
                out.append(t)
        return out

    def _build_vocab(self):
      
        token_freq = Counter()
        for conv, _, _ in self.data:
            for msg in conv:
                tokens = self._tokenize(msg)
                token_freq.update(tokens)
        
        # Initialize vocabulary with special tokens.
        self.ix2tok = [PAD_TOKEN, UNK_TOKEN]
        # Add tokens in order of decreasing frequency.
        for tok, freq in token_freq.most_common():
            self.ix2tok.append(tok)
        # Create the token-to-index mapping.
        self.tok2ix = {t: i for i, t in enumerate(self.ix2tok)}

    def __len__(self):
       
        return len(self.data)

    def __getitem__(self, idx):
        
        conv, lbls, scores = self.data[idx]
        tokenized_conv = []
        for msg in conv:
            toks = self._tokenize(msg)
            # Convert tokens to indices using the vocabulary, defaulting to UNK_TOKEN if not found.
            tok_ix = [self.tok2ix.get(t, self.tok2ix[UNK_TOKEN]) for t in toks]
            tokenized_conv.append(tok_ix)
        # Convert each game score to a float.
        scores = [float(s) for s in scores]
        return tokenized_conv, lbls, scores


# Custom collate function to pad sequences for a batch of conversations.

def collate_fn(batch):
   
    # Determine maximum number of messages in any conversation in the batch.
    max_msg_count = max(len(item[0]) for item in batch)
    # Determine maximum number of tokens in any message.
    max_token_count = 0
    for item in batch:
        for msg in item[0]:
            max_token_count = max(max_token_count, len(msg))
    
    padded_tokens = []
    padded_labels = []
    mask = []
    padded_scores = []
    
    # For each conversation in the batch...
    for conv, lbls, scores in batch:
        num_msgs = len(conv)
        conv_tokens = []
        conv_labels = []
        conv_mask = []
        conv_scores = []
        # Pad or truncate each conversation to max_msg_count messages.
        for i in range(max_msg_count):
            if i < num_msgs:
                # Pad the message to max_token_count tokens.
                msg = conv[i] + [0]*(max_token_count - len(conv[i]))
                conv_tokens.append(msg)
                conv_labels.append(lbls[i])
                conv_mask.append(1)  # Mark this message as valid.
                conv_scores.append(scores[i])
            else:
                # Pad missing messages with zeros.
                conv_tokens.append([0]*max_token_count)
                conv_labels.append(0)
                conv_mask.append(0)
                conv_scores.append(0)
        padded_tokens.append(conv_tokens)
        padded_labels.append(conv_labels)
        mask.append(conv_mask)
        padded_scores.append(conv_scores)
    
    padded_tokens = torch.tensor(padded_tokens, dtype=torch.long)
    padded_labels = torch.tensor(padded_labels, dtype=torch.long)
    mask = torch.tensor(mask, dtype=torch.long)
    padded_scores = torch.tensor(padded_scores, dtype=torch.float)
    return padded_tokens, padded_labels, mask, padded_scores

In [None]:
def convert_to_graph_data(diplomacy_dataset, world_knowledge_func):
    """
    Convert DiplomacyDataset (conversation-wise) to a format usable by ConversationDataset.

    Returns a list of ConversationDataset instances — one per conversation.
    """
    conversation_graphs = []

    for conv_msgs, labels, scores in diplomacy_dataset:
        num_msgs = len(conv_msgs)

        # Create edges: simple chain (msg[i] -> msg[i+1])
        edges = [(i, i + 1) for i in range(num_msgs - 1)]

        # Build ConversationDataset instance for this one conversation
        conv_data = ConversationDataset(
            conversations=conv_msgs,
            power_deltas=scores,
            edges=edges,
            truth_labels=labels,
            world_knowledge_func=world_knowledge_func,
        )
        conversation_graphs.append(conv_data)

    return conversation_graphs

In [None]:
class ConversationDataset(Dataset):
    def __init__(self, conversations, power_deltas, edges, truth_labels, world_knowledge_func, max_length=128):
        """
        Args:
            conversations (List[str]): Messages
            power_deltas (List[float]): Delta power values for each message
            edges (List[Tuple[int, int]]): List of (src, dst) edges between messages
            truth_labels (List[int]): 1 for truth, 0 for lie
            world_knowledge_func (Callable[[str], np.ndarray]): Function that maps message to external knowledge embedding
        """
        self.conversations = conversations
        self.power_deltas = torch.tensor(power_deltas, dtype=torch.float)
        self.truth_labels = torch.tensor(truth_labels, dtype=torch.long)
        self.edges = torch.tensor(edges, dtype=torch.long).T  # shape: (2, num_edges)
        self.tokenizer = BartTokenizer.from_pretrained("facebook/bart-base")
        self.world_knowledge_func = world_knowledge_func

        # Precompute world knowledge features
        # self.world_feats = self._get_world_feats()

    def _get_world_feats(self):
        feats = []
        for msg in self.conversations:
            feat = self.world_knowledge_func(msg)  # Expected shape: (world_dim,)
            feats.append(torch.tensor(feat, dtype=torch.float))
        return torch.stack(feats)

    def __len__(self):
        return 1  # Since this dataset returns a single graph (whole conversation)

    def __getitem__(self, idx):
        return {
            'messages': self.conversations,
            'edge_index': self.edges,
            'power_deltas': self.power_deltas,
            'truth_labels': self.truth_labels
        }


## Message Encoder

In [None]:
class BartMessageEncoder(nn.Module):
    def __init__(self, model_name='facebook/bart-base'):
        super().__init__()
        self.tokenizer = BartTokenizer.from_pretrained(model_name)
        self.bart = BartModel.from_pretrained(model_name)
        self.hidden_dim = self.bart.config.d_model  # 768 for bart-base
        self.pool = nn.AdaptiveAvgPool1d(1)

    def forward(self, texts):
        toks = self.tokenizer(texts, return_tensors='pt', padding=True, truncation=True, max_length=128)
        outputs = self.bart(**toks)
        last_hidden = outputs.last_hidden_state  # (B, L, D)
        pooled = self.pool(last_hidden.permute(0, 2, 1)).squeeze(-1)  # (B, D)
        return pooled, toks['attention_mask']

## Conversation Graph with World Knowledge

In [None]:
class ConversationGAT(nn.Module):
    def __init__(self, in_dim, gat_dim=128, heads=4, world_dim=128):
        super().__init__()
        self.world_dim = world_dim
        total_input_dim = in_dim + 1 + world_dim  # msg + power + world
        self.gat1 = GATConv(total_input_dim, gat_dim, heads=heads, dropout=0.1)
        self.gat2 = GATConv(gat_dim * heads, gat_dim, heads=1, concat=False, dropout=0.1)

    def forward(self, node_feats, edge_index, power_deltas, world_feats, num_random_edges=None):
        pd = power_deltas.unsqueeze(-1)  # (N, 1)
        x = torch.cat([node_feats, pd, world_feats], dim=-1)  # (N, D + 1 + W)

        N = node_feats.size(0)
        k = num_random_edges if num_random_edges is not None else N
        src = torch.randint(0, N, (k,), device=node_feats.device)
        dst = torch.randint(0, N, (k,), device=node_feats.device)
        random_edge_index = torch.stack([src, dst], dim=0)
        combined_edge_index = torch.cat([edge_index, random_edge_index], dim=1)

        x = F.elu(self.gat1(x, combined_edge_index))
        x = F.elu(self.gat2(x, combined_edge_index))
        return x

RuntimeError: Failed to import transformers.integrations.integration_utils because of the following error (look up to see its traceback):
Failed to import transformers.modeling_tf_utils because of the following error (look up to see its traceback):
Your currently installed version of Keras is Keras 3, but this is not yet supported in Transformers. Please install the backwards-compatible tf-keras package with `pip install tf-keras`.

## Dececption Classifier

In [None]:
class PolicyNetwork(nn.Module):
    def __init__(self, in_dim, hidden_dim=64):
        super().__init__()
        self.fc1 = nn.Linear(in_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)

## Generator using BART Decoder based on word context, world knowledge, deception label and power score

In [None]:
class ResponseGenerator(nn.Module):
    def __init__(self, bart_model_name='facebook/bart-base'):
        super().__init__()
        self.tokenizer = BartTokenizer.from_pretrained(bart_model_name)
        self.decoder = BartForConditionalGeneration.from_pretrained(bart_model_name)

    def forward(self, encoder_outputs, truth_labels, power_deltas, concept_feats=None, max_length=50):
        prompts = []
        for t, pd in zip(truth_labels, power_deltas):
            label_str = 'Truth' if t == 1 else 'Lie'
            prompts.append(f"[{label_str}|Delta:{pd.item():.2f}]")
        toks = self.tokenizer(prompts, return_tensors='pt', padding=True)
        out = self.decoder.generate(
            input_ids=toks['input_ids'],
            attention_mask=toks['attention_mask'],
            encoder_outputs=(encoder_outputs.unsqueeze(0),),
            max_length=max_length
        )
        return [self.tokenizer.decode(ids, skip_special_tokens=True) for ids in out]

## Full Deception Detection Model

In [None]:
class DeceptionModel(nn.Module):
    def __init__(self, world_dim=128):
        super().__init__()
        self.encoder = BartMessageEncoder()
        self.gat = ConversationGAT(in_dim=self.encoder.hidden_dim, world_dim=world_dim)
        self.policy = PolicyNetwork(in_dim=128)
        self.generator = ResponseGenerator()

    def forward(self, messages, edge_index, power_deltas, world_feats, truth_labels=None, rl_mode=False):
        # 1) Encode messages with BART encoder
        msg_feats, attn_mask = self.encoder(messages)
        # 2) GAT with added world knowledge
        node_feats = self.gat(msg_feats, edge_index, power_deltas, world_feats)
        # 3) Policy head
        logits = self.policy(node_feats)
        probs = F.softmax(logits, dim=-1)

        actions, log_probs = None, None
        if rl_mode:
            dist = torch.distributions.Categorical(probs)
            actions = dist.sample()
            log_probs = dist.log_prob(actions)

        responses = None
        if truth_labels is not None:
            responses = self.generator(node_feats, truth_labels, power_deltas)

        if rl_mode:
            return probs, actions, log_probs, responses
        return probs, responses

## Training and Evaluation

In [None]:
# Assuming everything is already defined:
# - ConversationDataset
# - DeceptionModel
# - sbert_world_knowledge (or other function)
# - Data: messages, power_deltas, edges, truth_labels

# Hyperparameters
hidden_dim = 128
gat_heads = 4
world_dim = 384  # SBERT
learning_rate = 1e-4
epochs = 10



# Dataset and model
dataset = ConversationDataset(
    conversations=messages,
    power_deltas=power_deltas,
    edges=edges,
    truth_labels=truth_labels,
    world_knowledge_func=sbert_world_knowledge
)
loader = DataLoader(dataset, batch_size=1, shuffle=True)

model = DeceptionModel(hidden_dim=hidden_dim, gat_heads=gat_heads, world_dim=world_dim)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss()

# Training loop
for epoch in range(epochs):
    model.train()
    total_loss = 0

    for batch in loader:
        messages = batch['messages'][0]
        edge_index = batch['edge_index'].to(device)
        power_deltas = batch['power_deltas'].to(device)
        truth_labels = batch['truth_labels'].float().to(device)
        world_feats = batch['world_feats'].to(device)

        optimizer.zero_grad()
        outputs = model(messages, edge_index, power_deltas, world_feats, truth_labels)
        loss = criterion(outputs, truth_labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    print(f"Epoch {epoch+1}/{epochs} | Loss: {total_loss:.4f}")