In [0]:
kafka_raw = source_path+'kafka-raw/'
#Below schemas are used to impose on source data feed. Bronze tables need not be created in advance like silver tables, because Bronze tables are not used in MERGE operation, but used to capture stream data for respective topics. This operation only requires schema definitions to be imposed on source.  
SchemaImposedOn_record = 'key BINARY, value BINARY, topic STRING, partition LONG, offset LONG, timestamp long'
SchemaImposedOn_books = 'book_id STRING, title STRING, author STRING, price DOUBLE, updated TIMESTAMP'
SchemaImposedOn_customers = 'customer_id STRING, email STRING, first_name STRING, last_name STRING, gender STRING, city STRING, country_code STRING, row_status STRING, row_time timestamp'
SchemaImposedOn_orders = 'order_id STRING, order_timestamp timestamp, customer_id STRING, quantity BIGINT, total BIGINT, books ARRAY<STRUCT<book_id STRING, quantity BIGINT, subtotal BIGINT>>'

def generateReadStream():
    readQuery = (
        spark.readStream
        .format('cloudFiles')
        .option('cloudFiles.format', 'json')
        .schema(SchemaImposedOnSource)
        .load(kafka_raw)
        .select(
            F.col('topic')
            ,F.col('key').cast('string')
            ,(F.col('timestamp')/1000).cast('timestamp').alias('create_ts')
            ,F.input_file_name().alias('Source_file')
            ,F.current_timestamp().alias('insert_ts')
            ,F.col('value').cast('string')
            )
    )
    return readQuery

def writeData_bronze(readQuery,topic_name,bronze_tbl_name):
    write_bronze_Topic = (
        readQuery
        .filter(F.col('topic')==topic_name)
        .withColumn('v',F.from_json(F.col('value'),getValue_ForGlobalVar('RecordSchema_',Topic)))
        .select('key','create_ts','Source_file','insert_ts','v.*')
        .writeStream
        .option('checkpointLocation',getValue_ForGlobalVar('tblCheckpoint_',tblID))
        .option('mergeSchema',True)
        .trigger(availableNow=True)
        .table(tbl_name)
    )
    write_bronze_Topic.awaitTermination()
    
    print(
    'Source\n',kafka_raw 
    ,'\nSchema\n',getValue_ForGlobalVar('RecordSchema_',Topic)
    ,'\nDest\n',getValue_ForGlobalVar('tbl_',tblID),'\n'
    )

def process_bronze_books():
    readQuery = generateReadStream()
    writeData_bronze(readQuery,'books','bronze_books')
    
def process_bronze_customers():
    readQuery = generateReadStream()
    writeData_bronze(readQuery,'customers','bronze_customers')

def process_bronze_orders():
    readQuery = generateReadStream()
    writeData_bronze(readQuery,'orders','bronze_orders')
    