# Delta Live Tables - Ingest Event Hub Messages from Google Analytics

This sample DLT notebook code is designed to be used in conjunction with the Python producer code of the example repo (https://github.com/dublindata/databricks-google-analytics-example). Note that you will have to provide secret values for your event hub name and event hub connection string for this example to work. All schemas are predefined for you, but if you change the fields that you query from the Analytics API, you will need to modify this schema to get the values to show up.

This notebook will create one Live table that represents the "raw" messages from event hub; each additional table builds off of it to show the acutal data in the payload.

In [0]:
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
# These are secret values, that should come from a secret scope. Change as appropriate.
EH_CONN_STR = dbutils.secrets.get(scope="scope", key="eh-conn-str")
EH_NAME = dbutils.secrets.get(scope="scope", key="eh-name")

# Kafka Consumer configuration
KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"oneenv-eventhub.servicebus.windows.net:9093",
  "subscribe"                : f"{EH_NAME}",
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "failOnDataLoss"           : "false"
}

# PAYLOAD SCHEMA
payload_ddl = "messageType INT, data STRING"
payload_ddl_type0 = "pageTitle STRING, screenPageViews STRING, asOf STRING"
payload_ddl_type1 = "eventName STRING, eventCount STRING, asOf STRING"
payload_ddl_type2 = "unifiedScreenName STRING, screenPageViews STRING, asOf STRING"

payload_schema = T._parse_datatype_string(payload_ddl)
payload_schema_type0 = T._parse_datatype_string(payload_ddl_type0)
payload_schema_type1 = T._parse_datatype_string(payload_ddl_type1)
payload_schema_type2 = T._parse_datatype_string(payload_ddl_type2)


# Basic record parsing and adding ETL audit columns
def parse(df,schema):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), schema))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw Messages From Event Hub",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  }
)

def messages_eventhub_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse, payload_schema)
  )

@dlt.create_table(
  comment="Page Views by Page Title",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  }    
)
def bronze_eh_page_count_by_page_name():
  df = spark.table("messages_eventhub_raw")
  return(
    dlt.read("messages_eventhub_raw")
      .filter(col('parsed_records.messageType') == 0)
      .withColumn("page_view_data", from_json(col("parsed_records.data"), payload_schema_type0))
      .select("page_view_data.pageTitle","page_view_data.screenPageViews","page_view_data.asOf")
  )


@dlt.create_table(
  comment="Event Counts",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  }    
)
def bronze_event_count():
  df = spark.table("messages_eventhub_raw")
  return(
    dlt.read("messages_eventhub_raw")
      .filter(col('parsed_records.messageType') == 1)
      .withColumn("event_data", from_json(col("parsed_records.data"), payload_schema_type1))
      .select("event_data.eventName","event_data.eventCount","event_data.asOf")
  )


@dlt.create_table(
  comment="Views By Page",
  table_properties={
    "quality": "silver",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  }    
)
def bronze_views_by_page():
  df = spark.table("messages_eventhub_raw")
  return(
    dlt.read("messages_eventhub_raw")
      .filter(col('parsed_records.messageType') == 2)
      .withColumn("page_views", from_json(col("parsed_records.data"), payload_schema_type2))
      .select("page_views.unifiedScreenName","page_views.screenPageViews","page_views.asOf")
  )