In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import expr, explode, col
from pyspark.sql.utils import AnalysisException

class invoiceStream:

    def __init__(self):
        # INPUT JSON and OUTPUTS (adjust paths/table names as you like)
        self.base_path = "/Volumes/spark-catalog/sandbox/sample_data/invoices"
        self.checkpoint = "/Volumes/spark-catalog/sandbox/sample_data/invoices_1/checkpoint"
        self.out_table = "invoice_flattened"

    # DDL schema exactly as in your screenshot
    def getSchema(self):
        return """
        InvoiceNumber string, CreatedTime bigint, StoreID string, PosID string, CashierID string,
        CustomerType string, CustomerCardNo string, TotalAmount double, NumberOfItems bigint,
        PaymentMethod string, TaxableAmount double, CGST double, SGST double, CESS double,
        DeliveryType string,
        DeliveryAddress struct<AddressLine string, City string, ContactNumber string, PinCode string,
        State string>,
        InvoiceLineItems array<struct<ItemCode string, ItemDescription string,
        ItemPrice double, ItemQty bigint, TotalValue double>>
        """

    # STREAM reader (tries first)
    def read_invoices_stream(self):
        return (
            spark.readStream
                .format("json")
                .schema(self.getSchema())
                .load(self.base_path)
        )

    # BATCH reader (fallback if streaming not supported)
    def read_invoices_batch(self):
        return (
            spark.read
                .format("json")
                .schema(self.getSchema())
                .load(self.base_path)
        )

    # explode array and keep useful top-level fields
    def explode_invoices(self, invoice_df):
        return (
            invoice_df.selectExpr(
                "InvoiceNumber", "CreatedTime", "StoreID", "PosID",
                "CashierID", "CustomerType", "PaymentMethod", "DeliveryType",
                "DeliveryAddress.City as City", "DeliveryAddress.State as State",
                "explode(InvoiceLineItems) as lineItem"
            )
        )

    # project fields from each line item
    def transform_invoices(self, exploded_df):
        return (
            exploded_df.select(
                "InvoiceNumber", "CreatedTime", "StoreID", "PosID",
                "CashierID", "CustomerType", "PaymentMethod", "DeliveryType",
                "City", "State",
                expr("lineItem.ItemCode").alias("ItemCode"),
                expr("lineItem.ItemDescription").alias("ItemDescription"),
                expr("lineItem.ItemPrice").alias("ItemPrice"),
                expr("lineItem.ItemQty").alias("ItemQty"),
                expr("lineItem.TotalValue").alias("TotalValue")
            )
        )

    # STREAM writer
    def write_invoices_stream(self, transformed_df):
        return (
            transformed_df.writeStream
                .format("delta")
                .option("checkpointLocation", self.checkpoint)
                .outputMode("append")
                .trigger(once=True)
                .toTable(self.out_table)
        )

    # BATCH writer (fallback)
    def write_invoices_batch(self, transformed_df):
        (
            transformed_df.write
                .format("delta")
                .mode("append")
                .saveAsTable(self.out_table)
        )

    def process_invoices(self):
        print("Invoice Processing:")
        try:
            # Try streaming path
            invoice_df = self.read_invoices_stream()
            exploded_df = self.explode_invoices(invoice_df)
            transformed_df = self.transform_invoices(exploded_df)
            print("Starting streaming query (trigger once)...")
            q = self.write_invoices_stream(transformed_df)
            q.awaitTermination()
            print("Streaming load complete.")
            return q
        except AnalysisException as e:
            # Fallback to batch if streaming not supported in your workspace
            print("Streaming not available; falling back to batch. Reason:", str(e))
            invoice_df = self.read_invoices_batch()
            exploded_df = self.explode_invoices(invoice_df)
            transformed_df = self.transform_invoices(exploded_df)
            print("Transformed Invoices",transformed_df)
            self.write_invoices_batch(transformed_df)
            print("Batch load complete.")
            return None

# Run it
invoiceStream().process_invoices()


Invoice Processing:
Starting streaming query (trigger once)...
Streaming load complete.


<pyspark.sql.connect.streaming.query.StreamingQuery at 0xff74146228a0>