# Senzing and Neo4j Integration Example

## Set up the Python environment

First, we need to import the Python library dependencies which are required for the code we'll be running.

In [None]:
from dataclasses import dataclass, field
import csv
import json
import os
import pathlib
import sys
import typing

from icecream import ic
from tqdm import tqdm
import dotenv
import matplotlib.pyplot as plt
import neo4j
import pandas as pd
import seaborn as sns
import watermark

%load_ext watermark

Show a "watermark" of which versions are being used for system componenents and library dependencies. This may help in case you need to troubleshoot the dependencies on your system, e.g., if there's some conflict during installation.

In [None]:
%watermark
%watermark --iversions

## Examine the input datasets

We will use three datasets which describe businesses (names, addresses, etc.) within the Las Vegas metropolitan area:

  - SafeGraph `Places of Interest` (POI)
  - US Dept of Labor `Wage and Hour Compliance Action Data` (WHISARD)
  - US Small Business Administration `PPP Loans over $150K` (PPP)

Two of these datasets are public, and one is available commercially.
You can obtain each dataset from the links given below.

Since we only need a subset of each dataset, first let's define a utility function to remove unneeded columns from a Pandas `DataFrame` object.

In [None]:
def sample_df (
    df: pd.DataFrame,
    cols_keep: typing.Set[ typing.Any ],
    ) -> pd.DataFrame:
    """
Remove all but the specified columns from the given Pandas dataframe.
https://stackoverflow.com/a/51285940/1698443
    """
    diff: typing.Set[ typing.Any ] = set(df.columns) - cols_keep
    
    return df.drop(
        diff,
        axis = 1,
        inplace = False,
    )

### Load the SafeGraph Places dataset

Load the `Places of Interest` (POI) dataset for Las Vegas, from SafeGraph: <https://www.safegraph.com/products/places>

In [None]:
poi_path: pathlib.Path = pathlib.Path("lv_data") / "poi.json"

df_poi: pd.DataFrame = pd.DataFrame.from_dict(
    [ json.loads(line) for line in poi_path.open(encoding = "utf-8") ],
)

In [None]:
df = sample_df(
    df_poi,
    set([
        "RECORD_TYPE",
        "DATA_SOURCE",
        "RECORD_ID",
        "LOCATION_NAME_ORG",
        "TOP_CATEGORY",
        "SUB_CATEGORY",
        "NAICS_CODE",
        "BUSINESS_GEO_LATITUDE",
        "BUSINESS_GEO_LONGITUDE",
        "PHONE_NUMBER",
        "BUSINESS_ADDR_FULL",
    ]),
)

In [None]:
df.head()

In [None]:
df.describe().loc[[ "count", "freq", "unique", ]]

### Load the DoL WHISARD dataset

Load the `Wage and Hour Compliance Action Data` (WHISARD) dataset for Las Vegas, from the US Department of Labor: <https://enforcedata.dol.gov/views/data_summary.php>

In [None]:
dol_path: pathlib.Path = pathlib.Path("lv_data") / "dol.csv"

df_dol: pd.DataFrame = pd.read_csv(
    dol_path,
    dtype = str,
    encoding = "utf-8",
)

In [None]:
df = sample_df(
    df_dol,
    set([
        "RECORD_TYPE",
        "DATA_SOURCE",
        "RECORD_ID",
        "BUSINESS_NAME_ORG",
        "LEGAL_NAME_ORG",
        "BUSINESS_ADDR_LINE1",
        "BUSINESS_ADDR_CITY",
        "BUSINESS_ADDR_STATE",
        "BUSINESS_ADDR_POSTAL_CODE",
    ]),
)

In [None]:
df.head()

In [None]:
df.describe().loc[[ "count", "freq", "unique", ]]

### Load the PPP Loans dataset

Load the `PPP Loans over $150K` (PPP) dataset for Las Vegas, from the US Small Business Administration: <https://data.sba.gov/dataset/ppp-foia>

In [None]:
ppp_path: pathlib.Path = pathlib.Path("lv_data") / "ppp.csv"

df_ppp: pd.DataFrame = pd.read_csv(
    ppp_path,
    dtype = str,
    encoding = "utf-8",
)

In [None]:
df_ppp.head()

In [None]:
df_ppp.describe().loc[[ "count", "freq", "unique", ]]

## Run the Entity Resolution in Senzing

Launch a Linux server running Ubuntu 20.04 LTS server with 4 vCPU, 16 GB memory.

See: <https://senzing.zendesk.com/hc/en-us/articles/115002408867-Quickstart-Guide>

```bash
wget https://senzing-production-apt.s3.amazonaws.com/senzingrepo_1.0.1-1_amd64.deb
sudo apt install ./senzingrepo_1.0.1-1_amd64.deb
sudo apt update
sudo apt upgrade
```

Depending on the Linux distribution, this may require installing `libssl1.1` as well, which is described in
<https://stackoverflow.com/questions/73251468/e-package-libssl1-1-has-no-installation-candidate>:

```bash
wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
sudo dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb
```

Then install Senzing, which will be located in the `/opt/senzing/data/current` directory:

```bash
sudo apt install senzingapi
```

Now create a project `~/senzing` in the current user's home directory and set up its configuration:

```bash
python3 /opt/senzing/g2/python/G2CreateProject.py ~/senzing
cd ~/senzing
source setupEnv
./python/G2SetupConfig.py
```

Prepare to load our three datasets into Senzing as data sources:

```bash
./python/G2ConfigTool.py
	+ addDataSource SAFEGRAPH
	+ addDataSource DOL_WHISARD
	+ addDataSource PPP_LOANS
	+ save
	+ y
	+ quit
```

We'll specify using up to 16 threads, to parallelize the input process:

```bash
./python/G2Loader.py -f lv_data/poi.json -nt 16
./python/G2Loader.py -f lv_data/dol.csv -nt 16
./python/G2Loader.py -f lv_data/ppp.csv -nt 16
```

Finally, export the resolved entities as the `export.json` local file:

```bash
./python/G2Export.py -F JSON -o export.json
```

## Parse the results from Senzing

Let's define a `dataclass` to represent the parsed results from Senzing entity resolution.

In [None]:
@dataclass(order=False, frozen=False)
class Entity:  # pylint: disable=R0902
    """
A data class representing a resolved entity.
    """
    entity_id: id
    num_recs: int
    records: typing.Set[ str ] = field(default_factory = lambda: set([]))
    related: typing.Dict[ int, dict ] = field(default_factory = lambda: {})
    has_ref: bool = False

Parse the JSON data from the export, to build a dictionary of entities indexed by their unique identifiers. Also keep track of both the "resolved" and "related" records for each entity, to use for constructing the knowledge graph from these results.

In [None]:
export_path: pathlib.Path = pathlib.Path("export.json")
entities: dict = {}

with export_path.open() as fp:
    for line in tqdm(fp.readlines(), desc = "read JSON"):
        entity_dat: dict = json.loads(line)
        entity_id: int = entity_dat["RESOLVED_ENTITY"]["ENTITY_ID"]

        records: set = set([
            ".".join([ r["DATA_SOURCE"], r["RECORD_ID"] ]).upper()
            for r in entity_dat["RESOLVED_ENTITY"]["RECORDS"]
        ])

        entities[entity_id] = Entity(
            entity_id = entity_id,
            records = records,
            num_recs = len(records),
            related = {
                r["ENTITY_ID"]: r
                for r in entity_dat["RELATED_ENTITIES"]
            },
        )

Let's examine one of the resolved entity objects, to see which fields are available

In [None]:
entity_dat

Now let's analyze the Senzing results, measuring how much the process of _entity resolution_ has consolidated records among the input datasets.

In [None]:
for entity in entities.values():
    if entity.num_recs > 1:
        entity.has_ref = True

        for inf_ent in entity.related.keys():
            entities[inf_ent].has_ref = True

edge_counts: typing.List[ int ] = [
    e.num_recs
    for e in entities.values()
    #if e.num_recs > 1
]

has_ref_ents: int = len([
    e for e in entities.values()
    if e.has_ref
])

In particular, it's helpful for planning about our eventual knowledge graph to understand the:

  - total number of entities
  - number of entities which have references (i.e., these will be linked within the knowledge graph)

In [None]:
ic(len(entities))
ic(has_ref_ents);

Now visualize this as a histogram of the resolved entities versus their related records in the input datasets.

In [None]:
fig, ax = plt.subplots()
plt.rcParams["font.family"] = "sans-serif"

y = sns.countplot(x = edge_counts, color = "lightblue")

y.tick_params(axis = "y", size = 9, colors = "gray")
y.bar_label(y.containers[0], padding = 3, color = "black", fontsize = 11)

plt.xlabel("records per entity", size = 10, fontstyle = "italic")
plt.ylabel("num entities", size = 10, fontstyle = "italic")

sns.despine(bottom = True, left = True)
plt.yscale("log")

From this analysis, more than 14K entities were linked to records through _entity resolution_.
These can be used to construct _nodes_, _properties_, and _relations_ in a knowledge graph.

In [None]:
num_rel_2: int = 3437
has_ref_ents - num_rel_2

Of the linked entities, more than 10K have three or more records linked.
This is interesting since we're trying to link records across three datasets.
We'll get more specific stats later through Cypher graph queries in Neo4j.

## Build a Knowledge Graph in Neo4j

Set up a Bolt driver using our credentials for Neo4j Desktop

In [None]:
dotenv.load_dotenv(dotenv.find_dotenv())

bolt_uri: str = os.environ.get("NEO4J_BOLT")
username: str = os.environ.get("NEO4J_USER")
password: str = os.environ.get("NEO4J_PASS")

driver: neo4j.BoltDriver = neo4j.GraphDatabase.driver(
    bolt_uri,
    auth = ( username, password, ),
)

Delete the previous graph data...

In [None]:
with driver.session() as session:
    #session.run("MATCH (x) DETACH DELETE x")
    #session.run("DROP CONSTRAINT unique_record")
    #session.run("DROP CONSTRAINT unique_entity")
    pass

In [None]:
with driver.session() as session:
    query: str = """
CREATE CONSTRAINT unique_record 
    IF NOT EXISTS FOR (rec:Record) 
    REQUIRE rec.uid IS UNIQUE
    """
    
    session.run(query)

    query = """
CREATE CONSTRAINT unique_entity 
    IF NOT EXISTS FOR (ent:Entity) 
    REQUIRE ent.uid IS UNIQUE
    """
    
    session.run(query)

### Populate nodes from the dataset records

Define utility functions used for loading the graph data.

In [None]:
def get_property_keys (
    df: pd.DataFrame,
    ) -> typing.List[ str ]:
    """
Convert the column names from the given Pandas dataframe into Cypher property names.
    """
    return [
        name.lower().replace(" ", "_")
        for name in df.columns.values.tolist()
    ]


def safe_value (
    obj: typing.Any,
    ) -> typing.Any:
    """
Escape double quotes within string values.
    """
    if isinstance(obj, str):
        return obj.replace('"', "'")

    return obj

In [None]:
def format_merge_record (
    keys: typing.List[ str ],
    vals: list,
    ) -> str:
    """
Format one MERGE statement in Cypher for the values of a given Record.
    """
    safe_vals = [ safe_value(v) for v in vals ]
    row_dict: dict = dict(zip(keys, safe_vals))

    uid: str = row_dict["data_source"].upper() + "." + row_dict["record_id"]
    
    props: str = ", ".join([
        f"rec.{key} = \"{val}\""
        for key, val in row_dict.items()
    ])

    return f"""
MERGE (rec:Record {{ uid: \"{uid}\" }})
  ON CREATE
    SET {props}      
RETURN rec.data_source, rec.record_id
    """


def load_records (
    session: neo4j.Session,
    df: pd.DataFrame,
    ) -> None:
    """
Iterate over each Record from one dataset to load using Cypher.
    """
    keys: typing.List[ str ] = get_property_keys(df)

    for _, row in tqdm(df.iterrows(), desc = "merge nodes for records"):
        query: str = format_merge_record(keys, row.tolist())
        session.run(query)

In [None]:
#df = df_poi.head(5)

In [None]:
with driver.session() as session:
    load_records(session, df_poi)

In [None]:
with driver.session() as session:
    load_records(session, df_dol)

In [None]:
with driver.session() as session:
    load_records(session, df_ppp)

In [None]:
with driver.session() as session:
    query: str = """
MATCH (rec:Record)
RETURN rec
LIMIT 10
    """
    
    for record in session.run(query):
        ic(record)

### Populate nodes from the Senzing entities

In [None]:
query = """
MERGE (ent:Entity {uid: $params.uid, has_ref: $params.has_ref})
"""

with driver.session() as session:
    for entity in tqdm(entities.values(), desc = "merge nodes for entities"):
        params = {
            "uid": entity.entity_id,
            "has_ref": entity.has_ref,
        }

        session.run(
            query,
            params = params,
        )

In [None]:
entity = list(entities.values())[0]
ic(entity);

### Connect the resolved records and related entities

In [None]:
query = """
MATCH
    (ent:Entity {uid: $params.entity_uid}),
    (rec:Record {uid: $params.record_uid})       
MERGE (ent)-[:RESOLVES]->(rec)
"""

with driver.session() as session:
    for entity in tqdm(entities.values(), desc = "merge entity->record"):
        for record_uid in entity.records:
            params = {
                "entity_uid": entity.entity_id,
                "record_uid": record_uid,
            }

            session.run(
                query,
                params = params,
            )

In [None]:
query = """
MATCH
    (ent:Entity {uid: $params.entity_uid}),
    (rel_ent:Entity {uid: $params.rel_ent})       
MERGE (ent)-[:RELATED {ambiguous: $params.ambiguous, disclosed: $params.disclosed, match_key: $params.match_key, match_level: $params.match_level, match_level_code: $params.match_level_code}]->(rel_ent)
"""

with driver.session() as session:
    for entity in tqdm(entities.values(), desc = "merge entity->related"):
        for rel_key, rel_ent in entity.related.items():
            params = {
                "entity_uid": entity.entity_id,
                "rel_ent": rel_ent["ENTITY_ID"],
                "ambiguous": (rel_ent["IS_AMBIGUOUS"] == 0),
                "disclosed": (rel_ent["IS_DISCLOSED"] == 0),
                "match_key": rel_ent["MATCH_KEY"],
                "match_level": rel_ent["MATCH_LEVEL"],
                "match_level_code": rel_ent["MATCH_LEVEL_CODE"],
            }

            session.run(
                query,
                params = params,
            )

In [None]:
with driver.session() as session:
    query = """
MATCH (ent:Entity)
RETURN
    ent.uid, COUNT { (ent)-[:RESOLVES]->(:Record) } AS num_recs
ORDER BY num_recs DESC
LIMIT 20
    """
    
    for ent in session.run(query):
        ic(ent)