In [0]:
from pyspark.sql.functions import *

In [0]:
#How to read the data in a batch mode instead of streaming
# This code will not accommodate new records this is not streaming
confluentBootstrapServers = 'pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092'
confluentApiKey = ''
confluentSecret = ''
confluentTopicName = 'retail-data-new'


In [0]:
#boiler plate code
orders_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "confluentBootstrapServers") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism","PLAIN") \
    .option("kafka.sasl.jaas.config","kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username= '{}' password='{}';".format(confluentApiKey, confluentSecret)) \
    .option("kafka.ssl.endpoint.identification.algorithm","https") \
    .option("subscribe", confluentTopicName) \
    # Run from very start time
    .option("startingTimestamp",1) \ 
    # Just to make sure all the micro batches are doing same amount of work
    .option("maxOffsetsPerTrigger",50) \
    .load()


In [0]:
display(orders_df)

In [0]:
converted_orders_df = orders_df.selectExpr("CAST(key as string) AS key","CAST(value as string) AS value"),"topic", "partition","offset","timestamp", "timestampType")

In [0]:
display(converted_orders_df)

In [0]:
orders_schema = "order_id long, customer_id long, customer_fname string, customer_lname string, city string, state string, pincode long, line_items array<struct<order_item_id: long, order_item_product_id: long, order_item_quantity: long, order_item_product_price: float, order_item_subtotal: float>>"


In [0]:
parsed_orders_df = converted_orders_df.select("key", from_json("value", orders_schema)alias("value"),"topic", "partition","offset","timestamp", "timestampType")

In [0]:
display(parsed_orders_df)

In [0]:
parsed_orders_df.createOrReplaceTempView("orders")

In [0]:
exploded_orders = spark.sql("""select key,value.order_id as order_id,
          value.customer_id as customer_id,
          value.customer_fname as customer_fname,
          value.customer_lname as customer_lname,
          value.city as city,
          value.state as state,
          value.pincode as pincode,
          explode(value.line_items) from orders
""")

In [0]:
display(exploded_orders)

In [0]:
exploded_orders.createOrReplaceTempView(exploded_orders)

In [0]:
flattened_orders = spark.sql(""" select key, order_id, customer_id,
          customer_fname,
          customer_lname,
          city,
          state,
          pincode,
          lines.order_item_id as item_id,
          lines.order_item_product as product_id,
          lines.order_item_quantity as quantity,
          lines.order_item_product_price as product_price,
          lines.order_item_subtotal as subtotal
          from exploded_orders
          """)

In [0]:
display(flattened_orders)

In [0]:
flattened_orders.writeStream
          .queryName("ingestionquery") \
          .format("delta") \
          .outputMode("append") \
          .option("checkpointLocation", "checkpointdir301") \
          .toTable("ordersnewtable302") 

In [0]:
spark.sql("select * from ordersnewtable302").show()