In [None]:
from pyspark.sql.functions import col, from_json, explode
from pyspark.sql.types import StructType, StringType, StructField
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("YourAppName") \
    .getOrCreate()

# Initialize DBUtils
dbutils = DBUtils(spark)

def setup_streaming_pipeline(spark, catalog_name, bronze_db, silver_db, bronze_table, silver_table, eventHubName, key_vault, connector, checkpoint_base_path, json_schema, explode_field=None):
    """
    Sets up a streaming pipeline to read from Azure Event Hub, store data in bronze Delta table, and process it to silver Delta table.
    
    Parameters:
        spark (SparkSession): The Spark session.
        catalog_name (str): Name of the catalog.
        bronze_db (str): Bronze database name.
        silver_db (str): Silver database name.
        bronze_table (str): Bronze table name to store raw data.
        silver_table (str): Silver table name to store processed data.
        eventHubName (str): Name of the Azure Event Hub.
        key_vault (str): Azure Key Vault name to retrieve secrets.
        connector (str): Secret name in Key Vault to get the Event Hub connection string.
        checkpoint_base_path (str): Base path for checkpointing.
        json_schema (StructType): JSON schema to apply to the incoming data.
        explode_field (str, optional): Field to explode if it's an array. Default is None.
    """
    
    # Set up widgets and paths
    notebook_name = DBUtils(spark).notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split("/")[-1].split(".")[0]
    bronze_checkpoint_path = f"{checkpoint_base_path}/{catalog_name}/{bronze_db}/checkpoints/{notebook_name}/"
    silver_checkpoint_path = f"{checkpoint_base_path}/{catalog_name}/{silver_db}/checkpoints/{notebook_name}/"

    # Catalog and schema setup
    try:
        spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name} MANAGED LOCATION 'abfss://streamingdata-demo@dataengineerdemoweather.dfs.core.windows.net/';")
    except Exception:
        print('Catalog might already exist.')

    for db in [bronze_db, silver_db]:
        try:
            spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{db};")
        except Exception:
            print(f"{db} schema might already exist.")

    for db, path in [(bronze_db, bronze_checkpoint_path), (silver_db, silver_checkpoint_path)]:
        try:
            spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{db}.checkpoints;")
        except Exception:
            print(f"{db} checkpoints might already exist.")
    
    # Event Hub configuration
    connectionString = dbutils.secrets.get(key_vault, connector)
    ehConf = {
        'eventhubs.connectionString': spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
        'eventhubs.eventHubName': eventHubName
    }

    # Reading stream from Azure Event Hub and writing to the Bronze Delta table
    df_bronze = spark.readStream \
        .format("eventhubs") \
        .options(**ehConf) \
        .load()

    df_bronze.display()  # For debugging and visualization
    
    df_bronze.writeStream \
        .option("checkpointLocation", bronze_checkpoint_path) \
        .outputMode("append") \
        .format("delta") \
        .toTable(f"{catalog_name}.{bronze_db}.{bronze_table}")

    # Processing Bronze table data and writing to Silver Delta table
    df_silver = spark.readStream \
        .format("delta") \
        .table(f"{catalog_name}.{bronze_db}.{bronze_table}") \
        .withColumn("body", col("body").cast("string")) \
        .withColumn("body", from_json(col("body"), json_schema))

    selected_columns = [col(f"body.{field.name}") for field in json_schema.fields]

    if explode_field:
        df_silver = df_silver.select(*selected_columns, col("enqueuedTime").alias('timestamp'), explode(col(f"body.{explode_field}")).alias("Input_array"))
        df_silver = df_silver.select(*selected_columns, 'timestamp', 
                                     *[col(f'Input_array.{field.name}').alias(field.name) for field in explode_field.schema.fields])
    else:
        df_silver = df_silver.select(*selected_columns, col("enqueuedTime").alias('timestamp'))
    
    df_silver.display()  # For debugging and visualization
    
    df_silver.writeStream \
        .option("checkpointLocation", silver_checkpoint_path) \
        .outputMode("append") \
        .format("delta") \
        .toTable(f"{catalog_name}.{silver_db}.{silver_table}")





In [None]:

weather_json_schema = StructType([StructField("temperature",StringType(),True),
                      StructField("time",StringType(),True),
                      StructField("skycondition", StringType(),True)])



In [None]:


# Example usage for weather data
setup_streaming_pipeline(
    spark=spark,
    catalog_name="streaming1",
    bronze_db="bronze",
    silver_db="silver",
    bronze_table="weather_table",
    silver_table="weather_table",
    eventHubName="streamingeventhubs",
    key_vault="testScope1",
    connector="testsecrettyler",
    checkpoint_base_path="/Volumes",
    json_schema=weather_json_schema
)


temperature,time,skycondition,timestamp
23°C,Wednesday 4:32 p.m.,Sunny,2024-10-23T20:33:38.632Z
