# Entity Resolution Demo Notebook

This notebook was created to showcase how Neo4j can help to indentfy and resolve duplication cause by near-similarities in your database. 

There are a few pre-requisites to take care of before we get started. 


## 0. Pre-requisites


### Python Packages
Be sure you have installed the following python packages

`pip3 install faker neo4j`

or 

`conda install faker conda-forge::neo4j-python-driver`

If conda isntall is not working, please refer to [anaconda.org](https://anaconda.org/conda-forge/neo4j-python-driver)

In [1]:

import random
import time
import re
from faker import Faker
from neo4j import GraphDatabase

### Neo4j DataBase

Before starting this demo, make sure to create a new database and update the URI AND PASSWORD DB_NAME below. You may also need to update the USER or DB_NAM if those are different for your DB instance. 

We recommend using [Neo4j Desktop](https://neo4j.com/product/#neo4j-desktop) as it allows you to use Graph Data Science Library at no cost.  If you are using Neo4J Desktop, you can find your URI by clicking Details and copying the "Bolt port".


You will also need to ensure APOC and Graph Data Science are enabled on your instance. If you do not do this, you will receive errors later in the notebook.
- [Installing APOC](https://neo4j.com/docs/apoc/current/installation/)
- [Installing GDC](https://neo4j.com/docs/graph-data-science/current/installation/)


In [2]:
# Neo4j connection details
URI = "bolt://localhost:7687"
USER = "neo4j"
PASSWORD = "password"   
DB_NAME = "neo4j"   
BATCH_ID = "batch1"

In [3]:
# Number of total candidate nodes 
TOTAL_NODES = 10000

# Percentage of near-duplicate s to inject (e.g., 10% means 1000 duplicates)
DUPLICATE_PERCENT = 0.1

# for keeping track of the program execution time
program_start_time = time.time()

## 1. Create mock data candidates

  - Inserts 10,000 Candidate nodes using Faker data.
  - Intentionally seeds near-duplicates to test entity resolution.

In [4]:
def create_mock_data(cleanup=True):
    """Generates random candidate data and inserts into Neo4j. Optionally cleans up old data."""
    fake = Faker()
    Faker.seed(42)  # Ensure some deterministic behavior
    
    driver = GraphDatabase.driver(URI, auth=(USER, PASSWORD))
    print("Connected to Neo4j.")
    
    # Cleanup old Candidates from previous batch (optional)
    if cleanup:
        cleanup_previous_batch(driver)
    
    # Generate random data 
    candidates = []
    
    for i in range(TOTAL_NODES):
        full_name = fake.name()
        email = fake.email()
        phone = fake.phone_number()
        address = fake.address().replace('\n', ', ')
        
        candidate = {
            "candidateId": f"cand_{i}",
            "batchId": BATCH_ID,
            "fullName": full_name,
            "email": email,
            "phoneNumber": phone,
            "address": address
        }
        candidates.append(candidate)
    
    # Create near duplicates by introducing small variations 
    num_duplicates = int(TOTAL_NODES * DUPLICATE_PERCENT)
    for _ in range(num_duplicates):
        original = random.choice(candidates)
        # Make a slight variation of the original 
        alt_full_name = introduce_small_typo(original['fullName'])
        alt_email = introduce_small_typo(original['email'])
        alt_phone = introduce_phone_variation(original['phoneNumber'])
        alt_address = introduce_small_typo(original['address'])
        
        cand_id = f"dup_{original['candidateId']}_{random.randint(1,100000)}"
        duplicate = {
            "candidateId": cand_id,
            "batchId": BATCH_ID,
            "fullName": alt_full_name,
            "email": alt_email,
            "phoneNumber": alt_phone,
            "address": alt_address
        }
        candidates.append(duplicate)
    
    # Shuffle final list to ensure random distribution 
    random.shuffle(candidates)
    
    # Insert into Neo4j in batches
    batch_size = 1000
    with driver.session(database=DB_NAME) as session:
        for i in range(0, len(candidates), batch_size):
            batch = candidates[i:i+batch_size]
            cypher = """
            UNWIND $rows AS row
            CREATE (c:Candidate {
              candidateId: row.candidateId,
              batchId: row.batchId,
              fullName: row.fullName,
              email: row.email,
              phoneNumber: row.phoneNumber,
              address: row.address
            })
            """
            session.run(cypher, parameters={"rows": batch})
            print(f"Inserted batch up to index {i}")
    
    driver.close()
    print("Mock data generation complete.")

def cleanup_previous_batch(driver):
    """Removes all Candidate nodes from the previous batch."""
    with driver.session(database=DB_NAME) as session:
        session.run(f"MATCH (c:Candidate {{batchId:'{BATCH_ID}'}}) DETACH DELETE c")
    print(f"Old Candidate nodes for batch '{BATCH_ID}' removed.")

def introduce_small_typo(original_str):
    """Randomly introduces a small typo or alteration in the given string."""
    if not original_str:
        return original_str
    # If string is short, just shuffle or skip
    if len(original_str) < 5:
        return original_str
    
    # 50% chance: skip to avoid too many changes
    if random.random() < 0.5:
        return original_str
    
    # Insert or remove a character at random position
    s_list = list(original_str)
    pos = random.randint(0, len(s_list)-1)
    
    # 50% chance remove a char, 50% chance replace with random letter
    if random.random() < 0.5:
        del s_list[pos]
    else:
        s_list[pos] = chr(random.randint(ord('a'), ord('z')))
    
    return "".join(s_list)

def introduce_phone_variation(phone_str):
    """Randomly modifies phone numbers slightly."""
    # e.g., remove a digit, add a digit, or do nothing
    if not phone_str:
        return phone_str
    if random.random() < 0.5:
        return phone_str  # skip messing with half for variety
    
    only_digits = ''.join(filter(str.isdigit, phone_str))
    if len(only_digits) > 6:
        # remove last digit or replace it
        if random.random() < 0.5:
            only_digits = only_digits[:-1]  # remove last
        else:
            only_digits = only_digits[:-1] + str(random.randint(0,9))
    
    # randomly add dash or parentheses to create variety
    formatted = only_digits
    if len(only_digits) > 3:
        formatted = f"({only_digits[:3]}) {only_digits[3:]}"
    return formatted



create_mock_data(cleanup=True)

Connected to Neo4j.
Old Candidate nodes for batch 'batch1' removed.
Inserted batch up to index 0
Inserted batch up to index 1000
Inserted batch up to index 2000
Inserted batch up to index 3000
Inserted batch up to index 4000
Inserted batch up to index 5000
Inserted batch up to index 6000
Inserted batch up to index 7000
Inserted batch up to index 8000
Inserted batch up to index 9000
Inserted batch up to index 10000
Mock data generation complete.


## 2. Create "fraud family" clusters

Purpose:
  - Programmatically creates two small demo clusters in Neo4j:
    1) A "fraud family" of 5 nodes with partial property overlaps
    2) 6 variations of one person with minor differences

Note: apoc library must be installed in Neo4j https://neo4j.com/docs/apoc/current/installation/


In [5]:

CLUSTER_1_DATA = [
    {
        "candidateId": "FRAUD_101",
        "batchId": "batch1",
        "fullName": "Theodore Chadwick",
        "email": "theo.chadwick@gmail.com",
        "phoneNumber": "555-1234",
        "address": "123 Fraud Rd, Chicago"
    },
    {
        "candidateId": "FRAUD_102",
        "batchId": "batch1",
        "fullName": "Theo Chadwick",
        "email": "theo_chad@gmail.com",
        "phoneNumber": "555-1234",
        "address": "123 Fraud Road, CHI"
    },
    {
        "candidateId": "FRAUD_103",
        "batchId": "batch1",
        "fullName": "T. Chadwick",
        "email": "t_chad@gmail.com",
        "phoneNumber": "555-1234",
        "address": "123 Fraud Rd, Chicago"
    },
    {
        "candidateId": "FRAUD_104",
        "batchId": "batch1",
        "fullName": "Ted Chad",
        "email": "tedChad@gmail.com",
        "phoneNumber": "555-9999",
        "address": "123 Fraudd Rd, Chicago"
    },
    {
        "candidateId": "FRAUD_105",
        "batchId": "batch1",
        "fullName": "Theo C.",
        "email": "theoc12@gmail.com",
        "phoneNumber": "423-502-1234",
        "address": "123 Fraud Rd, Chicago"
    }
]

CLUSTER_2_DATA = [
    {
        "candidateId": "PERSON_201",
        "batchId": "batch1",
        "fullName": "Jessica Parsons",
        "email": "jessparsons@gmail.com",
        "phoneNumber": "423-502-1235",
        "address": "99 Demo Ln, Springfield"
    },
    {
        "candidateId": "PERSON_202",
        "batchId": "batch1",
        "fullName": "Jess Parsons",
        "email": "jessparsons@gmail.com",
        "phoneNumber": "333-4444",
        "address": "99 Demo Lane, Springfield"
    },
    {
        "candidateId": "PERSON_203",
        "batchId": "batch1",
        "fullName": "Jessica P.",
        "email": "jparsons@gmail.com",
        "phoneNumber": "3334444",
        "address": "99 Demo Ln, Spring Field"
    },
    {
        "candidateId": "PERSON_204",
        "batchId": "batch1",
        "fullName": "J. Parsons",
        "email": "j.parsons@gmail.com",
        "phoneNumber": "333-4444",
        "address": "99 Demo LN, Springfield"
    },
    {
        "candidateId": "PERSON_205",
        "batchId": "batch1",
        "fullName": "Jessy Parson",
        "email": "jessy.parson@gmail.com",
        "phoneNumber": "(333)4444",
        "address": "99 Demo Ln, Springfield"
    },
    {
        "candidateId": "PERSON_206",
        "batchId": "batch1",
        "fullName": "Parsons, Jessica",
        "email": "parsons_jessica@gmail.com",
        "phoneNumber": "333 4444",
        "address": "99 Demo Lane, Springfield"
    }
]

def create_clusters():
    """Inserts the two demo clusters into Neo4j."""
    driver = GraphDatabase.driver(URI, auth=(USER, PASSWORD))
    print("Connected to Neo4j.")

    with driver.session(database=DB_NAME) as session:
        # Insert cluster 1 data using apoc.periodic.iterate
        # We'll build a list of records to pass in 
        session.run("""
        CALL apoc.periodic.iterate(
          'UNWIND $rows AS row RETURN row',
          'CREATE (c:Candidate {
             candidateId: row.candidateId,
             batchId: row.batchId,
             fullName: row.fullName,
             email: row.email,
             phoneNumber: row.phoneNumber,
             address: row.address
           })',
          {batchSize: 5, parallel: false, params: {rows: $clusterData}}
        )
        """, parameters={"clusterData": CLUSTER_1_DATA})
        print("Created Cluster 1 (Fraud Family).")

        # Insert cluster 2 data the same way
        session.run("""
        CALL apoc.periodic.iterate(
          'UNWIND $rows AS row RETURN row',
          'CREATE (c:Candidate {
             candidateId: row.candidateId,
             batchId: row.batchId,
             fullName: row.fullName,
             email: row.email,
             phoneNumber: row.phoneNumber,
             address: row.address
           })',
          {batchSize: 6, parallel: false, params: {rows: $clusterData}}
        )
        """, parameters={"clusterData": CLUSTER_2_DATA})
        print("Created Cluster 2 (One Person, 6 variations).")

    driver.close()
    print("Demo clusters inserted successfully.")

create_clusters()


Connected to Neo4j.
Created Cluster 1 (Fraud Family).
Created Cluster 2 (One Person, 6 variations).
Demo clusters inserted successfully.


# 3. Normalize the data

#Normaization is the process of converting data into a common format that allows for easier comparison and analysis. This is especially important when dealing with text data, where variations in spelling, punctuation, and formatting can make it difficult to match records accurately. In this exercise, we'll implement a few simple normalization functions for phone numbers, email addresses, and street addresses. These functions will remove non-essential characters, convert text to lowercase, and handle common abbreviations to make the data more consistent and easier to work with.

This section of code:
  - Applies normalization (stripping punctuation, lowercasing, etc.).
  - Writes back normalized fields (normalizedPhone, normalizedEmail, etc.).


### First, let's create some helper functions 

In [6]:
def normalize_phone(phone_str):
    """Remove all non-digit characters. Return None if invalid/empty."""
    if not phone_str:
        return None
    digits = re.sub(r'[^0-9]', '', phone_str)
    return digits if digits else None

def normalize_email(email_str):
    """Lowercase, trim spaces. Return None if invalid/empty."""
    if not email_str:
        return None
    email_str = email_str.strip().lower()
    # Optionally handle special domain-based rules, but we'll skip here
    return email_str

def normalize_address(addr_str):
    """Lowercase, remove punctuation, naive abbreviation handling."""
    if not addr_str:
        return None
    addr_str = addr_str.lower()
    addr_str = re.sub(r'[.,#]', '', addr_str)
    addr_str = addr_str.replace(' street', ' st')
    addr_str = addr_str.replace(' avenue', ' ave')
    # Additional domain-specific expansions can be added
    return addr_str.strip()

## Now we can normalize the data for each candidate

In [7]:


def normalize_properties():
    """ Normalizes phone, email, and address properties for all candidates in the batch."""
    driver = GraphDatabase.driver(URI, auth=(USER, PASSWORD))
    print("Connected to Neo4j for normalization.")
    
    with driver.session(database=DB_NAME) as session:
        # Retrieve all candidate data
        fetch_query = f"""
        MATCH (c:Candidate {{batchId:'{BATCH_ID}'}})
        RETURN c.candidateId AS candidateId,
               c.phoneNumber AS phone,
               c.email AS email,
               c.address AS address
        """
        result = session.run(fetch_query)
        updates = []
        for record in result:
            cand_id = record["candidateId"]
            phone = normalize_phone(record["phone"])
            email = normalize_email(record["email"])
            address = normalize_address(record["address"])
            
            updates.append({
                "candidateId": cand_id,
                "normalizedPhone": phone,
                "normalizedEmail": email,
                "normalizedAddress": address
            })
        
        # Bulk update in batches to avoid large transactions
        batch_size = 1000
        for i in range(0, len(updates), batch_size):
            batch = updates[i:i+batch_size]
            update_cypher = """
            UNWIND $rows AS row
            MATCH (c:Candidate {candidateId: row.candidateId})
            SET c.normalizedPhone = row.normalizedPhone,
                c.normalizedEmail = row.normalizedEmail,
                c.normalizedAddress = row.normalizedAddress
            """
            session.run(update_cypher, parameters={"rows": batch})
            print(f"Normalized batch up to index {i}")
    
    driver.close()
    print("Property normalization complete.")

## 4. Run Entity Recognition

The code blocks below:
  - Creates indexes on normalized fields
  - (Optionally) sets up blocking keys
  - Performs fuzzy matching (Jaro-Winkler, Levenshtein, etc.) on normalized fields
  - Aggregates multiple SIMILAR relationships into AGGREGATED_SIMILAR
  - Projects the subgraph into GDS for clustering (Leiden)
  - Optionally merges or links duplicates


We will first definte the helper functions, and then run the entire pipeline. 

In [8]:
def create_candidate_indexes(driver):
    """ Create indexes on normalized fields for faster lookup. """
    index_queries = [
        "CREATE INDEX candidate_candidateId_index IF NOT EXISTS FOR (c:Candidate) ON (c.candidateId)",
        "CREATE INDEX candidate_phone_index IF NOT EXISTS FOR (c:Candidate) ON (c.normalizedPhone)",
        "CREATE INDEX candidate_email_index IF NOT EXISTS FOR (c:Candidate) ON (c.normalizedEmail)",
        "CREATE INDEX candidate_address_index IF NOT EXISTS FOR (c:Candidate) ON (c.normalizedAddress)"
    ]
    with driver.session(database=DB_NAME) as session:
        for q in index_queries:
            session.run(q)
    print("Candidate indexes on normalized fields created.")

In [9]:
def create_soundex_blocking(driver):
    """ Creates a :BlockKey node for soundex(normalizedFullName) and links Candidates for partial blocking."""
    with driver.session(database=DB_NAME) as session:
        # Remove old BlockKeys if desired
        session.run("MATCH (b:BlockKey) DETACH DELETE b")
        
        query = f"""
        CALL apoc.periodic.iterate(
          'MATCH (c:Candidate {{batchId:"{BATCH_ID}"}}) RETURN c',
          'WITH c, apoc.text.soundex(c.normalizedFullName) AS sdx
           MERGE (bk:BlockKey {{value: sdx}})
           MERGE (c)-[:HAS_BLOCK]->(bk)',
          {{batchSize:1000, parallel:false}}
        )
        """
        session.run(query)
    print("Soundex blocking applied.")

In [10]:
def create_similarity_by_name(driver, jaro_threshold=0.05):
    """
    Jaro-Winkler distance is near 0.0 for identical strings and near 1.0 for different strings.
    We'll store similarity as (1 - distance), so we want distance < threshold => similarity > (1-threshold).
    Using blocking: only compare candidates that share the same blockKey.
    """
    query = f"""
    CALL apoc.periodic.iterate(
      "MATCH (bk:BlockKey)<-[:HAS_BLOCK]-(c:Candidate {{batchId:'{BATCH_ID}'}})
       RETURN bk, c",
      "MATCH (bk)<-[:HAS_BLOCK]-(c2:Candidate {{batchId:'{BATCH_ID}'}})
       WHERE id(c) < id(c2)
       WITH c, c2, apoc.text.jaroWinklerDistance(c.normalizedFullName, c2.normalizedFullName) AS dist
       WHERE dist < {jaro_threshold}
       CREATE (c)-[:SIMILAR {{
         comparedProperty: 'fullName',
         similarity: (1.0 - dist)
       }}]->(c2)",
      {{batchSize:200, parallel:false}}
    )
    """
    with driver.session(database=DB_NAME) as session:
        session.run(query)
    print("Created SIMILAR relationships by fullName (Jaro-Winkler).")

In [11]:
def create_similarity_by_email(driver, similarity_threshold=0.9):
    """
    Using Levenshtein-based similarity = 1 - (distance / maxLen).
    We'll do a simpler approach (no blocking for email) for illustration, or you can also combine with blockKey.
    """
    query = f"""
    CALL apoc.periodic.iterate(
      "MATCH (c:Candidate {{batchId:'{BATCH_ID}'}}) WHERE c.normalizedEmail IS NOT NULL RETURN c",
      "MATCH (c2:Candidate {{batchId:'{BATCH_ID}'}}) 
       WHERE c2.normalizedEmail IS NOT NULL AND id(c) < id(c2)
       WITH c, c2,
         apoc.text.levenshteinDistance(c.normalizedEmail, c2.normalizedEmail) AS dist,
         CASE WHEN size(c.normalizedEmail) >= size(c2.normalizedEmail)
              THEN size(c.normalizedEmail)
              ELSE size(c2.normalizedEmail)
         END AS maxLen
       WITH c, c2, dist, maxLen, 1.0 - (toFloat(dist)/toFloat(maxLen)) AS sim
       WHERE sim >= {similarity_threshold}
       CREATE (c)-[:SIMILAR {{
         comparedProperty: 'email',
         similarity: sim
       }}]->(c2)",
      {{batchSize:200, parallel:false}}
    )
    """
    with driver.session(database=DB_NAME) as session:
        session.run(query)
    print("Created SIMILAR relationships by email (Levenshtein).")

In [12]:
def create_similarity_by_phone(driver, similarity_threshold=0.9):
    """ Same approach for phone, using normalizedPhone. """
    query = f"""
    CALL apoc.periodic.iterate(
      "MATCH (c:Candidate {{batchId:'{BATCH_ID}'}}) WHERE c.normalizedPhone IS NOT NULL RETURN c",
      "MATCH (c2:Candidate {{batchId:'{BATCH_ID}'}}) 
       WHERE c2.normalizedPhone IS NOT NULL AND id(c) < id(c2)
       WITH c, c2,
         apoc.text.levenshteinDistance(c.normalizedPhone, c2.normalizedPhone) AS dist,
         CASE WHEN size(c.normalizedPhone) >= size(c2.normalizedPhone)
              THEN size(c.normalizedPhone)
              ELSE size(c2.normalizedPhone)
         END AS maxLen
       WITH c, c2, dist, maxLen, 1.0 - (toFloat(dist)/toFloat(maxLen)) AS sim
       WHERE sim >= {similarity_threshold}
       CREATE (c)-[:SIMILAR {{
         comparedProperty: 'phoneNumber',
         similarity: sim
       }}]->(c2)",
      {{batchSize:200, parallel:false}}
    )
    """
    with driver.session(database=DB_NAME) as session:
        session.run(query)
    print("Created SIMILAR relationships by phoneNumber (Levenshtein).")

In [13]:
def create_similarity_by_address(driver, jaro_threshold=0.1):
    """ Compare addresses using Jaro-Winkler distance < threshold => similarity > (1-threshold). """
    query = f"""
    CALL apoc.periodic.iterate(
      "MATCH (c:Candidate {{batchId:'{BATCH_ID}'}}) WHERE c.normalizedAddress IS NOT NULL RETURN c",
      "MATCH (c2:Candidate {{batchId:'{BATCH_ID}'}}) 
       WHERE c2.normalizedAddress IS NOT NULL AND id(c) < id(c2)
       WITH c, c2, apoc.text.jaroWinklerDistance(c.normalizedAddress, c2.normalizedAddress) AS dist
       WHERE dist < {jaro_threshold}
       CREATE (c)-[:SIMILAR {{
         comparedProperty: 'address',
         similarity: (1.0 - dist)
       }}]->(c2)",
      {{batchSize:200, parallel:false}}
    )
    """
    with driver.session(database=DB_NAME) as session:
        session.run(query)
    print("Created SIMILAR relationships by address (Jaro-Winkler).")

In [14]:
def store_weights_in_neo4j(driver, weights_map):
    """
    Stash normalized weights in a single :WeightConfig node {name:'default'} for easy reference.
    Example usage:
      raw_weights = {'fullName': 1.0, 'email': 1.5, 'phoneNumber': 1.2, 'address': 0.8}
      store_weights_in_neo4j(driver, raw_weights)
    """
    total = sum(weights_map.values())
    normalized = {k: (v / total) for k, v in weights_map.items()}

    with driver.session(database=DB_NAME) as session:
        # Ensure the WeightConfig node exists
        session.run("MERGE (w:WeightConfig {name:'default'})")
        
        # Use APOC to set dynamic property names
        for prop, val in normalized.items():
            session.run("""
                MATCH (w:WeightConfig {name:'default'})
                CALL apoc.create.setProperty(w, $prop, $val) YIELD node
                RETURN node
            """, parameters={'prop': prop, 'val': val})
    
    print("Stored normalized weights in Neo4j WeightConfig node.")
    return normalized


def aggregate_similarity_relationships(driver):
    """
    Combine multiple :SIMILAR edges between the same pair into an :AGGREGATED_SIMILAR with a weightedSum, avgSimilarity, etc.
    """
    # Remove old aggregated relationships
    cleanup_query = "MATCH ()-[r:AGGREGATED_SIMILAR]->() DELETE r"
    
    # Weighted Summation logic with correct weighted average calculation
    aggregation_query = """
    MATCH (wc:WeightConfig {name:'default'})
    WITH wc
    MATCH (c1:Candidate)-[r:SIMILAR]->(c2:Candidate)
    WHERE id(c1) < id(c2)
    WITH wc, c1, c2, collect(r) AS edges
    UNWIND edges AS e
    WITH wc, c1, c2, e,
         CASE e.comparedProperty
           WHEN 'fullName' THEN wc.fullName
           WHEN 'email' THEN wc.email
           WHEN 'phoneNumber' THEN wc.phoneNumber
           WHEN 'address' THEN wc.address
           ELSE 0.0
         END AS propWeight,
         e.similarity AS simVal
    WITH c1, c2,
         collect(simVal * propWeight) AS weightedSims,
         collect(propWeight) AS weights,
         collect(e.comparedProperty) AS propsUsed
    WITH c1, c2,
         reduce(total=0.0, val IN weightedSims | total + val) AS weightedSum,
         reduce(total=0.0, w IN weights | total + w) AS totalWeight,
         size(weightedSims) AS edgeCount,
         propsUsed
    CREATE (c1)-[:AGGREGATED_SIMILAR {
      weightedSum: weightedSum,
      avgSimilarity: CASE WHEN totalWeight > 0 THEN (weightedSum / totalWeight) ELSE 0 END,
      propertyCount: edgeCount,
      propertiesMatched: propsUsed
    }]->(c2)
    """
    
    with driver.session(database=DB_NAME) as session:
        session.run(cleanup_query)
        session.run(aggregation_query)
    print("AGGREGATED_SIMILAR relationships created.")


In [19]:
def project_and_cluster(driver, graph_name="candidateSimilarityGraph"):
    """
    Project the graph into GDS using :AGGREGATED_SIMILAR edges, then run Leiden for community detection.
    Write cluster info to each node as 'entityId'.
    """
    with driver.session(database=DB_NAME) as session:
        # Drop existing projection if any
        try:
            session.run(f"CALL gds.graph.drop('{graph_name}', false)")
        except:
            pass
        
        create_graph_query = f"""
        CALL gds.graph.project(
          '{graph_name}',
          'Candidate',
          {{
            AGGREGATED_SIMILAR: {{
              type: 'AGGREGATED_SIMILAR',
              orientation: 'UNDIRECTED',
              properties: {{
                weightedSum: {{
                  defaultValue: 0.0
                }}
              }}
            }}
          }}
        )
        """
        session.run(create_graph_query)
        print(f"Projected graph '{graph_name}' into GDS.")
        
        leiden_query = f"""
        CALL gds.leiden.write('{graph_name}', {{
          writeProperty: 'entityId',
          gamma: 100,
          maxLevels: 5,
          randomSeed: 42
        }})
        YIELD communityCount, didConverge
        """
        result = session.run(leiden_query).data()
        print("Leiden clustering result:", result)
        
        # Print distribution
        dist_query = """
        MATCH (c:Candidate)
        WHERE c.entityId IS NOT NULL
        RETURN c.entityId AS entityId, count(*) AS clusterSize
        ORDER BY clusterSize DESC
        """
        dist_result = session.run(dist_query).data()
        print("Cluster distribution:", dist_result)
        
        # Optionally drop the in-memory graph
        session.run(f"CALL gds.graph.drop('{graph_name}')")
        print(f"Clustering done, 'entityId' assigned to each candidate node.")

In [16]:

def merge_high_confidence(driver, threshold=2.5):
    """
    Merge nodes if weightedSum >= threshold. 
    This is destructive, so use caution. 
    """
    query = f"""
    CALL apoc.periodic.iterate(
      "MATCH (c1:Candidate)-[r:AGGREGATED_SIMILAR]->(c2:Candidate)
       WHERE r.weightedSum >= {threshold}
       RETURN c1, c2",
      "CALL apoc.refactor.mergeNodes([c1, c2], {{
         properties:'combine',
         mergeRels:true
      }}) YIELD node RETURN node",
      {{batchSize:100, parallel:false}}
    )
    """
    with driver.session(database=DB_NAME) as session:
        session.run(query)
    print(f"Nodes merged where weightedSum >= {threshold}.")

In [17]:
def link_high_confidence(driver, threshold=2.5):
    """
    Non-destructive approach: create a :SAME_AS relationship between duplicates.
    """
    query = f"""
    MATCH (c1:Candidate)-[r:AGGREGATED_SIMILAR]->(c2:Candidate)
    WHERE r.weightedSum >= {threshold}
    MERGE (c1)-[:SAME_AS {{confidence:r.weightedSum}}]->(c2)
    """
    with driver.session(database=DB_NAME) as session:
        session.run(query)
    print(f"Linked nodes with :SAME_AS where weightedSum >= {threshold}.")

In [22]:

def run_entity_resolution():
    driver = GraphDatabase.driver(URI, auth=(USER, PASSWORD))
    print("Connected to Neo4j for Entity Resolution pipeline.")
    
    # 1. Create indexes
    create_candidate_indexes(driver)
    
    # 2. (Optional) Create blocking keys
    create_soundex_blocking(driver)
    
    # 3. Generate :SIMILAR relationships
    create_similarity_by_name(driver, jaro_threshold=0.05)
    create_similarity_by_email(driver, similarity_threshold=0.9)
    create_similarity_by_phone(driver, similarity_threshold=0.9)
    create_similarity_by_address(driver, jaro_threshold=0.1)
    
    # Wait for relationship creation to finish
    time.sleep(2)
    
    # 4. Store & use weights to create :AGGREGATED_SIMILAR
    raw_weights = {
      "fullName": 1.0,
      "email": 1.5,
      "phoneNumber": 1.2,
      "address": 0.8
    }
    store_weights_in_neo4j(driver, raw_weights)
    aggregate_similarity_relationships(driver)
    
    # 5. Project into GDS & cluster
    project_and_cluster(driver, "candidateSimilarityGraph")
    
    # 6. Merge or link duplicates with high confidence
    # Adjust threshold if your total possible weightedSum is 4.5, for instance
    # merge_high_confidence(driver, threshold=2.5)
    # link_high_confidence(driver, threshold=2.5)  # or do merges if desired
    
    driver.close()
    print("Entity Resolution pipeline complete.")


start_time = time.time()
run_entity_resolution()
end_time = time.time()
elapsed_time = end_time - start_time  # seconds
elapsed_minutes = elapsed_time / 60   # convert seconds to minutes

print(f"Time elapsed: {elapsed_minutes:.2f} minutes")

Connected to Neo4j for Entity Resolution pipeline.
Candidate indexes on normalized fields created.
Soundex blocking applied.
Created SIMILAR relationships by fullName (Jaro-Winkler).
Created SIMILAR relationships by email (Levenshtein).
Created SIMILAR relationships by phoneNumber (Levenshtein).
Created SIMILAR relationships by address (Jaro-Winkler).




Stored normalized weights in Neo4j WeightConfig node.
AGGREGATED_SIMILAR relationships created.




Projected graph 'candidateSimilarityGraph' into GDS.


ClientError: {code: Neo.ClientError.Procedure.ProcedureCallFailed} {message: Failed to invoke procedure `gds.leiden.write`: Caused by: java.lang.NullPointerException: Cannot invoke "org.neo4j.gds.leiden.LeidenDendrogramManager.getCurrent()" because the return value of "org.neo4j.gds.leiden.LeidenResult.dendrogramManager()" is null}

## 5. Resolution

The bloks of code below: 
- Creates MasterEntity nodes and link each Candidate to its corresponding MasterEntity node using the 'entityId' property.
- Links each Candidate node to the corresponding MasterEntity node in batches.
- Sets cononical values for each MasterEntity node based on the most common values in the cluster.

In [None]:

def create_master_nodes_and_links():
    """
    1) Creates one distinct MasterEntity node per unique entityId value from the Leiden output
       (or whichever community detection algorithm you used to assign 'entityId').
    2) Links each Candidate node to the corresponding MasterEntity node in batches.

    """
    driver = GraphDatabase.driver(URI, auth=(USER, PASSWORD))


    with driver.session(database=DB_NAME) as session:
        # STEP 1: Create distinct MasterEntity nodes
        # For each unique entityId, we make a single MasterEntity node
        create_master_nodes_query = """
        CALL apoc.periodic.iterate(
          "MATCH (c:Candidate) 
           WHERE c.entityId IS NOT NULL
           RETURN DISTINCT c.entityId AS communityId",
          "MERGE (m:MasterEntity {communityId: communityId})",
          {batchSize: 1000, parallel: false}
        )
        """
        session.run(create_master_nodes_query)
        print("Distinct MasterEntity nodes created for each communityId.")

        # STEP 2: Link each Candidate to its MasterEntity node
        # For each candidate, MERGE a BELONGS_TO relationship to the matching MasterEntity node
        link_candidates_query = """
        CALL apoc.periodic.iterate(
          "MATCH (c:Candidate) 
           WHERE c.entityId IS NOT NULL
           RETURN c",
          "MATCH (m:MasterEntity {communityId: c.entityId})
           MERGE (c)-[:BELONGS_TO]->(m)",
          {batchSize: 1000, parallel: false}
        )
        """
        session.run(link_candidates_query)
        print("Linked each Candidate node to its corresponding MasterEntity node.")

create_master_nodes_and_links()


In [None]:
def set_canonical():
    """
    For each MasterEntity node, aggregates the properties of related Candidate nodes (where Candidate.entityId = MasterEntity.communityId),
    computes frequency maps for each property using apoc.coll.frequenciesAsMap, and updates the MasterEntity node with canonical values.
    The canonical value is taken as the first key from each frequency map.
    """
    driver = GraphDatabase.driver(URI, auth=(USER, PASSWORD))

    with driver.session(database=DB_NAME) as session:
        query = """
        CALL apoc.periodic.iterate(
          'MATCH (m:MasterEntity) RETURN m',
          'MATCH (c:Candidate {entityId: m.communityId})
           WITH m, 
                collect(c.fullName) AS allNames, 
                collect(c.email) AS allEmails, 
                collect(c.phoneNumber) AS allPhones, 
                collect(c.address) AS allAddresses
           WITH m,
                apoc.coll.frequenciesAsMap(allNames) AS nameFreq,
                apoc.coll.frequenciesAsMap(allEmails) AS emailFreq,
                apoc.coll.frequenciesAsMap(allPhones) AS phoneFreq,
                apoc.coll.frequenciesAsMap(allAddresses) AS addrFreq
           WITH m,
                keys(nameFreq)[0] AS bestName,
                keys(emailFreq)[0] AS bestEmail,
                keys(phoneFreq)[0] AS bestPhone,
                keys(addrFreq)[0] AS bestAddress
           SET m.fullNameCanonical = bestName,
               m.emailCanonical = bestEmail,
               m.phoneNumberCanonical = bestPhone,
               m.addressCanonical = bestAddress',
          {batchSize:50, parallel:false}
        )
        """
        session.run(query)
        print("Canonical values updated for MasterEntity nodes.")

set_canonical()

If you do run all, the block of code below will print out the run time for the entire process!!

In [None]:

program_end_time = time.time()
program_elapsed_time = program_end_time - program_start_time  # seconds
program_elapsed_minutes = program_elapsed_time / 60   # convert seconds to minutes

print(f"Time elapsed: {program_elapsed_minutes:.2f} minutes")