# Guardrails Service Demo

This notebook demonstrates how to use the Guardrails Service for anomaly detection in text requests. The service uses vector embeddings to compare incoming requests against a baseline dataset and detect anomalies.

We'll use a realistic dataset of 100 pharmacy customer questions to demonstrate how the service works in a real-world scenario.

## Prerequisites

Before running this notebook, make sure you have:
1. Started the Guardrails Service: `uv run uvicorn guardrails_service.main:app --reload`
2. Installed required dependencies: `uv sync --examples`

## Setup and Imports

In [None]:
import httpx
import json
import pandas as pd
from datetime import datetime
import time
import subprocess
import os
from typing import Optional

# Configuration
BASE_URL = "http://localhost:8000"
directory = os.path.abspath(".") + "/examples"
service_process: Optional[subprocess.Popen] = None

## Start the Guardrails Service

This will start the service in the background so we can interact with it through this notebook.

In [None]:
def start_service():
    """Start the guardrails service in the background"""
    global service_process
    try:
        # Check if service is already running
        response = httpx.get(f"{BASE_URL}/health", timeout=2)
        if response.status_code == 200:
            print("‚úÖ Service is already running")
            return
    except:
        pass
    
    print("üöÄ Starting Guardrails Service...")
    
    # Change to the project directory
    os.chdir("..")
    
    # Start the service
    service_process = subprocess.Popen(
        ["uv", "run", "uvicorn", "guardrails_service.server:app", "--port", "8000"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    
    # Wait for service to start
    for i in range(30):  # Wait up to 30 seconds
        try:
            response = httpx.get(f"{BASE_URL}/health", timeout=2)
            if response.status_code == 200:
                print("‚úÖ Service started successfully!")
                return
        except:
            time.sleep(1)
    
    print("‚ùå Failed to start service")

def stop_service():
    """Stop the guardrails service"""
    global service_process
    if service_process:
        print("üõë Stopping Guardrails Service...")
        service_process.terminate()
        service_process.wait()
        service_process = None
        print("‚úÖ Service stopped")

# Start the service
start_service()

## Helper Functions

Let's create some helper functions to interact with the API more easily.

In [None]:
def health_check():
    """Check if the service is healthy"""
    response = httpx.get(f"{BASE_URL}/health")
    return response.json()

def upload_baseline(requests_data):
    """Upload a baseline dataset"""
    payload = {
        "requests": [
            {
                "text": req["text"],
                "timestamp": req["timestamp"]
            }
            for req in requests_data
        ]
    }
    response = httpx.post(f"{BASE_URL}/anomaly/baseline/upload", json=payload)
    return response.json()

def add_to_baseline(text, timestamp=None):
    """Add a single request to the baseline"""
    if timestamp is None:
        timestamp = datetime.now().isoformat()
    
    payload = {
        "text": text,
        "timestamp": timestamp
    }
    response = httpx.post(f"{BASE_URL}/anomaly/baseline/add", json=payload)
    return response.json()

def detect_anomaly(text, threshold=None, compare_to=None, timestamp=None):
    """Run anomaly detection on a text request"""
    payload = {
        "text": text
    }
    
    if timestamp:
        payload["timestamp"] = timestamp
    if threshold is not None:
        payload["threshold"] = threshold
    if compare_to is not None:
        payload["compare_to"] = compare_to
    
    response = httpx.post(f"{BASE_URL}/anomaly/detect", json=payload)
    return response.json()

def get_baseline_stats():
    """Get baseline dataset statistics"""
    response = httpx.get(f"{BASE_URL}/anomaly/baseline/stats")
    return response.json()

def clear_baseline():
    """Clear all baseline data"""
    payload = {}
    response = httpx.post(f"{BASE_URL}/anomaly/baseline/clear", json=payload)
    return response.json()

# Malicious Detection Helper Functions

def upload_malicious_baseline(requests_data):
    """Upload a malicious baseline dataset"""
    payload = {
        "requests": [
            {
                "text": req["text"],
                "timestamp": req["timestamp"]
            }
            for req in requests_data
        ]
    }
    response = httpx.post(f"{BASE_URL}/malicious/baseline/upload", json=payload)
    return response.json()

def add_to_malicious_baseline(text, timestamp=None):
    """Add a single request to the malicious baseline"""
    if timestamp is None:
        timestamp = datetime.now().isoformat()
    
    payload = {
        "text": text,
        "timestamp": timestamp
    }
    response = httpx.post(f"{BASE_URL}/malicious/baseline/add", json=payload)
    return response.json()

def detect_malicious(text, threshold=None, compare_to=None, timestamp=None):
    """Run malicious detection on a text request"""
    payload = {
        "text": text
    }
    
    if timestamp:
        payload["timestamp"] = timestamp
    if threshold is not None:
        payload["threshold"] = threshold
    if compare_to is not None:
        payload["compare_to"] = compare_to
    
    response = httpx.post(f"{BASE_URL}/malicious/detect", json=payload)
    return response.json()

def get_malicious_baseline_stats():
    """Get malicious baseline dataset statistics"""
    response = httpx.get(f"{BASE_URL}/malicious/baseline/stats")
    return response.json()

def clear_malicious_baseline():
    """Clear all malicious baseline data"""
    payload = {}
    response = httpx.post(f"{BASE_URL}/malicious/baseline/clear", json=payload)
    return response.json()

def pretty_print_result(result, title=""):
    """Pretty print anomaly detection results"""
    if title:
        print(f"\n=== {title} ===")
    
    if "result" in result:
        r = result["result"]
        
        # Handle both anomaly and malicious results
        if "is_anomaly" in r:
            status = "üö® ANOMALY" if r["is_anomaly"] else "‚úÖ NORMAL"
            reasons_key = "anomaly_reasons"
        elif "is_malicious" in r:
            status = "üö® MALICIOUS" if r["is_malicious"] else "‚úÖ BENIGN"
            reasons_key = "malicious_reasons"
        else:
            status = "‚ùì UNKNOWN"
            reasons_key = "reasons"
            
        print(f"{status} - Confidence: {r['confidence_score']:.3f} - Risk: {r['risk_level']}")
        print(f"Similar records: {r['similar_records_count']}")
        
        if reasons_key in r and r[reasons_key]:
            print("Reasons:")
            for reason in r[reasons_key]:
                print(f"  - {reason}")
        
        if "baseline_stats" in result and result["baseline_stats"]:
            stats = result["baseline_stats"]
            if "threshold" in stats:
                print(f"Threshold used: {stats['threshold']}")
            if "detection_distance" in stats:
                print(f"Detection distance ({stats.get('detection_metric', 'unknown')}): {stats['detection_distance']:.3f}")
    else:
        print(json.dumps(result, indent=2))

# Test the connection
print("Testing connection...")
print(health_check())

## Step 1: Clear Any Existing Baseline Data

Let's start fresh by clearing any existing baseline data.

In [None]:
print("Clearing existing baseline data...")
result = clear_baseline()
print(f"‚úÖ {result['message']}")
print(f"Records removed: {result['records_removed']}")

# Check baseline stats
stats = get_baseline_stats()
print(f"\nBaseline stats: {stats}")

## Step 2: Load Pharmacy Baseline Dataset

We'll load a realistic dataset of 100 pharmacy customer questions from a JSON file. This represents normal pharmacy interactions.

In [None]:
# Load pharmacy baseline data from JSON file
print("Loading pharmacy baseline dataset from JSON file...")

# Load the data from our JSON file
with open(os.path.join(directory, "data/baseline.json"), "r") as f:
    baseline_data = json.load(f)

baseline_requests = baseline_data["requests"]

print(f"Loaded {len(baseline_requests)} pharmacy questions")
print("\nSample questions:")
for i, req in enumerate(baseline_requests[:5]):
    print(f"  {i+1}. {req['text']}")

print("\nUploading baseline dataset to service...")
result = upload_baseline(baseline_requests)
print(f"‚úÖ {result['message']}")
print(f"Records added: {result['records_added']}")

# Check baseline stats
stats = get_baseline_stats()
print(f"\nUpdated baseline stats: {stats}")

## Step 3: Test Anomaly Detection - Normal Pharmacy Questions

Let's test some normal pharmacy questions that should be similar to our baseline data.

In [None]:
# Test normal pharmacy questions (should not be flagged as anomalies)
normal_pharmacy_questions = [
    "I need to refill my blood pressure medication",
    "What are your pharmacy hours today?",
    "Can I pick up my prescription for my child?",
    "Do you accept my Medicare insurance?",
    "I need help understanding my medication dosage",
    "Can I get a flu shot here?"
]

print("Testing normal pharmacy questions (should NOT be flagged as anomalies):")
print("=" * 70)

for question in normal_pharmacy_questions:
    result = detect_anomaly(question)
    pretty_print_result(result, f"Question: {question}")

## Step 4: Test Anomaly Detection - Suspicious/Inappropriate Requests

Now let's test some suspicious or inappropriate requests that should be flagged as anomalies in a pharmacy context.

In [None]:
# Test suspicious/inappropriate requests (should be flagged as anomalies)
suspicious_requests = [
    "Can I buy oxycodone without a prescription?",
    "I want to purchase large quantities of pseudoephedrine for my lab",
    "How can I forge a prescription from my doctor?",
    "Can you sell me controlled substances under the table?",
    "I need access to your computer system to check my records",
    "'; DROP TABLE prescriptions; --",
    "<script>alert('pharmacy hack')</script>",
    "../../etc/passwd",
    "What's the pharmacist's home address?",
    "Can I get free drugs if I threaten you?"
]

print("Testing suspicious/inappropriate requests (should be flagged as anomalies):")
print("=" * 70)

for request in suspicious_requests:
    result = detect_anomaly(request)
    pretty_print_result(result, f"Request: {request}")

## Step 5: Test Different Threshold Values

Let's experiment with different threshold values to see how they affect anomaly detection sensitivity.

In [None]:
# Test the same request with different thresholds
test_request = "I'm in a lot of pain, can you help me get some painkillers?"
thresholds = [0.3, 0.5, 0.7, 0.9]

print(f"Testing request '{test_request}' with different thresholds:")
print("=" * 70)

results_summary = []

for threshold in thresholds:
    result = detect_anomaly(test_request, threshold=threshold)
    pretty_print_result(result, f"Threshold: {threshold}")
    
    # Store for summary
    results_summary.append({
        "threshold": threshold,
        "is_anomaly": result["result"]["is_anomaly"],
        "confidence_score": result["result"]["confidence_score"],
        "risk_level": result["result"]["risk_level"]
    })

# Create a summary table
print("\nüìä Threshold Sensitivity Summary:")
df = pd.DataFrame(results_summary)
print(df.to_string(index=False))

## Step 6: Test Different Compare_To Values

Let's test how changing the number of similar vectors to compare affects the results.

In [None]:
# Test the same request with different compare_to values
test_request = "Can I get information about drug interactions with my supplements?"
compare_to_values = [3, 5, 10, 15]

print(f"Testing request '{test_request}' with different compare_to values:")
print("=" * 70)

results_summary = []

for compare_to in compare_to_values:
    result = detect_anomaly(test_request, compare_to=compare_to)
    pretty_print_result(result, f"Compare to: {compare_to} vectors")
    
    # Store for summary
    results_summary.append({
        "compare_to": compare_to,
        "is_anomaly": result["result"]["is_anomaly"],
        "confidence_score": result["result"]["confidence_score"],
        "similar_records_count": result["result"]["similar_records_count"]
    })

# Create a summary table
print("\nüìä Compare_To Sensitivity Summary:")
df = pd.DataFrame(results_summary)
print(df.to_string(index=False))

## Step 7: Update Baseline with New Data

Let's add some new entries to our baseline dataset and see how it affects detection.

In [None]:
# Get current baseline stats
print("Current baseline stats:")
stats = get_baseline_stats()
print(stats)

# Add some new entries to the baseline - additional pharmacy-related questions
new_entries = [
    "I need to schedule a vaccination appointment",
    "Can you check my medication history?",
    "What are the side effects of this new prescription?",
    "Do you offer compound medications?",
    "I need help with my medication adherence packaging"
]

print("\nAdding new pharmacy-related entries to baseline:")
for entry in new_entries:
    result = add_to_baseline(entry)
    print(f"‚úÖ Added: {entry}")

# Check updated stats
print("\nUpdated baseline stats:")
stats = get_baseline_stats()
print(stats)

## Step 8: Re-test Anomaly Detection After Baseline Update

Now let's test the same requests again to see how the updated baseline affects detection.

In [None]:
# Test a request that should now be less anomalous
test_request = "I need information about my medication side effects"

print(f"Testing '{test_request}' after baseline update:")
print("=" * 70)

result = detect_anomaly(test_request)
pretty_print_result(result, "After baseline update")

# Test with the same suspicious requests as before
print("\nRe-testing suspicious requests after baseline update:")
print("=" * 70)

suspicious_sample = [
    "Can I buy oxycodone without a prescription?",
    "'; DROP TABLE prescriptions; --"
]

for request in suspicious_sample:
    result = detect_anomaly(request)
    pretty_print_result(result, f"Request: {request}")

## Step 9: Test Custom Threshold and Compare_To Combined

Let's test using both custom threshold and compare_to values together.

In [None]:
# Test combinations of threshold and compare_to
test_request = "I need help choosing between brand name and generic medications"

test_combinations = [
    {"threshold": 0.5, "compare_to": 5},
    {"threshold": 0.7, "compare_to": 10},
    {"threshold": 0.9, "compare_to": 3},
    {"threshold": 0.3, "compare_to": 15}
]

print(f"Testing '{test_request}' with different parameter combinations:")
print("=" * 70)

results_summary = []

for params in test_combinations:
    result = detect_anomaly(
        test_request,
        threshold=params["threshold"],
        compare_to=params["compare_to"]
    )
    
    title = f"Threshold: {params['threshold']}, Compare_to: {params['compare_to']}"
    pretty_print_result(result, title)
    
    # Store for summary
    results_summary.append({
        "threshold": params["threshold"],
        "compare_to": params["compare_to"],
        "is_anomaly": result["result"]["is_anomaly"],
        "confidence_score": result["result"]["confidence_score"],
        "risk_level": result["result"]["risk_level"]
    })

# Create a summary table
print("\nüìä Parameter Combination Summary:")
df = pd.DataFrame(results_summary)
print(df.to_string(index=False))

## Step 10: Final Baseline Clearing

Finally, let's clear the baseline to demonstrate the clearing functionality.

In [None]:
# Check current baseline stats before clearing
print("Baseline stats before clearing:")
stats = get_baseline_stats()
print(stats)

# Clear the baseline
print("\nClearing baseline dataset...")
result = clear_baseline()
print(f"‚úÖ {result['message']}")
print(f"Records removed: {result['records_removed']}")

# Check stats after clearing
print("\nBaseline stats after clearing:")
stats = get_baseline_stats()
print(stats)

# Test anomaly detection with no baseline (should flag everything as anomaly)
print("\nTesting anomaly detection with no baseline data:")
result = detect_anomaly("I need to refill my prescription")
pretty_print_result(result, "No baseline data")

In [None]:
# Check current malicious baseline stats before clearing
print("Malicious baseline stats before clearing:")
stats = get_malicious_baseline_stats()
print(stats)

# Clear the malicious baseline
print("\nClearing malicious baseline dataset...")
result = clear_malicious_baseline()
print(f"‚úÖ {result['message']}")
print(f"Records removed: {result['records_removed']}")

# Check stats after clearing
print("\nMalicious baseline stats after clearing:")
stats = get_malicious_baseline_stats()
print(stats)

# Test malicious detection with no baseline (should not flag anything as malicious)
print("\nTesting malicious detection with no baseline data:")
result = detect_malicious("'; DROP TABLE users; --")
pretty_print_result(result, "No malicious baseline data")

### Step 16: Clear Malicious Baseline

Finally, let's clear the malicious baseline to demonstrate the clearing functionality.

In [None]:
# Test various requests against both detection systems
test_cases = [
    {
        "text": "I need to refill my blood pressure medication",
        "category": "Normal Pharmacy Request"
    },
    {
        "text": "Can I buy oxycodone without a prescription?",
        "category": "Suspicious Pharmacy Request"
    },
    {
        "text": "'; DROP TABLE users; --",
        "category": "SQL Injection Attack"
    },
    {
        "text": "<script>alert('XSS')</script>",
        "category": "XSS Attack"
    },
    {
        "text": "I want to hack into your system and steal data",
        "category": "Explicit Threat"
    },
    {
        "text": "What's the protocol for missed doses of medication?",
        "category": "Novel Pharmacy Question"
    }
]

print("Dual Detection System Comparison:")
print("=" * 80)

comparison_results = []

for test_case in test_cases:
    text = test_case["text"]
    category = test_case["category"]
    
    print(f"\nüîç Testing: {category}")
    print(f"Text: '{text}'")
    print("-" * 80)
    
    # Run both detections
    anomaly_result = detect_anomaly(text)
    malicious_result = detect_malicious(text)
    
    # Extract key info
    is_anomaly = anomaly_result["result"]["is_anomaly"]
    anomaly_confidence = anomaly_result["result"]["confidence_score"]
    
    is_malicious = malicious_result["result"]["is_malicious"]
    malicious_confidence = malicious_result["result"]["confidence_score"]
    
    print(f"üìä ANOMALY DETECTION:  {'üö® ANOMALY' if is_anomaly else '‚úÖ NORMAL'} (confidence: {anomaly_confidence:.3f})")
    print(f"üõ°Ô∏è  MALICIOUS DETECTION: {'üö® MALICIOUS' if is_malicious else '‚úÖ BENIGN'} (confidence: {malicious_confidence:.3f})")
    
    # Determine overall status
    if is_anomaly and is_malicious:
        overall = "üî¥ BLOCKED (Both systems flagged)"
    elif is_anomaly or is_malicious:
        flagged_by = "Anomaly" if is_anomaly else "Malicious"
        overall = f"üü° FLAGGED (Flagged by {flagged_by} detection)"
    else:
        overall = "üü¢ ALLOWED (Both systems passed)"
    
    print(f"üéØ OVERALL DECISION: {overall}")
    
    # Store results for summary
    comparison_results.append({
        "category": category,
        "anomaly_flagged": is_anomaly,
        "malicious_flagged": is_malicious,
        "anomaly_confidence": round(anomaly_confidence, 3),
        "malicious_confidence": round(malicious_confidence, 3),
        "overall_status": "BLOCKED" if (is_anomaly and is_malicious) else "FLAGGED" if (is_anomaly or is_malicious) else "ALLOWED"
    })

# Create summary table
print("\n" + "=" * 80)
print("üìã DUAL DETECTION SUMMARY TABLE:")
print("=" * 80)
df = pd.DataFrame(comparison_results)
print(df.to_string(index=False))

### Step 15: Dual Detection Comparison

Let's test the same requests against both detection systems to see how they complement each other.

In [None]:
# Test malicious detection with different thresholds
test_request = "SELECT * FROM users WHERE admin = 1"
thresholds = [0.1, 0.25, 0.5, 0.8]

print(f"Testing request '{test_request}' with different malicious thresholds:")
print("=" * 70)

results_summary = []

for threshold in thresholds:
    result = detect_malicious(test_request, threshold=threshold)
    pretty_print_result(result, f"Threshold: {threshold}")
    
    # Store for summary
    results_summary.append({
        "threshold": threshold,
        "is_malicious": result["result"]["is_malicious"],
        "confidence_score": result["result"]["confidence_score"],
        "risk_level": result["result"]["risk_level"]
    })

# Create a summary table
print("\nüìä Malicious Detection Threshold Sensitivity Summary:")
df = pd.DataFrame(results_summary)
print(df.to_string(index=False))

### Step 14: Test Malicious Detection with Different Thresholds

Let's see how different threshold values affect malicious detection sensitivity.

In [None]:
# Test attack patterns (should be flagged as malicious)
attack_patterns = [
    "'; DELETE FROM prescriptions WHERE id > 0; --",  # Similar to SQL injection in baseline
    "<script>window.location='http://evil.com'</script>",  # Similar to XSS in baseline
    "../../windows/system32/config",  # Similar to path traversal in baseline
    "Can I buy oxycodone in bulk for my operation?",  # Similar to drug requests in baseline
    "I want to hack into your prescription database",  # Similar to hacking attempts in baseline
    "UNION ALL SELECT password FROM admin_users",  # Similar to SQL injection in baseline
    "Give me access to patient social security numbers",  # Similar to data theft in baseline
    "<iframe src='javascript:document.location=\"http://attacker.com\"'></iframe>",  # Similar to XSS
]

print("Testing attack patterns (should be flagged as malicious):")
print("=" * 70)

for request in attack_patterns:
    result = detect_malicious(request)
    pretty_print_result(result, f"Request: {request}")

### Step 13: Test Malicious Detection - Attack Patterns

Now let's test some attack patterns that should be flagged as malicious because they're similar to our baseline.

In [None]:
# Test benign requests (should NOT be flagged as malicious)
benign_requests = [
    "I need to refill my blood pressure medication",
    "What are your pharmacy hours today?",
    "Can I pick up my prescription for my child?",
    "Do you accept my Medicare insurance?",
    "I need help understanding my medication dosage",
    "Can I get a flu shot here?",
    "What's the generic version of my brand name drug?"
]

print("Testing benign requests (should NOT be flagged as malicious):")
print("=" * 70)

for request in benign_requests:
    result = detect_malicious(request)
    pretty_print_result(result, f"Request: {request}")

### Step 12: Test Malicious Detection - Benign Requests

Let's test some normal, benign requests that should NOT be flagged as malicious.

In [None]:
# Load malicious baseline data from JSON file
print("Loading malicious attack patterns from JSON file...")

# Load the malicious data from our JSON file
with open("data/malicious_baseline.json", "r") as f:
    malicious_data = json.load(f)

malicious_requests = malicious_data["requests"]

print(f"Loaded {len(malicious_requests)} malicious attack patterns")
print("\nSample attack patterns:")
for i, req in enumerate(malicious_requests[:5]):
    print(f"  {i+1}. {req['text']}")

print("\nUploading malicious baseline dataset to service...")
result = upload_malicious_baseline(malicious_requests)
print(f"‚úÖ {result['message']}")
print(f"Records added: {result['records_added']}")

# Check malicious baseline stats
stats = get_malicious_baseline_stats()
print(f"\nMalicious baseline stats: {stats}")

## Part 2: Malicious Content Detection

Now let's explore the malicious content detection system, which identifies requests that are **similar** to known attack patterns.

### Step 11: Load and Upload Malicious Baseline Dataset

We'll load a dataset of 50 known malicious attack patterns and upload them to the malicious detection system.

## Cleanup: Stop the Service

Finally, let's stop the service we started.

In [None]:
# Stop the service
stop_service()
print("‚úÖ Demo completed successfully!")

## Summary

This notebook demonstrated the complete workflow of the Guardrails Service using a realistic pharmacy dataset:

1. **Service Setup**: Started the service and verified it's running
2. **Baseline Management**: Loaded 100 pharmacy questions from JSON and added new entries
3. **Anomaly Detection**: Tested normal pharmacy questions vs suspicious/inappropriate requests
4. **Parameter Tuning**: Experimented with different `threshold` and `compare_to` values
5. **Baseline Updates**: Added new data and observed changes in detection
6. **Data Clearing**: Demonstrated baseline clearing functionality

### Key Takeaways:

- **Threshold**: Lower values (0.3-0.5) are more sensitive and flag more requests as anomalies
- **Compare_to**: Higher values use more baseline data for comparison, potentially improving accuracy
- **Baseline Quality**: A good baseline dataset with domain-specific data (pharmacy questions) improves detection accuracy
- **Dynamic Updates**: The baseline can be updated continuously as new normal patterns emerge