# Ingest Data with Zerobus Ingest

To run this notebook you need the following:
- Serverless Compute or Classic Compute Cluster with DBR >= 16.4 LTS
- Zerobus Ingest Public Preview enabled

#### Zerobus Ingest
Zerobus Ingest allows to efficiently push data into tables with ease. It supports record-by-record ingestion at any scale and operates in a serverless environment.



<br>

<img src="./images/zerobus-architecture.png" width="600"/>

<br>

- When data is transmitted to the Row Ingestion API, it goes through a buffering process before being added to a Delta table. This creates an efficient and durable ingestion mechanism to support a high volume of clients with variable throughput.
<br>
- Once the data has been materialized into Delta format, it becomes fully compatible with the comprehensive Databricks Platform, allowing users to leverage familiar tools and functionalities for further data analysis and processing.
- By default, the Python SDK performs automatic recovery. When a stream fails (e.g., due to a transient network issue), the SDK will attempt to reconnect and re-ingest any unacknowledged records in the background, preserving their order.


##### Install required libraries

In [None]:
# Install Zerobus Ingest SDK
%pip install databricks-zerobus-ingest-sdk grpcio-tools
%restart_python

In [None]:
# Install custom data generator library
%pip install -r ./line_data_generator/requirements.txt ./line_data_generator
%restart_python

##### Provide required information to connect to Zerobus host

To use the Zerobus Ingest SDK you will need the following information:

- Databricks Workspace URL 
  - Get your workspace URL. When viewing your Databricks workspace after logging in, take a look at the URL in your browser with the following format: https://_your_databricks-instance_.com/o=XXXXX. The URL that you require is everything before the “/o=XXXXX”

- Your Bronze table definition
  - Identify the target table you want to ingest data to. This is the table created in the notebook "1. Create-Sensor-Bronze-Table"

- Zerobus Host 
  - Zerobus URI = _workspace_id_.ingest.cloud.databricks.com. Databricks will provide you with this URL

- Service Principal Id and Secret --->  _For the following steps you need to be a workspace admin. If you are not an admin, ask to the appropriate person in your organization_
  - Go to Settings > Identity and Access.
  - Generate and save client ID and secret for that Service Principal.


**Once you have this information, update the 0-Parameters.ipynb notebook with your own values.**

In [None]:
%run ./0-Parameters

### Step 1: Generate Proto file
Do not edit the following cells

In [None]:
import subprocess

proto_msg = "DigitalTwin"       # Do not edit
proto_name = "dt-solacc.proto"  # Do not edit


command = [
  "python", "-m", "zerobus.tools.generate_proto",
    "--uc-endpoint", WORKSPACE_URL, 
    "--client-id", CLIENT_ID,
    "--client-secret", CLIENT_SECRET,
    "--table", BRONZE_TABLE,
    "--proto-msg", proto_msg,
    "--output", proto_name,
]

subprocess.run(command, check=True)

command = [
    "python3", "-m", "grpc_tools.protoc", "-I.", "--python_out=.",
    proto_name
]
subprocess.run(command, check=True)

print(f"Proto compiled: {proto_name.replace('.proto', '_pb2.py')}")

In [None]:
# Restart to avoid issues to load the proto definition
%restart_python

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

In [None]:
%run ./0-Parameters

### Step 2: Setup connection to the output Delta Table via Zerobus Ingest
Do not edit the following cells

First, we need to grant access to the bronze table created in Notebook 1 to the Service Principal you will use to connect to Zerobus Ingest Host.

A Service Principal is a specialized identity providing more security than personalized accounts. More information regarding Service Principal and how to use them for authentication can be found in [these instructions](https://docs.databricks.com/aws/en/dev-tools/auth/oauth-m2m).

The Service Principal needs to be granted the folllwing permission on the destination table:
  - For the catalog: USE_CATALOG.
  - For the schema: USE_SCHEMA.
  - For the table: MODIFY, SELECT.

In [None]:
# Grant MODIFY and SELECT on the table to the Service Principal
spark.sql(f"GRANT MODIFY, SELECT ON TABLE {BRONZE_TABLE} TO `{CLIENT_ID}`")

# Extract catalog and schema from the table name
catalog, schema, _ = BRONZE_TABLE.split(".")

# Grant USE CATALOG on the catalog to the Service Principal
spark.sql(f"GRANT USE CATALOG ON CATALOG {catalog} TO `{CLIENT_ID}`")

# Grant USE SCHEMA on the schema to the Service Principal
spark.sql(f"GRANT USE SCHEMA ON SCHEMA {catalog}.{schema} TO `{CLIENT_ID}`")

Now, we can start using the Zerobus SDK to connect to Zerobus Ingest Host

In [None]:
import dt_solacc_pb2 as row_pb2  # This is the compiled proto definition
from zerobus.sdk.aio import ZerobusSdk  # asynchronous SDK
from zerobus.sdk.shared import TableProperties  

In [None]:
# Define connection to Zerobus Ingest Host 

table_properties = TableProperties(BRONZE_TABLE, row_pb2.DigitalTwin.DESCRIPTOR)
sdk = ZerobusSdk(ZEROBUS_URL, WORKSPACE_URL)

In [None]:
# Test connection 
 
stream = await sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties)

if stream:
    print("Connection successful")
    stream.close()

### Step 3: Generate sensor data 

We will use a data generator to simulate data coming from a complex ball bearing production system organized as folllwing. 

The core elements of this prduction system are: 
- Production Line
  - Machine
    - Component
      - Sensor

<br>
<img src="./images/ball-bearing-diagram.png" width="600"/>



**How does the data generator work**
<br>
The data genetor will produce data mathing the production line setup you defined in the Digital Twin frontend. The following variables will be used:

- Number of lines
- Number of machines per line: each line can have a different number of machies.
- Number of components per machine: each machine  has the same number of components
- Each component has 6 different sensors (fixed) generating data:
  - Temperature
  - Pressure
  - Vibration
  - Speed
  - Rotation
  - Flow

In [None]:
import pandas as pd
import time
from line_data_generator import generate_all_lines, generate_equipment_mapping, table_size_estimator

In [None]:
# Define Production Line configuration -- TODO: Read from config file
num_lines = 4
machines_per_line = [3, 3, 4, 2]  # Number of machines per line
num_components = 3  

<br>You can adjust the size of the generated dataset by setting the sample_size parameter.

For instance, if you choose a sample size of 1000, the data generator will produce a dataset with these characteristics:

- Each row represents a specific component at a specific timestamp, with component_id and timestamp serving as unique identifiers.
- Each component will have 1000 rows, corresponding to the sample size.
- Consecutive rows for a given component are spaced 1 millisecond apart. Thus, for a sample size of 1000, the total duration covered per component is 1 second (1000 * 0.001).
- The total number of rows in the dataset depends on the number of components in your production line. For example, with 36 components (4 lines, 12 machines, 3 components per machine), the dataset will have 36,000 rows (36 * 1000).
- The overall time span for the dataset remains 1 seconds (1000 * 0.001), meaning that sensor data for different components is generated in parallel at the same timestamps.

In [None]:
# Define sample size
sample_size = 1000

With this configuration you will generate a dataset with the following size

In [None]:
# Generate equipment mapping: lines -> machine -> components -> sensors 
equipment_mapping = generate_equipment_mapping(num_lines, machines_per_line, num_components)

# Estimate table size
tot_num_rows, est_table_size, line_num_rows, est_line_table_size = table_size_estimator(machines_per_line, num_components, sample_size)

print(f"Number of rows: {tot_num_rows}")
print(f"Estimated table size: {est_table_size:.2f} MB")

In [None]:
# Run data generator and display the first 10 rows
batch_df_lines = generate_all_lines(equipment_mapping, sample_size, time.time())
display(batch_df_lines.head(10))

### Step 4: Ingest data via Zerobus API

The Zerobus Python SDK is available in two variations:

- **Synchronous (sync)**: A traditional, blocking client that uses standard threading for concurrency. This is the default client.
- **Asynchronous (async)**: A non-blocking client based on Python's asyncio library, suitable for high-throughput I/O-bound applications.

In this notebook we will use Asynchronous non-blocking clients. With the asynchronous client, _ingest_record_ is an async method that returns an asyncio.Future. You can await this _future_ to block until the record is durable.

Do not edit the following cells

In [None]:
import asyncio
import time
from ipywidgets import IntProgress, HTML, VBox
from IPython.display import display

**How does the client work** <br>
A data producer will first open a “stream” to a Delta table, construct a message matching its schema, and then push the message to our API. Zerobus will make the data durable, acknowledge the client's message, and materialize in the Delta table.

In [None]:
async def asynchronous_non_blocking_call():

    # Create stream to table
    stream = await sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties)

    # Config progress bar
    total = len(batch_df_lines)
    pbar = IntProgress(min=0, max=total)
    label = HTML(value="Ingesting sensor data via Zerobus Ingest: 0%")
    box = VBox([label, pbar])
    display(box)
    start = time.time()

    # non-blocking (streaming) call to Zerobus 
    for i in range(total):
        
        row = batch_df_lines.iloc[i]

        await stream.ingest_record(row_pb2.DigitalTwin(
            sensor_rotation=float(row['sensor_rotation']),
            sensor_flow=float(row['sensor_flow']),
            sensor_temperature=float(row['sensor_temperature']),
            sensor_speed=float(row['sensor_speed']),
            sensor_vibration=float(row['sensor_vibration']),
            sensor_pressure=float(row['sensor_pressure']),
            component_yield_output=float(row['component_yield_output']),
            timestamp=str(row['timestamp']),
            component_id=str(row['component_id']),
            damaged_component=bool(row['damaged_component']),
            abnormal_sensor=str(row['abnormal_sensor']) if row['abnormal_sensor'] is not None else "",
            machine_id=str(row['machine_id']),
            line_id=str(row['line_id'])
        ))

        # Update progress bar
        pbar.value = i + 1
        percent = int((i + 1) / total * 100)
        label.value = f"Ingesting sensor data via Zerobus Ingest: {percent}%"

    await stream.flush()

    await stream.close()

    # Add ingest info
    elapsed = time.time() - start
    print(f"Rows ingested: {total}")
    print(f"Time elapsed: {elapsed:.2f} seconds")

In [None]:
# Run the client
import nest_asyncio

nest_asyncio.apply()
await asynchronous_non_blocking_call()

In [None]:
## Display destination table
display(spark.table(BRONZE_TABLE).count())

### Step 5: Simulate data ingestion from  multiple sources in parallel


In this section we will simulate data ingestion from multiple production lines in parallel. Zerobus Ingest will process each production line separately and push the data in the same Delta Table

In [None]:
from line_data_generator import generate_line

Define sample size for each production line batch. 

In [None]:
# sample size for a component in each line
line_sample_size = 1000

With this configuration you will generate a dataset with the following size

In [None]:
tot_num_rows, est_table_size, line_num_rows, est_line_table_size = table_size_estimator(machines_per_line, num_components, line_sample_size)

for line in range(len(line_num_rows)): 
  print(f"Line {line+1}")               
  print(f" - Number of rows: {line_num_rows[line]}")
  print(f" - Estimated table size: {est_line_table_size[line]:.2f} MB")

**How does the client work** <br>
The strategy to ingest data from multiple sources in parallel depends on whether you are using the sync or async client. <br> The asynchronous SDK uses an event loop on a single thread. To achieve parallelism, you create multiple asyncio tasks and run them concurrently using asyncio.gather. This is highly efficient for I/O-bound operations.
<br><br>

In [None]:
async def worker(worker_id: int):
    """An async worker that creates a stream and ingests records."""
    print(f"Worker {worker_id}: Starting...")

    stream = await sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties)

    line_number = worker_id
    current_time = time.time()
    batch_df_line = generate_line(line_number, equipment_mapping, line_sample_size, current_time)

    total = len(batch_df_line)
    pbar = IntProgress(min=0, max=total)
    label = HTML(value=f"Ingesting sensor data for Line {line_number+1} via Zerobus Ingest: 0%")
    box = VBox([label, pbar])
    display(box)
    start = time.time()

    for i in range(total):

        row = batch_df_line.iloc[i]

        await stream.ingest_record(row_pb2.DigitalTwin(
            sensor_rotation=float(row['sensor_rotation']),
            sensor_flow=float(row['sensor_flow']),
            sensor_temperature=float(row['sensor_temperature']),
            sensor_speed=float(row['sensor_speed']),
            sensor_vibration=float(row['sensor_vibration']),
            sensor_pressure=float(row['sensor_pressure']),
            component_yield_output=float(row['component_yield_output']),
            timestamp=str(row['timestamp']),
            component_id=str(row['component_id']),
            damaged_component=bool(row['damaged_component']),
            abnormal_sensor=str(row['abnormal_sensor']) if row['abnormal_sensor'] is not None else "",
            machine_id=str(row['machine_id']),
            line_id=str(row['line_id'])
        ))

        # Update progress bar
        pbar.value = i + 1
        percent = int((i + 1) / total * 100)
        label.value = f"Ingesting sensor data for Line {line_number+1} via Zerobus Ingest: {percent}%"

    await stream.flush()
    await stream.close()
    print(f"Worker {worker_id}: Finished.")


async def run_parallel():
    num_workers = num_lines
    tasks = []
    for i in range(num_workers):
        task = asyncio.create_task(worker(i))
        tasks.append(task)
       
    await asyncio.gather(*tasks)

In [None]:
# Run the client
import nest_asyncio

nest_asyncio.apply()
await run_parallel()

### Step 6: Simulate Continuous Data Ingestion from  one source

In this section, we will simulate a continuous data ingestion. To prevent creating a large dataset in a single data generation job, we will run multiple data ingestion batches. Each batch will be generated separately and sent to the Delta Table one at a time using Zerobus Ingest. <br>
<br>
You can configuare the batch sample size and batch count to simulate the desired contunous data ingestion duration.




In [None]:
### Define batch configuration
sample_size = 10000   # for each batch
batch_count = 10   # number of batches to be run

With this configuration you will generate a dataset with the following size

In [None]:
# Calculate total number of rows to be generated 
tot_num_rows, est_table_size, line_num_rows, est_line_table_size = table_size_estimator(machines_per_line, num_components, sample_size)

print(f"Number of rows in each batch: {tot_num_rows}")
print(f"Estimated table size in each batch: {est_table_size:.2f} MB")
print(f"Number of rows for the total dataset: {tot_num_rows * batch_count}")

**How does the client work** <br>
We will use the same clinet configuration used in Step 4.
For each batch,  a data producer will first open a “stream” to a Delta table, construct a message matching its schema, and then push the message to our API. Zerobus will make the data durable, acknowledge the client's message, and materialize in the Delta table.

In [None]:
async def batch_asynchronous_blocking_call():

  # Create stream to table
  stream = await sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties)

  min_batch_wait =  sample_size *  0.001   # minumun seconds wait between writes to avoid having overlapping time between batches

  for i in range(0, int(batch_count)):
    
    current_time = time.time()

    if i > 0:
      if current_time <= batch_time + min_batch_wait:
        wait = int(batch_time + min_batch_wait - current_time + 10) # add 10 seconds just in case
        time.sleep(wait)
        print(f"Pausing {wait} seconds to avoid overlapping timestamps across batches")

    print(f"--- Ingesting batch {int(i+1)} / {int(batch_count)} ---")

    # Generate new data
    batch_time = time.time()
    batch_df_lines = generate_all_lines(equipment_mapping, sample_size, batch_time)

    # Config progress bar
    total = len(batch_df_lines)
    pbar = IntProgress(min=0, max=total)
    label = HTML(value="Ingesting sensor data via Zerobus Ingest: 0%")
    box = VBox([label, pbar])
    display(box)
    start = time.time()

    # non-blocking (streaming) call. 
    for i in range(total):
        
        row = batch_df_lines.iloc[i]

        await stream.ingest_record(row_pb2.DigitalTwin(
            sensor_rotation=float(row['sensor_rotation']),
            sensor_flow=float(row['sensor_flow']),
            sensor_temperature=float(row['sensor_temperature']),
            sensor_speed=float(row['sensor_speed']),
            sensor_vibration=float(row['sensor_vibration']),
            sensor_pressure=float(row['sensor_pressure']),
            component_yield_output=float(row['component_yield_output']),
            timestamp=str(row['timestamp']),
            component_id=str(row['component_id']),
            damaged_component=bool(row['damaged_component']),
            abnormal_sensor=str(row['abnormal_sensor']) if row['abnormal_sensor'] is not None else "",
            machine_id=str(row['machine_id']),
            line_id=str(row['line_id'])
        ))

        # Update progress bar
        pbar.value = i + 1
        percent = int((i + 1) / total * 100)
        label.value = f"Ingesting sensor data via Zerobus Ingest: {percent}%"

    await stream.flush()

    # Add ingest info
    elapsed = time.time() - start
    print(f"Rows ingested: {total}")
    print(f"Time elapsed: {elapsed:.2f} seconds")
    print("\n" * 1)

  await stream.close()

Run the data generator simulation job

In [None]:
run_simulation = False  # change to True if you want to run the simulation job

In [None]:
if run_simulation:

    # Run the asynchronous function
    import nest_asyncio

    nest_asyncio.apply()
    await batch_asynchronous_blocking_call()