In [0]:
from pyspark.sql.functions import expr

class InvoiceBronze:

    def __init__(self):
        self.base_dir = "/FileStore/invoices"
        self.schema = """InvoiceNumber string,
                         CreatedTime bigint,
                         StoreID string,
                         PosID string,
                         CashierID string,
                         CustomerType string,
                         CustomerCardNo string,
                         TotalAmount double,
                         NumberOfItems int,
                         PaymentMethod string,
                         TaxableAmount double,
                         CGST double,
                         SGST double,
                         CESS double,
                         DeliveryType string,
                         DeliveryAddress struct <
                                                AddressLine string,
                                                City string,
                                                State string,
                                                PinCode string,
                                                ContactNumber string
                                                >,
                         InvoiceLineItems array <
                                                struct <
                                                        ItemCode string,
                                                        ItemDescription string,
                                                        ItemPrice double,
                                                        ItemQty int,
                                                        TotalValue double
                                                        >
                                                >"""
    
    def ingest_data(self):
        df = spark.readStream.format('json')\
                            .schema(self.schema)\
                            .option('cleanSource', 'archive')\
                            .option('sourceArchiveDir', f'{self.base_dir}/data/archive')\
                            .load(f'{self.base_dir}/data/bronze')
        return df

    def sink_data(self, df):
        return df.writeStream.format('delta')\
                                    .queryName('Bronze_Squery')\
                                    .outputMode('append')\
                                    .option('checkpointLocation', f'{self.base_dir}/checkpoint_location/bronze')\
                                    .option('path', f'{self.base_dir}/data/silver')\
                                    .toTable('invoice_silver_table')
    def launcher(self):
        print('Bronze ingestion stream started...', end='')
        df = self.ingest_data()
        BSquery = self.sink_data(df)
        print('Done.')
        return BSquery

        


In [0]:
class InvoiceSilver:

    def __init__(self):
        self.base_dir = "/FileStore/invoices"
    
    def ingest_data(self):
        df = spark.readStream.table('invoice_silver_table')
        return df
    
    def transformation(self, df):
        explode_df = df.selectExpr("InvoiceNumber", "CreatedTime", "StoreID", "PosID", "CashierID", "CustomerType", "CustomerCardNo", "TotalAmount", "NumberOfItems", "PaymentMethod", "TaxableAmount", "CGST", "SGST", "CESS","DeliveryType", "DeliveryAddress.AddressLine", "DeliveryAddress.City", "DeliveryAddress.State", "DeliveryAddress.PinCode", "DeliveryAddress.ContactNumber", "explode(InvoiceLineItems) as InvoiceLineItems")
        
        final_df = explode_df.selectExpr("InvoiceNumber", "CreatedTime", "StoreID", "PosID", "CashierID", "CustomerType", "CustomerCardNo", "TotalAmount", "NumberOfItems", "PaymentMethod", "TaxableAmount", "CGST", "SGST", "CESS","DeliveryType", "AddressLine", "City", "State", "PinCode", "ContactNumber", "InvoiceLineItems.ItemCode", "InvoiceLineItems.ItemDescription", "InvoiceLineItems.ItemPrice", "InvoiceLineItems.ItemQty", "InvoiceLineItems.TotalValue")
        return final_df

    def sink_data(self, df):
        return df.writeStream.format('delta')\
                                    .queryName('Silver_Squery')\
                                    .outputMode('append')\
                                    .option('checkpointLocation', f'{self.base_dir}/checkpoint_location/silver')\
                                    .option ('path', f'{self.base_dir}/data/gold')\
                                    .toTable('invoice_gold_table')
    def launcher(self):
        print('Silver Transformation stream started...', end='')
        df = self.ingest_data()
        Transdf = self.transformation(df)
        SSquery = self.sink_data(Transdf)
        print('Done.')
        return SSquery

        
