# Spark Streaming and Senzing

This notebook shows you how to process streaming data through Apache Spark and send it to Senzing for entity resolution, simulating a real-time data processing pipeline. If you haven't already gone through the `senzing_quickstart.ipynb` tutorial in this repository, we recommend starting with that one because it contains more detailed explanations for each of the steps.

### Steps in this tutorial

1. Set up the Senzing gRPC server, download the `customer.json` data file and split it into 20 separate JSONL files to simulate streaming data.
2. Configure the Senzing engine so it's ready to receive data.
3. Create a Spark session with streaming capabilities, define a schema and set up a streaming dataframe.
4. Implement a batch processing function that takes each streaming batch from Spark, sends individual records to Senzing for entity resolution, and tracks which entities are affected by each record addition.
5. Run a cleanup process to ensure the entities are as accurate as possible.

## Set up requirements

In this tutorial, we'll use the [`senzing`](https://garage.senzing.com/sz-sdk-python/index.html) and [`senzing_grpc`](https://garage.senzing.com/sz-sdk-python-grpc/) packages, in addition to PySpark. Install these using the `requirements.txt` file in the repo.

In [1]:
import json
import os
import requests
import shutil

from icecream import ic
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.types import StructType, StructField, StringType
from senzing import szengineflags, szerror
from senzing_grpc import SzAbstractFactoryGrpc, \
    SzConfigManagerGrpc, SzDiagnosticGrpc, SzEngineGrpc, SzConfigGrpc, SzProductGrpc
import grpc
import watermark

In [2]:
%load_ext watermark
%watermark
%watermark --iversions

Last updated: 2025-09-16T10:12:18.534810-07:00

Python implementation: CPython
Python version       : 3.13.3
IPython version      : 9.4.0

Compiler    : Clang 16.0.0 (clang-1600.0.26.6)
OS          : Darwin
Release     : 24.6.0
Machine     : arm64
Processor   : arm
CPU cores   : 14
Architecture: 64bit

json        : 2.0.9
requests    : 2.32.5
senzing_grpc: 0.5.11
pyspark     : 4.0.0
watermark   : 2.5.0
grpc        : 1.74.0
senzing     : 0.2.20



We'll start our [Senzing gRPC server](https://github.com/senzing-garage/serve-grpc/tree/main) using Docker.

Run the following command in a terminal window:

```bash
docker run -it --publish 8261:8261 --rm senzing/serve-grpc
```

Then download the example data:

In [3]:
data_path: str = "./data/"

data_url_prefix: str = "https://raw.githubusercontent.com/Senzing/truth-sets/refs/heads/main/truthsets/demo/"

filename: str = "reference.json"

In [4]:
os.makedirs(data_path, exist_ok = True)

url: str = data_url_prefix + filename
filepath: str = data_path + filename

if not os.path.exists(filepath):
    response: requests.Response = requests.get(url, stream = True, timeout = 10)
    response.raw.decode_content = True

    with open(filepath, "wb") as file:
        shutil.copyfileobj(response.raw, file)

## Create separate JSONL files to simulate streaming

We'll use the `reference.json` file from the Senzing "Truthsets" <https://github.com/Senzing/truth-sets/> demo data. This dataset contains customer and organization information, with incomplete contact data.

We'll save each record from the `reference.json` file into a separate JSONL file to simulate streaming data.

In [5]:
def create_streaming_files (
        input_file: str,
        output_dir: str,
    ) -> None:
    """simulate streaming data"""
    os.makedirs(output_dir, exist_ok = True)
    
    with open(input_file, "r") as fp:
        for i, line in enumerate(fp):
            try:
                record: dict = json.loads(line)
                filename: str = f"{output_dir}/record_{record['RECORD_ID']}.json"
                
                with open(filename, "w") as out_file:
                    json.dump(record, out_file)
                    
                print(f"Created {filename}")
                
            except json.JSONDecodeError as ex:
                print(f"Error parsing line {i}: {ex}")
                continue

In [6]:
create_streaming_files(
    "data/reference.json",
    "data/streaming",
)

Created data/streaming/record_2012.json
Created data/streaming/record_2013.json
Created data/streaming/record_2014.json
Created data/streaming/record_2041.json
Created data/streaming/record_2051.json
Created data/streaming/record_2061.json
Created data/streaming/record_2071.json
Created data/streaming/record_2074.json
Created data/streaming/record_2081.json
Created data/streaming/record_2091.json
Created data/streaming/record_2101.json
Created data/streaming/record_2102.json
Created data/streaming/record_2111.json
Created data/streaming/record_2112.json
Created data/streaming/record_2121.json
Created data/streaming/record_2122.json
Created data/streaming/record_2131.json
Created data/streaming/record_2132.json
Created data/streaming/record_2141.json
Created data/streaming/record_2151.json
Created data/streaming/record_2161.json
Created data/streaming/record_2162.json


## Configure Senzing

Next, configure the Senzing engine to accept the `reference` dataset, in the same way as the `spark_quickstart.ipynb` tutorial.

In [7]:
grpc_channel: grpc.Channel = grpc.insecure_channel("localhost:8261")
sz_abstract_factory: SzAbstractFactoryGrpc = SzAbstractFactoryGrpc(grpc_channel)

Check connectivity by getting the Senzing version:

In [8]:
sz_product: SzProductGrpc = sz_abstract_factory.create_product()
version_json: str = json.loads(sz_product.get_version())

print(
    json.dumps(
        version_json,
        indent = 2,
    )
)

{
  "PRODUCT_NAME": "Senzing SDK",
  "VERSION": "4.0.0",
  "BUILD_VERSION": "4.0.0.25184",
  "BUILD_DATE": "2025-07-03",
  "BUILD_NUMBER": "2025_07_03__16_38",
  "COMPATIBILITY_VERSION": {
    "CONFIG_VERSION": "11"
  },
  "SCHEMA_VERSION": {
    "ENGINE_SCHEMA_VERSION": "4.0",
    "MINIMUM_REQUIRED_SCHEMA_VERSION": "4.0",
    "MAXIMUM_REQUIRED_SCHEMA_VERSION": "4.99"
  }
}


In [9]:
sz_configmanager: SzConfigManagerGrpc = sz_abstract_factory.create_configmanager()
sz_diagnostic: SzDiagnosticGrpc = sz_abstract_factory.create_diagnostic()
sz_engine: SzEngineGrpc = sz_abstract_factory.create_engine()

In [10]:
config_id: int = sz_configmanager.get_default_config_id()
sz_config:SzConfigGrpc = sz_configmanager.create_config_from_config_id(config_id)

This time, we'll only use a single data source:

In [11]:
try:
    sz_config.register_data_source("REFERENCE")

except (grpc.RpcError, szerror.SzError) as err:
    print(err, "\n")
    print("You only need to register the data source once")

In [12]:
new_json_config: str = sz_config.export()

new_config_id: int = sz_configmanager.register_config(
    new_json_config,
    "Spark Streaming",
)

sz_configmanager.replace_default_config_id(
    config_id,
    new_config_id,
)

In [13]:
sz_abstract_factory.reinitialize(new_config_id)

## Set up the Spark Streaming functions

Start a new Spark session, create a schema, and then set up a stream reader from Spark's [Structured Streaming](https://spark.apache.org/docs/latest/streaming/index.html) engine. 

In the next section, we'll use a stream writer to send the data from the Spark Streaming dataframe to Senzing.

First, create a Spark session. Ignore any warnings about `NativeCodeLoader` or `jdk.incubator.vector`

In [14]:
spark = SparkSession.builder \
    .appName("Senzing Streaming") \
    .master("local[*]") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/16 10:12:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Providing a schema for our data makes sure that all the files have the correct information.
This also speeds up the Spark stream reader.

In [15]:
customers_schema = StructType([
    StructField("DATA_SOURCE", StringType(), True),
    StructField("RECORD_ID", StringType(), True),
    StructField("RECORD_TYPE", StringType(), True),
    StructField("PRIMARY_NAME_ORG", StringType(), True),
    StructField("SECONDARY_NAME_ORG", StringType(), True),
    StructField("PRIMARY_NAME_FULL", StringType(), True),
    StructField("NATIVE_NAME_FULL", StringType(), True),
    StructField("PRIMARY_NAME_LAST", StringType(), True),
    StructField("PRIMARY_NAME_FIRST", StringType(), True),
    StructField("PRIMARY_NAME_MIDDLE", StringType(), True),
    StructField("GENDER", StringType(), True),
    StructField("DATE_OF_BIRTH", StringType(), True),
    StructField("PASSPORT_NUMBER", StringType(), True),
    StructField("PASSPORT_COUNTRY", StringType(), True),
    StructField("DRIVERS_LICENSE_NUMBER", StringType(), True),
    StructField("DRIVERS_LICENSE_STATE", StringType(), True),
    StructField("SSN_NUMBER", StringType(), True),
    StructField("NATIONAL_ID_NUMBER", StringType(), True),
    StructField("NATIONAL_ID_COUNTRY", StringType(), True),
    StructField("ADDR_TYPE", StringType(), True),
    StructField("ADDR_FULL", StringType(), True),
    StructField("ADDR_LINE1", StringType(), True),
    StructField("ADDR_CITY", StringType(), True),
    StructField("ADDR_STATE", StringType(), True),
    StructField("ADDR_POSTAL_CODE", StringType(), True),
    StructField("ADDR_COUNTRY", StringType(), True),
    StructField("PHONE_TYPE", StringType(), True),
    StructField("PHONE_NUMBER", StringType(), True),
    StructField("EMAIL_ADDRESS", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("STATUS", StringType(), True),
    StructField("CATEGORY", StringType(), True),
    StructField("AMOUNT", StringType(), True)
])

The stream reader uses this schema to write to a streaming dataframe. For this example, it reads one file at a time to simulate streaming, but you can easily change this to your input stream.

In [16]:
streaming_df: DataFrame = spark \
    .readStream \
    .schema(customers_schema) \
    .option("maxFilesPerTrigger", 1)  \
    .json('data/streaming')

## Add records to Senzing and to the Spark Streaming dataframe

Use the `get_affected_entities` function from the `spark_quickstart` tutorial to track what entities have been changed or created in the Senzing repository:

In [17]:
def get_affected_entities (
    rec_info: str,
    ) -> list:
    """helper function to extract the `ENTITY_ID`"""
    info: list = json.loads(rec_info)

    return [ entity["ENTITY_ID"] for entity in info["AFFECTED_ENTITIES"] ]

In [18]:
affected_entities: set = set()

And we'll use the code from the `spark_quickstart.ipynb` tutorial to create a function that will send a streaming batch to the Senzing engine:

In [19]:
def process_streaming_batch (
    batch_df: DataFrame,
    batch_id: int,
    ) -> None:
    """send a streaming batch to Senzing"""
    if batch_df.count() == 0:
        return

    print(f"Processing batch {batch_id} with {batch_df.count()} records")
    
    for row in batch_df.rdd.toLocalIterator():
        record: dict = {
            k: v
            for k, v in row.asDict().items()
            if v is not None
        }
        
        rec_info: str = sz_engine.add_record(
            record["DATA_SOURCE"],
            record["RECORD_ID"], 
            record,
            szengineflags.SzEngineFlags.SZ_WITH_INFO,
        )
        
        affected_entities.update(get_affected_entities(rec_info))
        print(f"Added record {record['RECORD_ID']}")

Then, we'll stream the data from the Spark dataframe to Senzing using a Spark stream writer. Ignore any warnings about `ResolveWriteToStream`

In [20]:
streaming_query: StreamingQuery = streaming_df \
    .writeStream \
    .foreachBatch(process_streaming_batch) \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .start()

25/09/16 10:12:21 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


While this is running, we can get a count of the number of records that have been added to the Senzing repository:

In [21]:
result: dict = json.loads(sz_engine.get_stats())
print(result["workload"]["loadedRecords"])

-1


We can view the `affected_entities` set to confirm that entities have been created:

In [22]:
affected_entities

set()

And, for each of the entities that have been updated, we can pull the details of that entity:

In [23]:
for entity_id in affected_entities:
    result: dict = json.loads(sz_engine.get_entity_by_entity_id(entity_id))
    print(f"Entity ID: {str(entity_id)}, Name: {result['RESOLVED_ENTITY']['ENTITY_NAME']}, Record Type: {result['RESOLVED_ENTITY']['FEATURES']['RECORD_TYPE'][0]['FEAT_DESC']}")

If you want to scale this up, you can use the `ThreadPoolExecutor` from the Python [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) module, as shown in this example: <https://github.com/brianmacy/sz_incremental_withinfo-v3/blob/main/sz_incremental_withinfo.py>

## Process REDO records

As in the `spark_quickstart.ipynb` tutorial, we'll run the Senzing [redo process](https://senzing.zendesk.com/hc/en-us/articles/360007475133-Processing-REDO) to clean up the entities in the Senzing repository, updating the `affected_entities` set as we go.

In [24]:
while True:
    redo_record: str = sz_engine.get_redo_record()
    
    if not redo_record:
        break

    rec_info: str = sz_engine.process_redo_record(
        redo_record,
        flags = SzEngineFlags.SZ_WITH_INFO,
    )
    
    affected_entities.update(get_affected_entities(rec_info))
    print(rec_info)

## Look up the resolved entities

Now that the entities have been resolved, we can search the Senzing repository for anyone we are interested in:

In [25]:
search_query: dict = {
    "name_full": "Wang Jie",
}

search_result: dict = sz_engine.search_by_attributes(json.dumps(search_query))

print(
    json.dumps(
        json.loads(search_result),
        indent = 2,
    )
)

{
  "RESOLVED_ENTITIES": [
    {
      "MATCH_INFO": {
        "MATCH_LEVEL_CODE": "POSSIBLY_SAME",
        "MATCH_KEY": "+NAME",
        "ERRULE_CODE": "SNAME",
        "CANDIDATE_KEYS": {
          "NAME_KEY": [
            {
              "FEAT_ID": 486,
              "FEAT_DESC": "ANK|JIE"
            }
          ]
        },
        "FEATURE_SCORES": {
          "NAME": [
            {
              "INBOUND_FEAT_ID": -2,
              "INBOUND_FEAT_DESC": "Wang Jie",
              "CANDIDATE_FEAT_ID": 482,
              "CANDIDATE_FEAT_DESC": "\u738b\u6770",
              "CANDIDATE_FEAT_USAGE_TYPE": "NATIVE",
              "SCORE": 100,
              "ADDITIONAL_SCORES": {
                "GNR_FN": 100,
                "GNR_SN": -1,
                "GNR_GN": -1,
                "GENERATION_MATCH": -1,
                "GNR_ON": -1
              },
              "SCORE_BUCKET": "SAME",
              "SCORE_BEHAVIOR": "NAME"
            }
          ]
        }
      },
      "ENTIT

25/09/16 10:12:21 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


You can take a look at the `spark_quickstart` tutorial for details on how to extract data from the Senzing repository.

In [26]:
try:
    spark.stop()
except Exception as ex:
    pass

Ignore any `MicroBatchExecution` errors, though if you want to avoid these see
<https://www.waitingforcode.com/apache-spark-structured-streaming/stopping-structured-streaming-query/read>

## Next steps

If you’re interested in exploring Senzing further, check out the following links:

- [Senzing + Docker quickstart](https://senzing.com/docs/quickstart/quickstart_docker/)

- [Senzing Learning Portal](https://senzing.com/senzing-learning-portal-signup)

- [Senzing SDK Documentation](https://senzing.com/docs/)

- [Entity Centric Learning](https://senzing.com/entity-centric-learning-explained/)

- [CORD: Collections Of Relatable Data](https://senzing.com/senzing-ready-data-collections-cord)

- [Senzing GitHub public repos](https://github.com/senzing-garage)

- ["Graph Power Hour!" podcast](https://senzing.com/graph-power-hour)