# Sanctions Screening Evaluation

- **Purpose:** Evaluate sanctions screening accuracy and validate precision/recall targets
- **Author:** Devbrew LLC  
- **Last Updated:** November 18, 2025  
- **Status:** In progress  
- **License:** Apache 2.0

## Overview

This notebook implements the evaluation protocol for the sanctions screening module. The evaluation measures matching accuracy through a labeled test set and validates that the system meets production accuracy targets.

**Evaluation Metrics:**
- Precision@1: Percentage of queries where top candidate is the correct match (target: ≥95%)
- Recall@top3: Percentage of queries where ground truth match appears in top 3 (target: ≥98%)
- False Positive Rate: Percentage of non-matches incorrectly flagged as matches
- Decision Accuracy: Alignment between predicted and expected decision categories

The evaluation validates that the screening system correctly identifies sanctioned entities while minimizing false positives, meeting production readiness requirements.

## Setup: Artifacts and Functions

The evaluation loads artifacts generated by the implementation pipeline:

- **Sanctions Index**: Canonicalized names and metadata (`sanctions_index.parquet`)
- **Blocking Indices**: Inverted indices for candidate retrieval (`blocking_indices.json`)
- **Metadata**: Version tracking and dataset statistics

Helper functions for text normalization, tokenization, and screening are loaded to enable independent evaluation runs without re-executing the full implementation pipeline.

### Environment Configuration

We configure the Python environment with standardized settings, import required libraries, and set a fixed random seed for reproducibility. This ensures consistent evaluation results across runs.

In [None]:
import warnings
from pathlib import Path
import json
import random

import pandas as pd
import numpy as np

import rapidfuzz as rf

# Configuration
warnings.filterwarnings("ignore")
pd.set_option("display.max_columns", 100)
pd.set_option("display.max_rows", 100)
pd.set_option("display.float_format", '{:.2f}'.format)

# Reproducibility
RANDOM_STATE = 42
random.seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)

print("Environment configured successfully")
print(f" pandas: {pd.__version__}")
print(f" numpy: {np.__version__}")
print(f" rapidfuzz: {rf.__version__}")

Environment configured successfully
 pandas: 2.3.3
 numpy: 2.3.3
 rapidfuzz: 3.14.1


### Load Artifacts

The evaluation loads pre-computed artifacts from the implementation pipeline. The sanctions index contains 39,350 canonicalized name records with metadata. Blocking indices enable O(1) candidate retrieval through inverted index lookups.

In [2]:
# Path configuration
PROJECT_ROOT = Path.cwd().parent if Path.cwd().name == "notebooks" else Path.cwd()
MODELS_DIR = PROJECT_ROOT / "packages" / "models"
DATA_DIR = PROJECT_ROOT / "data_catalog" / "processed"


print("Loading artifacts...\n")

# Load sanctions index
sanctions_index_path = MODELS_DIR / "sanctions_index.parquet"
if not sanctions_index_path.exists():
    raise FileNotFoundError(f"Sanctions index not found: {sanctions_index_path}\n"
                          f"Please run notebooks/04_sanctions_screening.ipynb first to generate artifacts.")

sanctions_index = pd.read_parquet(sanctions_index_path)
print(f"Loaded sanctions index: {len(sanctions_index):,} records")

# Load blocking indices
blocking_indices_path = MODELS_DIR / "blocking_indices.json"
if not blocking_indices_path.exists():
    raise FileNotFoundError(f"Blocking indices not found: {blocking_indices_path}\n"
                          f"Please run notebooks/04_sanctions_screening.ipynb first to generate artifacts.")

with open(blocking_indices_path, 'r') as f:
    blocking_indices = json.load(f)

first_token_index = {k: v for k, v in blocking_indices['first_token'].items()}
bucket_index = {k: v for k, v in blocking_indices['bucket'].items()}
initials_index = {k: v for k, v in blocking_indices['initials'].items()}

print(f"Loaded blocking indices:")
print(f" - First token index: {len(first_token_index):,} keys")
print(f" - Bucket index: {len(bucket_index):,} keys")
print(f" - Initials index: {len(initials_index):,} keys")

# Load metadata (optional, for version tracking)
metadata_path = MODELS_DIR / "sanctions_index_metadata.json"
if metadata_path.exists():
    with open(metadata_path, 'r') as f:
        sanctions_index_metadata = json.load(f)
    print(f"\nLoaded metadata: version {sanctions_index_metadata.get('created_at', 'unknown')}")
else:
    sanctions_index_metadata = {}
    print("[Warning] Metadata not found (optional)")

print(f"\nAll artifacts loaded successfully")

Loading artifacts...

Loaded sanctions index: 39,350 records
Loaded blocking indices:
 - First token index: 15,597 keys
 - Bucket index: 4 keys
 - Initials index: 15,986 keys

Loaded metadata: version 2025-11-17T06:00:56.218723

All artifacts loaded successfully


### Helper Functions

Text normalization and tokenization functions are imported from the shared `packages.compliance.sanctions` module. This module provides standardized functions used by both `04_sanctions_screening.ipynb` and this evaluation notebook, ensuring consistency across the screening pipeline.

The shared functions include:
- `normalize_text()`: Text normalization for robust fuzzy matching
- `tokenize()`: Tokenization with stopword filtering

In [None]:
from packages.compliance.sanctions import (
    normalize_text,
    tokenize
)

# Verify imports work
print("Helper functions imported successfully")
print(f"  - normalize_text: {normalize_text.__name__}")
print(f"  - tokenize: {tokenize.__name__}")

Helper functions imported successfully
  - normalize_text: normalize_text
  - tokenize: tokenize


## Create Labeled Test Set

To evaluate the screening system's accuracy, we need a labeled test set with known ground truth matches. This test set will include:

- **Positive examples**: Query variations of names in the sanctions index (exact matches, normalized versions, case variations, typos)
- **Negative examples**: Names that should NOT match any sanctions record (to test false positive rate)

We'll sample diverse names from the sanctions index and create query variations to test different matching scenarios. This approach allows us to measure:
- **Precision@1**: How often the top candidate is the correct match
- **Recall@top3**: How often the ground truth appears in the top 3 results
- **False Positive Rate**: How often non-matches are incorrectly flagged

In [8]:
# Function to introduce typos
def _introduce_typo(name: str, n_typos: int = 1) -> str:
    """
    Introduce minor typos for testing robustness.
    
    Randomly replaces or removes characters to simulate real-world
    data entry errors.
    """
    chars = list(name)
    for _ in range(n_typos):
        if len(chars) > 0:
            idx = random.randint(0, len(chars) - 1)
            # Replace with random character or remove
            if random.random() < 0.5:
                chars[idx] = random.choice('abcdefghijklmnopqrstuvwxyz')
            else:
                chars.pop(idx)
    return ''.join(chars)

# Function to create labeled test set
def create_labeled_test_set(
    sanctions_index: pd.DataFrame,
    n_samples: int = 80,
    random_state: int = 42
) -> pd.DataFrame:
    """
    Create a labeled test set with ground truth matches.
    
    Samples names from sanctions index and creates query variations
    with known ground truth matches.
    
    Args:
        sanctions_index: DataFrame with sanctions records
        n_samples: Number of base names to sample
        random_state: Random seed for reproducibility
        
    Returns:
        DataFrame with test queries and ground truth labels
    """
    np.random.seed(random_state)
    random.seed(random_state)
    
    # Sample diverse names from sanctions index
    # Note: For a production system, you might want stratified sampling
    # by country/program/entity_type, but for this case study we keep it simple
    sampled = sanctions_index.sample(
        n=min(n_samples, len(sanctions_index)),
        random_state=random_state
    )
    
    # Validate UIDs exist
    valid_uids = set(sanctions_index['uid'].values)
    
    test_queries = []
    
    for _, row in sampled.iterrows():
        original_name = row['name']
        uid = row['uid']
        
        # Skip if UID is invalid
        if uid not in valid_uids:
            continue
        
        # Create query variations to test different matching scenarios
        variations = [
            # Exact match (should score very high)
            {
                'query': original_name,
                'ground_truth_uid': uid,
                'expected_score_min': 0.95,
                'variation_type': 'exact'
            },
            # Normalized version (tests normalization pipeline)
            {
                'query': row['name_norm'],
                'ground_truth_uid': uid,
                'expected_score_min': 0.90,
                'variation_type': 'normalized'
            },
            # Case variation (tests case-insensitive matching)
            {
                'query': original_name.upper(),
                'ground_truth_uid': uid,
                'expected_score_min': 0.90,
                'variation_type': 'case'
            },
            # Minor typo (tests robustness to errors)
            {
                'query': _introduce_typo(original_name, n_typos=1),
                'ground_truth_uid': uid,
                'expected_score_min': 0.85,
                'variation_type': 'typo'
            }
        ]
        
        # Add negative examples (non-matches) to test false positive rate
        # Sample a random name that shouldn't match
        non_match_candidates = sanctions_index[sanctions_index['uid'] != uid]
        if len(non_match_candidates) > 0:
            non_match_name = non_match_candidates.sample(1, random_state=random_state).iloc[0]['name']
            
            variations.append({
                'query': non_match_name,
                'ground_truth_uid': None,  # No match expected
                'expected_score_min': None,
                'variation_type': 'non_match'
            })
        
        test_queries.extend(variations)
    
    # Create DataFrame
    test_df = pd.DataFrame(test_queries)
    
    # Validate all ground truth UIDs
    if len(test_df) > 0:
        invalid_uids = test_df[
            test_df['ground_truth_uid'].notna() & 
            ~test_df['ground_truth_uid'].isin(valid_uids)
        ]
        if len(invalid_uids) > 0:
            raise ValueError(f"Found {len(invalid_uids)} invalid ground truth UIDs")
    
    # Add metadata
    test_df['query_id'] = range(len(test_df))
    test_df['created_at'] = pd.Timestamp.now()
    
    return test_df

# Create test set
print("Creating labeled test set...")
labeled_test_set = create_labeled_test_set(
    sanctions_index,
    n_samples=50,  # 50 names × ~4 variations = ~200 queries
    random_state=RANDOM_STATE
)

print(f"\nCreated {len(labeled_test_set):,} test queries")
print(f" - With ground truth: {labeled_test_set['ground_truth_uid'].notna().sum():,}")
print(f" - Non-matches: {labeled_test_set['ground_truth_uid'].isna().sum():,}")

# Show distribution of variation types
if 'variation_type' in labeled_test_set.columns:
    print(f"\nVariation type distribution:")
    for var_type, count in labeled_test_set['variation_type'].value_counts().items():
        print(f" - {var_type}: {count:,}")

# Save test set for reproducibility
test_set_path = DATA_DIR / "sanctions_eval_labels.csv"
test_set_path.parent.mkdir(parents=True, exist_ok=True)
labeled_test_set.to_csv(test_set_path, index=False)
print(f"\nSaved test set to: {test_set_path}")

Creating labeled test set...

Created 250 test queries
 - With ground truth: 200
 - Non-matches: 50

Variation type distribution:
 - exact: 50
 - normalized: 50
 - case: 50
 - typo: 50
 - non_match: 50

Saved test set to: /Users/joekariuki/Documents/Devbrew/research/devbrew-payments-fraud-sanctions/data_catalog/processed/sanctions_eval_labels.csv
