In [1]:
import json
import os
import time
from pathlib import Path
import grpc
from senzing import SzEngine, SzError
from senzing_grpc import SzAbstractFactoryGrpc

# Get connection info from environment
SENZING_HOST = os.getenv('SENZING_GRPC_HOST', 'senzing')
SENZING_PORT = os.getenv('SENZING_GRPC_PORT', '8261')

print(f"Connecting to Senzing at {SENZING_HOST}:{SENZING_PORT}")

Connecting to Senzing at senzing:8261


## Initialize the Senzing Engine

Opens a gRPC channel and creates the Senzing engine instance that will be used to load all records in this notebook.  A successful message here means Senzing is up and ready to accept data.

In [2]:
grpc_url = f"{SENZING_HOST}:{SENZING_PORT}"
grpc_channel = grpc.insecure_channel(grpc_url)

# Initialize Senzing factory and engine
sz_abstract_factory = SzAbstractFactoryGrpc(grpc_channel)
sz_engine = sz_abstract_factory.create_engine()

print("Senzing engine initialized successfully")

Senzing engine initialized successfully


## Verify Data Files Are Present

Checks that both `open-ownership.json` and `open-sanctions.json` exist in the `/workspace/data` directory before attempting to load anything.  If a file is missing, the error message tells you exactly where to put it on your host machine.

In [3]:
DATA_DIR = Path('/workspace/data')

OPEN_OWNERSHIP_FILE = DATA_DIR / 'open-ownership.json'
OPEN_SANCTIONS_FILE = DATA_DIR / 'open-sanctions.json'

# Verify files exist
if not OPEN_OWNERSHIP_FILE.exists():
    print(f"ERROR: {OPEN_OWNERSHIP_FILE} not found")
    print(f"Make sure open-ownership.json is in the data/ directory on your host")
else:
    print(f"Found: {OPEN_OWNERSHIP_FILE}")

if not OPEN_SANCTIONS_FILE.exists():
    print(f"ERROR: {OPEN_SANCTIONS_FILE} not found")
    print(f"Make sure open-sanctions.json is in the data/ directory on your host")
else:
    print(f"Found: {OPEN_SANCTIONS_FILE}")

if OPEN_OWNERSHIP_FILE.exists() and OPEN_SANCTIONS_FILE.exists():
    print("\nAll data files found and ready to load")

Found: /workspace/data/open-ownership.json
Found: /workspace/data/open-sanctions.json

All data files found and ready to load


## Define the Record Loader Function

Defines `load_jsonl_file()`, a reusable helper that reads a JSONL file line by line and calls `sz_engine.add_record()` for each one.  It validates that the data source code matches what is expected, tracks progress every 100 records, and collects errors without stopping the full load.

In [4]:
def load_jsonl_file(file_path, data_source_name):
    """
    Load a JSONL file and add each record to Senzing.
    
    Args:
        file_path: Path to the JSONL file
        data_source_name: Expected data source name for validation
    
    Returns:
        Tuple of (records_loaded, errors)
    """
    records_loaded = 0
    errors = []
    
    print(f"\nLoading {file_path}...")
    print(f"Expected data source: {data_source_name}")
    
    with open(file_path, 'r') as f:
        for line_num, line in enumerate(f, 1):
            try:
                # Parse JSON record
                record = json.loads(line.strip())
                
                # Validate data source
                if record.get('DATA_SOURCE') != data_source_name:
                    print(f"Warning: Line {line_num} has unexpected data source: {record.get('DATA_SOURCE')}")
                
                # Add record to Senzing
                sz_engine.add_record(
                    data_source_code=record['DATA_SOURCE'],
                    record_id=record['RECORD_ID'],
                    record_definition=json.dumps(record)
                )
                
                records_loaded += 1
                
                # Progress indicator every 100 records
                if records_loaded % 100 == 0:
                    print(f"  Loaded {records_loaded} records...", end='\r')
                    
            except SzError as e:
                errors.append(f"Line {line_num}: Senzing error - {e}")
            except json.JSONDecodeError as e:
                errors.append(f"Line {line_num}: JSON parse error - {e}")
            except KeyError as e:
                errors.append(f"Line {line_num}: Missing required field - {e}")
            except Exception as e:
                errors.append(f"Line {line_num}: Unexpected error - {e}")
    
    print(f"\n  Completed: {records_loaded} records loaded")
    
    if errors:
        print(f"\n  Errors encountered: {len(errors)}")
        print("  First 5 errors:")
        for error in errors[:5]:
            print(f"    {error}")
    
    return records_loaded, errors

## Register New Data Sources in Senzing

Checks whether `OPEN-OWNERSHIP` and `OPEN-SANCTIONS` are already registered in the active Senzing configuration, and adds any that are missing.  If the config was updated, you will need to restart the `erkg_senzing` container before loading records so the engine picks up the new data sources.

In [5]:
sz_configmanager = sz_abstract_factory.create_configmanager()
default_config_id = sz_configmanager.get_default_config_id()

# Check what the running engine knows about
active_config_id = sz_engine.get_active_config_id()

print(f"Default config ID: {default_config_id}")
print(f"Active config ID:  {active_config_id}")

if default_config_id != active_config_id:
    print("\n" + "="*70)
    print("WARNING: Engine is using an old configuration!")
    print("="*70)
    print("Run this command in your terminal:")
    print("  docker restart erkg_senzing")
    print("\nWait about 10 seconds, then add reconnection cell and continue.")
    print("="*70)
else:
    # Create a new config from the current default
    sz_config = sz_configmanager.create_config_from_config_id(default_config_id)
    
    # Get current data sources
    data_sources_json = sz_config.get_data_source_registry()
    data_sources = json.loads(data_sources_json)
    existing_sources = [ds['DSRC_CODE'] for ds in data_sources.get('DATA_SOURCES', [])]
    
    print(f"Existing data sources: {existing_sources}")
    
    # Register our data sources if they don't exist
    sources_to_add = ['OPEN-OWNERSHIP', 'OPEN-SANCTIONS']
    sources_added = []
    
    for source in sources_to_add:
        if source not in existing_sources:
            print(f"  Registering: {source}")
            sz_config.register_data_source(source)
            sources_added.append(source)
        else:
            print(f"  Already registered: {source}")
    
    # If we added any sources, save the config
    if sources_added:
        print("\nSaving configuration changes...")
        
        # Export the modified config
        config_definition = sz_config.export()
        
        # Register the new config
        new_config_id = sz_configmanager.register_config(
            config_definition=config_definition,
            config_comment=f"Added data sources: {', '.join(sources_added)}"
        )
        
        # Set as default
        sz_configmanager.set_default_config_id(new_config_id)
        
        print(f"Configuration saved with ID: {new_config_id}")
        print("\n" + "="*70)
        print("IMPORTANT: Restart the Senzing container to load the new config")
        print("="*70)
        print("Run this command in your terminal:")
        print("  docker restart erkg_senzing")
        print("\nWait about 10 seconds, then continue with the notebook.")
        print("="*70)
    else:
        print("\nAll data sources already registered.  No restart needed.")

Registering data sources...
Default config ID: 948463713
Active config ID:  948463713
Existing data sources: ['TEST', 'SEARCH']
  Registering: OPEN-OWNERSHIP
  Registering: OPEN-SANCTIONS

Saving configuration changes...
Configuration saved with ID: 3285108390

IMPORTANT: Restart the Senzing container to load the new config
Run this command in your terminal:
  docker restart erkg_senzing

Wait about 10 seconds, then continue with the notebook.


## Load Open Sanctions Records

Calls `load_jsonl_file()` to ingest the Open Sanctions dataset into Senzing and reports how long it took and the throughput rate.  This dataset is small (24 records) so it should complete in under a second.

In [6]:
start_time = time.time()

sanctions_loaded, sanctions_errors = load_jsonl_file(
    OPEN_SANCTIONS_FILE,
    'OPEN-SANCTIONS'
)

sanctions_duration = time.time() - start_time
print(f"\nOpen Sanctions loading completed in {sanctions_duration:.2f} seconds")
print(f"Rate: {sanctions_loaded / sanctions_duration:.2f} records/second")


Loading /workspace/data/open-sanctions.json...
Expected data source: OPEN-SANCTIONS

  Completed: 24 records loaded

Open Sanctions loading completed in 0.90 seconds
Rate: 26.67 records/second


## Load Open Ownership Records

Calls `load_jsonl_file()` to ingest the Open Ownership dataset into Senzing and reports timing and throughput.  This dataset is larger (316 records) so Senzing will be doing a bit more work on the entity resolution work as it loads.

In [7]:
start_time = time.time()

ownership_loaded, ownership_errors = load_jsonl_file(
    OPEN_OWNERSHIP_FILE, 
    'OPEN-OWNERSHIP'
)

ownership_duration = time.time() - start_time
print(f"\nOpen Ownership loading completed in {ownership_duration:.2f} seconds")
print(f"Rate: {ownership_loaded / ownership_duration:.2f} records/second")


Loading /workspace/data/open-ownership.json...
Expected data source: OPEN-OWNERSHIP
  Loaded 300 records...
  Completed: 316 records loaded

Open Ownership loading completed in 7.44 seconds
Rate: 42.47 records/second


## Loading Summary

Prints a final count of records loaded and errors encountered across both datasets.  Use this to confirm everything went in cleanly before moving on to the exploration notebooks.

In [8]:
total_records = ownership_loaded + sanctions_loaded
total_errors = len(ownership_errors) + len(sanctions_errors)

print("\n" + "="*60)
print("LOADING SUMMARY")
print("="*60)
print(f"Open Ownership records: {ownership_loaded:,}")
print(f"Open Sanctions records: {sanctions_loaded:,}")
print(f"Total records loaded:   {total_records:,}")
print(f"Total errors:           {total_errors}")
print("="*60)


LOADING SUMMARY
Open Ownership records: 316
Open Sanctions records: 24
Total records loaded:   340
Total errors:           0


## Check Senzing Entity Resolution Statistics

Calls `sz_engine.get_stats()` to see how many records Senzing processed and how many entities it created.  The difference between records loaded and entities created tells you how much merging actually happened.

In [9]:
try:
    stats = sz_engine.get_stats()
    stats_dict = json.loads(stats)
    
    print("\nSenzing Entity Resolution Statistics:")
    print(f"  Total records processed: {total_records:,}")
    
    # The stats show how Senzing resolved entities
    if 'workload' in stats_dict:
        print(f"  Entities created: {stats_dict['workload'].get('loadedRecords', 'N/A')}")
    
except Exception as e:
    print(f"Could not retrieve stats: {e}")


Senzing Entity Resolution Statistics:
  Total records processed: 340
  Entities created: 282


## Spot Check a Resolved Entity

Looks up a specific Open Ownership record by ID to verify it loaded correctly and shows which other records Senzing merged into the same entity.  This is a quick gut check to confirm entity resolution is working on the new data.

In [10]:
sample_data_source = "OPEN-OWNERSHIP"
sample_record_id = "10094521532396971848"

try:
    entity_result = sz_engine.get_entity_by_record_id(
        data_source_code=sample_data_source,
        record_id=sample_record_id
    )
    
    entity = json.loads(entity_result)
    
    print(f"\nSample Entity Lookup:")
    print(f"  Data Source: {sample_data_source}")
    print(f"  Record ID: {sample_record_id}")
    print(f"  Entity ID: {entity.get('RESOLVED_ENTITY', {}).get('ENTITY_ID')}")
    print(f"  Entity Name: {entity.get('RESOLVED_ENTITY', {}).get('ENTITY_NAME')}")
    
    # Show how many records resolved to this entity
    records = entity.get('RESOLVED_ENTITY', {}).get('RECORDS', [])
    print(f"  Total records in entity: {len(records)}")
    
    if len(records) > 1:
        print(f"  This entity was created by resolving {len(records)} records together")
        for rec in records[:3]:  # Show first 3
            print(f"    - {rec.get('DATA_SOURCE')}: {rec.get('RECORD_ID')}")
    
except SzError as e:
    print(f"Error looking up entity: {e}")


Sample Entity Lookup:
  Data Source: OPEN-OWNERSHIP
  Record ID: 10094521532396971848
  Entity ID: 25
  Entity Name: GOLD WYNN UK HOLDINGS LIMITED
  Total records in entity: 2
  This entity was created by resolving 2 records together
    - OPEN-OWNERSHIP: 10094521532396971848
    - OPEN-OWNERSHIP: 1281397035527615147
