In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [None]:
spark = SparkSession.builder.appName('Kafka Stream').master('local[3]').config(
        'spark.streaming.stropGracefullyOnShutdown', 'true'
    ).config(
        'spark.sql.shuffle.partitions', 3
    ).getOrCreate()

In [4]:
kafka_df = spark.readStream.format('kafka').option(
    'kafka.bootstrap.servers', 'localhost:9092'
).option('subscribe','invoices').option('startingOffsets','earliest').load()

In [5]:
kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
schema = T.StructType([
        T.StructField("InvoiceNumber", T.StringType()),
        T.StructField("CreatedTime", T.LongType()),
        T.StructField("StoreID", T.StringType()),
        T.StructField("PosID", T.StringType()),
        T.StructField("CashierID", T.StringType()),
        T.StructField("CustomerType", T.StringType()),
        T.StructField("CustomerCardNo", T.StringType()),
        T.StructField("TotalAmount", T.DoubleType()),
        T.StructField("NumberOfItems", T.IntegerType()),
        T.StructField("PaymentMethod", T.StringType()),
        T.StructField("CGST", T.DoubleType()),
        T.StructField("SGST", T.DoubleType()),
        T.StructField("CESS", T.DoubleType()),
        T.StructField("DeliveryType", T.StringType()),
        T.StructField("DeliveryAddress", T.StructType([
            T.StructField("AddressLine", T.StringType()),
            T.StructField("City", T.StringType()),
            T.StructField("State", T.StringType()),
            T.StructField("PinCode", T.StringType()),
            T.StructField("ContactNumber", T.StringType())
        ])),
        T.StructField("InvoiceLineItems", T.ArrayType(T.StructType([
            T.StructField("ItemCode", T.StringType()),
            T.StructField("ItemDescription", T.StringType()),
            T.StructField("ItemPrice", T.DoubleType()),
            T.StructField("ItemQty", T.IntegerType()),
            T.StructField("TotalValue", T.DoubleType())
        ]))),
    ])

In [10]:
value_df = kafka_df.select(F.from_json(F.col('value').cast('string'), schema).alias('value')).selectExpr('value.*')
value_df.printSchema()

root
 |-- InvoiceNumber: string (nullable = true)
 |-- CreatedTime: long (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- PosID: string (nullable = true)
 |-- CashierID: string (nullable = true)
 |-- CustomerType: string (nullable = true)
 |-- CustomerCardNo: string (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- NumberOfItems: integer (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- CGST: double (nullable = true)
 |-- SGST: double (nullable = true)
 |-- CESS: double (nullable = true)
 |-- DeliveryType: string (nullable = true)
 |-- DeliveryAddress: struct (nullable = true)
 |    |-- AddressLine: string (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- State: string (nullable = true)
 |    |-- PinCode: string (nullable = true)
 |    |-- ContactNumber: string (nullable = true)
 |-- InvoiceLineItems: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ItemCode: string (nullable = true)

In [12]:
explode_df = value_df.selectExpr(
    'InvoiceNumber','CreatedTime','StoreID','PosID','CustomerType','PaymentMethod',
    'DeliveryType','DeliveryAddress.City','DeliveryAddress.State',
    'DeliveryAddress.PinCode','explode(InvoiceLineItems) as InvoiceLineItem'
)

In [13]:
explode_df.printSchema()

root
 |-- InvoiceNumber: string (nullable = true)
 |-- CreatedTime: long (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- PosID: string (nullable = true)
 |-- CustomerType: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- DeliveryType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- PinCode: string (nullable = true)
 |-- InvoiceLineItem: struct (nullable = true)
 |    |-- ItemCode: string (nullable = true)
 |    |-- ItemDescription: string (nullable = true)
 |    |-- ItemPrice: double (nullable = true)
 |    |-- ItemQty: integer (nullable = true)
 |    |-- TotalValue: double (nullable = true)



In [14]:
def struct_to_col(df, struct_field):
    df = df.alias('temp')
    cols = df.select(f'{struct_field}.*').columns
    for col in cols:
        df = df.withColumn(col, F.expr(f'{struct_field}.{col}'))
    return df.drop(struct_field)

In [15]:
flattened_df = struct_to_col(explode_df, 'InvoiceLineItem')
flattened_df.printSchema()

root
 |-- InvoiceNumber: string (nullable = true)
 |-- CreatedTime: long (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- PosID: string (nullable = true)
 |-- CustomerType: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- DeliveryType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- PinCode: string (nullable = true)
 |-- ItemCode: string (nullable = true)
 |-- ItemDescription: string (nullable = true)
 |-- ItemPrice: double (nullable = true)
 |-- ItemQty: integer (nullable = true)
 |-- TotalValue: double (nullable = true)



In [16]:
try:
    invoice_writer_query = flattened_df.writeStream.format(
        'json'
    ).queryName('Flattened Invoice Writer').outputMode('append').option(
        'path', 'output'
    ).option('checkpointLocation', 'chk-point-dir').trigger(
        processingTime = '1 minute'
    ).start()
    invoice_writer_query.awaitTermination()
except KeyboardInterrupt:
    print('Stopped')

23/08/07 08:14:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/08/07 08:14:50 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 