In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

class InvoiceBronzeLayer:

    def __init__(self):
        self.bootstrap_server = 'pkc-619z3.us-east1.gcp.confluent.cloud:9092'
        self.jaas_module = 'org.apache.kafka.common.security.plain.PlainLoginModule'
        self.api_key = 'OVPNJYWE56QVX4LD'
        self.api_secret = 'LSK7hPY5Kvqpj12hiaZuLyawzvN+eOXqDXtOgyRdBAcz9j6RxQdx0DlIlSXLU7Jb'
        self.topic = 'invoices'
        self.delivery_add_schema = (StructType([
            StructField("AddressLine", StringType(), True),
            StructField("City", StringType(), True),
            StructField("State", StringType(), True),
            StructField("PinCode", StringType(), True),
            StructField("ContactNumber", StringType(), True)
        ]))
        self.line_item_schema = (StructType([
            StructField("ItemCode", StringType(), True),
            StructField("ItemDescription", StringType(), True),
            StructField("ItemPrice", DoubleType(), True),
            StructField("ItemQty", IntegerType(), True),
            StructField("TotalValue", DoubleType(), True)
        ]))
        self.invoice_schema = (StructType([
            StructField("InvoiceNumber", StringType(), True),
            StructField("CreatedTime", LongType(), True),
            StructField("StoreID", StringType(), True),
            StructField("PosID", StringType(), True),
            StructField("CashierID", StringType(), True),
            StructField("CustomerType", StringType(), True),
            StructField("CustomerCardNo", IntegerType(), True),
            StructField("TotalAmount", DoubleType(), True),
            StructField("NumberOfItems", IntegerType(), True),
            StructField("PaymentMethod", StringType(), True),
            StructField("TaxableAmount", DoubleType(), True),
            StructField("CGST", DoubleType(), True),
            StructField("SGST", DoubleType(), True),
            StructField("CESS", DoubleType(), True),
            StructField("DeliveryType", StringType(), True),
            StructField("DeliveryAddress", self.delivery_add_schema),
            StructField("InvoiceLineItems", ArrayType(self.line_item_schema))]))
    
    def load_raw_data(self):
        raw_df = (spark
                  .readStream
                  .format("kafka")
                  .option("kafka.bootstrap.servers", self.bootstrap_server)
                  .option("kafka.security.protocol", "SASL_SSL")
                  .option("kafka.sasl.mechanism", "PLAIN")
                  .option("kafka.sasl.jaas.config", f"{self.jaas_module} required username='{self.api_key}' password='{self.api_secret}';")
                  .option("maxOffsetsPerTrigger",100)
                  #.option("startingTimestamp",1)
                  .option("subscribe", self.topic)
                  .load()
                  )
        
        processed_df = raw_df.select(raw_df.key.cast("string").alias("key"),
                      from_json(raw_df.value.cast("string"),self.invoice_schema).alias("value"),
                      "topic","timestamp")
        
        return processed_df
    
    # Steaming Query will pass each microbatch to this function
    # Delta table supports ACID transaction, so we can use merge statement to upsert data.

    # Must create dev.retail_store.invoices_raw table upfront. Since we are not using the toTable method, we need to create the table manually.
    def upsert(self, processed_df, batchId):
        processed_df.createOrReplaceTempView("processed_data")
        merge_statement = """
         Merge into dev.retail_store.invoices_raw t
         using processed_data s
         on s.value == t.value
         when matched then update set *
         when not matched then insert *
        """
        processed_df._jdf.sparkSession().sql(merge_statement)
    
    def persist_to_bronze(self, processed_df):
        bronze_layer_streaming_query = (processed_df
                                        .writeStream
                                        .queryName("bronze_layer_streaming_query")
                                        .format("delta")
                                        .outputMode("append")
                                        .option("checkPointLocation","/Volumes/dev/retail_store/checkpoint_invoices_raw")
                                        .foreachBatch(self.upsert)
                                        .start()
                                        )
        return bronze_layer_streaming_query
    
    def start_streaming(self):
        processed_df = self.load_raw_data()
        return self.persist_to_bronze(processed_df)



In [0]:
bronze_layer = InvoiceBronzeLayer()
bronze_layer_streaming_query = bronze_layer.start_streaming()

In [0]:
bronze_layer_streaming_query.stop()

In [0]:
%sql
describe history dev.retail_store.invoices_raw;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
15,2024-12-08T23:16:33Z,2533331465148935,silkykansal10@gmail.com,MERGE,"Map(predicate -> [""(value#107089 = value#123818)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(171749421940014),1129-144324-d526nqyd,13.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 17865, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 7, executionTimeMs -> 5090, materializeSourceTimeMs -> 871, numTargetRowsInserted -> 0, conflictDetectionTimeMs -> 86, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 1, scanTimeMs -> 1380, numTargetRowsUpdated -> 7, numOutputRows -> 7, numTargetDeletionVectorsRemoved -> 1, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 7, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 2797)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
14,2024-12-08T23:16:30Z,2533331465148935,silkykansal10@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(171749421940014),1129-144324-d526nqyd,13.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 36250, p25FileSize -> 16136, numDeletionVectorsRemoved -> 1, minFileSize -> 16136, numAddedFiles -> 1, maxFileSize -> 16136, p75FileSize -> 16136, p50FileSize -> 16136, numAddedBytes -> 16136)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
13,2024-12-08T23:16:25Z,2533331465148935,silkykansal10@gmail.com,MERGE,"Map(predicate -> [""(value#107089 = value#120993)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(171749421940014),1129-144324-d526nqyd,11.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 20097, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 15, executionTimeMs -> 4974, materializeSourceTimeMs -> 797, numTargetRowsInserted -> 0, conflictDetectionTimeMs -> 90, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 1, scanTimeMs -> 1372, numTargetRowsUpdated -> 15, numOutputRows -> 15, numTargetDeletionVectorsRemoved -> 1, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 15, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 2782)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
12,2024-12-08T23:16:22Z,2533331465148935,silkykansal10@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(171749421940014),1129-144324-d526nqyd,11.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 36134, p25FileSize -> 16153, numDeletionVectorsRemoved -> 1, minFileSize -> 16153, numAddedFiles -> 1, maxFileSize -> 16153, p75FileSize -> 16153, p50FileSize -> 16153, numAddedBytes -> 16153)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
11,2024-12-08T23:16:18Z,2533331465148935,silkykansal10@gmail.com,MERGE,"Map(predicate -> [""(value#107089 = value#118339)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(171749421940014),1129-144324-d526nqyd,9.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 19954, numTargetBytesRemoved -> 37115, numTargetDeletionVectorsAdded -> 2, numTargetRowsMatchedUpdated -> 14, executionTimeMs -> 4910, materializeSourceTimeMs -> 796, numTargetRowsInserted -> 0, conflictDetectionTimeMs -> 328, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 1312, numTargetRowsUpdated -> 14, numOutputRows -> 14, numTargetDeletionVectorsRemoved -> 2, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 14, numTargetFilesRemoved -> 4, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 2776)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
10,2024-12-08T23:16:15Z,2533331465148935,silkykansal10@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(171749421940014),1129-144324-d526nqyd,9.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 13, numRemovedBytes -> 121122, p25FileSize -> 16180, numDeletionVectorsRemoved -> 2, minFileSize -> 16180, numAddedFiles -> 1, maxFileSize -> 16180, p75FileSize -> 16180, p50FileSize -> 16180, numAddedBytes -> 16180)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
9,2024-12-08T23:16:10Z,2533331465148935,silkykansal10@gmail.com,MERGE,"Map(predicate -> [""(value#107089 = value#116634)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(171749421940014),1129-144324-d526nqyd,8.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 19725, numTargetBytesRemoved -> 18658, numTargetDeletionVectorsAdded -> 2, numTargetRowsMatchedUpdated -> 13, executionTimeMs -> 4694, materializeSourceTimeMs -> 711, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 1211, numTargetRowsUpdated -> 13, numOutputRows -> 13, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 13, numTargetFilesRemoved -> 2, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 2748)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
8,2024-12-08T23:16:03Z,2533331465148935,silkykansal10@gmail.com,MERGE,"Map(predicate -> [""(value#107089 = value#115547)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(171749421940014),1129-144324-d526nqyd,7.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 8270, numTargetBytesRemoved -> 8270, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 3211, materializeSourceTimeMs -> 748, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 928, numTargetRowsUpdated -> 1, numOutputRows -> 1, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 1, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1482)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
7,2024-12-08T23:15:05Z,2533331465148935,silkykansal10@gmail.com,MERGE,"Map(predicate -> [""(value#107089 = value#113875)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(171749421940014),1129-144324-d526nqyd,6.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 18733, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 2910, materializeSourceTimeMs -> 731, numTargetRowsInserted -> 8, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 937, numTargetRowsUpdated -> 0, numOutputRows -> 8, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 8, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 706)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
6,2024-12-08T23:14:59Z,2533331465148935,silkykansal10@gmail.com,MERGE,"Map(predicate -> [""(value#107089 = value#112751)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(171749421940014),1129-144324-d526nqyd,5.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 18495, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 0, executionTimeMs -> 2190, materializeSourceTimeMs -> 629, numTargetRowsInserted -> 7, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 802, numTargetRowsUpdated -> 0, numOutputRows -> 7, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 7, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 738)",,Databricks-Runtime/15.4.x-aarch64-scala2.12
