In [0]:
import dlt
import json
from pyspark.sql.types import *
from pyspark.sql import functions as f

In [0]:
def return_eventhub_config(
    eh_namespace: str,
    eh_name: str,
    akv_secret_scope: str
):
    EH_NAMESPACE                    = eh_namespace
    EH_NAME                         = eh_name
    EH_CONN_SHARED_ACCESS_KEY_NAME  = dbutils.secrets.get(scope=akv_secret_scope, key=f'eventhub-{eh_namespace}-access-key-name')
    EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope=akv_secret_scope, key=f'eventhub-{eh_namespace}-access-key')
    EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"

    KAFKA_OPTIONS = {
    "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
    "subscribe"                : EH_NAME,
    "kafka.sasl.mechanism"     : "PLAIN",
    "kafka.security.protocol"  : "SASL_SSL",
    "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";"
    }

    return KAFKA_OPTIONS

In [0]:
def create_bronze_table_from_eventhub(
    eh_namespace: str,
    eh_name: str,
    akv_secret_scope: str,
    output_table_name: str
):
    KAFKA_OPTIONS = return_eventhub_config(eh_namespace, eh_name, akv_secret_scope)
    bronze_layer_table = f"{spark.conf.get('ENV_BRONZE_LH', 'bronze')}.{output_table_name}"
    @dlt.table(name = bronze_layer_table)
    def table():
        df = spark.read \
            .format("kafka") \
            .options(**KAFKA_OPTIONS) \
            .load() \
            .withColumn("records", f.col("value").cast("string"))
        return df

In [0]:
import json
source_to_bronze_config_path = "../../Config/SourceToBronze.json"  # Path to your JSON file
with open(source_to_bronze_config_path, 'r') as file:
    config = json.load(file)

In [0]:
eventhub_sources = [item for item in config["sources"] if item["source_format"] == "eventhub"]

for source in eventhub_sources:
    create_bronze_table_from_eventhub(
        eh_namespace = source["eventhub_namespace"],
        eh_name = source["eventhub_name"],
        akv_secret_scope = spark.conf.get("ENV_AKV_SECRET_SCOPE", "akv-dlp"),
        output_table_name = source["table_name"]
    )


In [0]:
# df = spark.read \
#     .format("kafka") \
#     .options(**KAFKA_OPTIONS) \
#     .load() \
#     .withColumn("records", f.col("value").cast("string"))

In [0]:
# @dlt.table(
#   comment="CDC events from SQL Server",
#   partition_cols=["event_time"]
# )
# def sqlservercdc():

In [0]:
# import json
# source_to_bronze_config_path = "../../Config/SourceToBronze.json"  # Path to your JSON file
# with open(source_to_bronze_config_path, 'r') as file:
#     config = json.load(file)

In [0]:
# eventhub_sources = [item for item in config["sources"] if item["source_format"] == "eventhub"]

In [0]:
# %sql
# USE CATALOG dlp_dev

In [0]:
# %sql
# CREATE SCHEMA brz MANAGED LOCATION 'abfss://dataplatform@dataplatformpocaytc1028.dfs.core.windows.net/'

In [0]:
# %sql
# CREATE SCHEMA bronze MANAGED LOCATION 'abfss://dataplatform@dataplatformpocaytc1028.dfs.core.windows.net/'

In [0]:
# %sql
# CREATE SCHEMA silver MANAGED LOCATION 'abfss://dataplatform@dataplatformpocaytc1028.dfs.core.windows.net/'

In [0]:
# %sql
# DROP SCHEMA IF EXISTS silver CASCADE

In [0]:
# df = spark.read \
#     .format("kafka") \
#     .options(**KAFKA_OPTIONS) \
#     .load() \
#     .withColumn("records", f.col("value").cast("string"))

In [0]:
# def parse(df):
#   return (df
#     .withColumn("records", col("value").cast("string"))
#     .withColumn("parsed_records", from_json(col("records"), payload_schema))
#     .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
#     .withColumn("eh_enqueued_timestamp", expr("timestamp"))
#     .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
#     .withColumn("etl_processed_timestamp", col("current_timestamp"))
#     .withColumn("etl_rec_uuid", expr("uuid()"))
#     .drop("records", "value", "key")
#   )

In [0]:
# df.write.mode("overwrite").saveAsTable("silver.dummy")