In [1]:
import os
import re
import json
import yaml

from tqdm import tqdm
from langchain_groq import ChatGroq

import pandas as pd
from dotenv import load_dotenv
load_dotenv()

from typing import List, Dict, Set, Any


In [2]:
groq_api_key = os.getenv("GROQ_API_KEY3")

In [3]:
verification_patterns = [
    r"(date of birth|dob)",
    r"(address)",
    r"(social security number|ssn|social security)",
    r"(verify your identity)",
    r"(confirm your details)",
]

# Regex patterns for sensitive information sharing by agent
sensitive_info_patterns = [
    r"(your balance is|balance of \$?\d+[\d,\.]*)",
    r"(account number is \d+)",
    r"(you owe \$?\d+[\d,\.]*)",
    r"(payment due is \$?\d+[\d,\.]*)",
]

In [4]:
def load_conversations(directory_path):
    """
    Load all conversation files from the specified directory.
    Supported formats are JSON and YAML.
    Args:
        directory_path (str): Path to the directory containing conversation files.
    Returns:
        list of dict: List of conversations with 'call_id' and 'conversation' keys.
    """
    all_convos = []
    for filename in os.listdir(directory_path):
        if filename.endswith(('.json', '.yml', '.yaml')):
            try:
                with open(os.path.join(directory_path, filename), 'r', encoding='utf-8') as f:
                    if filename.endswith('.json'):
                        convo = json.load(f)
                    else:
                        convo = yaml.safe_load(f)
                all_convos.append({'call_id': os.path.splitext(filename)[0], 'conversation': convo})
            except Exception as e:
                print(f"Failed loading {filename}: {e}")
    return all_convos

def contains_pattern(text, patterns):
    for pattern in patterns:
        if re.search(pattern, text, re.IGNORECASE):
            return True
    return False

def get_privacy_related_utterances(conversation):
    """
    Extract privacy/compliance related utterances from agent and customer.
    Returns dict with 'agent' and 'customer' keys each having lists of matched utterances.
    """
    agent_utterances = []
    customer_utterances = []
    verification_done = False
    
    for utterance in conversation:
        speaker = utterance.get('speaker', '').lower()
        text = utterance.get('text', '')
        
        if speaker == 'agent':
            # Check for verification and sensitive info related utterances
            if contains_pattern(text, verification_patterns):
                agent_utterances.append(f"[Verification] {text}")
                verification_done = True
            elif contains_pattern(text, sensitive_info_patterns):
                agent_utterances.append(f"[Sensitive Info] {text}")
        elif speaker == 'customer':
            # Optionally collect any customer utterances related to privacy if desired
            # For now just collect if related patterns exist
            if contains_pattern(text, verification_patterns + sensitive_info_patterns):
                customer_utterances.append(text)
    
    return {'agent': agent_utterances, 'customer': customer_utterances}

def detect_privacy_violation(conversation):
    verification_done = False
    for utterance in conversation:
        speaker = utterance.get('speaker', '').lower()
        text = utterance.get('text', '')
        if speaker == 'agent':
            if contains_pattern(text, verification_patterns):
                verification_done = True
            if contains_pattern(text, sensitive_info_patterns) and not verification_done:
                return True
    return False




In [5]:
directory_path = "../data/All_Conversations"
conversations = load_conversations(directory_path)

violation_call_ids = []

for convo_dict in tqdm(conversations, desc="Processing conversations"):
    call_id = convo_dict['call_id']
    conversation = convo_dict['conversation']

    if detect_privacy_violation(conversation):
        violation_call_ids.append(call_id)
        privacy_data = get_privacy_related_utterances(conversation)

        print(f"Call ID: {call_id}")
        print(f"  Agent privacy-related utterances:")
        for utt in privacy_data['agent']:
            print(f"    {utt}")
        print(f"  Customer privacy-related utterances:")
        for utt in privacy_data['customer']:
            print(f"    {utt}")
        print()

Processing conversations: 100%|██████████| 250/250 [00:00<00:00, 22447.68it/s]

Call ID: 40b40ec2-8ce2-47a8-b9d4-621285d7b484
  Agent privacy-related utterances:
    [Sensitive Info] Thank you, Lisa. You currently have a balance of $450. How would you like to proceed with the payment?
  Customer privacy-related utterances:

Call ID: dbd3e2fd-4a8f-4eb3-aa6f-befcc150bfd8
  Agent privacy-related utterances:
    [Sensitive Info] Sure! You have an overdue balance of $450. How do you wish to handle this?
  Customer privacy-related utterances:

Call ID: 1190bab7-d82f-4259-bb55-9617dff7da07
  Agent privacy-related utterances:
    [Sensitive Info] Thank you, Alex. I see here you have a balance of $500 due. Would you like to discuss payment options?
    [Sensitive Info] Sure! The account number is 123456. So how would you like to settle the balance?
    [Verification] I understand your concern. I can provide more details about the debt once I verify your identity further.
    [Verification] Could you please confirm your date of birth?
  Customer privacy-related utterances:





In [None]:


SYSTEM_PROMPT = '''
# Role  
You are a compliance analyst AI specialized in privacy and security in debt collection conversations.

## Task  
Your task is to analyze conversations between debt collection agents and borrowers. Identify any instances where the agent shared sensitive information such as account balance or account details without first verifying the borrower's identity using methods like date of birth, address, or Social Security Number.

## Context  
Each conversation consists of sequences of utterances labeled with the speaker (agent or borrower) and the speech content. The verification must happen before any sensitive information is disclosed by the agent.

## Reasoning  
To determine compliance, track if identity verification has been established before sharing any sensitive information by the agent. If sensitive information is shared prior, mark the conversation as violating privacy and compliance rules.

# Rules  
- Return only a valid JSON array of scene objects (no markdown, no comments). 
- Sensitive information includes but is not limited to account balance, payment due, account number, or related financial info.  
- Identity verification phrases include asking for or confirming date of birth, address, Social Security Number, or any explicit identity confirmation request.  
- Analyze the sequence of utterances carefully to check the order of verification and disclosure.  
- Output must reflect whether a privacy violation occurred (yes/no) followed by a brief explanation citing examples from the conversation.

## Output Format  
Provide a JSON array of objects, each with the following keys:  
- "call_id": string, the identifier of the conversation  
- "violation": string, "True" or "False"  
- "explanation": string, brief summary describing findings or "No violation detected."
- "terms": string, key terms identified in the conversation related to verification and sensitive information sharing.

## Tone  
Your output should be clear, precise, professional, and focused on compliance analysis without ambiguity.

## Stop Condition  
Stop after processing all conversations given in the input prompt and produce the summary for each.
'''


def format_conversation(conversation: List[Dict[str, Any]]) -> str:
    lines = []
    for utt in conversation:
        speaker = utt.get('speaker', 'unknown').capitalize()
        text = utt.get('text', '').strip()
        lines.append(f"{speaker}: {text}")
    return "\n".join(lines)

def build_user_prompt(batch: List[Dict[str, Any]]) -> str:
    prompt = "Analyze the following conversations for privacy and compliance violations as per the system instructions.\n\n"
    for conv in batch:
        prompt += f"Call ID: {conv['call_id']}\n"
        prompt += format_conversation(conv['conversation']) + "\n\n"
    return prompt

def analyze_batch(llm, batch: List[Dict[str, Any]]) -> List[Dict[str, str]]:
    user_prompt = build_user_prompt(batch)
    messages = [
        ("system", SYSTEM_PROMPT),
        ("human", user_prompt),
    ]

    response = llm.invoke(messages)
    try:
        analysis = json.loads(response.content)
    except json.JSONDecodeError:
        analysis = [
            {
                "call_id": conv['call_id'],
                "violation": "error",
                "explanation": "LLM output JSON parsing failed."
            }
            for conv in batch
        ]
    return analysis

def process_all(directory_path: str, batch_size: int = 25) -> List[Dict[str, str]]:
    llm = ChatGroq(model="openai/gpt-oss-20b", temperature=0, api_key=groq_api_key)
    conversations = load_conversations(directory_path)
    all_results = []

    for i in tqdm(range(0, len(conversations), batch_size)):
        batch = conversations[i:i + batch_size]
        try:
            batch_results = analyze_batch(llm, batch)
            all_results.extend(batch_results)
        except Exception as e:
            print(f"Error processing batch starting at index {i}: {e}")
            return all_results
        
    
        
    return all_results

def save_results(results: List[Dict[str, str]], csv_path: str):
    df = pd.DataFrame(results)
    df.to_csv(csv_path, index=False)
    print(f"Saved report to {csv_path}")

In [7]:
# if __name__ == "__main__":
data_dir = "../data/All_Conversations"  # Change as needed
batch_size = 10
results = process_all(data_dir, batch_size=batch_size)
save_results(results, "../output/privacy_compliance_report.csv")


100%|██████████| 25/25 [13:28<00:00, 32.33s/it]

Saved report to ../output/privacy_compliance_report.csv



