### Spark Expectations - Streaming Write Guide

<div style="color:red; font-weight:bold; border:2px solid red; padding:8px;">
‚ö†Ô∏è ALERT: Notebook meant to be ran in Databricks !
</div>

* Please read through the [Spark Expectation Documentation](https://engineering.nike.com/spark-expectations) and [STREAMWRITE_GUIDE.md](../../STREAMWRITE_GUIDE.md) before proceeding with this demo

This notebook demonstrates how to use Spark Expectations with **streaming DataFrames**:
- Automatic detection of streaming DataFrames
- Proper checkpoint location configuration
- Streaming query management
- Best practices for production streaming workloads

#### Widgets
* `catalog`, `schema` - define where output tables are going to be created
  * Tables are going to be prefixed with logged in DBX user
* `checkpoint_path` - dedicated checkpoint location for streaming queries (REQUIRED for production)
* `library_source` combo box defines library url(git branch or pypi) from where to pull library
  * `pypi` (installs latest published version available in PyPi)
  * `git` (installs library from specified git branch)
    * Set `git_branch` input field to match git branch (example `main`)


### Initialize notebook config
This step will read widget values needed to configure Spark-Expectation library installation and define schema, catalog and table naming


In [None]:
%python

import re
import pandas as pd

logged_in_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
logged_in_user = logged_in_user.split('@')[0]

user = re.sub(r'[^a-zA-Z]', '', logged_in_user).lower()

catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
library = dbutils.widgets.get("library_source")
org = dbutils.widgets.get("git_org")
branch_or_commit = dbutils.widgets.get("git_branch_or_commit")
checkpoint_base_path = dbutils.widgets.get("checkpoint_path")

CONFIG = {
    "owner": user,
    "catalog": catalog,
    "schema": schema,
    "user": user,
    "product_id": f"se_{user}_streaming_product",
    "in_memory_source": f"se_{user}_streaming_source",
    "rules_table": f"{catalog}.{schema}.se_{user}_streaming_rules",
    "stats_table": f"{catalog}.{schema}.se_{user}_streaming_stats",
    "target_table": f"{catalog}.{schema}.se_{user}_streaming_target",
    "checkpoint_path": checkpoint_base_path if checkpoint_base_path else f"/tmp/checkpoints/se_{user}_streaming",
    "library": library,
    "org": org,
    "branch_or_commit": branch_or_commit
}

config_df = pd.DataFrame(list(CONFIG.items()), columns=['Key', 'Value'])
display(config_df)


### Install Required Libraries
* Spark Expectations
* Jinja2 (required for using custom templates)


In [None]:
if CONFIG["library"] == "pypi":
    print("-----INSTALLING SPARK-EXPECTATIONS from PyPI")
    %pip install spark-expectations
elif CONFIG["library"] == "git":
    print(f"-----INSTALLING SPARK-EXPECTATIONS from Git Org/User {CONFIG['org']}, Branch/Commit {CONFIG['branch_or_commit']}")
    giturl = f"git+https://github.com/{CONFIG['org']}/spark-expectations.git@{CONFIG['branch_or_commit']}"
    %pip install --force-reinstall {giturl}

print("-----INSTALLING Jinja2 template library")
%pip install jinja2

%restart_python


In [None]:
# Display Spark Expectations installed version
from importlib.metadata import version
print(f"---- Current SparkExpectation Version: {version('spark-expectations')}")


### Cleanup
Removing previously created user prefixed tables and checkpoints


In [None]:
db_name = f"{CONFIG['catalog']}.{CONFIG['schema']}"
pattern = f"se_{CONFIG['user']}_streaming*"

# Set the current catalog
spark.sql(f"USE CATALOG {CONFIG['catalog']}")

# Drop tables matching pattern
tables_df = spark.sql(f"SHOW TABLES IN {db_name} LIKE '{pattern}'")
tables_to_drop = [row for row in tables_df.collect() if not row["isTemporary"]]

if tables_to_drop:
    print(f"Found {len(tables_to_drop)} tables to drop.")
    for row in tables_to_drop:
        table_name = row["tableName"]
        spark.sql(f"DROP TABLE IF EXISTS {db_name}.{table_name}")
        print(f"Dropped table: {db_name}.{table_name}")
else:
    print("----- No tables to drop")

# Drop views matching pattern
views_df = spark.sql(f"SHOW VIEWS in {db_name} LIKE '{pattern}'")
views_to_drop = views_df.collect()

if views_to_drop:
    print(f"Found {len(views_to_drop)} views to drop.")
    for row in views_to_drop:
        view_name = row["viewName"]
        spark.sql(f"DROP VIEW IF EXISTS {view_name}")
        print(f"Dropped view: {view_name}")
else:
    print("----- No views to drop")

# Clean up checkpoint directory
print(f"\nCleaning up checkpoint directory: {CONFIG['checkpoint_path']}")
dbutils.fs.rm(CONFIG['checkpoint_path'], True)
print("Checkpoint directory cleaned")


## Streaming Data Quality with Spark Expectations

### Steps:
1. Create a streaming source (using `rate` source for demo)
2. Define data quality rules
3. Configure Spark Expectations with streaming options
4. Apply DQ checks on streaming DataFrame
5. Monitor and manage streaming queries


### 1. Create Streaming Source

For this demo, we'll use Spark's built-in `rate` source which generates streaming data.
In production, you would typically read from Kafka, Kinesis, or other streaming sources.


In [None]:
from pyspark.sql.functions import col, expr, when, lit

# Create a streaming DataFrame using rate source
# This generates rows with columns: timestamp, value
streaming_source = (
    spark.readStream
    .format("rate")
    .option("rowsPerSecond", "5")  # Generate 5 rows per second
    .option("numPartitions", "2")
    .load()
)

# Transform the streaming data to add meaningful columns for DQ checks
streaming_df = (
    streaming_source
    .withColumn("id", col("value"))
    .withColumn("age", (col("value") % 50) + 10)  # Age between 10-59
    .withColumn(
        "email",
        when(col("value") % 10 == 0, lit(None))  # Every 10th record has null email
        .otherwise(expr("concat('user', value, '@example.com')"))
    )
    .withColumn("name", expr("concat('User_', value)"))
    .select("id", "age", "email", "name", "timestamp")
)

print("‚úì Streaming source created")
print(f"  Is streaming: {streaming_df.isStreaming}")
print("  Schema:")
streaming_df.printSchema()


### 2. Define Data Quality Rules

Create rules to validate the streaming data


In [None]:
import pandas as pd

rules_data = [
    {
        "product_id": CONFIG["product_id"],
        "table_name": CONFIG["target_table"],
        "rule_type": "row_dq",
        "rule": "age_not_null",
        "column_name": "age",
        "expectation": "age IS NOT NULL",
        "action_if_failed": "ignore",  # For streaming, use 'ignore' to keep processing
        "tag": "completeness",
        "description": "Age must not be null",
        "enable_for_source_dq_validation": True,
        "enable_for_target_dq_validation": False,
        "is_active": True,
        "enable_error_drop_alert": False,
        "error_drop_threshold": 0,
        "priority": "medium",
    },
    {
        "product_id": CONFIG["product_id"],
        "table_name": CONFIG["target_table"],
        "rule_type": "row_dq",
        "rule": "age_range_valid",
        "column_name": "age",
        "expectation": "age BETWEEN 10 AND 100",
        "action_if_failed": "ignore",
        "tag": "validity",
        "description": "Age must be between 10 and 100",
        "enable_for_source_dq_validation": True,
        "enable_for_target_dq_validation": False,
        "is_active": True,
        "enable_error_drop_alert": False,
        "error_drop_threshold": 0,
        "priority": "high",
    },
    {
        "product_id": CONFIG["product_id"],
        "table_name": CONFIG["target_table"],
        "rule_type": "row_dq",
        "rule": "email_not_null",
        "column_name": "email",
        "expectation": "email IS NOT NULL",
        "action_if_failed": "ignore",
        "tag": "completeness",
        "description": "Email must not be null",
        "enable_for_source_dq_validation": True,
        "enable_for_target_dq_validation": False,
        "is_active": True,
        "enable_error_drop_alert": False,
        "error_drop_threshold": 0,
        "priority": "medium",
    },
    {
        "product_id": CONFIG["product_id"],
        "table_name": CONFIG["target_table"],
        "rule_type": "row_dq",
        "rule": "email_format_valid",
        "column_name": "email",
        "expectation": "email LIKE '%@%.%'",
        "action_if_failed": "ignore",
        "tag": "validity",
        "description": "Email must be in valid format",
        "enable_for_source_dq_validation": True,
        "enable_for_target_dq_validation": False,
        "is_active": True,
        "enable_error_drop_alert": False,
        "error_drop_threshold": 0,
        "priority": "low",
    },
]

rules_df = spark.createDataFrame(pd.DataFrame(rules_data))
rules_df.write.mode("overwrite").saveAsTable(CONFIG['rules_table'])

print("‚úì Data quality rules created")
display(rules_df)


### 3. Configure Spark Expectations for Streaming

‚ö†Ô∏è **IMPORTANT**: For streaming workloads, you MUST configure:
1. `se_enable_streaming` = False (this is for stats streaming to Kafka, not data streaming)
2. Checkpoint location in the target table writer configuration
3. Output mode (append/update/complete)
4. Processing trigger (how often to process micro-batches)


In [None]:
from spark_expectations.core import load_configurations
from spark_expectations.config.user_config import Constants as user_config
from spark_expectations.core.expectations import (
    SparkExpectations,
    WrappedDataFrameWriter,
)

# Initialize default Spark Expectations configuration
load_configurations(spark)

# Configure writer for TARGET and ERROR tables (STREAMING)
# This writer will be used for the target table which will be streaming
target_writer_streaming_config = {
    "outputMode": "append",  # Use append for streaming writes
    "format": "delta",
    "queryName": f"se_{CONFIG['user']}_streaming_target_query",
    "trigger": {"processingTime": "10 seconds"},  # Process every 10 seconds
    "options": {
        # üî• CRITICAL: Always set checkpointLocation for production streaming
        "checkpointLocation": f"{CONFIG['checkpoint_path']}/target",
        "maxFilesPerTrigger": "100"
    }
}

# Create writer with streaming configuration
target_writer = WrappedDataFrameWriter().mode("append").format("delta")

# Configure writer for STATS table (BATCH)
# Stats are written in batch mode, even when processing streaming data
stats_writer = WrappedDataFrameWriter().mode("append").format("delta")

# Streaming stats options (for Kafka - disabled in this example)
stats_streaming_config_dict = {
    user_config.se_enable_streaming: False  # Disable stats streaming to Kafka
}

# Initialize Spark Expectations
se = SparkExpectations(
    product_id=CONFIG["product_id"],
    rules_df=rules_df,
    stats_table=CONFIG["stats_table"],
    stats_table_writer=stats_writer,
    target_and_error_table_writer=target_writer,
    stats_streaming_options=stats_streaming_config_dict,
)

print("‚úì Spark Expectations configured for streaming")
print(f"  Checkpoint location: {CONFIG['checkpoint_path']}")
print(f"  Output mode: {target_writer_streaming_config['outputMode']}")
print(f"  Processing trigger: {target_writer_streaming_config['trigger']}")


### 4. Apply Data Quality Checks to Streaming Data

The `@se.with_expectations` decorator automatically detects streaming DataFrames and uses the appropriate write method.

**Key Points:**
- The decorator will use `writeStream` instead of `write` when it detects a streaming DataFrame
- The target table writer configuration (with checkpoint location) will be used
- The streaming query will be started and managed automatically


In [None]:
from pyspark.sql import DataFrame

# User configuration for notifications (can be extended with Slack, email, etc.)
notification_conf = {}

@se.with_expectations(
    target_table=CONFIG["target_table"],
    write_to_table=True,
    write_to_temp_table=False,  # Not needed for streaming
    user_conf=notification_conf,
    target_and_error_table_writer=target_writer,
    target_table_writer_config=target_writer_streaming_config,  # üî• Pass streaming config
)
def process_streaming_data():
    """
    This function returns the streaming DataFrame.
    Spark Expectations will:
    1. Apply all data quality rules
    2. Write validated data to target table using streaming write
    3. Write error records to error table
    4. Write stats to stats table
    """
    return streaming_df

# Start the streaming query with data quality checks
print("üöÄ Starting streaming query with Spark Expectations...")
streaming_query = process_streaming_data()

print("\n‚úì Streaming query started successfully!")
print(f"  Query ID: {streaming_query.id if streaming_query else 'N/A'}")
print(f"  Query Name: {streaming_query.name if streaming_query else 'N/A'}")
print(f"  Is Active: {streaming_query.isActive if streaming_query else False}")


### 5. Monitor Streaming Query

Check the status and progress of the streaming query


In [None]:
import time

# Let the stream run for a bit to process some data
print("‚è≥ Letting stream run for 30 seconds to process data...")
time.sleep(30)

if streaming_query and streaming_query.isActive:
    # Get streaming query progress
    progress = streaming_query.lastProgress
    
    if progress:
        print("\nüìä Streaming Query Progress:")
        print(f"  Batch ID: {progress.get('batchId', 'N/A')}")
        print(f"  Input Rows/Second: {progress.get('inputRowsPerSecond', 0)}")
        print(f"  Processed Rows/Second: {progress.get('processedRowsPerSecond', 0)}")
        print(f"  Batch Duration: {progress.get('batchDuration', 0)} ms")
        print(f"  Timestamp: {progress.get('timestamp', 'N/A')}")
        
        # Show sources
        sources = progress.get('sources', [])
        if sources:
            print(f"\n  Sources:")
            for source in sources:
                print(f"    - Description: {source.get('description', 'N/A')}")
                print(f"      Input Rows: {source.get('numInputRows', 0)}")
                print(f"      Processing Rate: {source.get('processedRowsPerSecond', 0):.2f} rows/sec")
    else:
        print("‚ö†Ô∏è No progress information available yet. Stream may be initializing.")
else:
    print("‚ùå Streaming query is not active")


### 6. View Results

Check the target, error, and stats tables to see the results of data quality checks


In [None]:
# View records in target table
print("üìã Target Table (validated records):")
target_df = spark.sql(f"SELECT * FROM {CONFIG['target_table']} ORDER BY id DESC LIMIT 20")
display(target_df)

# Count records
total_count = spark.sql(f"SELECT COUNT(*) as count FROM {CONFIG['target_table']}").collect()[0]['count']
print(f"\nTotal records in target table: {total_count}")


In [None]:
# View error records
error_table = f"{CONFIG['target_table']}_error"
print("üö® Error Table (failed validation records):")

try:
    error_df = spark.sql(f"SELECT * FROM {error_table} ORDER BY id DESC LIMIT 20")
    error_count = spark.sql(f"SELECT COUNT(*) as count FROM {error_table}").collect()[0]['count']
    
    print(f"Total error records: {error_count}")
    if error_count > 0:
        display(error_df)
    else:
        print("No error records found (all records passed validation)")
except Exception as e:
    print(f"Error table may not exist yet or has no records: {e}")


### 7. View Data Quality Results in Detail

Analyze which rules passed/failed


In [None]:
# Get latest stats
latest_stats = spark.sql(f"""
    SELECT 
        product_id,
        table_name,
        input_count,
        error_count,
        output_count,
        ROUND(success_percentage, 2) as success_pct,
        ROUND(error_percentage, 2) as error_pct,
        row_dq_res_summary,
        dq_status.row_dq as row_dq_status,
        meta_dq_run_date_time
    FROM {CONFIG['stats_table']}
    ORDER BY meta_dq_run_date_time DESC
    LIMIT 5
""")

print("üìà Latest Data Quality Summary:")
display(latest_stats)

# Show row DQ summary
print("\nüîç Row-level DQ Results Summary:")
row_dq_summary = spark.sql(f"""
    SELECT 
        explode(row_dq_res_summary) as summary_item,
        meta_dq_run_date_time
    FROM {CONFIG['stats_table']}
    ORDER BY meta_dq_run_date_time DESC
    LIMIT 1
""")

if row_dq_summary.count() > 0:
    expanded_summary = row_dq_summary.select(
        col("summary_item.rule").alias("rule"),
        col("summary_item.column_name").alias("column"),
        col("summary_item.description").alias("description"),
        col("summary_item.failed_row_count").alias("failed_count"),
        col("summary_item.action_if_failed").alias("action"),
        col("summary_item.tag").alias("tag")
    )
    display(expanded_summary)


### 8. Monitor Active Streaming Queries

View all active streaming queries in the session


In [None]:
# List all active streaming queries
print("üîÑ Active Streaming Queries:")
active_streams = spark.streams.active

if active_streams:
    for stream in active_streams:
        print(f"\n  Query ID: {stream.id}")
        print(f"  Query Name: {stream.name}")
        print(f"  Status: {'Active' if stream.isActive else 'Inactive'}")
        print(f"  Recent Progress: {stream.recentProgress[-1] if stream.recentProgress else 'N/A'}")
else:
    print("  No active streaming queries")


### 9. Stop Streaming Query

Gracefully stop the streaming query when done using the SparkExpectationsWriter method


In [None]:
from spark_expectations.sinks.utils.writer import SparkExpectationsWriter

# Get the writer instance from Spark Expectations context
# Note: In production, you would typically have access to the writer instance
# For this demo, we'll use the streaming query directly

# Stop the specific streaming query gracefully using the main streaming_query variable
if streaming_query and streaming_query.isActive:
    print(f"üõë Stopping streaming query: {streaming_query.name}")
    print(f"  Query ID: {streaming_query.id}")
    
    # Stop with a 30 second timeout
    streaming_query.stop()
    print("‚úì Streaming query stopped successfully")
else:
    print("‚ö†Ô∏è Streaming query is not active or already stopped")

# Verify the query is stopped
if streaming_query:
    print(f"\nQuery Status: {'Active' if streaming_query.isActive else 'Stopped'}")


### Stop All Active Streaming Queries

Stop all active streaming queries in the current session


In [None]:
# Stop all active streaming queries
print("üõë Stopping all active streaming queries...")
active_streams = spark.streams.active

if active_streams:
    for stream in active_streams:
        if stream.isActive:
            print(f"\n  Stopping: {stream.name} (ID: {stream.id})")
            try:
                stream.stop()
                print(f"  ‚úì Stopped successfully")
            except Exception as e:
                print(f"  ‚ùå Error stopping stream: {e}")
    
    # Wait a moment for cleanup
    import time
    time.sleep(2)
    
    # Verify all streams are stopped
    remaining_streams = spark.streams.active
    print(f"\n‚úì All streaming queries stopped")
    print(f"  Active streaming queries remaining: {len(remaining_streams)}")
else:
    print("  No active streaming queries to stop")


## Key Takeaways for Streaming with Spark Expectations

### ‚úÖ Best Practices:

1. **Always Configure Checkpoint Locations**
   - Required for fault tolerance and exactly-once processing
   - Use dedicated, persistent storage (HDFS, S3, ADLS)
   - Never reuse checkpoint paths between different streaming jobs

2. **Choose Appropriate Actions**
   - Use `action_if_failed: ignore` for most streaming rules
   - Avoid `drop` action which can cause data loss
   - Failed records are logged in error table for analysis

3. **Monitor Your Streams**
   - Regularly check streaming query status
   - Monitor stats table for data quality trends
   - Set up alerts for error rate thresholds

4. **Configure Triggers Appropriately**
   - `processingTime`: For micro-batch processing (e.g., "10 seconds")
   - `once`: For one-time processing of available data
   - `continuous`: For low-latency streaming (experimental)

5. **Output Modes**
   - `append`: Most common for streaming (only new rows)
   - `complete`: Entire result table (requires aggregations)
   - `update`: Only updated rows (for aggregations)

6. **Graceful Shutdown**
   - Always stop streaming queries gracefully using `stop()` method
   - Use timeout parameter for controlled shutdown
   - Check for active queries before stopping

### üìö Additional Resources:
- [STREAMWRITE_GUIDE.md](../../STREAMWRITE_GUIDE.md) - Comprehensive streaming guide
- [Spark Expectations Documentation](https://engineering.nike.com/spark-expectations)
- Spark Structured Streaming Programming Guide

### üîë Key Differences from Batch Processing:

| Aspect | Batch | Streaming |
|--------|-------|-----------|
| Write Method | `df.write` | `df.writeStream` |
| Checkpoint | Not required | **Required** for production |
| Action on Failure | `drop`, `fail`, `ignore` | Prefer `ignore` |
| Query Management | N/A | Must monitor and stop queries |
| Stats Writing | Batch | Batch (even for streaming data) |
| Target Writing | Batch | Streaming with checkpoint |


In [None]:
# View stats table
print("üìä Stats Table (data quality metrics):")
stats_df = spark.sql(f"SELECT * FROM {CONFIG['stats_table']} ORDER BY meta_dq_run_date_time DESC LIMIT 10")
display(stats_df.select(
    "product_id",
    "table_name",
    "input_count",
    "error_count",
    "output_count",
    "success_percentage",
    "error_percentage",
    "dq_status",
    "meta_dq_run_id",
    "meta_dq_run_date_time"
))
