In [0]:
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration and authentication
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME_1                         = spark.conf.get("iot.ingestion.eh.name")
EH_NAME_2                         = spark.conf.get("iot.ingestion.eh.name2")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"


# Kafka Consumer configuration

KAFKA_OPTIONS_TAXI = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME_1,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

KAFKA_OPTIONS_WEATHER = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME_2,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}




# Basic record parsing and adding ETL audit columns
def parse_taxi(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_data", from_json(col("records"), None, {"schemaLocationKey": "taxi_schema"})
)  
  )

def parse_weather(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_data", from_json(col("records"), None, {"schemaLocationKey": "weather_schema"})
)    
  )

@dlt.create_table(
  comment="Raw Taxi Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_data IS NOT NULL")
def taxi_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS_TAXI)
    .load()
    .transform(parse_taxi)
  )

@dlt.create_table(
    comment="Raw Weather Events",
    table_properties={"quality": "bronze"}
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_data IS NOT NULL")
def weather_raw():
    return (
        spark.readStream
        .format("kafka")
        .options(**KAFKA_OPTIONS_WEATHER)
        .load()
        .transform(parse_weather)
    )

In [0]:
from pyspark.sql.functions import col

@dlt.table(
    comment="Silver table with flattened taxi_parsed struct"
)
def silver_trip_data():
    df = dlt.readStream("taxi_raw")
    return df.select(
        col("parsed_data._rescued_data").alias("rescued_data"),
        col("parsed_data.doLocationId").alias("doLocationId"),
        col("parsed_data.extra").alias("extra"),
        col("parsed_data.fareAmount").alias("fareAmount"),
        col("parsed_data.improvementSurcharge").alias("improvementSurcharge"),
        col("parsed_data.mtaTax").alias("mtaTax"),
        col("parsed_data.passengerCount").alias("passengerCount"),
        col("parsed_data.paymentType").alias("paymentType"),
        col("parsed_data.puLocationId").alias("puLocationId"),
        col("parsed_data.rateCodeId").alias("rateCodeId"),
        col("parsed_data.storeAndFwdFlag").alias("storeAndFwdFlag"),
        col("parsed_data.tipAmount").alias("tipAmount"),
        col("parsed_data.tollsAmount").alias("tollsAmount"),
        col("parsed_data.totalAmount").alias("totalAmount"),
        col("parsed_data.tpepDropoffDateTime").alias("tpepDropoffDateTime"),
        col("parsed_data.tpepPickupDateTime").alias("tpepPickupDateTime"),
        col("parsed_data.tripDistance").alias("tripDistance"),
        col("parsed_data.vendorID").alias("vendorID")
    )