# üõ°Ô∏è Citadel PII Processing - Testing Center

## Test PII Anonymization, Deanonymization, and Blocking capabilities!

Use this Jupyter notebook to verify PII processing in Citadel Access Contracts, including:
- **Use Case 1**: PII Anonymization/Deanonymization with state saving
- **Use Case 2**: PII Blocking with detection reporting
- Testing with various PII types (names, emails, phone numbers, credit cards, IBANs, etc.)
- Validating custom regex pattern detection
- Verifying Event Hub logging for PII state saving

> **Note:** This notebook assumes you have already deployed your Citadel Governance Hub with PII processing capabilities enabled. 
> Ensure Azure AI Language Service is configured and the PII policy fragments are deployed.

## Azure Prerequisites

- An Azure AI Language Service instance with PII detection enabled
- Event Hub configured for PII state logging (if testing state saving)
- APIM policy fragments deployed: `pii-anonymization`, `pii-deanonymization`, `pii-state-saving`
- Managed Identity configured with access to Language Service

<a id='0'></a>
### 0Ô∏è‚É£ Initialize notebook variables

Before running the tests, ensure you have set the following variables according to your environment:

In [None]:
import os
import sys, json, requests, time
sys.path.insert(1, '../shared')  # add the shared directory to the Python path
import utils
from apimtools import APIMClientTool

inference_api_version = "2024-05-01-preview"

targetInferenceApi = "models"  # use 'models' for universal LLM API, or 'openai' for Azure OpenAI

governance_hub_resource_group = "REPLACE"  ## specify the resource group name where the Governance Hub is located
location = "REPLACE"  ## specify the location of the Governance Hub

# Key Vault configuration (for storing endpoint and API key secrets)
use_keyvault_integration = False
keyvault_subscription_id = "00000000-0000-0000-0000-000000000000"  # Replace with your Key Vault subscription ID
keyvault_resource_group = "REPLACE"  # Replace with your Key Vault resource group
keyvault_name = "REPLACE"  # Replace with your Key Vault name

<a id='1'></a>
### 1Ô∏è‚É£ Verify the Azure CLI and the connected Azure subscription

The following commands ensure that you have the latest version of the Azure CLI and that the Azure CLI is connected to your Azure subscription.

In [None]:
output = utils.run("az account show", "Retrieved az account", "Failed to get the current az account")

if output.success and output.json_data:
    current_user = output.json_data['user']['name']
    tenant_id = output.json_data['tenantId']
    subscription_id = output.json_data['id']

    utils.print_info(f"Current user: {current_user}")
    utils.print_info(f"Tenant ID: {tenant_id}")
    utils.print_info(f"Subscription ID: {subscription_id}")

<a id='init'></a>
### ‚öôÔ∏è Initialize client tool for your APIM service

üëâ An existing Citadel's Governance Hub deployment is expected with PII policy fragments deployed

In [None]:
try:
    apimClientTool = APIMClientTool(
        governance_hub_resource_group
    )
    apimClientTool.initialize()
    apimClientTool.discover_api(targetInferenceApi)

    apim_resource_gateway_url = str(apimClientTool.apim_resource_gateway_url)
    azure_endpoint = str(apimClientTool.azure_endpoint)
    
    # Get supported models from the policy fragment
    supported_models = apimClientTool.get_policy_fragment_supported_models("set-backend-pools")
    utils.print_info(f"Supported models in APIM policy fragment 'set-backend-pools': {supported_models}")

    if targetInferenceApi == "openai":
        chat_completions_url = f"{azure_endpoint}openai/deployments/{{model_name}}/chat/completions?api-version={inference_api_version}"
    else:  # models
        chat_completions_url = f"{azure_endpoint}models/chat/completions?api-version={inference_api_version}"
    utils.print_info(f"Chat Completion Endpoint Template: {chat_completions_url}")

    utils.print_info(f"Using the following API: {apimClientTool.api_id}")

    utils.print_ok(f"Testing tool initialized successfully!")
except Exception as e:
    utils.print_error(f"Error initializing APIM Client Tool: {e}")

---
## üîê Use Case 1: PII Anonymization & Deanonymization

This use case tests PII masking where:
- Inbound requests have PII replaced with placeholders (e.g., `<Person_0>`, `<Email_0>`)
- Outbound responses have placeholders restored with original PII values
- State saving logs all PII processing to Event Hub for auditing
---

<a id='2.1'></a>
### 2Ô∏è‚É£.1 Define PII Masking Access Contract

Create an access contract with PII anonymization/deanonymization enabled and state saving turned on.

In [None]:
timestamp = time.strftime('%Y%m%d%H%M%S')

# PII Masking Use Case Configuration
pii_masking_contract = {
    "name": f"pii-masking-contract-{timestamp}",
    "business_unit": "HR",
    "use_case_name": "PIIMasking",
    "environment": "DEV",
    "use_keyvault": use_keyvault_integration,
    "endpoint_secret": "HR-PII-LLM-ENDPOINT",
    "apikey_secret": "HR-PII-LLM-KEY",
    "description": "HR PII Masking - Anonymization/Deanonymization with State Saving",
    "pii_config": {
        "mode": "anonymization",
        "confidence_threshold": "0.8",
        "entity_exclusions": "PersonType",
        "detection_language": "en",
        "state_saving_enabled": True
    }
}

utils.print_info(f"PII Masking Contract Configuration:")
utils.print_info(f"  Business Unit: {pii_masking_contract['business_unit']}")
utils.print_info(f"  Use Case: {pii_masking_contract['use_case_name']}")
utils.print_info(f"  PII Mode: {pii_masking_contract['pii_config']['mode']}")
utils.print_info(f"  State Saving: {pii_masking_contract['pii_config']['state_saving_enabled']}")
utils.print_info(f"  Product ID: LLM-{pii_masking_contract['business_unit']}-{pii_masking_contract['use_case_name']}-{pii_masking_contract['environment']}")

<a id='2.2'></a>
### 2Ô∏è‚É£.2 Create PII Masking Product Policy

Generate a custom product policy XML that enables PII anonymization/deanonymization with custom regex patterns and state saving.

In [None]:
import shutil

bicep_dir = "../bicep/infra/citadel-access-contracts"
template_file = os.path.join(bicep_dir, "main.bicep")

# Create folder structure for PII Masking contract
contract = pii_masking_contract
folder_name = f"{contract['business_unit'].lower()}-{contract['use_case_name'].lower()}"
environment_folder = contract['environment'].lower()
masking_contract_folder = os.path.join(bicep_dir, "contracts", folder_name, environment_folder)
os.makedirs(masking_contract_folder, exist_ok=True)
utils.print_info(f"üìÅ Created folder: {masking_contract_folder}")

# Create PII Masking Policy XML
pii_masking_policy = '''<policies>
    <inbound>
        <base />
        <!-- Enable PII Anonymization -->
        <set-variable name="piiAnonymizationEnabled" value="true" />
        
        <choose>
            <when condition="@(context.Variables.GetValueOrDefault<string>(\"piiAnonymizationEnabled\") == \"true\")">
                
                <!-- Configure PII detection settings -->
                <set-variable name="piiConfidenceThreshold" value="0.8" />
                <set-variable name="piiEntityCategoryExclusions" value="PersonType" />
                <set-variable name="piiDetectionLanguage" value="en" />

                <!-- Configure custom regex patterns for additional PII detection -->
                <set-variable name="piiRegexPatterns" value="@{
                    var patterns = new JArray {
                        new JObject {
                            [\"pattern\"] = @\"\\b\\d{4}[- ]?\\d{4}[- ]?\\d{4}[- ]?\\d{4}\\b\",
                            [\"category\"] = \"CREDIT_CARD\"
                        },
                        new JObject {
                            [\"pattern\"] = @\"\\b[A-Z]{2}\\d{6}[A-Z]\\b\",
                            [\"category\"] = \"PASSPORT_NUMBER\"
                        },
                        new JObject {
                            [\"pattern\"] = @\"\\b784-\\d{4}-\\d{7}-\\d{1}\\b\",
                            [\"category\"] = \"EMIRATES_ID\"
                        }
                    };
                    return patterns.ToString();
                }" />
                
                <!-- Capture request body for PII processing -->
                <set-variable name="piiInputContent" value="@(context.Request.Body.As<string>(preserveContent: true))" />
                
                <!-- Apply PII anonymization -->
                <include-fragment fragment-id="pii-anonymization" />
                
                <!-- Replace request body with anonymized content -->
                <set-body>@(context.Variables.GetValueOrDefault<string>("piiAnonymizedContent"))</set-body>
            </when>
        </choose>
    </inbound>
    <backend>
        <base />
    </backend>
    <outbound>
        <base />
        <!-- Store response body before processing -->
        <set-variable name="responseBodyContent" value="@(context.Response.Body.As<string>(preserveContent: true))" />
        
        <choose>
            <when condition="@(context.Variables.GetValueOrDefault<string>(\"piiAnonymizationEnabled\") == \"true\" && 
                            context.Variables.ContainsKey(\"piiMappings\"))">
                
                <!-- Set input for deanonymization -->
                <set-variable name="piiDeanonymizeContentInput" value="@(context.Variables.GetValueOrDefault<string>(\"responseBodyContent\"))" />
                
                <!-- Apply PII deanonymization -->
                <include-fragment fragment-id="pii-deanonymization" />
                
                <!-- Enable PII processing audit logging to Event Hub -->
                <set-variable name="piiStateSavingEnabled" value="true" />
                <set-variable name="originalRequest" value="@(context.Variables.GetValueOrDefault<string>(\"piiInputContent\"))" />
                <set-variable name="originalResponse" value="@(context.Variables.GetValueOrDefault<string>(\"responseBodyContent\"))" />
                <include-fragment fragment-id="pii-state-saving" />
                
                <!-- Replace response with deanonymized content -->
                <set-body>@(context.Variables.GetValueOrDefault<string>("piiDeanonymizedContentOutput"))</set-body>
            </when>
            <otherwise>
                <!-- Pass through original response -->
                <set-body>@(context.Variables.GetValueOrDefault<string>("responseBodyContent"))</set-body>
            </otherwise>
        </choose>
    </outbound>
    <on-error>
        <base />
    </on-error>
</policies>'''

# Write the policy file
masking_policy_file = os.path.join(masking_contract_folder, "ai-product-policy.xml")
with open(masking_policy_file, 'w') as f:
    f.write(pii_masking_policy)
utils.print_ok(f"‚úÖ PII Masking policy file created: {masking_policy_file}")

<a id='2.3'></a>
### 2Ô∏è‚É£.3 Create Parameter File and Deploy PII Masking Contract

In [None]:
# Generate parameter file for PII Masking contract
masking_params_file = os.path.join(masking_contract_folder, "main.bicepparam")
policy_relative_path = "ai-product-policy.xml"

masking_params_content = f'''using '../../../main.bicep'

// ============================================================================
// {contract['description']} - Generated from PII Testing Notebook
// ============================================================================

param apim = {{
  subscriptionId: '{subscription_id}'
  resourceGroupName: '{governance_hub_resource_group}'
  name: '{apimClientTool.apim_resource_name}'
}}

param keyVault = {{
  subscriptionId: '{keyvault_subscription_id}'
  resourceGroupName: '{keyvault_resource_group}'
  name: '{keyvault_name}'
}}

param useTargetAzureKeyVault = {str(contract['use_keyvault']).lower()}

param useCase = {{
  businessUnit: '{contract['business_unit']}'
  useCaseName: '{contract['use_case_name']}'
  environment: '{contract['environment']}'
}}

param apiNameMapping = {{
  LLM: ['universal-llm-api', 'azure-openai-api']
}}

param services = [
  {{
    code: 'LLM'
    endpointSecretName: '{contract['endpoint_secret']}'
    apiKeySecretName: '{contract['apikey_secret']}'
    policyXml: loadTextContent('{policy_relative_path}')
  }}
]

param productTerms = 'PII Masking Access Contract - {contract["description"]}'

// Azure AI Foundry Integration (disabled)
param useTargetFoundry = false

param foundry = {{
  subscriptionId: '00000000-0000-0000-0000-000000000000'
  resourceGroupName: 'placeholder'
  accountName: 'placeholder'
  projectName: 'placeholder'
}}
'''

with open(masking_params_file, 'w') as f:
    f.write(masking_params_content)
utils.print_ok(f"‚úÖ Parameter file created: {masking_params_file}")

# Deploy the PII Masking access contract
utils.print_info(f"\n{'='*60}")
utils.print_info(f"Deploying PII Masking Access Contract...")
utils.print_info(f"{'='*60}")

deployment_cmd = f"az deployment sub create --name {contract['name']} --location {location} --template-file {template_file} --parameters {masking_params_file}"

masking_deployment_output = utils.run(
    deployment_cmd,
    f"Deployment '{contract['name']}' succeeded",
    f"Deployment '{contract['name']}' failed"
)

if masking_deployment_output.success:
    utils.print_ok(f"‚úÖ PII Masking Access Contract deployed successfully!")
else:
    utils.print_error(f"‚ùå PII Masking Access Contract deployment failed!")

<a id='2.4'></a>
### 2Ô∏è‚É£.4 Retrieve API Key for PII Masking Contract

In [None]:
# Re-initialize APIM client to pick up new subscriptions
apimClientTool.initialize()

masking_product_id = f"LLM-{pii_masking_contract['business_unit']}-{pii_masking_contract['use_case_name']}-{pii_masking_contract['environment']}"
masking_subscription_name = f"{masking_product_id}-SUB-01"
masking_api_key = None

for sub in apimClientTool.apim_subscriptions:
    if masking_subscription_name.lower() in sub.get('name', '').lower():
        masking_api_key = sub.get('key')
        utils.print_ok(f"‚úÖ Found API key for {masking_product_id}")
        break

if not masking_api_key:
    utils.print_error(f"‚ùå Could not find API key for {masking_product_id}")

<a id='2.5'></a>
### 2Ô∏è‚É£.5 Test PII Masking with Various PII Types

Send test requests containing different types of PII data to verify anonymization and deanonymization.

In [None]:
model_name = "gpt-4o"
utils.print_info(f"Using model: {model_name}")

# Define test payloads with various PII types
pii_test_payloads = [
    {
        "name": "Personal Names and Email",
        "description": "Tests detection of person names and email addresses",
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful HR assistant. Keep responses brief."},
                {"role": "user", "content": "Please help me draft an email to John Smith at john.smith@company.com about his performance review scheduled for next week."}
            ]
        },
        "expected_pii": ["Person", "Email"]
    },
    {
        "name": "Phone Numbers and Addresses",
        "description": "Tests detection of phone numbers and physical addresses",
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful assistant. Keep responses brief."},
                {"role": "user", "content": "Contact Sarah Johnson at +1-555-123-4567 or visit her at 123 Main Street, New York, NY 10001 for the meeting."}
            ]
        },
        "expected_pii": ["Person", "PhoneNumber", "Address"]
    },
    {
        "name": "Credit Card (Regex Pattern)",
        "description": "Tests custom regex pattern for credit card numbers",
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful assistant. Keep responses brief."},
                {"role": "user", "content": "My credit card number is 4532-1234-5678-9012 and it expires next month. Can you help me understand card security?"}
            ]
        },
        "expected_pii": ["CREDIT_CARD"]
    },
    {
        "name": "IBAN and Banking Info",
        "description": "Tests detection of international banking account numbers",
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful banking assistant. Keep responses brief."},
                {"role": "user", "content": "Please transfer the funds to my account. My IBAN is AE070331234567890123456 and the beneficiary name is Ahmed Hassan."}
            ]
        },
        "expected_pii": ["Person", "InternationalBankingAccountNumber"]
    },
    {
        "name": "Emirates ID (Regex Pattern)",
        "description": "Tests custom regex pattern for UAE Emirates ID",
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful government services assistant. Keep responses brief."},
                {"role": "user", "content": "I need to update my records. My Emirates ID is 784-1990-1234567-1 and my name is Fatima Al Maktoum."}
            ]
        },
        "expected_pii": ["Person", "EMIRATES_ID"]
    },
    {
        "name": "Multiple PII Types Combined",
        "description": "Tests detection of multiple PII types in a single request",
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful customer service assistant. Keep responses brief."},
                {"role": "user", "content": "Hi, my name is Michael Chen, email: m.chen@email.com, phone: +971-50-123-4567. I live at 456 Palm Jumeirah, Dubai. My card ending in 4532-8765-4321-0987 was charged incorrectly."}
            ]
        },
        "expected_pii": ["Person", "Email", "PhoneNumber", "Address", "CREDIT_CARD"]
    }
]

utils.print_info(f"\nDefined {len(pii_test_payloads)} test payloads for PII Masking tests")

In [None]:
# Execute PII Masking tests
masking_test_results = []

utils.print_info(f"\n{'='*80}")
utils.print_info(f"üõ°Ô∏è PII MASKING TESTS - Anonymization/Deanonymization")
utils.print_info(f"{'='*80}")

for i, test in enumerate(pii_test_payloads, 1):
    utils.print_info(f"\n--- Test {i}/{len(pii_test_payloads)}: {test['name']} ---")
    utils.print_info(f"Description: {test['description']}")
    utils.print_info(f"Expected PII types: {', '.join(test['expected_pii'])}")
    
    # Extract user message for display
    user_message = next((m['content'] for m in test['payload']['messages'] if m['role'] == 'user'), '')
    utils.print_info(f"User message: {user_message[:100]}..." if len(user_message) > 100 else f"User message: {user_message}")
    
    try:
        start_time = time.time()
        response = requests.post(
            chat_completions_url,
            headers={'api-key': masking_api_key},
            json=test['payload'],
            timeout=60
        )
        elapsed_time = time.time() - start_time
        
        utils.print_response_code(response)
        
        result = {
            "test_name": test['name'],
            "status_code": response.status_code,
            "elapsed_time": elapsed_time,
            "success": response.status_code == 200
        }
        
        if response.status_code == 200:
            data = response.json()
            content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
            result["response_content"] = content
            utils.print_ok(f"‚úÖ Response received ({elapsed_time:.2f}s)")
            utils.print_info(f"üí¨ Response: {content[:150]}..." if len(content) > 150 else f"üí¨ Response: {content}")
            
            # Check if response contains any of the original PII (it shouldn't after deanonymization)
            # The LLM should respond with the original PII restored
            utils.print_ok(f"üîÑ PII should be deanonymized in response")
        else:
            result["error"] = response.text[:200]
            utils.print_error(f"‚ùå Error: {response.text[:200]}")
        
        masking_test_results.append(result)
        
    except Exception as e:
        utils.print_error(f"‚ùå Request failed: {e}")
        masking_test_results.append({
            "test_name": test['name'],
            "status_code": 0,
            "elapsed_time": 0,
            "success": False,
            "error": str(e)
        })
    
    time.sleep(1)  # Small delay between tests

# Summary
utils.print_info(f"\n{'='*80}")
utils.print_info(f"üìä PII MASKING TEST SUMMARY")
utils.print_info(f"{'='*80}")
successful = sum(1 for r in masking_test_results if r['success'])
utils.print_info(f"Total tests: {len(masking_test_results)}")
utils.print_ok(f"Successful: {successful}")
utils.print_error(f"Failed: {len(masking_test_results) - successful}")

---
## üö´ Use Case 2: PII Blocking

This use case tests PII blocking where:
- Requests containing PII are rejected with HTTP 400 Bad Request
- Detected PII categories are reported in the error response
- No PII data reaches the backend LLM service
---

<a id='3.1'></a>
### 3Ô∏è‚É£.1 Define PII Blocking Access Contract

Create an access contract that blocks any requests containing PII data.

In [None]:
# PII Blocking Use Case Configuration
pii_blocking_contract = {
    "name": f"pii-blocking-contract-{timestamp}",
    "business_unit": "Compliance",
    "use_case_name": "PIIBlocking",
    "environment": "DEV",
    "use_keyvault": use_keyvault_integration,
    "endpoint_secret": "COMPLIANCE-LLM-ENDPOINT",
    "apikey_secret": "COMPLIANCE-LLM-KEY",
    "description": "Compliance PII Blocking - Reject requests containing PII",
    "pii_config": {
        "mode": "blocking",
        "confidence_threshold": "0.75",
        "entity_exclusions": "PersonType",
        "detection_language": "en"
    }
}

utils.print_info(f"PII Blocking Contract Configuration:")
utils.print_info(f"  Business Unit: {pii_blocking_contract['business_unit']}")
utils.print_info(f"  Use Case: {pii_blocking_contract['use_case_name']}")
utils.print_info(f"  PII Mode: {pii_blocking_contract['pii_config']['mode']}")
utils.print_info(f"  Product ID: LLM-{pii_blocking_contract['business_unit']}-{pii_blocking_contract['use_case_name']}-{pii_blocking_contract['environment']}")

<a id='3.2'></a>
### 3Ô∏è‚É£.2 Create PII Blocking Product Policy

Generate a custom product policy XML that detects PII and blocks requests containing sensitive data.

In [None]:
# Create folder structure for PII Blocking contract
contract = pii_blocking_contract
folder_name = f"{contract['business_unit'].lower()}-{contract['use_case_name'].lower()}"
environment_folder = contract['environment'].lower()
blocking_contract_folder = os.path.join(bicep_dir, "contracts", folder_name, environment_folder)
os.makedirs(blocking_contract_folder, exist_ok=True)
utils.print_info(f"üìÅ Created folder: {blocking_contract_folder}")

# Create PII Blocking Policy XML
# Note: This policy uses pii-anonymization for detection but blocks instead of anonymizing
pii_blocking_policy = '''<policies>
    <inbound>
        <base />
        <!-- Enable PII Blocking -->
        <set-variable name="piiBlockingEnabled" value="true" />
        
        <choose>
            <when condition="@(context.Variables.GetValueOrDefault<string>(\"piiBlockingEnabled\") == \"true\")">
                
                <!-- Configure PII detection settings -->
                <set-variable name="piiAnonymizationEnabled" value="true" />
                <set-variable name="piiConfidenceThreshold" value="0.75" />
                <set-variable name="piiEntityCategoryExclusions" value="PersonType" />
                <set-variable name="piiDetectionLanguage" value="en" />

                <!-- Configure custom regex patterns for additional PII detection -->
                <set-variable name="piiRegexPatterns" value="@{
                    var patterns = new JArray {
                        new JObject {
                            [\"pattern\"] = @\"\\b\\d{4}[- ]?\\d{4}[- ]?\\d{4}[- ]?\\d{4}\\b\",
                            [\"category\"] = \"CREDIT_CARD\"
                        },
                        new JObject {
                            [\"pattern\"] = @\"\\b[A-Z]{2}\\d{6}[A-Z]\\b\",
                            [\"category\"] = \"PASSPORT_NUMBER\"
                        },
                        new JObject {
                            [\"pattern\"] = @\"\\b784-\\d{4}-\\d{7}-\\d{1}\\b\",
                            [\"category\"] = \"EMIRATES_ID\"
                        }
                    };
                    return patterns.ToString();
                }" />
                
                <!-- Capture request body for PII processing -->
                <set-variable name="piiInputContent" value="@(context.Request.Body.As<string>(preserveContent: true))" />
                
                <!-- Apply PII anonymization to detect PII -->
                <include-fragment fragment-id="pii-anonymization" />
                
                <!-- Check if any PII was detected (piiMappings will have entries if PII found) -->
                <choose>
                    <when condition="@{
                        var mappings = context.Variables.GetValueOrDefault<string>(\"piiMappings\", \"[]\");
                        var mappingsArray = JArray.Parse(mappings);
                        return mappingsArray.Count > 0;
                    }">
                        <return-response>
                            <set-status code="400" reason="Bad Request" />
                            <set-header name="Content-Type" exists-action="override">
                                <value>application/json</value>
                            </set-header>
                            <set-body>@{
                                var mappings = JArray.Parse(context.Variables.GetValueOrDefault<string>("piiMappings", "[]"));
                                var categories = new HashSet<string>();
                                foreach (var mapping in mappings) {
                                    var placeholder = mapping["placeholder"].ToString();
                                    var category = placeholder.TrimStart('<').Split('_')[0];
                                    categories.Add(category);
                                }
                                return new JObject(
                                    new JProperty("error", new JObject(
                                        new JProperty("code", "PII_DETECTED"),
                                        new JProperty("message", "Request blocked: Personal Identifiable Information (PII) detected in the request."),
                                        new JProperty("detectedCategories", string.Join(", ", categories)),
                                        new JProperty("entityCount", mappings.Count)
                                    ))
                                ).ToString();
                            }</set-body>
                        </return-response>
                    </when>
                </choose>
            </when>
        </choose>
    </inbound>
    <backend>
        <base />
    </backend>
    <outbound>
        <base />
    </outbound>
    <on-error>
        <base />
    </on-error>
</policies>'''

# Write the policy file
blocking_policy_file = os.path.join(blocking_contract_folder, "ai-product-policy.xml")
with open(blocking_policy_file, 'w') as f:
    f.write(pii_blocking_policy)
utils.print_ok(f"‚úÖ PII Blocking policy file created: {blocking_policy_file}")

<a id='3.3'></a>
### 3Ô∏è‚É£.3 Create Parameter File and Deploy PII Blocking Contract

In [None]:
# Generate parameter file for PII Blocking contract
blocking_params_file = os.path.join(blocking_contract_folder, "main.bicepparam")
policy_relative_path = "ai-product-policy.xml"

blocking_params_content = f'''using '../../../main.bicep'

// ============================================================================
// {contract['description']} - Generated from PII Testing Notebook
// ============================================================================

param apim = {{
  subscriptionId: '{subscription_id}'
  resourceGroupName: '{governance_hub_resource_group}'
  name: '{apimClientTool.apim_resource_name}'
}}

param keyVault = {{
  subscriptionId: '{keyvault_subscription_id}'
  resourceGroupName: '{keyvault_resource_group}'
  name: '{keyvault_name}'
}}

param useTargetAzureKeyVault = {str(contract['use_keyvault']).lower()}

param useCase = {{
  businessUnit: '{contract['business_unit']}'
  useCaseName: '{contract['use_case_name']}'
  environment: '{contract['environment']}'
}}

param apiNameMapping = {{
  LLM: ['universal-llm-api', 'azure-openai-api']
}}

param services = [
  {{
    code: 'LLM'
    endpointSecretName: '{contract['endpoint_secret']}'
    apiKeySecretName: '{contract['apikey_secret']}'
    policyXml: loadTextContent('{policy_relative_path}')
  }}
]

param productTerms = 'PII Blocking Access Contract - {contract["description"]}'

// Azure AI Foundry Integration (disabled)
param useTargetFoundry = false

param foundry = {{
  subscriptionId: '00000000-0000-0000-0000-000000000000'
  resourceGroupName: 'placeholder'
  accountName: 'placeholder'
  projectName: 'placeholder'
}}
'''

with open(blocking_params_file, 'w') as f:
    f.write(blocking_params_content)
utils.print_ok(f"‚úÖ Parameter file created: {blocking_params_file}")

# Deploy the PII Blocking access contract
utils.print_info(f"\n{'='*60}")
utils.print_info(f"Deploying PII Blocking Access Contract...")
utils.print_info(f"{'='*60}")

deployment_cmd = f"az deployment sub create --name {contract['name']} --location {location} --template-file {template_file} --parameters {blocking_params_file}"

blocking_deployment_output = utils.run(
    deployment_cmd,
    f"Deployment '{contract['name']}' succeeded",
    f"Deployment '{contract['name']}' failed"
)

if blocking_deployment_output.success:
    utils.print_ok(f"‚úÖ PII Blocking Access Contract deployed successfully!")
else:
    utils.print_error(f"‚ùå PII Blocking Access Contract deployment failed!")

<a id='3.4'></a>
### 3Ô∏è‚É£.4 Retrieve API Key for PII Blocking Contract

In [None]:
# Re-initialize APIM client to pick up new subscriptions
apimClientTool.initialize()

blocking_product_id = f"LLM-{pii_blocking_contract['business_unit']}-{pii_blocking_contract['use_case_name']}-{pii_blocking_contract['environment']}"
blocking_subscription_name = f"{blocking_product_id}-SUB-01"
blocking_api_key = None

for sub in apimClientTool.apim_subscriptions:
    if blocking_subscription_name.lower() in sub.get('name', '').lower():
        blocking_api_key = sub.get('key')
        utils.print_ok(f"‚úÖ Found API key for {blocking_product_id}")
        break

if not blocking_api_key:
    utils.print_error(f"‚ùå Could not find API key for {blocking_product_id}")

<a id='3.5'></a>
### 3Ô∏è‚É£.5 Test PII Blocking with Various PII Types

Send test requests containing PII data to verify they are blocked, and requests without PII are allowed.

In [None]:
# Define test payloads for PII Blocking - mix of requests with and without PII
pii_blocking_test_payloads = [
    {
        "name": "Request WITH PII - Should be BLOCKED",
        "description": "Request containing email and name - should return 400",
        "should_block": True,
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "Send an email to John Smith at john.smith@company.com about the project update."}
            ]
        },
        "expected_pii": ["Person", "Email"]
    },
    {
        "name": "Request WITHOUT PII - Should be ALLOWED",
        "description": "Generic request without any PII - should return 200",
        "should_block": False,
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "What is the capital of France?"}
            ]
        },
        "expected_pii": []
    },
    {
        "name": "Request WITH Credit Card - Should be BLOCKED",
        "description": "Request containing credit card number - should return 400",
        "should_block": True,
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "My card number is 4532-1234-5678-9012. Why was I charged twice?"}
            ]
        },
        "expected_pii": ["CREDIT_CARD"]
    },
    {
        "name": "Request WITHOUT PII - Technical Question",
        "description": "Technical question without PII - should return 200",
        "should_block": False,
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful coding assistant."},
                {"role": "user", "content": "How do I create a REST API in Python using Flask?"}
            ]
        },
        "expected_pii": []
    },
    {
        "name": "Request WITH Phone Number - Should be BLOCKED",
        "description": "Request containing phone number - should return 400",
        "should_block": True,
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "Please call me back at +1-555-123-4567 to discuss the issue."}
            ]
        },
        "expected_pii": ["PhoneNumber"]
    },
    {
        "name": "Request WITH Emirates ID - Should be BLOCKED",
        "description": "Request containing Emirates ID (custom regex) - should return 400",
        "should_block": True,
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "My Emirates ID is 784-1990-1234567-1. Can you help me with my visa application?"}
            ]
        },
        "expected_pii": ["EMIRATES_ID"]
    },
    {
        "name": "Request WITHOUT PII - Math Question",
        "description": "Simple math question without PII - should return 200",
        "should_block": False,
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful math tutor."},
                {"role": "user", "content": "What is 25 multiplied by 4?"}
            ]
        },
        "expected_pii": []
    },
    {
        "name": "Request WITH Multiple PII Types - Should be BLOCKED",
        "description": "Request with multiple PII types - should return 400 with all categories",
        "should_block": True,
        "payload": {
            "model": model_name,
            "messages": [
                {"role": "system", "content": "You are a helpful assistant."},
                {"role": "user", "content": "Hi, I'm Sarah Johnson (sarah.j@email.com, +971-50-987-6543). My IBAN is AE070331234567890123456. Please help me."}
            ]
        },
        "expected_pii": ["Person", "Email", "PhoneNumber", "InternationalBankingAccountNumber"]
    }
]

utils.print_info(f"\nDefined {len(pii_blocking_test_payloads)} test payloads for PII Blocking tests")
utils.print_info(f"  - Requests that should be BLOCKED: {sum(1 for t in pii_blocking_test_payloads if t['should_block'])}")
utils.print_info(f"  - Requests that should be ALLOWED: {sum(1 for t in pii_blocking_test_payloads if not t['should_block'])}")

In [None]:
# Execute PII Blocking tests
blocking_test_results = []

utils.print_info(f"\n{'='*80}")
utils.print_info(f"üö´ PII BLOCKING TESTS - Request Rejection")
utils.print_info(f"{'='*80}")

for i, test in enumerate(pii_blocking_test_payloads, 1):
    utils.print_info(f"\n--- Test {i}/{len(pii_blocking_test_payloads)}: {test['name']} ---")
    utils.print_info(f"Description: {test['description']}")
    utils.print_info(f"Expected outcome: {'BLOCKED (400)' if test['should_block'] else 'ALLOWED (200)'}")
    if test['expected_pii']:
        utils.print_info(f"Expected PII types: {', '.join(test['expected_pii'])}")
    
    # Extract user message for display
    user_message = next((m['content'] for m in test['payload']['messages'] if m['role'] == 'user'), '')
    utils.print_info(f"User message: {user_message[:80]}..." if len(user_message) > 80 else f"User message: {user_message}")
    
    try:
        start_time = time.time()
        response = requests.post(
            chat_completions_url,
            headers={'api-key': blocking_api_key},
            json=test['payload'],
            timeout=60
        )
        elapsed_time = time.time() - start_time
        
        # Determine if test passed based on expected outcome
        if test['should_block']:
            test_passed = response.status_code == 400
        else:
            test_passed = response.status_code == 200
        
        result = {
            "test_name": test['name'],
            "status_code": response.status_code,
            "elapsed_time": elapsed_time,
            "should_block": test['should_block'],
            "test_passed": test_passed
        }
        
        if test_passed:
            utils.print_ok(f"‚úÖ TEST PASSED - Status: {response.status_code} ({elapsed_time:.2f}s)")
        else:
            utils.print_error(f"‚ùå TEST FAILED - Expected: {'400' if test['should_block'] else '200'}, Got: {response.status_code}")
        
        if response.status_code == 400:
            try:
                error_data = response.json()
                error_info = error_data.get('error', {})
                result["detected_categories"] = error_info.get('detectedCategories', '')
                result["entity_count"] = error_info.get('entityCount', 0)
                utils.print_info(f"  üîç Detected PII categories: {error_info.get('detectedCategories', 'N/A')}")
                utils.print_info(f"  üìä Entity count: {error_info.get('entityCount', 'N/A')}")
            except:
                utils.print_info(f"  Response: {response.text[:200]}")
        elif response.status_code == 200:
            data = response.json()
            content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
            result["response_content"] = content
            utils.print_info(f"  üí¨ Response: {content[:100]}..." if len(content) > 100 else f"  üí¨ Response: {content}")
        else:
            result["error"] = response.text[:200]
            utils.print_error(f"  Unexpected response: {response.text[:200]}")
        
        blocking_test_results.append(result)
        
    except Exception as e:
        utils.print_error(f"‚ùå Request failed: {e}")
        blocking_test_results.append({
            "test_name": test['name'],
            "status_code": 0,
            "elapsed_time": 0,
            "should_block": test['should_block'],
            "test_passed": False,
            "error": str(e)
        })
    
    time.sleep(1)  # Small delay between tests

# Summary
utils.print_info(f"\n{'='*80}")
utils.print_info(f"üìä PII BLOCKING TEST SUMMARY")
utils.print_info(f"{'='*80}")
total_tests = len(blocking_test_results)
passed_tests = sum(1 for r in blocking_test_results if r['test_passed'])
blocked_correctly = sum(1 for r in blocking_test_results if r['should_block'] and r['status_code'] == 400)
allowed_correctly = sum(1 for r in blocking_test_results if not r['should_block'] and r['status_code'] == 200)

utils.print_info(f"Total tests: {total_tests}")
utils.print_ok(f"Tests passed: {passed_tests}/{total_tests}")
utils.print_info(f"  - Correctly blocked (with PII): {blocked_correctly}")
utils.print_info(f"  - Correctly allowed (no PII): {allowed_correctly}")
if passed_tests < total_tests:
    utils.print_error(f"Tests failed: {total_tests - passed_tests}")

---
## üìä Overall Test Summary
---

In [None]:
utils.print_info(f"\n{'='*80}")
utils.print_info(f"üìä OVERALL PII PROCESSING TEST SUMMARY")
utils.print_info(f"{'='*80}")

# PII Masking Summary
utils.print_info(f"\nüõ°Ô∏è PII Masking (Anonymization/Deanonymization):")
masking_success = sum(1 for r in masking_test_results if r['success'])
utils.print_info(f"   Product: {masking_product_id}")
utils.print_info(f"   Tests: {len(masking_test_results)} | Successful: {masking_success} | Failed: {len(masking_test_results) - masking_success}")

# PII Blocking Summary
utils.print_info(f"\nüö´ PII Blocking:")
blocking_passed = sum(1 for r in blocking_test_results if r['test_passed'])
utils.print_info(f"   Product: {blocking_product_id}")
utils.print_info(f"   Tests: {len(blocking_test_results)} | Passed: {blocking_passed} | Failed: {len(blocking_test_results) - blocking_passed}")

# Overall
total_all = len(masking_test_results) + len(blocking_test_results)
passed_all = masking_success + blocking_passed
utils.print_info(f"\nüìà Overall Results:")
utils.print_info(f"   Total tests executed: {total_all}")
if passed_all == total_all:
    utils.print_ok(f"   ‚úÖ All tests passed! ({passed_all}/{total_all})")
else:
    utils.print_info(f"   Passed: {passed_all}/{total_all}")
    utils.print_error(f"   Failed: {total_all - passed_all}/{total_all}")

<a id='cleanup'></a>
### üßπ Cleanup (Optional)

Remove the test PII access contracts from APIM created during this notebook session.

> **Note:** This will not delete any created secrets in Azure Key Vault.

In [None]:
# Set to True to delete the PII access contracts created in this session
cleanup_enabled = True

if cleanup_enabled:
    contracts_to_cleanup = [
        (masking_product_id, "PII Masking"),
        (blocking_product_id, "PII Blocking")
    ]
    
    for product_id, description in contracts_to_cleanup:
        utils.print_info(f"Deleting {description} product: {product_id}...")
        
        # Delete product and its associated subscriptions
        prod_cmd = f"az apim product delete --resource-group {governance_hub_resource_group} --service-name {apimClientTool.apim_resource_name} --product-id {product_id} --delete-subscriptions true --yes"
        utils.run(prod_cmd, f"Deleted product {product_id}", f"Failed to delete product {product_id}")
    
    # Optionally clean up the generated contract folders
    import shutil
    for folder in [masking_contract_folder, blocking_contract_folder]:
        if os.path.exists(folder):
            shutil.rmtree(folder)
            utils.print_info(f"Removed folder: {folder}")
    
    utils.print_ok("üßπ Cleanup completed!")
else:
    utils.print_info("Cleanup is disabled. Set cleanup_enabled = True to remove test resources.")