# Kafka Multi-Client DLT Pipeline
This pipeline reads from a Kafka stream containing data from multiple clients and creates:
- A separate schema per client
- Bronze, Silver, and Gold tables for each client
- Error handling that continues processing on failures
- Alerting on failures


In [None]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from datetime import datetime
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


In [None]:
# Get configuration from pipeline settings
kafka_bootstrap_servers = spark.conf.get("kafka.bootstrap.servers")
kafka_topic = spark.conf.get("kafka.topic")
alert_email = spark.conf.get("alert.email", "")
catalog = spark.conf.get("catalog")
base_schema = spark.conf.get("base_schema")

print(f"Pipeline Configuration:")
print(f"  Kafka Servers: {kafka_bootstrap_servers}")
print(f"  Kafka Topic: {kafka_topic}")
print(f"  Catalog: {catalog}")
print(f"  Base Schema: {base_schema}")
print(f"  Note: Consumer group ID not used (not supported on serverless)")


In [None]:
# Expected schema for incoming Kafka messages
# Adjust this schema based on your actual Kafka message structure
message_schema = StructType([
    StructField("client_id", StringType(), False),
    StructField("client_name", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("data", MapType(StringType(), StringType()), True),
    StructField("amount", DoubleType(), True),
    StructField("status", StringType(), True),
    StructField("metadata", MapType(StringType(), StringType()), True)
])


In [None]:
# Helper function to send alerts
def send_alert(client_id, table_name, error_message):
    """
    Log errors and prepare for alerting.
    In production, this could integrate with external alerting systems.
    """
    alert_msg = f"""Pipeline Alert:
    Client: {client_id}
    Table: {table_name}
    Error: {error_message}
    Timestamp: {datetime.now().isoformat()}
    """
    logger.error(alert_msg)
    
    # Log to a central error tracking table
    try:
        error_df = spark.createDataFrame([
            (client_id, table_name, error_message, datetime.now())
        ], ["client_id", "table_name", "error_message", "error_timestamp"])
        
        error_df.write.mode("append").saveAsTable(f"{catalog}.{base_schema}.pipeline_errors")
    except Exception as e:
        logger.error(f"Failed to log error to table: {str(e)}")
    
    return alert_msg


## Bronze Layer - Raw Kafka Ingestion
Read raw data from Kafka stream


In [None]:
@dlt.table(
    name="kafka_raw_bronze",
    comment="Raw data from Kafka stream - all clients",
    table_properties={
        "quality": "bronze",
        "pipelines.autoOptimize.managed": "true"
    }
)
def kafka_raw_bronze():
    """
    Read raw data from Kafka and parse JSON messages.
    This is the entry point for all client data.
    """
    try:
        return (
            spark.readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
                .option("subscribe", kafka_topic)
                .option("startingOffsets", "earliest")  # Changed from 'latest' to read all messages
                .option("failOnDataLoss", "false")  # Continue on data loss
                # Note: kafka.consumer.group.id is NOT supported on serverless/shared clusters
                .load()
                .select(
                    col("key").cast("string").alias("kafka_key"),
                    col("value").cast("string").alias("kafka_value"),
                    col("topic").alias("kafka_topic"),
                    col("partition").alias("kafka_partition"),
                    col("offset").alias("kafka_offset"),
                    col("timestamp").alias("kafka_timestamp")
                )
                .withColumn("ingestion_timestamp", current_timestamp())
                .withColumn(
                    "parsed_value",
                    from_json(col("kafka_value"), message_schema)
                )
                .select(
                    "kafka_key",
                    "kafka_value",
                    "kafka_topic",
                    "kafka_partition",
                    "kafka_offset",
                    "kafka_timestamp",
                    "ingestion_timestamp",
                    col("parsed_value.client_id").alias("client_id"),
                    col("parsed_value.client_name").alias("client_name"),
                    col("parsed_value.timestamp").alias("event_timestamp"),
                    col("parsed_value.event_type").alias("event_type"),
                    col("parsed_value.data").alias("data"),
                    col("parsed_value.amount").alias("amount"),
                    col("parsed_value.status").alias("status"),
                    col("parsed_value.metadata").alias("metadata")
                )
        )
    except Exception as e:
        logger.error(f"Error in kafka_raw_bronze: {str(e)}")
        send_alert("ALL", "kafka_raw_bronze", str(e))
        raise


## Client List Table
Maintain a list of all unique clients seen in the stream


In [None]:
@dlt.table(
    name="client_list",
    comment="List of all unique clients in the system"
)
def client_list():
    """
    Track all unique clients that have been seen.
    This helps with schema management and monitoring.
    """
    return (
        dlt.read_stream("kafka_raw_bronze")
            .select(
                "client_id",
                "client_name"
            )
            .dropDuplicates(["client_id"])
            .withColumn("first_seen", current_timestamp())
    )


## Dynamic Per-Client Bronze Tables
Create bronze tables for each client with error handling


In [None]:
def create_client_bronze_table(client_id):
    """
    Create a bronze table for a specific client.
    Uses expect_or_drop to handle malformed data gracefully.
    """
    table_name = f"bronze_{client_id}"
    
    @dlt.table(
        name=table_name,
        comment=f"Bronze layer for client {client_id} - Raw validated data",
        table_properties={
            "quality": "bronze",
            "client_id": client_id
        }
    )
    @dlt.expect_or_drop("valid_client_id", f"client_id = '{client_id}'")
    @dlt.expect_or_drop("not_null_timestamp", "event_timestamp IS NOT NULL")
    def bronze_table():
        try:
            return (
                dlt.read_stream("kafka_raw_bronze")
                    .filter(col("client_id") == client_id)
                    .withColumn("bronze_processed_timestamp", current_timestamp())
            )
        except Exception as e:
            logger.error(f"Error creating bronze table for client {client_id}: {str(e)}")
            send_alert(client_id, table_name, str(e))
            # Return empty dataframe to allow pipeline to continue
            return spark.createDataFrame([], schema="client_id STRING")
    
    return bronze_table


## Dynamic Per-Client Silver Tables
Create silver tables with data quality checks and transformations


In [None]:
# def create_client_silver_table(client_id):
#     """
#     Create a silver table for a specific client.
#     Applies data quality rules and enrichment.
#     """
#     bronze_table_name = f"bronze_{client_id}"
#     silver_table_name = f"silver_{client_id}"
    
#     @dlt.table(
#         name=silver_table_name,
#         comment=f"Silver layer for client {client_id} - Cleaned and enriched data",
#         table_properties={
#             "quality": "silver",
#             "client_id": client_id
#         }
#     )
#     @dlt.expect_or_drop("valid_status", "status IN ('active', 'pending', 'completed', 'failed')")
#     @dlt.expect_or_drop("valid_amount", "amount >= 0")
#     @dlt.expect("has_event_type", "event_type IS NOT NULL")
#     def silver_table():
#         try:
#             return (
#                 dlt.read_stream(bronze_table_name)
#                     .select(
#                         "client_id",
#                         "client_name",
#                         "event_timestamp",
#                         "event_type",
#                         "amount",
#                         "status",
#                         "data",
#                         "metadata",
#                         "kafka_offset",
#                         "kafka_partition"
#                     )
#                     # Add enrichment columns
#                     .withColumn("silver_processed_timestamp", current_timestamp())
#                     .withColumn("year", year(col("event_timestamp")))
#                     .withColumn("month", month(col("event_timestamp")))
#                     .withColumn("day", dayofmonth(col("event_timestamp")))
#                     .withColumn(
#                         "amount_category",
#                         when(col("amount") < 100, "small")
#                         .when(col("amount") < 1000, "medium")
#                         .otherwise("large")
#                     )
#                     # Deduplicate based on kafka offset and partition
#                     .dropDuplicates(["kafka_offset", "kafka_partition"])
#             )
#         except Exception as e:
#             logger.error(f"Error creating silver table for client {client_id}: {str(e)}")
#             send_alert(client_id, silver_table_name, str(e))
#             # Return empty dataframe to allow pipeline to continue
#             return spark.createDataFrame([], schema="client_id STRING")
    
#     return silver_table


In [None]:
# def create_client_gold_table(client_id):
#     """
#     Create a gold table for a specific client.
#     Contains aggregated business metrics.
#     """
#     silver_table_name = f"silver_{client_id}"
#     gold_table_name = f"gold_{client_id}_summary"
    
#     @dlt.table(
#         name=gold_table_name,
#         comment=f"Gold layer for client {client_id} - Aggregated metrics",
#         table_properties={
#             "quality": "gold",
#             "client_id": client_id
#         }
#     )
#     def gold_table():
#         try:
#             return (
#                 dlt.read_stream(silver_table_name)
#                     .groupBy(
#                         "client_id",
#                         "client_name",
#                         window(col("event_timestamp"), "1 hour"),
#                         "event_type",
#                         "status"
#                     )
#                     .agg(
#                         count("*").alias("event_count"),
#                         sum("amount").alias("total_amount"),
#                         avg("amount").alias("avg_amount"),
#                         min("amount").alias("min_amount"),
#                         max("amount").alias("max_amount"),
#                         countDistinct("event_type").alias("unique_event_types")
#                     )
#                     .select(
#                         "client_id",
#                         "client_name",
#                         col("window.start").alias("window_start"),
#                         col("window.end").alias("window_end"),
#                         "event_type",
#                         "status",
#                         "event_count",
#                         "total_amount",
#                         "avg_amount",
#                         "min_amount",
#                         "max_amount",
#                         "unique_event_types"
#                     )
#                     .withColumn("gold_processed_timestamp", current_timestamp())
#             )
#         except Exception as e:
#             logger.error(f"Error creating gold table for client {client_id}: {str(e)}")
#             send_alert(client_id, gold_table_name, str(e))
#             # Return empty dataframe to allow pipeline to continue
#             return spark.createDataFrame([], schema="client_id STRING")
    
#     return gold_table


## Initialize Tables for Known Clients
This section dynamically creates tables for each client.
In production, you might want to:
1. Read the list of clients from a configuration table
2. Auto-discover clients from the stream
3. Use a scheduled job to add new clients as they appear


In [None]:
# # Example: Define your clients here or read from a configuration
# # For demonstration, we'll create tables for a few example clients
# # In production, you would dynamically discover these from the stream

# KNOWN_CLIENTS = [
#     "client_001",
#     "client_002",
#     "client_003",
#     "client_004",
#     "client_005"
# ]

# # Create bronze, silver, and gold tables for each known client
# for client_id in KNOWN_CLIENTS:
#     try:
#         # Create and register bronze table
#         bronze_func = create_client_bronze_table(client_id)
#         globals()[f"bronze_{client_id}"] = bronze_func()  # Call the returned function
        
#         # Create and register silver table
#         silver_func = create_client_silver_table(client_id)
#         globals()[f"silver_{client_id}"] = silver_func()  # Call the returned function
        
#         # Create and register gold table
#         gold_func = create_client_gold_table(client_id)
#         globals()[f"gold_{client_id}_summary"] = gold_func()  # Call the returned function
        
#         logger.info(f"Successfully initialized tables for client: {client_id}")
#     except Exception as e:
#         logger.error(f"Failed to initialize tables for client {client_id}: {str(e)}")
#         send_alert(client_id, "initialization", str(e))
#         # Continue with next client


## Error Tracking Table
Create a table to track all pipeline errors


In [None]:
# @dlt.table(
#     name="pipeline_errors",
#     comment="Centralized error tracking for the pipeline"
# )
# def pipeline_errors():
#     """
#     This table captures all errors that occur during pipeline execution.
#     Use this for monitoring and alerting.
#     """
#     # This table is populated by the send_alert function
#     # It's created as an empty table that will be populated via append mode
#     schema = StructType([
#         StructField("client_id", StringType(), False),
#         StructField("table_name", StringType(), False),
#         StructField("error_message", StringType(), False),
#         StructField("error_timestamp", TimestampType(), False)
#     ])
#     return spark.createDataFrame([], schema)


## Pipeline Monitoring View
Create a view to monitor pipeline health


In [None]:
# @dlt.view(
#     name="pipeline_health_monitor",
#     comment="Real-time view of pipeline health by client"
# )
# def pipeline_health_monitor():
#     """
#     Monitor the health of the pipeline by tracking:
#     - Recent error counts per client
#     - Last successful processing time
#     - Data volume metrics
#     """
#     return (
#         dlt.read("pipeline_errors")
#             .filter(col("error_timestamp") >= expr("current_timestamp() - INTERVAL 1 HOUR"))
#             .groupBy("client_id")
#             .agg(
#                 count("*").alias("error_count_last_hour"),
#                 max("error_timestamp").alias("last_error_time"),
#                 collect_list("error_message").alias("recent_errors")
#             )
#     )
