In [0]:
# Imports
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [0]:
# parameters

# Widget parameters
dbutils.widgets.text("s3_source_path", "s3://databricks-storage-4052354327981619/raw/transactions", "S3 Source Path")
dbutils.widgets.text("checkpoint_path", "s3://databricks-storage-4052354327981619/raw/checkpoints/bronze_transactions", "Checkpoint Path")
dbutils.widgets.text("catalog_name", "workspace_bank", "Catalog Name")
dbutils.widgets.text("schema_name", "bronze", "Schema Name")
dbutils.widgets.text("table_name", "credit_card_transactions", "Table Name")
dbutils.widgets.dropdown("trigger_mode", "processingTime", ["processingTime", "availableNow", "continuous"], "Trigger Mode")
dbutils.widgets.text("trigger_interval", "30 seconds", "Trigger Interval (for processingTime mode)")

# Get parameter values
s3_source_path = dbutils.widgets.get("s3_source_path").rstrip("/")
checkpoint_path = dbutils.widgets.get("checkpoint_path").rstrip("/")
catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")
table_name = dbutils.widgets.get("table_name")
trigger_mode = dbutils.widgets.get("trigger_mode")
trigger_interval = dbutils.widgets.get("trigger_interval")

# Full table name
full_table_name = f"{catalog_name}.{schema_name}.{table_name}"

print(f"üìÇ Source Path: {s3_source_path}")
print(f"üìç Checkpoint Path: {checkpoint_path}")
print(f"üìä Target Table: {full_table_name}")
print(f"‚è±Ô∏è  Trigger Mode: {trigger_mode}")
if trigger_mode == "processingTime":
    print(f"‚è±Ô∏è  Trigger Interval: {trigger_interval}")

In [0]:
# Define the schema for credit card transactions
transaction_schema = StructType([
    StructField("transaction_id", LongType(), False),
    StructField("account_id", LongType(), False),
    StructField("merchant_id", LongType(), False),
    StructField("merchant_category", StringType(), False),
    StructField("transaction_amount", DoubleType(), False),
    StructField("transaction_timestamp", TimestampType(), False),
    StructField("transaction_status", StringType(), False),
    StructField("export_id", LongType(), False),
    StructField("export_ts", TimestampType(), False),
    StructField("export_date", DateType(), False)
])

print("‚úÖ Transaction schema defined")

In [0]:
## Auto Loader Stream Configuration

# Read stream using Auto Loader
df_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")  # Handle schema evolution
    .option("cloudFiles.maxFilesPerTrigger", 100)  # Process up to 100 files per micro-batch
    .load(s3_source_path)
)

print("‚úÖ Auto Loader stream configured")

In [0]:
# Add bronze layer metadata
df_stream_with_metadata = (
    df_stream
    .withColumn("_ingestion_timestamp", F.current_timestamp())
    .withColumn("_source_file", F.input_file_name())
    .withColumn("_ingest_date", F.current_date())
)

print("‚úÖ Metadata columns added")

In [0]:
# Write Stream to Bronze Delta Table
# Configure trigger based on mode
def get_trigger():
    if trigger_mode == "availableNow":
        return {"availableNow": True}
    elif trigger_mode == "continuous":
        return {"continuous": trigger_interval}
    else:  # processingTime (default)
        return {"processingTime": trigger_interval}

# Write to Delta table
query = (
    df_stream_with_metadata
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .option("mergeSchema", "true")  # Allow schema evolution
    .trigger(**get_trigger())
    .toTable(full_table_name)
)

print(f"‚úÖ Streaming query started")
print(f"üìä Writing to table: {full_table_name}")
print(f"üîÑ Query ID: {query.id}")
print(f"üìç Checkpoint: {checkpoint_path}")

In [0]:
# Display stream status
display(query.status)

In [0]:
# Show recent progress
import time

# Wait a bit for initial progress
time.sleep(5)

recent_progress = query.recentProgress
if recent_progress:
    print(f"üìä Latest Progress:")
    latest = recent_progress[-1]
    print(f"  - Batch ID: {latest.get('batchId', 'N/A')}")
    print(f"  - Records Processed: {latest.get('numInputRows', 0)}")
    print(f"  - Processing Time: {latest.get('durationMs', {}).get('triggerExecution', 0)} ms")
    print(f"  - Source Files: {latest.get('sources', [{}])[0].get('numInputRows', 0)} rows")
else:
    print("‚è≥ Waiting for first batch to complete...")

In [0]:
# Verify Data in Bronze Table

# Query the bronze table to verify data
print(f"üìä Checking bronze table: {full_table_name}")

# Get record count
record_count = spark.table(full_table_name).count()
print(f"‚úÖ Total records in bronze table: {record_count:,}")

# Show sample records
print("\nüìã Sample records:")
display(spark.table(full_table_name).orderBy(F.desc("_ingestion_timestamp")).limit(10))

In [0]:
# Helper Functions

def get_stream_metrics():
    """Get current stream metrics"""
    return {
        "query_id": query.id,
        "is_active": query.isActive,
        "status": query.status,
        "recent_progress": query.recentProgress[-1] if query.recentProgress else None
    }

def show_checkpoint_info():
    """Display checkpoint location contents"""
    try:
        files = dbutils.fs.ls(checkpoint_path)
        print(f"üìÅ Checkpoint contents ({checkpoint_path}):")
        for f in files:
            print(f"  - {f.name} ({f.size} bytes)")
    except Exception as e:
        print(f"‚ö†Ô∏è  Could not read checkpoint: {str(e)}")

get_stream_metrics()
show_checkpoint_info()