# Senzing + Neo4j: Build a knowledge graph

## Set up the Python environment

First, we need to import the Python library dependencies which are required for the code we'll be running.

In [1]:
from dataclasses import dataclass, field
import json
import os
import pathlib
import sys
import typing

from graphdatascience import GraphDataScience
from icecream import ic
from tqdm import tqdm
import dotenv
import neo4j
import pandas as pd
import watermark

%load_ext watermark

Show a "watermark" of which versions are being used for system componenents and library dependencies. This may help in case you need to troubleshoot the dependencies on your system, e.g., if there's some conflict during installation.

In [2]:
%watermark
%watermark --iversions

Last updated: 2024-04-05T11:56:10.529028-07:00

Python implementation: CPython
Python version       : 3.11.0
IPython version      : 8.22.2

Compiler    : Clang 13.0.0 (clang-1300.0.29.30)
OS          : Darwin
Release     : 21.6.0
Machine     : x86_64
Processor   : i386
CPU cores   : 8
Architecture: 64bit

sys      : 3.11.0 (v3.11.0:deaf509e8f, Oct 24 2022, 14:43:23) [Clang 13.0.0 (clang-1300.0.29.30)]
watermark: 2.4.3
pandas   : 2.2.1
json     : 2.0.9
neo4j    : 5.18.0



## Parse the results from Senzing

Let's define a `dataclass` to represent the parsed results from Senzing entity resolution.

In [3]:
@dataclass(order=False, frozen=False)
class Entity:  # pylint: disable=R0902
    """
A data class representing a resolved entity.
    """
    entity_id: id
    num_recs: int
    records: typing.Set[ str ] = field(default_factory = lambda: set([]))
    related: typing.Dict[ int, dict ] = field(default_factory = lambda: {})
    has_ref: bool = False

Parse the JSON data from the export, to build a dictionary of entities indexed by their unique identifiers. Also keep track of both the "resolved" and "related" records for each entity, to use for constructing the knowledge graph from these results.

In [4]:
export_path: pathlib.Path = pathlib.Path("../export.json")
entities: dict = {}

with export_path.open() as fp:
    for line in tqdm(fp.readlines(), desc = "read JSON"):
        entity_dat: dict = json.loads(line)
        entity_id: int = entity_dat["RESOLVED_ENTITY"]["ENTITY_ID"]

        records: set = set([
            ".".join([ r["DATA_SOURCE"].upper(), str(r["RECORD_ID"]) ])
            for r in entity_dat["RESOLVED_ENTITY"]["RECORDS"]
        ])

        entities[entity_id] = Entity(
            entity_id = entity_id,
            records = records,
            num_recs = len(records),
            related = {
                r["ENTITY_ID"]: r
                for r in entity_dat["RELATED_ENTITIES"]
            },
        )

read JSON: 100%|████████████████████████████████████████████████████████████████████████████████████| 99156/99156 [00:07<00:00, 13931.59it/s]


To finish preparing the input data for resolved entities, let's make a quick traversal of the record linkage and set a flag for "interesting" entities which will have relations in the graph to visualize.

In [5]:
for entity in entities.values():
    if entity.num_recs > 0:
        entity.has_ref = True

    for rel_ent_id in entity.related:
        entities[rel_ent_id].has_ref = True

Let's examine one of the resolved entity objects, to see which fields are available

In [6]:
entity_dat

{'RESOLVED_ENTITY': {'ENTITY_ID': 438737,
  'RECORDS': [{'DATA_SOURCE': 'SAFEGRAPH',
    'RECORD_ID': 'zzw-222@5yv-c8t-t7q',
    'ENTITY_TYPE': 'GENERIC',
    'INTERNAL_ID': 438737,
    'ENTITY_KEY': '7A2952039A2EDAE86C89FF025284618BB47F5B0E',
    'ENTITY_DESC': 'Royalty Renee Salez',
    'MATCH_KEY': '',
    'MATCH_LEVEL': 0,
    'MATCH_LEVEL_CODE': '',
    'ERRULE_CODE': '',
    'LAST_SEEN_DT': '2024-03-12 18:54:15.638'}]},
 'RELATED_ENTITIES': []}

In [7]:
ic(list(entities.values())[-1]);

ic| list(entities.values())[-1]: Entity(entity_id=438737,
                                        num_recs=1,
                                        records={'SAFEGRAPH.zzw-222@5yv-c8t-t7q'},
                                        related={},
                                        has_ref=True)


## Connect the GDS library to Neo4j Desktop

Set up a GDS connection using our credentials for Neo4j Desktop

In [8]:
dotenv.load_dotenv(dotenv.find_dotenv())

bolt_uri: str = os.environ.get("NEO4J_BOLT")
database: str = os.environ.get("NEO4J_DBMS")
username: str = os.environ.get("NEO4J_USER")
password: str = os.environ.get("NEO4J_PASS")

gds:GraphDataScience = GraphDataScience(
    bolt_uri,
    auth = ( username, password, ),
    database = database,
    aura_ds = False,
)

## Build the KG in Neo4j

### Populate nodes from the Senzing entities

In [9]:
gds.run_cypher("""
DROP CONSTRAINT `entity_node_key` IF EXISTS
""")

gds.run_cypher("""
CREATE CONSTRAINT `entity_node_key` IF NOT EXISTS
  FOR (ent:Entity)
  REQUIRE ent.uid IS NODE KEY
""")

In [10]:
df_ent: pd.DataFrame = pd.DataFrame([
    {
        "uid": entity.entity_id,
        "has_ref": entity.has_ref,
    }
    for entity in entities.values()
])

unwind_query: str = """
UNWIND $rows AS row
CALL {
  WITH row
  MERGE (ent:Entity {uid: row.uid, has_ref: row.has_ref})
} IN TRANSACTIONS OF 10000 ROWS
    """

gds.run_cypher(
    unwind_query,
    {"rows": df_ent.to_dict(orient = "records")},
)

### Connect the resolved records and related entities

In [11]:
df_rec: pd.DataFrame = pd.DataFrame([
    {
        "entity_uid": entity.entity_id,
        "record_uid": record_uid,
    }
    for entity in entities.values()
    for record_uid in entity.records
])

unwind_query: str = """
UNWIND $rows AS row
CALL {
  WITH row
  MATCH
    (ent:Entity {uid: row.entity_uid}),
    (rec:Record {uid: row.record_uid})       
  MERGE (ent)-[:RESOLVES]->(rec)
} IN TRANSACTIONS OF 10000 ROWS
    """

gds.run_cypher(
    unwind_query,
    {"rows": df_rec.to_dict(orient = "records")},
)

In [12]:
df_rel: pd.DataFrame = pd.DataFrame([
    {
        "entity_uid": entity.entity_id,
        "rel_ent": rel_ent["ENTITY_ID"],
        "ambiguous": (rel_ent["IS_AMBIGUOUS"] == 0),
        "disclosed": (rel_ent["IS_DISCLOSED"] == 0),
        "match_key": rel_ent["MATCH_KEY"],
        "match_level": rel_ent["MATCH_LEVEL"],
        "match_level_code": rel_ent["MATCH_LEVEL_CODE"],
    }
    for entity in entities.values()
    for rel_key, rel_ent in entity.related.items()
])

unwind_query: str = """
UNWIND $rows AS row
CALL {
  WITH row
  MATCH
    (ent:Entity {uid: row.entity_uid}),
    (rel_ent:Entity {uid: row.rel_ent})       
  MERGE (ent)-[:RELATED {ambiguous: row.ambiguous, disclosed: row.disclosed, match_key: row.match_key, match_level: row.match_level, match_level_code: row.match_level_code}]->(rel_ent)
} IN TRANSACTIONS OF 10000 ROWS
    """

gds.run_cypher(
    unwind_query,
    {"rows": df_rel.to_dict(orient = "records")},
)

In [13]:
gds.run_cypher("""
MATCH (ent:Entity)
RETURN COUNT(ent.uid)
""")

Unnamed: 0,COUNT(ent.uid)
0,99156
