# Auditing RAG Pipelines with LangChain

This notebook demonstrates how to use Conformal-Drift to audit RAG pipelines built with LangChain.

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/debu-sinha/conformaldrift/blob/main/examples/02_rag_langchain.ipynb)

In [None]:
# Install dependencies
!pip install conformal-drift langchain langchain-openai chromadb -q

In [None]:
import os
import numpy as np

# Set your API key
os.environ['OPENAI_API_KEY'] = 'your-api-key-here'

## 1. Set Up RAG Pipeline

In [None]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA

# Sample documents
documents = [
    "Python is a high-level programming language known for its readability.",
    "Machine learning is a subset of artificial intelligence.",
    "Neural networks are inspired by biological neural networks.",
    "Deep learning uses multiple layers of neural networks.",
    "Natural language processing deals with text and speech.",
]

# Set up embeddings and vector store
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_texts(documents, embeddings)

# Create RAG chain
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 2})
)

print("RAG pipeline set up successfully!")

## 2. Define Nonconformity Score Function

We'll use the similarity between retrieved documents and the response as the basis for nonconformity scores.

In [None]:
def compute_nonconformity_score(query, response, retrieved_docs):
    """
    Compute nonconformity score based on response-document alignment.
    Lower scores = more conforming (response aligns with documents)
    Higher scores = less conforming (potential hallucination)
    """
    if not retrieved_docs:
        return 1.0  # Maximum nonconformity if no docs retrieved
    
    # Get embeddings
    response_emb = embeddings.embed_query(response)
    doc_embs = [embeddings.embed_query(doc) for doc in retrieved_docs]
    
    # Compute similarities
    similarities = []
    for doc_emb in doc_embs:
        similarity = np.dot(response_emb, doc_emb) / (
            np.linalg.norm(response_emb) * np.linalg.norm(doc_emb)
        )
        similarities.append(similarity)
    
    # Nonconformity = 1 - max similarity
    return 1 - max(similarities)

print("Nonconformity function defined.")

## 3. Collect Calibration Scores

In [None]:
# Calibration queries
calibration_queries = [
    "What is Python?",
    "What is machine learning?",
    "How do neural networks work?",
    "What is deep learning?",
    "What is NLP?",
    # Add more queries for better calibration...
]

# Run calibration
calibration_scores = []

for query in calibration_queries:
    # Get response and retrieved docs
    result = qa_chain({"query": query})
    response = result['result']
    
    # Get retrieved docs
    docs = vectorstore.similarity_search(query, k=2)
    doc_texts = [doc.page_content for doc in docs]
    
    # Compute score
    score = compute_nonconformity_score(query, response, doc_texts)
    calibration_scores.append(score)
    print(f"Query: '{query[:30]}...' -> Score: {score:.3f}")

calibration_scores = np.array(calibration_scores)
print(f"\nCalibration complete: {len(calibration_scores)} samples")

## 4. Initialize Auditor and Run Audit

In [None]:
from conformal_drift import ConformalDriftAuditor

# Initialize auditor
auditor = ConformalDriftAuditor(
    calibration_scores=calibration_scores,
    alpha=0.1  # 90% coverage target
)

# Test queries (potentially shifted domain)
test_queries = [
    "What is reinforcement learning?",  # Related but not in corpus
    "How does GPT work?",  # Different domain
    "What is Python used for?",  # In domain
    # Add more test queries...
]

# Collect test scores
test_scores = []
test_labels = []  # 1 if response is grounded, 0 if hallucination

for query in test_queries:
    result = qa_chain({"query": query})
    response = result['result']
    
    docs = vectorstore.similarity_search(query, k=2)
    doc_texts = [doc.page_content for doc in docs]
    
    score = compute_nonconformity_score(query, response, doc_texts)
    test_scores.append(score)
    
    # For demo, manually label (in practice, use ground truth)
    test_labels.append(1 if score < 0.5 else 0)

print(f"Test samples: {len(test_scores)}")

In [None]:
# Run audit
test_data = {
    'scores': np.array(test_scores),
    'labels': np.array(test_labels)
}

results = auditor.audit(
    test_data=test_data,
    shift_type="semantic",
    shift_intensity=np.linspace(0, 1, 6)
)

print("\nAudit Results:")
print("-" * 40)
for i, cov in zip(results.shift_intensities, results.coverage):
    print(f"Shift {i:.0%}: Coverage = {cov:.3f}")
print(f"\nMax coverage gap: {results.max_coverage_gap:.3f}")

## 5. Recommendations

Based on the audit results, you can:

1. **Expand knowledge base** if coverage drops due to out-of-domain queries
2. **Recalibrate** if coverage degrades significantly under shift
3. **Add guardrails** to abstain on high-nonconformity predictions
4. **Monitor in production** using continuous auditing