# Load data records into KùzuDB

## set up environment

Load the library dependencies:

In [1]:
import json
import pathlib
import sys
import traceback
import typing

from icecream import ic
import kuzu
import pandas as pd
import watermark

Show a watermark of the OS, hardware, language environment, and dependent library versions.

In [2]:
%load_ext watermark
%watermark
%watermark --iversions

Last updated: 2025-03-20T14:14:23.929043-07:00

Python implementation: CPython
Python version       : 3.13.2
IPython version      : 8.32.0

Compiler    : Clang 16.0.0 (clang-1600.0.26.6)
OS          : Darwin
Release     : 23.6.0
Machine     : arm64
Processor   : arm
CPU cores   : 14
Architecture: 64bit

json     : 2.0.9
kuzu     : 0.8.2
pandas   : 2.2.3
watermark: 2.5.0
sys      : 3.13.2 (main, Feb  4 2025, 14:51:09) [Clang 16.0.0 (clang-1600.0.26.6)]



## helper function definitions

Helper method to extract names from the Senzing-formatted datasets.

In [3]:
def extract_name (
    dat: dict,
    ) -> str:
    """Extract names from the input data records"""
    try:
        name: typing.Optional[ str ] = None

        if "PRIMARY_NAME_FULL" in dat:
            name = dat["PRIMARY_NAME_FULL"]
        else:
            for rec in dat["NAMES"]:
                if "NAME_TYPE" in rec and rec["NAME_TYPE"] == "PRIMARY":
                    if "NAME_FULL" in rec:
                        name = rec["NAME_FULL"]
                        break
                    if "NAME_ORG" in rec:
                        name = rec["NAME_ORG"]
                        break
                elif "PRIMARY_NAME_ORG" in rec:
                    name = rec["PRIMARY_NAME_ORG"]
                    break

        if name is not None:
            if name == "-" or len(name) < 1:
                name = None
        else:
            print("extract_name DQ:", dat)
            sys.exit(0)

        return name

    except Exception as ex:
        ic(ex)
        traceback.print_exc()

        ic(dat)
        sys.exit(0)

Helper method to extract addresses from the Senzing-formatted datasets.

In [4]:
def extract_addr (
    dat: dict,
    ) -> typing.Optional[ str ]:
    """Extract addresses from the input data records"""
    try:
        addr: typing.Optional[ str ] = None

        if "ADDRESSES" in dat:
            for rec in dat["ADDRESSES"]:
                if "ADDR_FULL" in rec:
                    addr = rec["ADDR_FULL"]
                    break

        if addr is not None:
            if addr == "-" or len(addr) < 1:
                addr = None

        return addr

    except Exception as ex:
        ic(ex)
        traceback.print_exc()

        ic(dat)
        sys.exit(0)

## load datasets

Load the slice of the OpenSanctions dataset <https://www.opensanctions.org/>

In [5]:
dat_file: pathlib.Path = pathlib.Path("open-sanctions.json")
data_rows: typing.List[ dict ] = []
risk_rows: typing.List[ dict ] = []

with open(dat_file, "rb") as fp:
    for line in fp:
        dat: dict = json.loads(line)
        risks: list = []

        if "RISKS" in dat:
            for risk in dat["RISKS"]:
                risk_rows.append({
                    "id": dat["RECORD_ID"],
                    "topic": risk["TOPIC"],
                })

        data_rows.append({
            "id": dat["RECORD_ID"],
            "kind": dat["RECORD_TYPE"],
            "name": extract_name(dat),
            "addr": extract_addr(dat),
            "url": dat["URL"],
        })

df_os: pd.DataFrame = pd.DataFrame.from_dict(
    data_rows,
    orient = "columns"
).drop_duplicates(subset = ["id"])

df_os.head()

Unnamed: 0,id,kind,name,addr,url
0,NK-25vyVFzt8vdJGgAXMRTwTJ,PERSON,Abassin BADSHAH,"31 Quernmore Close, Bromley, Kent, United King...",https://www.opensanctions.org/entities/NK-25vy...
1,NK-3p3mmVWmjwVtTfKchz4kNE,ORGANIZATION,LMAR (GB) LTD,"31 Quernmore Close, Bromley, Kent, United King...",https://www.opensanctions.org/entities/NK-3p3m...
2,NK-auyPsLrBzRoxjCRWgjBvas,ORGANIZATION,WANDLE HOLDINGS LIMITED,"DEANA BEACH APTS, BLOCK A, Flat 212, Προμαχών...",https://opensanctions.org/entities/NK-auyPsLrB...
3,NK-cf4Q3KcmUnQbt8Cy7iTtwK,ORGANIZATION,POLYUS GOLD INTERNATIONAL LIMITED,"3RD FLOOR CHARTER PLACE 23-27 SEATON PALCE, ST...",https://opensanctions.org/entities/NK-cf4Q3Kcm...
4,NK-dNNN56A4ApVfUFvfzniLCF,PERSON,Firuza Nazimovna Kerimova,"MOSCOW, RUS, 123430",https://www.opensanctions.org/entities/NK-dNNN...


In [6]:
df_os.describe(include = "all").loc[[ "count", "freq", "unique", ]]

Unnamed: 0,id,kind,name,addr,url
count,24,24,24,19,24
freq,1,14,2,4,1
unique,24,2,23,12,24


OpenSanctions provides the "risk" category of data, so handle this specially:

In [7]:
df_risk: pd.DataFrame = pd.DataFrame.from_dict(
    risk_rows,
    orient = "columns"
)

df_risk.head()

Unnamed: 0,id,topic
0,NK-25vyVFzt8vdJGgAXMRTwTJ,corp.disqual
1,NK-auyPsLrBzRoxjCRWgjBvas,sanction.linked
2,NK-cf4Q3KcmUnQbt8Cy7iTtwK,sanction.linked
3,NK-dNNN56A4ApVfUFvfzniLCF,role.rca
4,NK-dNNN56A4ApVfUFvfzniLCF,sanction


In [8]:
df_risk.describe(include = "all").loc[[ "count", "freq", "unique", ]]

Unnamed: 0,id,topic
count,29,29
freq,4,10
unique,19,7


Load the slice of the Open Ownership datasets <https://www.openownership.org/>

In [9]:
dat_file: pathlib.Path = pathlib.Path("open-ownership.json")
data_rows: typing.List[ dict ] = []
role_rows: typing.List[ dict ] = []

with open(dat_file, "rb") as fp:
    for line in fp:
        dat: dict = json.loads(line)
        kv: dict = {
            "id": dat["RECORD_ID"],
            "kind": dat["RECORD_TYPE"],
            "name": extract_name(dat),
            "addr": extract_addr(dat),
        }

        if dat["RECORD_TYPE"] == "PERSON" and len(dat["ATTRIBUTES"]) > 0:
            kv["country"] = dat["ATTRIBUTES"][0]["NATIONALITY"]
        elif dat["RECORD_TYPE"] == "ORGANIZATION" and "REGISTRATION_COUNTRY" in dat:
            kv["country"] = dat["REGISTRATION_COUNTRY"]
            
        data_rows.append(kv)

        if "RELATIONSHIPS" in dat:
            for rel in dat["RELATIONSHIPS"]:
                if "REL_ANCHOR_DOMAIN" not in rel:
                    role_rows.append({
                        "src_id": dat["RECORD_ID"],
                        "dst_id": rel["REL_POINTER_KEY"],
                        "role": rel["REL_POINTER_ROLE"],
                        "date": rel["REL_POINTER_FROM_DATE"],
                    })

df_oa: pd.DataFrame = pd.DataFrame.from_dict(
    data_rows,
    orient = "columns"
).drop_duplicates(subset = ["id"])

df_oa.head()

Unnamed: 0,id,kind,name,addr,country
0,10094521532396971848,ORGANIZATION,GOLD WYNN UK HOLDINGS LIMITED,"C/O Fladgate Llp, 16 Great Queen Street, Londo...",GB
1,10165632722354515453,ORGANIZATION,UPSIDE TECHNOLOGY LIMITED,"Apt 52, 3 Whitehall Court, London, SW1A 2EL",GB
2,10264459789712927869,PERSON,Kenneth Kurt Hansen,"Finderupvej 61, Kastrup, 2770",DK
3,10369029484097831758,PERSON,Daniel Simmons,"17 St Andrews Crescent, Cardiff, South Glamorg...",GB
4,10390699576067371333,PERSON,Wyndham James Alexander Plumptre,"Apt 52, 3, Whitehall Court, London, SW1A 2EL",GB


In [10]:
df_oa.describe(include = "all").loc[[ "count", "freq", "unique", ]]

Unnamed: 0,id,kind,name,addr,country
count,258,258,258,239,247
freq,1,160,4,22,165
unique,258,2,197,116,7


Open Ownership describes _ultimate beneficial ownership_ (UBO) details, which provides the "link" category of data:

In [11]:
ids: set = set(df_oa["id"].tolist())
closed_role_rows: list = []

for row in role_rows:
    if row["src_id"] in ids and row["dst_id"] in ids:
        closed_role_rows.append(row)

df_role: pd.DataFrame = pd.DataFrame.from_dict(
    closed_role_rows,
    orient = "columns"
)

df_role.head()

Unnamed: 0,src_id,dst_id,role,date
0,10094521532396971848,7584591804488095167,shareholding 75% 100%,2020-03-18
1,10094521532396971848,7584591804488095167,voting_rights 75% 100%,2020-03-18
2,10094521532396971848,7584591804488095167,appointment_of_board,2020-03-18
3,10165632722354515453,598161773989218568,shareholding 75% 100%,2019-08-20
4,10165632722354515453,598161773989218568,voting_rights 75% 100%,2019-08-20


In [12]:
df_role.describe(include = "all").loc[[ "count", "freq", "unique", ]]

Unnamed: 0,src_id,dst_id,role,date
count,536,536,536,536
freq,22,27,145,114
unique,113,99,29,77


## load Senzing export

Load the entity resolution results exported from Senzing:

In [20]:
er_export_file: pathlib.Path = pathlib.Path("export.json")
ER_ENTITY_PREFIX: str = "sz_"
ent_rows: typing.List[ dict ] = []
rec_rows: typing.List[ dict ] = []
rel_rows: typing.List[ dict ] = []

with open(er_export_file, "rb") as fp:
    for line in fp:
        dat: dict = json.loads(line)
        ent_id: str = ER_ENTITY_PREFIX + str(dat["RESOLVED_ENTITY"]["ENTITY_ID"]).strip()
        ent_desc: typing.Optional[ str ] = None

        # link to resolved data records
        for dat_rec in dat["RESOLVED_ENTITY"]["RECORDS"]:
            rec_id = dat_rec["RECORD_ID"]

            rec_rows.append({
                "ent_id": ent_id,
                "rec_id": rec_id,
                "why": dat_rec["MATCH_KEY"],
                "level": dat_rec["MATCH_LEVEL"],
            })

            desc: str = dat_rec["ENTITY_DESC"].strip()

            if len(desc) > 0:
                ent_desc = desc

        ent_rows.append({
            "id": ent_id,
            "desc": ent_desc,
        })

        # link to related entities
        for rel_rec in dat["RELATED_ENTITIES"]:
            rel_id: str = ER_ENTITY_PREFIX + str(rel_rec["ENTITY_ID"]).strip()

            rel_rows.append({
                "ent_id": ent_id,
                "rel_id": rel_id,
                "why": rel_rec["MATCH_KEY"],
                "level": rel_rec["MATCH_LEVEL"],
            })

df_ent: pd.DataFrame = pd.DataFrame.from_dict(
    ent_rows,
    orient = "columns"
)

df_ent.head()

Unnamed: 0,id,desc
0,sz_1,Abassin Badshah
1,sz_2,LMAR (GB) LTD
2,sz_3,WANDLE HOLDINGS LIMITED
3,sz_4,POLYUS GOLD INTERNATIONAL LIMITED
4,sz_5,フィルザ・ケリモヴァ（ハンバラエヴァ）


In [21]:
df_ent.describe(include = "all").loc[[ "count", "freq", "unique", ]]

Unnamed: 0,id,desc
count,193,193
freq,1,2
unique,193,189


In [22]:
df_rec: pd.DataFrame = pd.DataFrame.from_dict(
    rec_rows,
    orient = "columns"
)

df_rec.head()

Unnamed: 0,ent_id,rec_id,why,level
0,sz_1,NK-25vyVFzt8vdJGgAXMRTwTJ,,0
1,sz_1,17207853441353212969,+NAME+ADDRESS+NATIONALITY,1
2,sz_1,6747548100436839873,+NAME+DOB+NATIONALITY,1
3,sz_2,NK-3p3mmVWmjwVtTfKchz4kNE,,0
4,sz_3,NK-auyPsLrBzRoxjCRWgjBvas,,0


In [23]:
df_rec.describe(include = "all").loc[[ "count", "freq", "unique", ]]

Unnamed: 0,ent_id,rec_id,why,level
count,282,282,282,282.0
freq,4,1,193,
unique,193,282,18,


In [24]:
df_rel: pd.DataFrame = pd.DataFrame.from_dict(
    rel_rows,
    orient = "columns"
)

df_rel.head()

Unnamed: 0,ent_id,rel_id,why,level
0,sz_1,sz_2,+OPEN-SANCTIONS(DIRECTORSHIP:),11
1,sz_1,sz_9,"+ADDRESS+OOR(:SHAREHOLDING 50% 75%,VOTING_RIGH...",11
2,sz_1,sz_100075,"+ADDRESS+OOR(:APPOINTMENT_OF_BOARD,SHAREHOLDIN...",11
3,sz_1,sz_100132,+ADDRESS+NATIONALITY,3
4,sz_2,sz_1,+OPEN-SANCTIONS(:DIRECTORSHIP),11


In [25]:
df_rel.describe(include = "all").loc[[ "count", "freq", "unique", ]]

Unnamed: 0,ent_id,rel_id,why,level
count,732,732,732,732.0
freq,20,20,95,
unique,188,188,140,


## create graph tables

Create an empty on-disk graph database and connect to it:

In [35]:
db: kuzu.Database = kuzu.Database("./db")
conn: kuzu.Connection = kuzu.Connection(db)

Define the schema for node tables

In [38]:
conn.execute("DROP TABLE IF EXISTS Related")
conn.execute("DROP TABLE IF EXISTS Matched")
conn.execute("DROP TABLE IF EXISTS Entity")
conn.execute("DROP TABLE IF EXISTS Role")
conn.execute("DROP TABLE IF EXISTS Risk")
conn.execute("DROP TABLE IF EXISTS OpenOwnership")
conn.execute("DROP TABLE IF EXISTS OpenSanctions")

conn.execute("CREATE NODE TABLE OpenSanctions(id STRING, kind STRING, name STRING, addr STRING, url STRING, PRIMARY KEY (id))")
conn.execute("CREATE NODE TABLE OpenOwnership(id STRING, kind STRING, name STRING, addr STRING, country STRING, PRIMARY KEY (id))")
conn.execute("CREATE NODE TABLE Risk(uid SERIAL, id STRING, topic STRING, PRIMARY KEY(uid))")
conn.execute("CREATE REL TABLE Role(FROM OpenOwnership TO OpenOwnership, role STRING, date DATE)")
conn.execute("CREATE NODE TABLE Entity(id STRING, descript STRING, PRIMARY KEY(id))")
conn.execute("CREATE REL TABLE Matched(FROM Entity TO OpenSanctions, why STRING, level INT8)")
conn.execute("CREATE REL TABLE Related(FROM Entity TO Entity, why STRING, level INT8)")

<kuzu.query_result.QueryResult at 0x12d50acf0>

In [39]:
conn.execute("COPY OpenSanctions FROM df_os")
conn.execute("COPY OpenOwnership FROM df_oa")
conn.execute("COPY Risk FROM df_risk")
conn.execute("COPY Role FROM df_role")
conn.execute("COPY Entity FROM df_ent")
#conn.execute("COPY Entity FROM df_rec")
#conn.execute("COPY Entity FROM df_rel")

<kuzu.query_result.QueryResult at 0x12d50adb0>