# Gravix Layer Cookbook: Autonomous AML Investigation Agent
This notebook demonstrates how to build a sophisticated **AI-powered Anti-Money Laundering (AML) Investigator** using the **Gravix Layer** SDK. It simulates a banking compliance workflow where an AI agent analyzes customer cases, cross-references regulations, and autonomously drafts correspondence.

### What is an Autonomous AML Agent?
Traditional compliance systems rely on rigid rules. An **Autonomous AML Agent** combines **RAG (Retrieval-Augmented Generation)** with **Agentic Reasoning**. It doesn't just search for keywords; it "thinks" like a human analyst to assess risk, map specific regulations to customer behaviors, and determine necessary actions.

### In this notebook, you will learn to:
1.  **Ingest** regulatory documents (PDFs) into a local reference store for grounding.
2.  **Connect** to a Gravix Vector Index for high-precision semantic search.
3.  **Build** an `InvestigativeAgent` that performs **Chain-of-Thought** reasoning to produce decision memos.
4.  **Create** a `CommunicationAgent` that autonomously drafts and sends professional emails.
5.  **Simulate** end-to-end investigation scenarios with complex corporate structures.

### Key Features:
-   **Chain-of-Thought Reasoning**: The agent explains *why* a risk level is chosen, not just *what* it is.
-   **Regulatory Citation**: Every claim is backed by a specific regulation ID and source text.
-   **Multi-Agent Architecture**: Separation of concerns between the *Investigator* (Analysis) and the *Communicator* (Outreach).
-   **Automated Outreach**: Generates context-aware HTML emails to clients based on investigation findings.

### Use Cases:
-   Automated Level 1 AML Triage
-   Compliance Quality Assurance (QA)
-   Regulatory Gap Analysis
-   Training Simulator for Junior Analysts

### Disclaimer
**This notebook is for educational and demonstration purposes. While it simulates a compliance workflow, real-world AML systems require strict validation, human-in-the-loop review, and adherence to local laws.**

In [83]:
# Install dependencies
!pip install -q gravixlayer PyPDF2 secure-smtplib


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
import os
import json
import re
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Dict, Any, Tuple
import PyPDF2
from gravixlayer import GravixLayer

# --- CONFIGURATION ---
# Replace with your actual API key or set as environment variable
GRAVIX_API_KEY = os.environ.get("GRAVIX_API_KEY", "YOUR_GRAVIX_API_KEY") # Example: "bodRu896YSJT.......p3lO3zab4rQ48qv4aeBGZOA"

CHAT_MODEL = "qwen/qwen-2.5-vl-7b-instruct"
EMBEDDING_MODEL = "baai/bge-large-en-v1.5"
INDEX_NAME = "uk-fca-compliance-guide"
PDF_PATH = "fg18-05.pdf"

# --- EMAIL CONFIGURATION ---
# You'll need to update these with your actual SMTP credentials
SMTP_SERVER = "smtp.gmail.com"  # Change based on your email provider
SMTP_PORT = 587
SENDER_EMAIL = os.environ.get("SENDER_EMAIL", "YOUR_EMAIL@gmail.com")  # Example: "XYZ@gmail.com"
SENDER_PASSWORD = os.environ.get("SENDER_PASSWORD", "YOUR_APP_PASSWORD")  # Example: "wegs gnrm igwl enhq" (App Password)

# Initialize Client
client = GravixLayer(api_key=GRAVIX_API_KEY)
print(f"GravixLayer Client Initialized (Agent Model: {CHAT_MODEL})")

GravixLayer Client Initialized (Agent Model: qwen/qwen-2.5-vl-7b-instruct)


## Reference Data Ingestion
We must load the PDF text into local memory to create the `DOC_REFERENCE_STORE`. This allows the agent to retrieve the actual text content when the Vector DB returns an ID.

In [85]:
def load_and_chunk_pdf(pdf_path: str) -> List[Dict]:
    """Extracts text and chunks it for the reference store"""
    print(f"Loading Reference Data: {pdf_path}")
    text = ""
    try:
        with open(pdf_path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            for page in reader.pages:
                t = page.extract_text()
                if t: text += t + "\n"
    except Exception as e:
        print(f"Error reading PDF: {e}")
        return []

    # Chunking Strategy - optimized for regulatory context
    text = re.sub(r'\s+', ' ', text).strip()
    chunk_size = 1200 
    overlap = 300
    chunks = []
    start = 0
    
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunk = text[start:end].strip()
        if chunk: chunks.append(chunk)
        start = max(end - overlap, start + 1)
        if start >= len(text): break
    
    # Format documents with IDs matching your Vector Index
    docs = []
    for i, c in enumerate(chunks):
        docs.append({
            "id": f"fca_doc_{i}",
            "text": c
        })
    
    print(f"Processed {len(docs)} reference chunks.")
    return docs

# Load Data
documents = load_and_chunk_pdf(PDF_PATH)

if documents:
    # Create the ID -> Text Map
    DOC_REFERENCE_STORE = {doc["id"]: doc["text"] for doc in documents}
    print(f"Reference Store Ready: {len(DOC_REFERENCE_STORE)} entries mapped.")
else:
    DOC_REFERENCE_STORE = {}
    print("Warning: Reference store empty. Agent will not be able to see text context.")

Loading Reference Data: fg18-05.pdf
Processed 796 reference chunks.
Reference Store Ready: 796 entries mapped.
Processed 796 reference chunks.
Reference Store Ready: 796 entries mapped.


## Vector Database Connection
We connect to the already-created index. **Upsert logic is commented out.**

In [86]:
# --- OPTIONAL: CREATE INDEX (Run once if index doesn't exist) ---
# try:
#     index = client.vectors.indexes.create(
#         name=INDEX_NAME,
#         dimension=1024,
#         metric="cosine",
#         cloud_provider="AWS",
#         region="us-east-1",
#         index_type="serverless"
#     )
#     print(f"Created index: {index.id}")
# except Exception as e:
#     print(f"Index creation skipped/failed: {e}")

class ComplianceVectorStore:
    def __init__(self, client, index_name):
        self.client = client
        self.index_name = index_name
        self.index_id = None
        
    def connect(self):
        try:
            print("Connecting to Vector Index...")
            # Get the list response object
            response = self.client.vectors.indexes.list()
            
            # ROBUST EXTRACTION: Identify where the list lives inside the object
            items = []
            if hasattr(response, 'data'):
                items = response.data
            elif hasattr(response, 'indexes'):
                items = response.indexes
            elif hasattr(response, 'items'):
                items = response.items
            elif isinstance(response, list):
                items = response
            else:
                # Fallback: If it's an unknown object, try to cast it to a list/dict to inspect
                print(f"Unknown response structure: {type(response)}")
                return

            # Iterate through the extracted items
            for idx in items:
                # Handle if idx is an object or a dictionary
                name = getattr(idx, 'name', idx.get('name') if isinstance(idx, dict) else None)
                idx_id = getattr(idx, 'id', idx.get('id') if isinstance(idx, dict) else None)
                
                if name == self.index_name:
                    self.index_id = idx_id
                    print(f"Connected to Index ID: {self.index_id}")
                    return
            
            print(f"Index '{self.index_name}' not found in account.")
            
        except Exception as e:
            print(f"Connection Error: {e}")

    def search(self, query: str, top_k: int = 5):
        if not self.index_id: raise ValueError("Index not connected.")
        vectors = self.client.vectors.index(self.index_id)
        return vectors.search_text(
            query=query,
            model=EMBEDDING_MODEL,
            top_k=top_k,
            filter={"country": "UK"}
        ).hits

# Initialize and Connect
vector_store = ComplianceVectorStore(client, INDEX_NAME)
vector_store.connect()

# --- DATA ALREADY INDEXED: UPSERT LOGIC DISABLED ---
# def upsert_data():
#    vectors = client.vectors.index(vector_store.index_id)
#    for doc in documents:
#        vectors.upsert_text(text=doc['text'], model=EMBEDDING_MODEL, id=doc['id'], ...)
# upsert_data()

Connecting to Vector Index...
Connected to Index ID: c0ebaa9c-46bb-47ea-ab16-a4664c0669e2
Connected to Index ID: c0ebaa9c-46bb-47ea-ab16-a4664c0669e2


## Investigative Agent Configuration
This class performs the **Chain-of-Thought** reasoning. It takes a customer case, finds evidence, and outputs a structured decision memo.

In [87]:
class InvestigativeAgent:
    def __init__(self, client, vector_store, doc_mapping):
        self.client = client
        self.vector_store = vector_store
        self.doc_mapping = doc_mapping
        
    def investigate_case(self, case_data: Dict[str, Any]) -> str:
        """
        Takes a structured case object (JSON) and performs a full AML investigation.
        """
        # 1. Formulate Search Query from Case Data
        # We create a targeted query based on the high-risk attributes found in the case
        search_query = f"risk associated with {case_data.get('customer_type', '')} {case_data.get('industry', '')} {case_data.get('jurisdiction', '')} {case_data.get('risk_flags', '')}"
        print(f"\nAgent Strategy: Searching regulations for '{search_query}'...")
        
        # 2. Retrieve Regulatory Context
        try:
            hits = self.vector_store.search(search_query, top_k=5)
        except Exception as e:
            return f"System Error: {e}"

        # 3. Build The Regulatory Knowledge Block (Using Memory Lookup)
        context_text = ""
        
        for i, hit in enumerate(hits):
            doc_id = getattr(hit, 'id', None)
            score = getattr(hit, 'score', 0.0)
            
            # Filter for quality
            if score > 0.60:
                # RETRIEVE TEXT FROM MEMORY
                content = self.doc_mapping.get(doc_id, "")
                if content:
                    context_text += f"\n[REGULATION ID: {doc_id}] (Relevance: {score:.2f})\nTEXT: {content[:2000]}\n"

        if not context_text:
            return "Low Confidence: No specific regulations found matching this scenario. Manual Review Required."

        # 4. The Investigator Prompt (Chain of Thought)
        print("Agent is reasoning on Risk Levels and Next Steps...")
        
        system_prompt = """You are a Senior AML Investigator for a UK Bank.
        Your job is to review a Customer Case and produce an 'Investigation Decision Memo'.
        
        You must follow this strict format:
        
        ### 1. Risk Assessment
        - **Risk Level**: (Low / Medium / High / Critical)
        - **Rationale**: Explain WHY based on the supplied Regulation Blocks.
        
        ### 2. Regulatory Citations
        - List specific risks identified in the FCA Guide. Cite [REGULATION ID] for every claim.
        
        ### 3. Required Action Items
        - Create a bulleted list of documents/evidence we must ask the client for.
        - **IMPORTANT**: This list must be client-facing. Do NOT include Regulation IDs or internal codes here.
        - Example: "Request Trust Deed to verify Beneficial Owners."
        """
        
        case_str = json.dumps(case_data, indent=2)
        
        try:
            response = self.client.chat.completions.create(
                model=CHAT_MODEL,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": f"REGULATIONS:\n{context_text}\n\nCUSTOMER CASE:\n{case_str}"}
                ]
            )
            report = response.choices[0].message.content
            
            # --- CITATION EXTRACTION LOGIC ---
            # Find all occurrences of IDs like fca_doc_XXX
            cited_ids = set(re.findall(r'fca_doc_\d+', report))
            
            if cited_ids:
                report += "\n\n### 4. Sources\n"
                for doc_id in cited_ids:
                    source_text = self.doc_mapping.get(doc_id, "Source text not found.")
                    # Format as a blockquote
                    report += f"\n**[{doc_id}]**\n> {source_text.strip()}\n"
            
            return report
            
        except Exception as e:
            return f"Analysis Failed: {e}"

# Initialize Agent
agent = InvestigativeAgent(client, vector_store, DOC_REFERENCE_STORE)
print("Investigative Agent Ready")

Investigative Agent Ready


In [88]:
class CommunicationAgent:
    def __init__(self, client, sender_email, sender_password, smtp_server, smtp_port):
        self.client = client
        self.sender_email = sender_email
        self.sender_password = sender_password
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port

    def extract_rfi_items(self, report_text: str) -> str:
        """
        Extracts the Required Action Items section from the investigation report.
        """
        patterns = [
            r"### 3\. Required Action Items.*?(?=###|\Z)",
            r"##\s*3\.\s*Required Action Items.*?(?=##|\Z)",
            r"Required Action Items.*?(?=###|##|\Z)"
        ]
        
        rfi_section = ""
        for pattern in patterns:
            match = re.search(pattern, report_text, re.DOTALL | re.IGNORECASE)
            if match:
                rfi_section = match.group(0).strip()
                break
        
        if not rfi_section:
            return "Please review the full report for required actions."

        # Clean up header
        lines = rfi_section.split('\n')
        cleaned_lines = []
        for line in lines:
            if not re.match(r'^#+\s*\d*\.?\s*Required Action Items', line, re.IGNORECASE) and \
               not re.match(r'^Required Action Items', line, re.IGNORECASE):
                cleaned_lines.append(line)
                
        return "\n".join(cleaned_lines).strip()

    def draft_email(self, customer_name: str, rfi_content: str) -> str:
        """
        Uses the LLM to draft a professional email requesting the documents.
        """
        print("Communication Agent: Drafting RFI email...")
        
        system_prompt = """You are a professional Compliance Officer at a UK Bank.
        Your goal is to write a polite, clear, and formal email to a corporate client requesting specific documents for AML verification.
        
        - Tone: Professional, firm but polite.
        - Structure: Subject line, Salutation, Context (AML review), List of requested items, Deadline/Call to action, Sign-off.
        - Output: Return ONLY the email body (HTML format). Do not include the subject line in the body, I will handle that separately.
        - Use <ul> and <li> tags for the list of items.
        - IMPORTANT: Do NOT use placeholders like [Date], [Name], or [Contact Info]. Use the specific details provided below.
        - CRITICAL: Do NOT mention internal procedures, Regulation IDs (e.g., [REGULATION ID: ...]), or Suspicious Activity Reports (SARs). The email must be strictly client-facing and clean.
        """
        
        user_prompt = f"""
        Customer Name: {customer_name}
        Required Items:
        {rfi_content}
        
        Context Details:
        - Deadline: within 5 business days
        - Contact Information: compliance@gravix.com
        - Sign-off Name: Gravix Compliance Team
        
        Draft the email body in HTML.
        """
        
        try:
            response = self.client.chat.completions.create(
                model=CHAT_MODEL,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ]
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"Failed to draft email with LLM: {e}")
            # Fallback template
            return f"""
            <p>Dear Client,</p>
            <p>Regarding {customer_name}, we require the following documents:</p>
            <pre>{rfi_content}</pre>
            <p>Regards,<br>Compliance Team</p>
            """

    def send_email(self, recipient_email: str, customer_name: str, report_text: str):
        """
        Orchestrates the extraction, drafting, and sending of the email.
        """
        # 1. Extract RFI
        rfi_content = self.extract_rfi_items(report_text)
        
        # 2. Draft Email Body (Agentic Step)
        email_html_body = self.draft_email(customer_name, rfi_content)
        
        # 3. Send Email
        try:
            msg = MIMEMultipart('alternative')
            msg['From'] = self.sender_email
            msg['To'] = recipient_email
            msg['Subject'] = f"Action Required: Compliance Documentation for {customer_name}"
            
            # Attach HTML body
            msg.attach(MIMEText(email_html_body, 'html'))
            
            with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
                server.starttls()
                server.login(self.sender_email, self.sender_password)
                server.send_message(msg)
            
            print(f"Communication Agent: Email sent to {recipient_email}")
            return True
            
        except Exception as e:
            print(f"Communication Agent: Failed to send email. {e}")
            return False

# Initialize Communication Agent
comm_agent = CommunicationAgent(client, SENDER_EMAIL, SENDER_PASSWORD, SMTP_SERVER, SMTP_PORT)
print("Communication Agent Ready")

Communication Agent Ready


## Live Investigation Scenarios
We simulate real-world banking scenarios (JSON payloads) to test the agent.

In [None]:
# --- SCENARIO A: The Complex Corporate Structure ---
case_payload = {
    "customer_name": "Alpha Omega Holdings Ltd",
    "jurisdiction": "British Virgin Islands (BVI)",
    "industry": "Precious Metals Trading",
    "structure": "Complex shell company with nominee directors",
    "risk_flags": "High value cross-border wires, opaque ownership",
    "contact_email": "client_contact@example.com"
}

print(f"Incoming Case: {case_payload['customer_name']}")

# 1. Investigative Agent runs the analysis
report = agent.investigate_case(case_payload)

print("\n" + "="*60)
print("INVESTIGATION MEMO")
print("="*60)
print(report)

# 2. Communication Agent handles the outreach
if case_payload.get('contact_email'):
    print("\n" + "="*60)
    print("HANDOFF TO COMMUNICATION AGENT")
    print("="*60)
    comm_agent.send_email(
        recipient_email=case_payload['contact_email'],
        customer_name=case_payload['customer_name'],
        report_text=report
    )
else:
    print("No email address provided.")

Incoming Case: Alpha Omega Holdings Ltd

Agent Strategy: Searching regulations for 'risk associated with  Precious Metals Trading British Virgin Islands (BVI) High value cross-border wires, opaque ownership'...
Agent is reasoning on Risk Levels and Next Steps...
Agent is reasoning on Risk Levels and Next Steps...

INVESTIGATION MEMO
### 1. Risk Assessment

- **Risk Level**: Critical
- **Rationale**: Based on the given customer case, Alpha Omega Holdings Ltd exhibits several red flags indicative of heightened risks related to financial crime. The high value cross-border wires and opaque ownership structure raise significant concerns. According to [fca_doc_558] (Financial Crime: a guide for firms), managing the risks associated with complex corporate structures and ensuring adequate due diligence on the clients are paramount. Further, [fca_doc_80] indicates the importance of effective enhanced ongoing monitoring for high-risk transactions. As evident in [fca_doc_457], risk assessments sh

In [None]:
# --- SCENARIO B: The PEP (Politically Exposed Person) ---
case_payload_2 = {
    "customer_name": "Mr. John Doe",
    "type": "Individual",
    "risk_flags": "Close associate of a Minister of Energy in Country X",
    "transaction_pattern": "Large cash deposits into personal account",
    "contact_email": "pep_contact@example.com"
}

print(f"\nIncoming Case: {case_payload_2['customer_name']}")

# 1. Investigative Agent runs the analysis
report_2 = agent.investigate_case(case_payload_2)

print("\n" + "="*60)
print("INVESTIGATION MEMO")
print("="*60)
print(report_2)

# 2. Communication Agent handles the outreach
if case_payload_2.get('contact_email'):
    print("\n" + "="*60)
    print("HANDOFF TO COMMUNICATION AGENT")
    print("="*60)
    comm_agent.send_email(
        recipient_email=case_payload_2['contact_email'],
        customer_name=case_payload_2['customer_name'],
        report_text=report_2
    )
else:
    print("No email address provided.")


Incoming Case: Mr. John Doe

Agent Strategy: Searching regulations for 'risk associated with    Close associate of a Minister of Energy in Country X'...
Agent is reasoning on Risk Levels and Next Steps...
Agent is reasoning on Risk Levels and Next Steps...

INVESTIGATION MEMO
### Risk Assessment

- **Risk Level**: Medium
- **Rationale**: The customer is a close associate of a Minister in Country X, which is a political high-risk area as per FCTR 1. Large cash deposits may indicate potential for money laundering as seen in Reg Doc 82, section 3.2.10. The need for further investigation arises to ensure compliance with regulations as described in FCG 1, particularly regarding reporting to NCA and maintaining clear processes outlined in FC.

### Regulatory Citations

- [REGULATION ID: fca_doc_82]
- [REGULATION ID: fca_doc_83]
- [REGULATION ID: fca_doc_557]
- [REGULATION ID: fca_doc_558]

### Required Action Items

- Request detailed financial statements and account activity report to veri

## Conclusion

### What We've Accomplished
In this cookbook, we have successfully built an end-to-end **Autonomous Compliance System**:

1.  **Built a Reference Store**: Created a system to ground AI responses in actual regulatory text, preventing hallucinations.
2.  **Deployed an Investigative Agent**: Implemented an AI that assesses risk, cites sources, and produces structured decision memos.
3.  **Implemented a Communication Agent**: Automated the "last mile" of compliance by drafting and sending client emails.
4.  **Validated with Scenarios**: Ran live simulations against complex cases (Shell Companies, PEPs) to prove the workflow.

### The Power of Agentic Compliance
This approach represents a shift from "search-based" to "reasoning-based" compliance:

-   **Consistency**: Every case is reviewed against the exact same regulatory standard.
-   **Speed**: Investigations that take humans hours are completed in seconds.
-   **Auditability**: The "Decision Memo" provides a clear, written trail of logic for auditors.
-   **Scalability**: The system can handle spikes in alert volume without adding headcount.

### Next Steps
To take this prototype to production:

1.  **Connect Real Data**: Hook the `InvestigativeAgent` into your Transaction Monitoring System (TMS) or CRM.
2.  **Human-in-the-Loop**: Insert a review step where a human analyst approves the "Decision Memo" before the email is sent.
3.  **Expand Knowledge Base**: Index more regulations (e.g., EU 5AMLD, US BSA/PATRIOT Act) to handle multi-jurisdictional cases.
4.  **Feedback Loop**: Use human edits to the generated emails to fine-tune the `CommunicationAgent` over time.

### Key Takeaways
-   **RAG + Agents = Expert Systems**: Simple vector search isn't enough; you need agentic reasoning to apply knowledge to facts.
-   **Structured Output is Critical**: Forcing the AI to output JSON or structured Markdown makes the results actionable by other systems.
-   **Gravix Layer SDK**: Simplifies the complex plumbing of vector databases, LLM inference, and agent orchestration.