## Read Storage Logs from EventHub in Azure and persist to a table

In [0]:
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    LongType,
    TimestampType,
    MapType,
    ArrayType,
)
from pyspark.sql.functions import col, from_json, unbase64, explode, variant_get

In [0]:
dbutils.widgets.text("catalog", "slog")
dbutils.widgets.text("schema", "default")
dbutils.widgets.text("raw_table", "azure_raw_storage_logs")

dbutils.widgets.text("secret-scope", "slog-scope")
dbutils.widgets.text("eh-namespace", "eh-namespace")
dbutils.widgets.text("eh-name", "slog")
dbutils.widgets.text("eh-conn-string", "eh-slog")

dbutils.widgets.text(
    "checkpoint",
    "abfss://slog@stsezsandbox07.dfs.core.windows.net/checkpoints/azure_raw_storage_log",
)

In [0]:
# Create streaming options
full_name_space = f"{dbutils.widgets.get('catalog')}.{dbutils.widgets.get('schema')}.{dbutils.widgets.get('raw_table')}"
checkpoint = f"{dbutils.widgets.get('checkpoint')}"

# Configuration (store secrets in Databricks Secrets)
secret_scope = dbutils.widgets.get("secret-scope")
eh_namespace = dbutils.secrets.get(secret_scope, dbutils.widgets.get("eh-namespace"))
eh_name = dbutils.widgets.get("eh-name")
eh_conn_string = dbutils.secrets.get(
    secret_scope, dbutils.widgets.get("eh-conn-string")
)

In [0]:
# Create catalog and schema
# This can be removed or changed based on deployment targets
spark.sql(f"CREATE CATALOG IF NOT EXISTS {dbutils.widgets.get('catalog')}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {dbutils.widgets.get('schema')}")

In [0]:
# Define the schema for Event Hub storage logs
eventhub_logs_schema = StructType([
    StructField("records", ArrayType(StructType([
        StructField("time", StringType(), True),
        StructField("resourceId", StringType(), True),
        StructField("category", StringType(), True),
        StructField("operationName", StringType(), True),
        StructField("operationVersion", StringType(), True),
        StructField("schemaVersion", StringType(), True),
        StructField("statusCode", LongType(), True),
        StructField("statusText", StringType(), True),
        StructField("durationMs", LongType(), True),
        StructField("callerIpAddress", StringType(), True),
        StructField("correlationId", StringType(), True),
        StructField("identity", StructType([
            StructField("type", StringType(), True),
            StructField("tokenHash", StringType(), True),
            StructField("authorization", ArrayType(StructType([
                StructField("action", StringType(), True),
                StructField("roleAssignmentId", StringType(), True),
                StructField("roleDefinitionId", StringType(), True),
                StructField("principals", ArrayType(StructType([
                    StructField("id", StringType(), True),
                    StructField("type", StringType(), True)
                ])), True),
                StructField("denyAssignmentId", StringType(), True),
                StructField("type", StringType(), True),
                StructField("result", StringType(), True),
                StructField("reason", StringType(), True)
            ])), True),
            StructField("requester", StructType([
                StructField("objectId", StringType(), True),
                StructField("tenantId", StringType(), True)
            ]), True)
        ]), True),
        StructField("location", StringType(), True),
        StructField("properties", MapType(StringType(), StringType()), True),
        StructField("uri", StringType(), True),
        StructField("protocol", StringType(), True),
        StructField("resourceType", StringType(), True)
    ])), True)
])

In [0]:
# Define the kafka options
kafka_options = {
    "kafka.bootstrap.servers": f"{eh_namespace}:9093",
    "subscribe": eh_name,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{eh_conn_string}";',
    "startingOffsets": "earliest",
    "failOnDataLoss": "false",
}

# Read the data as a stream from Event Hubs
df = spark.readStream.format("kafka").options(**kafka_options).load()

parsed_df = df.select(
    from_json(
        col("value").cast("string"), eventhub_logs_schema  # Convert binary to string
    ).alias("data")
).select("data.records")

# Explode the records array to get individual records
exploded_df = parsed_df.select(explode(col("records")).alias("record")).select(
    "record.*"
)

query = (
    exploded_df.writeStream.format("delta")
    .outputMode("append")  # or "update" for aggregation queries
    .option("checkpointLocation", checkpoint)
    .trigger(availableNow=True)  # Use AvailableNow trigger
    .toTable(full_name_space)
)

# Start the streaming query
query.awaitTermination()