In [None]:
%run /Workspace/stream_process/setup.py

In [11]:
import logging
from pyspark.sql import SparkSession


In [None]:

# Configure logging once (top of script)
logging.basicConfig(filename='running_logs.log',
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


In [None]:

spark = SparkSession.builder.appName("InvoiceStreaming").getOrCreate()


In [None]:


class invoiceStream():
    def __init__(self):
        self.base_data_dir_invoice = base_data_dir_invoice
        logger.info("InvoiceStream initialized")

    def getSchema(self):
        logger.debug("Returning invoice schema")
        return """InvoiceNumber string, CreatedTime bigint, StoreID string, PosID string, CashierID string,
                CustomerType string, CustomerCardNo string, TotalAmount double, NumberOfItems bigint, 
                PaymentMethod string, TaxableAmount double, CGST double, SGST double, CESS double, 
                DeliveryType string,
                DeliveryAddress struct<AddressLine string, City string, ContactNumber string, PinCode string, 
                State string>,
                InvoiceLineItems array<struct<ItemCode string, ItemDescription string, 
                    ItemPrice double, ItemQty bigint, TotalValue double>>
            """
    #this is a jason file and is a nested file so we need to use the schema and DeliveryAddress use struct
    def readInvoices(self):
        logger.info("Reading streaming invoice data...")

        return (spark.readStream
                    .format("json")
                    .schema(self.getSchema())
                    .load(f"{self.base_data_dir_invoice}/invoices-*.json")
                )  

    def explodeInvoices(self, invoiceDF):
        logger.info("Exploding the invoice array...")
        return ( invoiceDF.selectExpr("InvoiceNumber", "CreatedTime", "StoreID", "PosID",
                                      "CustomerType", "PaymentMethod", "DeliveryType", "DeliveryAddress.City",
                                      "DeliveryAddress.State","DeliveryAddress.PinCode", 
                                      "explode(InvoiceLineItems) as LineItem")
                                    )   
        
    def flattenInvoices(self, explodedDF): 
        
        from pyspark.sql.functions import expr

        logger.info("flatten the nested data and create a new column")
        return( explodedDF.withColumn("ItemCode", expr("LineItem.ItemCode"))
                        .withColumn("ItemDescription", expr("LineItem.ItemDescription"))
                        .withColumn("ItemPrice", expr("LineItem.ItemPrice"))
                        .withColumn("ItemQty", expr("LineItem.ItemQty"))
                        .withColumn("TotalValue", expr("LineItem.TotalValue"))
                        .drop("LineItem") ## Finaly we will drop LineItem column as we have created new columns for each field in the nested data

                )
        
    def appendInvoices(self, flattenedDF):
        logger.info("Append invoice data...")
        return (flattenedDF.writeStream
                    .format("delta")
                    .option("checkpointLocation", f"{self.base_data_dir}/chekpoint/invoices")
                    .outputMode("append")
                    .toTable("invoice_line_items")
        )

    def process(self):
           print(f"Starting Invoice Processing Stream...", end='')
           invoicesDF = self.readInvoices()
           explodedDF = self.explodeInvoices(invoicesDF)
           resultDF = self.flattenInvoices(explodedDF)
           sQuery = self.appendInvoices(resultDF)
           print("Done\n")
           return sQuery    

In [15]:
test = invoiceStream()

test.process

<bound method invoiceStream.process of <__main__.invoiceStream object at 0x000001FE3FF72E10>>