# Deduplication Experiment

This notebook implements a non-destructive deduplication strategy using "blocking" and "clustering".

## Strategy
1. **Fetch**: Retrieve all records from the `namen_pydantic` index.
2. **Block**: Group records by normalized `geslachtsnaam` to reduce comparisons.
3. **Cluster**: 
    - Within each block, compare full names using Levenshtein distance (RapidFuzz).
    - Apply strict constraints (e.g., birth/death year compatibility).
    - Group matches into `MergedIndexItem` objects.
4. **Result**: A list of merged entities containing references to source IDs.

In [None]:
uv pip install rapidfuzz elasticsearch pydantic tqdm

Collecting rapidfuzz
  Downloading rapidfuzz-3.14.3-cp310-cp310-macosx_11_0_arm64.whl.metadata (12 kB)
Downloading rapidfuzz-3.14.3-cp310-cp310-macosx_11_0_arm64.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m11.5 MB/s[0m  [33m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz
Successfully installed rapidfuzz-3.14.3


In [2]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from pydantic import BaseModel, Field, model_validator
from typing import List, Optional, Dict, Any
import rapidfuzz
from rapidfuzz import fuzz
from tqdm.notebook import tqdm
import collections
import re

In [3]:
# Initialize Elasticsearch
es = Elasticsearch(['http://localhost:9200'], verify_certs=False, request_timeout=30)
if es.ping():
    print("Connected to Elasticsearch")
else:
    print("Could not connect to Elasticsearch")

Connected to Elasticsearch


In [4]:
# --- MergedIndexItem Definition ---
class MergedIndexItem(BaseModel):
    canonical_fullname: str
    geslachtsnaam: str
    birth: Optional[str] = None
    death: Optional[str] = None
    ids: List[str]
    source_records: List[Dict] = Field(default_factory=list)
    
    @model_validator(mode='after')
    def set_canonical_name(self):
        if not self.canonical_fullname and self.source_records:
            names = [r.get('fullname', '') for r in self.source_records if r.get('fullname')]
            if names:
                self.canonical_fullname = max(names, key=len)
        return self

In [5]:
def fetch_all_records(es_client, index_name="namen_pydantic"):
    records = []
    print(f"Fetching records from index '{index_name}'...")
    try:
        # Use scan for efficient retrieval of all documents
        for hit in tqdm(scan(es_client, index=index_name, query={"query": {"match_all": {}}}), desc="Fetching", unit="rec"):
            source = hit['_source']
            source['_id'] = hit['_id']
            records.append(source)
    except Exception as e:
        print(f"Error fetching records: {e}")
    return records

In [6]:
# Fetch Records
all_records = fetch_all_records(es)
print(f"Total records fetched: {len(all_records)}")

Fetching records from index 'namen_pydantic'...


Fetching: 0rec [00:00, ?rec/s]

Total records fetched: 49803


In [23]:
# --- Collection Metadata Analysis ---
# Calculate date ranges (min/max year) for each collectionID
# This helps us infer possible date ranges for records that lack explicit birth/death years.

collection_stats = {}

print("Analyzing collection date ranges...")
for rec in all_records:
    coll = rec.get('collection')
    # Filter out 0 or None years
    years = []
    if rec.get('by') and rec.get('by') != 0:
        years.append(int(rec['by']))
    if rec.get('dy') and rec.get('dy') != 0:
        years.append(int(rec['dy']))
    
    if not coll:
        continue
        
    if coll not in collection_stats:
        collection_stats[coll] = {'years': []}
    
    collection_stats[coll]['years'].extend(years)

# Compute min/max
for coll, stats in collection_stats.items():
    years = stats['years']
    if years:
        stats['min'] = min(years)
        stats['max'] = max(years)
        stats['mean'] = sum(years) / len(years)
    else:
        # Fallback if a collection has NO valid dates
        stats['min'] = 1000
        stats['max'] = 2000 # broad default
        stats['mean'] = 1500
        
print(f"Computed stats for {len(collection_stats)} collections.")
# Example output
list(collection_stats.items())[:3]

Analyzing collection date ranges...
Computed stats for 20 collections.


[('anhalt_dessau_henriette_amalia_von',
  {'years': [], 'min': 1000, 'max': 2000, 'mean': 1500}),
 ('archives', {'years': [], 'min': 1000, 'max': 2000, 'mean': 1500}),
 ('bourbon_charlotte_de',
  {'years': [], 'min': 1000, 'max': 2000, 'mean': 1500})]

In [7]:
# --- Blocking and Clustering ---
def normalize_name(name):
    if not name: return ""
    return name.lower().strip()

# 1. Blocking
blocks = collections.defaultdict(list)
for rec in tqdm(all_records, desc="Blocking"):
    gn = rec.get('geslachtsnaam', '')
    if not gn:
        # Fallback if geslachtsnaam is empty, try to extract from fullname
        fullname = rec.get('fullname', '')
        if fullname:
            gn = fullname.split()[-1]
    
    key = normalize_name(gn)
    if key:
        blocks[key].append(rec)

print(f"Created {len(blocks)} blocks.")

Blocking:   0%|          | 0/49803 [00:00<?, ?it/s]

Created 21027 blocks.


In [31]:
# 2. Clustering with Weighted Confidence Score (Refined for Name Formatting)
# We use a scoring system to combine name similarity, birth year, and death year compatibility.
# Score range: 0-100. Threshold for merge: 85 (adjustable).

merged_results = []
SCORE_THRESHOLD = 85

def get_year(item, field):
    try:
        val = item.get(field)
        if val is not None and int(val) != 0:
            return int(val)
    except (ValueError, TypeError):
        pass
    return None

def normalize_name_part(part):
    return part.lower().strip().replace('.', '').replace(',', '')

def extract_first_name(fullname):
    """
    Extracts the 'First Name' part using heuristics:
    - If comma present ('Doe, John'), take part AFTER comma.
    - If no comma ('John Doe'), take FIRST part.
    - Handle titles/prefixes.
    """
    if not fullname: return ""
    
    parts = fullname.split(',')
    if len(parts) > 1:
        # Format: "Lastname, Firstname ..."
        # Take the part after the first comma
        segment = parts[1].strip()
        tokens = segment.split()
    else:
        # Format: "Firstname Lastname" or just "Name"
        tokens = fullname.split()
        
    if not tokens: return ""
    
    first = normalize_name_part(tokens[0])
    
    # Skip common titles if the name has more parts
    titles = {'mr', 'dr', 'jhr', 'ds', 'prof', 'baron', 'graaf', 'ir', 'jonkheer', 'mevrouw', 'juffrouw'}
    if first in titles and len(tokens) > 1:
        first = normalize_name_part(tokens[1])
        
    return first

def calculate_match_score(item1, item2, stats):
    """
    Calculates a match confidence score (0-100) between two items.
    Components:
    - Name Similarity (50% weight) - Standard Levenshtein
    - First Name Compatibility (30% weight) - Strict check on extracted first names
    - Year Compatibility (20% weight)
    """
    
    name1 = item1.get('fullname', '')
    name2 = item2.get('fullname', '')
    if not name1 or not name2: return 0
    
    # --- 1. First Name Check (Critical for brothers/fathers) ---
    first1 = extract_first_name(name1)
    first2 = extract_first_name(name2)
    
    first_name_score = 0
    
    if not first1 or not first2:
        # One name is empty/missing first name -> Neutral/Soft Match
        # e.g., "Dussen, van der" (No first name) vs "Jan van der Dussen"
        first_name_score = 50 
    elif first1 == first2:
        first_name_score = 100
    elif len(first1) == 1 or len(first2) == 1:
        # Initial match (e.g., "J" vs "jan")
        if first1[0] == first2[0]:
            first_name_score = 85 # High likelihood
        else:
            return 0 # Mismatching initial (J vs P) = Veto
    else:
        # Fuzzy match first name (e.g. "Joannes" vs "Johannes")
        if fuzz.ratio(first1, first2) > 85:
            first_name_score = 90
        else:
            return 0 # VETO: Distinct first names (likely brothers/relatives)
            
    # --- 2. Full Name Similarity ---
    name_sim = fuzz.token_sort_ratio(name1, name2)
    
    # --- 3. Year Compatibility ---
    by1, by2 = get_year(item1, 'by'), get_year(item2, 'by')
    dy1, dy2 = get_year(item1, 'dy'), get_year(item2, 'dy')
    
    year_score = 50 # Neutral default
    score_contributions = []

    # Hard Vetoes
    if by1 and by2 and abs(by1 - by2) > 3: return 0
    if dy1 and dy2 and abs(dy1 - dy2) > 3: return 0
    if by1 and dy2 and by1 > dy2: return 0
    if by2 and dy1 and by2 > dy1: return 0

    # Birth Year Score
    if by1 and by2:
        score_contributions.append(100 if abs(by1 - by2) <= 1 else 70)
    elif (by1 and not by2) or (not by1 and by2):
        val = by1 if by1 else by2
        target_coll = item2.get('collection') if by1 else item1.get('collection')
        if target_coll in stats:
            s = stats[target_coll]
            score_contributions.append(65 if s['min'] - 20 <= val <= s['max'] + 20 else 10)
    
    # Death Year Score
    if dy1 and dy2:
        score_contributions.append(100 if abs(dy1 - dy2) <= 1 else 70)
    elif (dy1 and not dy2) or (not dy1 and dy2):
        val = dy1 if dy1 else dy2
        target_coll = item2.get('collection') if dy1 else item1.get('collection')
        if target_coll in stats:
            s = stats[target_coll]
            score_contributions.append(65 if s['min'] - 20 <= val <= s['max'] + 20 else 10)

    if score_contributions:
        year_score = sum(score_contributions) / len(score_contributions)
    
    # Final Weighting
    # First Name (30%) + Full Name Context (50%) + Years (20%)
    final_score = (first_name_score * 0.3) + (name_sim * 0.5) + (year_score * 0.2)
    
    return final_score

# --- Clustering Loop ---
for key, block in tqdm(blocks.items(), desc="Clustering"):
    if len(block) == 1:
        item = block[0]
        merged_results.append(MergedIndexItem(
            canonical_fullname=item.get('fullname', ''),
            geslachtsnaam=item.get('geslachtsnaam', ''),
            birth=item.get('birth'),
            death=item.get('death'),
            ids=[item['_id']],
            source_records=[item]
        ))
        continue
    
    processed = set()
    for i, item1 in enumerate(block):
        if i in processed: continue
        
        cluster = [item1]
        processed.add(i)
        
        for j, item2 in enumerate(block):
            if j <= i or j in processed: continue
            
            # Calculate score
            score = calculate_match_score(item1, item2, collection_stats)
            
            if score >= SCORE_THRESHOLD:
                cluster.append(item2)
                processed.add(j)
        
        merged_results.append(MergedIndexItem(
            canonical_fullname=cluster[0].get('fullname', ''),
            geslachtsnaam=cluster[0].get('geslachtsnaam', ''),
            birth=cluster[0].get('birth'),
            death=cluster[0].get('death'),
            ids=[it['_id'] for it in cluster],
            source_records=cluster
        ))

print(f"Total Merged Records: {len(merged_results)}")

Clustering:   0%|          | 0/21027 [00:00<?, ?it/s]

Total Merged Records: 44390


In [32]:
# Summary
print("--- Deduplication Summary ---")
print(f"Original Records: {len(all_records)}")
print(f"Merged Entities:  {len(merged_results)}")

# Show an example of a cluster
found_cluster = False
for m in merged_results:
    if len(m.ids) > 1:
        print("\nExample Cluster:")
        print(m.model_dump_json(indent=2))
        found_cluster = True
        break

if not found_cluster:
    print("No duplicates found with current threshold.")

--- Deduplication Summary ---
Original Records: 49803
Merged Entities:  44390

Example Cluster:
{
  "canonical_fullname": "Dussen, Nicolaas van der",
  "geslachtsnaam": "Dussen",
  "birth": null,
  "death": null,
  "ids": [
    "2401",
    "14841"
  ],
  "source_records": [
    {
      "fullname": "Dussen, Nicolaas van der",
      "geslachtsnaam": "Dussen",
      "birth": null,
      "death": null,
      "biography": "",
      "by": 0,
      "dy": 0,
      "reference": null,
      "collection": "de_witt",
      "_id": "2401"
    },
    {
      "fullname": "mr. Nicolaas van der Dussen",
      "geslachtsnaam": "Dussen",
      "birth": "1718",
      "death": "1770-01-13",
      "biography": "",
      "by": 1718,
      "dy": 1770,
      "reference": null,
      "collection": "raa",
      "_id": "14841"
    }
  ]
}


In [None]:
all_records[0]

[MergedIndexItem(canonical_fullname='Dussen, van der', geslachtsnaam='Dussen', birth=None, death=None, ids=['0', '1930'], source_records=[{'fullname': 'Dussen, van der', 'geslachtsnaam': 'Dussen', 'birth': None, 'death': None, 'biography': '', 'by': 0, 'dy': 0, 'reference': None, 'collection': 'anhalt_dessau_henriette_amalia_von', '_id': '0'}, {'fullname': 'Dussen, P. van der', 'geslachtsnaam': 'Dussen', 'birth': None, 'death': None, 'biography': '', 'by': 0, 'dy': 0, 'reference': None, 'collection': 'de_witt', '_id': '1930'}]),
 MergedIndexItem(canonical_fullname='Dussen, Arende van der', geslachtsnaam='Dussen', birth=None, death=None, ids=['2379'], source_records=[{'fullname': 'Dussen, Arende van der', 'geslachtsnaam': 'Dussen', 'birth': None, 'death': None, 'biography': '', 'by': 0, 'dy': 0, 'reference': None, 'collection': 'de_witt', '_id': '2379'}]),
 MergedIndexItem(canonical_fullname='Dussen, Nicolaas van der', geslachtsnaam='Dussen', birth=None, death=None, ids=['2401', '14841'