# üîê Building a Production-Grade PII-Safe RAG Pipeline with Presidio, Qdrant & Custom Recognizers

Modern AI systems routinely process sensitive user data ‚Äî including names, emails, phone numbers, government IDs, and financial identifiers. Ensuring that this data is **never leaked**, **never embedded**, and **never exposed** in model outputs is fundamental to responsible AI engineering.

This tutorial walks through a complete, practical pipeline for handling sensitive information using:

* **Microsoft Presidio** for PII detection and redaction
* **Custom-built recognizers** for high-reliability detection of critical identifiers (SSN, credit cards, bank numbers, Aadhaar/PAN, etc.)
* **SentenceTransformer embeddings** for vectorization of *sanitized* text
* **Qdrant** as the vector database for secure retrieval
* **Fernet encryption** for storing original (unredacted) documents safely
* **Audit logging** for traceability and compliance support

By the end of this walkthrough you will have a fully functioning, end-to-end PII-safe RAG system that:

### ‚úî Prevents unwanted PII from entering embeddings

### ‚úî Supports deterministic, high-accuracy entity detection via custom recognizers

### ‚úî Stores originals in encrypted form for DSARs & audits

### ‚úî Retrieves only sanitized text into LLM prompts

### ‚úî Escalates and logs sensitive access operations

### ‚úî Is fully reproducible and version-stable

This mirrors the same architecture patterns used in production systems across finance, healthcare, and enterprise AI deployments ‚Äî but built in a lightweight, reproducible form right inside your Colab environment.

---

# üß≠ What You‚Äôll Learn

* How to configure and extend **Presidio** beyond its defaults
* Why production systems must **override certain built-in recognizers**
* How to safely build a **two-store model** (redacted vectors + encrypted originals)
* How to implement **PII-aware retrieval**
* How to create a **trusted audit trail** for all access
* How to assemble a **safe prompt** for downstream LLMs

---

# üéØ Why Custom Recognizers Matter

While Presidio provides many built-in recognizers, real-world deployments require:

* deterministic detection
* strong regex patterns
* multi-country ID support
* strict consistency across versions
* reproducibility

This tutorial shows you how to **extend Presidio with your own high-assurance recognizers**, ensuring your system doesn‚Äôt silently break when built-ins change or fail.

---

# üåê Who This is For

* ML Engineers building **secure RAG pipelines**
* Backend engineers integrating LLMs into compliance-sensitive applications
* AI security and privacy researchers
* Developers working with financial, identity, or healthcare data
* Anyone deploying LLMs in production and wanting to ‚Äúdo it right‚Äù

---

# üöÄ Ready? Let‚Äôs Start Building a Real PII-Safe RAG System.

---

## 1) Install dependencies

In [None]:
# ================================================================
# 1. Install and load spaCy EN model BEFORE Presidio (critical)
# ================================================================
!python -m spacy download en_core_web_lg --quiet

In [2]:
!pip install presidio-analyzer presidio-anonymizer qdrant-client sentence-transformers cryptography --quiet

[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m128.7/128.7 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m337.3/337.3 kB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m2.6/2.6 MB[0m [31m49.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m107.4/107.4 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h

## 2) Imports and helper utilities

In [1]:
# ----------------------------
# 1. Standard imports
# ----------------------------
import os, json, time, uuid
from pathlib import Path

# Presidio
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig

# Embeddings
from sentence_transformers import SentenceTransformer

# Qdrant (local mode)
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance

# Encryption
from cryptography.fernet import Fernet


# ----------------------------
# 2. Colab-friendly directories
# ----------------------------
DATA_DIR = Path("/content/pii_rag_demo")
DATA_DIR.mkdir(parents=True, exist_ok=True)

ORIGINALS_DIR = DATA_DIR / "originals"
ORIGINALS_DIR.mkdir(exist_ok=True)

ENCRYPTED_DIR = DATA_DIR / "encrypted_originals"
ENCRYPTED_DIR.mkdir(exist_ok=True)

AUDIT_LOG_PATH = DATA_DIR / "audit_log.ndjson"


# ----------------------------
# 3. Initialize Presidio engines
# ----------------------------
analyzer = AnalyzerEngine()      # uses spaCy pipeline you installed
anonymizer = AnonymizerEngine()


# ----------------------------
# 4. Embedding model
# ----------------------------
embed_model = SentenceTransformer("all-MiniLM-L6-v2")


# ----------------------------
# 5. Qdrant local DB (disk persistence in Colab)
# ----------------------------
qdrant_path = DATA_DIR / "qdrant_db"
qdrant = QdrantClient(path=str(qdrant_path))

COLLECTION_NAME = "docs_redacted"
VECTOR_SIZE = embed_model.get_sentence_embedding_dimension()

# Create collection if not exists
collections = qdrant.get_collections().collections
existing = [c.name for c in collections]

if COLLECTION_NAME not in existing:
    qdrant.recreate_collection(
        collection_name=COLLECTION_NAME,
        vectors_config=VectorParams(size=VECTOR_SIZE, distance=Distance.COSINE),
    )


# ----------------------------
# 6. Encryption key handling
# ----------------------------
KEY_PATH = DATA_DIR / "fernet.key"

if not KEY_PATH.exists():
    key = Fernet.generate_key()
    KEY_PATH.write_bytes(key)
else:
    key = KEY_PATH.read_bytes()

fernet = Fernet(key)


# ----------------------------
# 7. Audit log helper
# ----------------------------
def audit_log(event: dict):
    entry = {
        "id": str(uuid.uuid4()),
        "ts": time.time(),
        **event
    }
    with open(AUDIT_LOG_PATH, "a") as f:
        f.write(json.dumps(entry) + "\n")


print("Setup complete. Qdrant collection:", COLLECTION_NAME)
print("Working directory:", DATA_DIR)




Setup complete. Qdrant collection: docs_redacted
Working directory: /content/pii_rag_demo


## 3) Sample documents (3 docs)

In [2]:
docs = {
"doc1": {
"title": "Meeting notes",
"text": "We met with Acme Corp. Agenda: roadmap, Q4 planning. Attendees: Alice, Bob."
},
"doc2": {
"title": "Patient record",
"text": "Patient John Doe, born 1990-01-01, email john.doe@example.com, SSN 123-45-6789, phone +1 555-123-4567. Notes: allergic to penicillin."
},
"doc3": {
"title": "Engineer bio",
"text": "Dev: Amar Singh. Email: amar@example.org. Likes football and ML."
}
}


# Save originals (plaintext) and audit
for doc_id, doc in docs.items():
  p = ORIGINALS_DIR / f"{doc_id}.txt"
  p.write_text(doc["text"], encoding="utf-8")
  audit_log({"event":"ingest_plain_save","doc_id":doc_id, "path":str(p)})


print("Saved originals to", ORIGINALS_DIR)

Saved originals to /content/pii_rag_demo/originals


In [14]:
from presidio_analyzer import RecognizerRegistry

registry = RecognizerRegistry()
registry.load_predefined_recognizers()
all_recognizers = registry.recognizers   # <-- internal list, safe to inspect

print("Total recognizers:", len(all_recognizers))
print("--------------------------------------")
for r in all_recognizers:
    print(f"{r.__class__.__name__}: {r.supported_entities}")




Total recognizers: 16
--------------------------------------
CreditCardRecognizer: ['CREDIT_CARD']
UsBankRecognizer: ['US_BANK_NUMBER']
UsLicenseRecognizer: ['US_DRIVER_LICENSE']
UsItinRecognizer: ['US_ITIN']
UsPassportRecognizer: ['US_PASSPORT']
UsSsnRecognizer: ['US_SSN']
NhsRecognizer: ['UK_NHS']
CryptoRecognizer: ['CRYPTO']
DateRecognizer: ['DATE_TIME']
EmailRecognizer: ['EMAIL_ADDRESS']
IbanRecognizer: ['IBAN_CODE']
IpRecognizer: ['IP_ADDRESS']
MedicalLicenseRecognizer: ['MEDICAL_LICENSE']
PhoneRecognizer: ['PHONE_NUMBER']
UrlRecognizer: ['URL']
SpacyRecognizer: ['DATE_TIME', 'NRP', 'LOCATION', 'PERSON', 'ORGANIZATION']


In [10]:
from presidio_analyzer import Pattern, PatternRecognizer

ssn_pattern = Pattern(
    name="ssn_pattern",
    regex=r"\b(?!(000|666|9))\d{3}[- ]?(?!00)\d{2}[- ]?(?!0000)\d{4}\b",
    score=0.8,
)

custom_ssn_recognizer = PatternRecognizer(
    supported_entity="SSN",
    patterns=[ssn_pattern],
    supported_language="en"
)

analyzer.registry.add_recognizer(custom_ssn_recognizer)

print("Added custom SSN recognizer.")

Added custom SSN recognizer.


In [12]:
res = analyzer.analyze(
    text=text,
    entities=["SSN"],   # <-- custom recognizer
    language="en"
)

redacted = anonymizer.anonymize(
    text=text,
    analyzer_results=res,
    operators={"DEFAULT": OperatorConfig("replace", {"new_value":"[REDACTED]"})}
)


In [13]:
test = "SSN 123-45-6789 belongs to John Doe"
print(analyzer.analyze(test, entities=["SSN"], language="en"))


[type: SSN, start: 4, end: 15, score: 0.8]


## 4) PII detection, redaction, sensitivity scoring, encryption of original

In [15]:
ENTITIES = [
    "PERSON",
    "PHONE_NUMBER",
    "EMAIL_ADDRESS",
    "SSN",
    "DATE_TIME",
    "LOCATION",
]

from qdrant_client.models import PointStruct

# Simple sensitivity scoring
WEIGHTS = {
    "US_SSN": 10,
    "EMAIL_ADDRESS": 3,
    "PHONE_NUMBER": 3,
    "PERSON": 2,
    "LOCATION": 2,
    "DATE_TIME": 1,
}

def sensitivity_score(results):
    score = 0
    for r in results:
        score += WEIGHTS.get(r.entity_type, 1)
    return score


# Redaction operator: replace with [REDACTED_<TYPE>]


def make_ops(results):
    # We will supply per-entity operators but the anonymizer can use DEFAULT as well
    return {"DEFAULT": OperatorConfig("replace", {"new_value": "[REDACTED]"})}


# Process and index each doc
for doc_id, doc in docs.items():
    text = doc["text"]
    results = analyzer.analyze(text=text, entities=ENTITIES, language="en")
    score = sensitivity_score(results)
    ops = make_ops(results)
    anonymized = anonymizer.anonymize(text=text, analyzer_results=results, operators=ops)
    redacted_text = anonymized.text

    # Save redacted text to a file (for auditing and to index)
    redacted_path = DATA_DIR / f"{doc_id}_redacted.txt"
    redacted_path.write_text(redacted_text, encoding="utf-8")

    # Encrypt original and save encrypted blob
    orig_path = ORIGINALS_DIR / f"{doc_id}.txt"
    data = orig_path.read_bytes()
    token = fernet.encrypt(data)
    enc_path = ENCRYPTED_DIR / f"{doc_id}.enc"
    enc_path.write_bytes(token)

    # Create embedding for redacted text
    embedding = embed_model.encode(redacted_text).tolist()

    # Upsert into Qdrant
    metadata = {
        "doc_id": doc_id,
        "title": doc["title"],
        "redacted_path": str(redacted_path),
        "encrypted_path": str(enc_path),
        "sensitivity_score": score,
        "detected_entities": [
            {"type": r.entity_type, "start": r.start, "end": r.end, "score": r.score}
            for r in results
        ],
    }
    qdrant.upsert(
        collection_name=COLLECTION_NAME,
        points=[
            PointStruct(id=str(uuid.uuid5(uuid.NAMESPACE_DNS, doc_id)), vector=embedding, payload=metadata)
        ],
    )

    audit_log(
        {
            "event": "ingest_indexed",
            "doc_id": doc_id,
            "redacted_path": str(redacted_path),
            "encrypted_path": str(enc_path),
            "sensitivity_score": score,
        }
    )

print("Ingest & index complete. Qdrant collection has docs.")

Ingest & index complete. Qdrant collection has docs.


## 5) Retrieval demo + guardrail escalation

In [16]:
from qdrant_client.http import models as rest_models


def retrieve(query, top_k=3, score_threshold=8):
  q_vec = embed_model.encode(query).tolist()
  resp = qdrant.search(collection_name=COLLECTION_NAME, query_vector=q_vec, limit=top_k)
  hits = []
  for p in resp:
    payload = p.payload
    hits.append({
    "id": p.id,
    "score": p.score,
    "payload": payload
    })
# Check sensitivity
  for h in hits:
    if h["payload"].get("sensitivity_score", 0) >= score_threshold:
      audit_log({"event":"retrieval_escalate","query":query,"doc_id":h["id"],"sensitivity_score":h["payload"]["sensitivity_score"]})
  return {"action":"escalate","doc":h}
# else assemble context
  context = "\n\n".join([ Path(h["payload"]["redacted_path"]).read_text(encoding="utf-8") for h in hits ])
  audit_log({"event":"retrieval_ok","query":query,"doc_ids":[h["id"] for h in hits]})
  return {"action":"ok","context":context,"hits":hits}


# Run two queries: one innocuous, one that triggers escalate
print("Query 1: 'roadmap' -> expects doc1")
print(retrieve("roadmap"))


print("\nQuery 2: 'patient John' -> should trigger escalation due to SSN/email")
print(retrieve("patient John"))

Query 1: 'roadmap' -> expects doc1
{'action': 'escalate', 'doc': {'id': 'fbe4ef3e-9402-5135-99fb-e5bf7ddcfa7a', 'score': 0.09000226821727905, 'payload': {'doc_id': 'doc2', 'title': 'Patient record', 'redacted_path': '/content/pii_rag_demo/doc2_redacted.txt', 'encrypted_path': '/content/pii_rag_demo/encrypted_originals/doc2.enc', 'sensitivity_score': 10, 'detected_entities': [{'type': 'EMAIL_ADDRESS', 'start': 41, 'end': 61, 'score': 1.0}, {'type': 'PERSON', 'start': 8, 'end': 16, 'score': 0.85}, {'type': 'DATE_TIME', 'start': 23, 'end': 33, 'score': 0.85}, {'type': 'SSN', 'start': 67, 'end': 78, 'score': 0.8}, {'type': 'PHONE_NUMBER', 'start': 89, 'end': 101, 'score': 0.75}]}}}

Query 2: 'patient John' -> should trigger escalation due to SSN/email
{'action': 'escalate', 'doc': {'id': '460ae2af-2a4b-58d5-b3e0-a142023d83bb', 'score': 0.012802670378199026, 'payload': {'doc_id': 'doc1', 'title': 'Meeting notes', 'redacted_path': '/content/pii_rag_demo/doc1_redacted.txt', 'encrypted_path': 

  resp = qdrant.search(collection_name=COLLECTION_NAME, query_vector=q_vec, limit=top_k)


## 6) How to satisfy a DSAR / delete request

In [None]:
# Example: delete doc2 from both Qdrant and encrypted store
def delete_doc(doc_id):
# delete from qdrant
  qdrant.delete(collection_name=COLLECTION_NAME, points=[doc_id])
  # delete encrypted file
  enc_path = ENCRYPTED_DIR / f"{doc_id}.enc"
  if enc_path.exists():
    enc_path.unlink()
# append audit
  audit_log({"event":"dsar_delete","doc_id":doc_id})


# Demonstrate
print("Before deletion, search for 'patient'")
print(retrieve("patient"))


print("Deleting doc2...")
delete_doc("doc2")


print("After deletion, search for 'patient' -> should not return doc2")
print(retrieve("patient"))

## 7) Notes, security & production considerations

- Key Management: In production, do not store Fernet keys locally. Use a KMS (AWS KMS, GCP KMS, Azure Key Vault) and rotate keys. Use MultiFernet for rotation.

- RBAC: Protect Qdrant endpoints and encrypted storage with access controls.

- PII Detection Coverage: Presidio covers many entities; extend with custom recognizers for domain-specific PII.

- Audit Durability: Use append-only logging to an immutable store (Cloud Storage with WORM settings, or an append-only DB).

- Human-in-the-loop UI: For escalations, integrate a simple approve/reject workflow (e.g., Streamlit) to inspect encrypted originals (after appropriate auth) and approve redacted excerpts.