In [0]:
%sql
create schema if not exists data.gold

In [0]:
# from pyspark.sql.functions import col, explode, expr, to_timestamp, regexp_extract, from_xml, schema_of_xml, array
# from pyspark.sql import DataFrame

# def parse_xml_row(df_input: DataFrame) -> DataFrame:
#     """
#     Parse XML data using Spark's native XML parser to handle multiple Periods.
    
#     Args:
#         df_input: DataFrame with columns: filename, current_timestamp, xml_string
        
#     Returns:
#         DataFrame with one row per Point, with correct position_timestamp per Period
#     """
#     # Extract metadata
#     df_base = df_input.select(
#         col("filename"),
#         col("current_timestamp").alias("ingestion_timestamp"),
#         to_timestamp(regexp_extract(col("filename"), r"(\d{12})\.xml$", 1), "yyyyMMddHHmm").alias("file_timestamp"),
#         col("xml_string")
#     )
    
#     # Infer schema from a sample XML
#     sample_xml = df_base.select("xml_string").first()[0]
#     xml_schema = schema_of_xml(sample_xml)
    
#     # Parse XML into struct
#     df_parsed_xml = df_base.withColumn("parsed", from_xml(col("xml_string"), xml_schema))
    
#     # Check if Period is an array or struct by inspecting the schema
#     period_field = df_parsed_xml.select("parsed.TimeSeries.Period").schema.fields[0]
#     is_period_array = str(period_field.dataType).startswith("ArrayType")
    
#     # Extract the TimeSeries.Period and explode it
#     if is_period_array:
#         # Period is already an array
#         df_period_data = df_parsed_xml.select(
#             col("filename"),
#             col("ingestion_timestamp"),
#             col("file_timestamp"),
#             explode(col("parsed.TimeSeries.Period")).alias("period")
#         )
#     else:
#         # Period is a struct - wrap in array first
#         df_period_data = df_parsed_xml.select(
#             col("filename"),
#             col("ingestion_timestamp"),
#             col("file_timestamp"),
#             explode(array(col("parsed.TimeSeries.Period"))).alias("period")
#         )
    
#     # Extract interval_start and interval_end from each Period
#     df_period_with_intervals = df_period_data.select(
#         col("filename"),
#         col("ingestion_timestamp"),
#         col("file_timestamp"),
#         to_timestamp(col("period.timeInterval.start"), "yyyy-MM-dd'T'HH:mm'Z'").alias("interval_start"),
#         to_timestamp(col("period.timeInterval.end"), "yyyy-MM-dd'T'HH:mm'Z'").alias("interval_end"),
#         col("period.Point").alias("points_array")
#     )
    
#     # Explode the Point array to get one row per Point
#     df_points_exploded = df_period_with_intervals.select(
#         col("filename"),
#         col("ingestion_timestamp"),
#         col("file_timestamp"),
#         col("interval_start"),
#         col("interval_end"),
#         explode(col("points_array")).alias("point")
#     )
    
#     # Extract position and quantity from each Point
#     df_result = df_points_exploded.select(
#         col("filename"),
#         col("ingestion_timestamp"),
#         col("file_timestamp"),
#         col("interval_start"),
#         col("interval_end"),
#         col("point.position").cast("int").alias("position"),
#         col("point.quantity").cast("double").alias("quantity")
#     )
    
#     # Calculate position_timestamp: interval_start + (position - 1) * 15 minutes
#     df_final = df_result.withColumn(
#         "position_timestamp",
#         expr("interval_start + make_interval(0, 0, 0, 0, 0, (position - 1) * 15)")
#     )
    
#     return df_final

# # ===== CHANGE THIS VARIABLE TO TEST DIFFERENT ROWS =====
# row_number = 5  # Change this to 1, 2, 3, 4, etc.
# # ========================================================

# # Get the specified row from bronze table (sorted by filename for consistency)
# df_all_bronze = spark.table("data.bronze.bronze_A16").orderBy("filename")
# df_bronze_row = df_all_bronze.limit(row_number).tail(1)
# df_bronze_single = spark.createDataFrame(df_bronze_row, df_all_bronze.schema)

# # Parse the XML
# df_parsed = parse_xml_row(df_bronze_single)

# print(f"Testing row {row_number} from bronze_A16 (sorted by filename)")
# print(f"Filename: {df_bronze_single.first()['filename']}")
# print(f"Total Points extracted: {df_parsed.count()}")
# print("\nParsed data with position_timestamp:")
# df_parsed.display()

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS data.silver;

CREATE table if not EXISTS  data.silver.silver_A16 (
  filename STRING,
  ingestion_timestamp TIMESTAMP,
  file_timestamp TIMESTAMP,
  interval_start TIMESTAMP,
  interval_end TIMESTAMP,
  position INT,
  quantity DOUBLE,
  position_timestamp TIMESTAMP
)
USING DELTA;

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.functions import col, explode, expr, to_timestamp, regexp_extract, from_xml, schema_of_xml, array
from pyspark.sql import DataFrame

# Define parse_xml_row function (needed in same cell for foreachBatch)
def parse_xml_row(df_input: DataFrame) -> DataFrame:
    """
    Parse XML data using Spark's native XML parser to handle multiple Periods.
    """
    df_base = df_input.select(
        col("filename"),
        col("current_timestamp").alias("ingestion_timestamp"),
        to_timestamp(regexp_extract(col("filename"), r"(\d{12})\.xml$", 1), "yyyyMMddHHmm").alias("file_timestamp"),
        col("xml_string")
    )
    
    sample_xml = df_base.select("xml_string").first()[0]
    xml_schema = schema_of_xml(sample_xml)
    df_parsed_xml = df_base.withColumn("parsed", from_xml(col("xml_string"), xml_schema))
    
    period_field = df_parsed_xml.select("parsed.TimeSeries.Period").schema.fields[0]
    is_period_array = str(period_field.dataType).startswith("ArrayType")
    
    if is_period_array:
        df_period_data = df_parsed_xml.select(
            col("filename"), col("ingestion_timestamp"), col("file_timestamp"),
            explode(col("parsed.TimeSeries.Period")).alias("period")
        )
    else:
        df_period_data = df_parsed_xml.select(
            col("filename"), col("ingestion_timestamp"), col("file_timestamp"),
            explode(array(col("parsed.TimeSeries.Period"))).alias("period")
        )
    
    df_period_with_intervals = df_period_data.select(
        col("filename"), col("ingestion_timestamp"), col("file_timestamp"),
        to_timestamp(col("period.timeInterval.start"), "yyyy-MM-dd'T'HH:mm'Z'").alias("interval_start"),
        to_timestamp(col("period.timeInterval.end"), "yyyy-MM-dd'T'HH:mm'Z'").alias("interval_end"),
        col("period.Point").alias("points_array")
    )
    
    df_points_exploded = df_period_with_intervals.select(
        col("filename"), col("ingestion_timestamp"), col("file_timestamp"),
        col("interval_start"), col("interval_end"),
        explode(col("points_array")).alias("point")
    )
    
    df_result = df_points_exploded.select(
        col("filename"), col("ingestion_timestamp"), col("file_timestamp"),
        col("interval_start"), col("interval_end"),
        col("point.position").cast("int").alias("position"),
        col("point.quantity").cast("double").alias("quantity")
    )
    
    df_final = df_result.withColumn(
        "position_timestamp",
        expr("interval_start + make_interval(0, 0, 0, 0, 0, (position - 1) * 15)")
    )
    
    return df_final

# Define the process_batch function for foreachBatch
def process_batch(batch_df, batch_id):
    """
    Process a micro-batch of bronze data and MERGE into silver table.
    """
    print(f"\n{'='*60}")
    print(f"Processing batch {batch_id}")
    print(f"{'='*60}")
    
    row_count = batch_df.count()
    print(f"Batch contains {row_count} bronze rows")
    
    if row_count == 0:
        print("Empty batch, skipping...")
        return
    
    silver_table = DeltaTable.forName(spark, "data.silver.silver_A16")
    
    processed_count = 0
    for row in batch_df.toLocalIterator():
        processed_count += 1
        filename = row['filename']
        print(f"\n  [{processed_count}/{row_count}] Processing: {filename}")
        
        df_single_row = spark.createDataFrame([row], batch_df.schema)
        df_parsed = parse_xml_row(df_single_row)
        
        silver_table.alias("target").merge(
            df_parsed.alias("source"),
            "target.position_timestamp = source.position_timestamp"
        ).whenMatchedUpdate(
            condition="source.file_timestamp > target.file_timestamp",
            set={
                "filename": "source.filename",
                "ingestion_timestamp": "source.ingestion_timestamp",
                "file_timestamp": "source.file_timestamp",
                "interval_start": "source.interval_start",
                "interval_end": "source.interval_end",
                "position": "source.position",
                "quantity": "source.quantity",
                "position_timestamp": "source.position_timestamp"
            }
        ).whenNotMatchedInsertAll(
        ).execute()
        
        print(f"    ✓ MERGE completed")
    
    print(f"\n✓ Batch {batch_id} completed: {processed_count} files processed")

print("✓ parse_xml_row and process_batch functions defined")
print("Ready to use with foreachBatch in streaming")

In [0]:
# Step 2: Setup streaming from bronze to silver
print("Setting up structured streaming...")

# Define checkpoint location
checkpoint_location = "/Volumes/source/source_schema/source_volume/checkpoints/bronze_to_silver_A16"

# Read bronze table as a stream
df_bronze_stream = spark.readStream.table("data.bronze.bronze_A16")

print(f"✓ Streaming source configured")
print(f"Checkpoint location: {checkpoint_location}")

# Start the streaming query with foreachBatch
print("\nStarting streaming query...")
print("Mode: Batch (trigger availableNow - processes all available data once)\n")

query = df_bronze_stream.writeStream \
    .foreachBatch(process_batch) \
    .option("checkpointLocation", checkpoint_location) \
    .trigger(availableNow=True) \
    .start()

# Wait for the stream to finish processing
query.awaitTermination()

print("\n" + "="*60)
print("✓ Streaming query completed successfully!")
print("="*60)
print("\nAll unprocessed bronze rows have been processed to silver.")
print("Next run will only process new rows added after this checkpoint.")

In [0]:
# %sql
# select * from data.silver.silver_A16 order by position_timestamp