In [0]:
%sql
USE CATALOG melpark_azure;
CREATE SCHEMA IF NOT EXISTS bronze;

In [0]:
# 1. Get your Storage Key
storage_account = "melpark" 
storage_key = dbutils.secrets.get(scope="kv-scope-v1", key="storage-account-key")

# 2. Authorize Spark to read directly (Bypassing /mnt)
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", storage_key)

# 3. Define the Direct Paths (ABFSS)
# This format is: abfss://<container>@<account>.dfs.core.windows.net/<folder>
base_uri = f"abfss://bronze@{storage_account}.dfs.core.windows.net"

print(f"Direct connection configured for: {base_uri}")

Direct connection configured for: abfss://bronze@melpark.dfs.core.windows.net


In [0]:
from pyspark.sql.functions import current_timestamp

def ingest_static_table(table_name, folder_name):
    # Construct the direct path
    source_path = f"{base_uri}/{folder_name}"
    print(f"Ingesting {table_name} from {source_path}...")
    
    # 1. READ from Direct Path
    df = spark.read.format("parquet").load(source_path)
    
    # 2. Add Metadata
    df_final = df.withColumn("ingested_at", current_timestamp())
    
    # 3. WRITE to Unity Catalog
    target_table = f"melpark_azure.bronze.{table_name}"
    
    df_final.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable(target_table)
    
    print(f"Success! Managed table '{target_table}' updated.")



Ingesting parking_bays from abfss://bronze@melpark.dfs.core.windows.net/parking_bays...
Success! Managed table 'melpark_azure.bronze.parking_bays' updated.
Ingesting parking_meters from abfss://bronze@melpark.dfs.core.windows.net/parking_meters...
Success! Managed table 'melpark_azure.bronze.parking_meters' updated.


In [0]:
# Run ingestion
ingest_static_table("parking_bays", "parking_bays")
ingest_static_table("parking_meters", "parking_meters")
ingest_static_table("parking_restrictions","parking_restrictions")
ingest_static_table("parking_zones_plates", "parking_zones_plates")

Ingesting parking_restrictions from abfss://bronze@melpark.dfs.core.windows.net/parking_restrictions...
Success! Managed table 'melpark_azure.bronze.parking_restrictions' updated.
Ingesting parking_zones_plates from abfss://bronze@melpark.dfs.core.windows.net/parking_zones_plates...
Success! Managed table 'melpark_azure.bronze.parking_zones_plates' updated.


In [0]:
from pyspark.sql.functions import current_timestamp

# Configuration
source_path = f"{base_uri}/topics/parking.public.raw_parking_sensors"
checkpoint_path = f"{base_uri}/_checkpoints/parking_sensors_uc"
# FIX: Define a path for the schema master copy
schema_path = f"{base_uri}/_schemas/parking_sensors" 

table_name = "melpark_azure.bronze.parking_sensors"

print(f"Starting Auto Loader stream to {table_name}...")

# 1. Read Stream
df_stream = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.inferColumnTypes", "true")
    # FIX: Add this required option
    .option("cloudFiles.schemaLocation", schema_path) 
    .load(source_path)
)

# 2. Add Metadata
df_stream_final = df_stream.withColumn("ingested_at", current_timestamp())

# 3. Write Stream
query = (df_stream_final.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .option("mergeSchema", "true") 
    .table(table_name)
)

print("Stream is active!")

Starting Auto Loader stream to melpark_azure.bronze.parking_sensors...
Stream is active!
