In [0]:
class invoiceStream():
    def __init__(self):
        self.base_dir="/FileStore/project1"

    def getSchema(self):
        return """InvoiceNumber string, CreatedTime bigint, StoreID string, PosID string, CashierID string,
                CustomerType string, CustomerCardNo string, TotalAmount double, NumberOfItems bigint, 
                PaymentMethod string, TaxableAmount double, CGST double, SGST double, CESS double, 
                DeliveryType string,
                InvoiceLineItems array<struct<ItemCode string, ItemDescription string, 
                    ItemPrice double, ItemQty bigint, TotalValue double>>
            """
    def readInvoices(self):
        return (spark.readStream\
            .format("json")\
            .schema(self.getSchema())
            .load(f"{self.base_dir}/data/invoices"))
    
    def explodeInvoices(self,invoiceDf):
        from pyspark.sql.functions import explode
        explodeDf=invoiceDf.selectExpr("InvoiceNumber","CreatedTime","StoreID","PosID","CustomerType","PaymentMethod","DeliveryType","explode(InvoiceLineItems) as LineItems")
        return explodeDf
    
    def flattenInvoice(self,explodeDf):
        from pyspark.sql.functions import expr
        flattenDf=explodeDf.withColumn("ItemCode",expr("LineItems.ItemCode"))\
                        .withColumn("ItemDescription",expr("LineItems.ItemDescription"))\
                        .withColumn("ItemPrice",expr("LineItems.ItemPrice"))\
                        .withColumn("ItemQty",expr("LineItems.ItemQty"))\
                        .withColumn("TotalValue",expr("LineItems.TotalValue"))\
                        .drop("LineItems")
        return flattenDf
    
    def appendInvoices(self,flattenDf):
        return flattenDf.writeStream.format("Delta")\
            .option("checkpointLocation",f"{self.base_dir}/checkpoint/invoices")\
            .outputMode("append")\
            .toTable("invoice_lineItem_table")
    
    def process(self):
        print("\t starting the processing of invoices .....",end='')
        invoiceDf=self.readInvoices()
        explodeDf=self.explodeInvoices(invoiceDf)
        flattenDf=self.flattenInvoice(explodeDf)
        sQuery=self.appendInvoices(flattenDf)
        print("\nDone..\n")
        return sQuery
    


    




    



