In [0]:
# 1. Setup Widgets for Parameterization
dbutils.widgets.text("source_path", "/Volumes/workspace/e-commerce_data/csv_files", "1. Source Data Path")
dbutils.widgets.text("target_database", "streaming_db", "2. Target Streaming Database")

# 2. Get Parameters
source_path = dbutils.widgets.get("source_path")
target_database = dbutils.widgets.get("target_database")

checkpoint_base_path = "/Volumes/workspace/e-commerce_data/csv_files/_checkpoints/ecommerce_pipeline"

# 3. Prepare Environment
dbutils.fs.mkdirs(checkpoint_base_path)

print("--- Configuration ---")
print(f"Source Path:          {source_path}")
print(f"Target Database:      {target_database}")
print(f"Checkpoint Base Path: {checkpoint_base_path}")
print("-----------------------")

# 4. Import Pipeline Functions
import sys
sys.path.append('/Workspace/Repos/3hmedgomaa2001@gmail.com/Ecommerce-Databricks-Solution')

from etl_pipeline.transformers import create_silver_layer, create_gold_layers
from etl_pipeline.writer import write_delta_table

# 5. Define the Micro-Batch Processing Function
def process_micro_batch(micro_batch_df, batch_id):
    print(f"\n--- Processing Micro-Batch ID: {batch_id} ---")

    try:
        # Load static bronze tables
        properties_df = spark.table(f"{target_database}.properties_bronze")
        category_df = spark.table(f"{target_database}.categories_bronze")

        # Combine micro-batch with other bronze tables
        bronze_dfs_for_batch = {
            "events": micro_batch_df,
            "properties": properties_df,
            "categories": category_df
        }

        # --- Silver Transformation ---
        silver_micro_batch_df = create_silver_layer(bronze_dfs_for_batch, skip_quality_check=True)

        # Write Silver micro-batch
        silver_table_name = f"{target_database}.events_enriched_silver_stream"
        silver_micro_batch_df.write.format("delta").mode("append").saveAsTable(silver_table_name)
        print(f"Appended micro-batch {batch_id} to {silver_table_name}")

        # --- Gold Transformation ---
        gold_dfs = create_gold_layers(silver_micro_batch_df)

        # Write each Gold table
        for name, df in gold_dfs.items():
            table_name = f"{target_database}.{name}_stream"
            df.write.format("delta").mode("append").saveAsTable(table_name)
            print(f"Updated Gold table: {table_name}")

    except Exception as e:
        print(f"ERROR processing micro-batch {batch_id}: {e}")

# 6. Define the Streaming Source
events_stream_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("cloudFiles.schemaLocation", f"{checkpoint_base_path}/schema/events") \
    .load(source_path)

# 7. Start the Stream
print(f"\nDEBUG: Initializing stream with checkpointLocation: '{checkpoint_base_path}'") 

stream_query = events_stream_df.writeStream \
    .foreachBatch(process_micro_batch) \
    .option("checkpointLocation", checkpoint_base_path) \
    .trigger(availableNow=True) \
    .start()

print("\n--- Streaming query started. Waiting for completion... ---")
stream_query.awaitTermination()
print("--- Streaming query has completed. ---")
