In [21]:
from pyspark.sql import SparkSession, DataFrame
import time

In [3]:
spark = SparkSession.builder \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.apache.hadoop:hadoop-client-runtime:3.3.4") \
        .getOrCreate()

In [44]:
class KafkaSink():
    def __init__(self) -> None:
        self.base_dir = "/root/data_engineer2/Apache-Spark-and-Databricks-Stream-Processing-in-Lakehouse-main/Kafka_Practice/data"
        self.BOOTSTRAP_SERVER = "pkc-619z3.us-east1.gcp.confluent.cloud:9092"
        self.JAAS_MODULE = "org.apache.kafka.common.security.plain.PlainLoginModule"
        self.CLUSTER_API_KEY = "WAMFMTSHUDP2D3I3"
        self.CLUSTER_API_SECRET = "VXnP+yvvBC1F5nmO3J5pYEJ3GEwQj/YFE8GLDdmi9WloeaDhjo4AvW9Eoi6AEg2Y"
        self.invoice_schema = """InvoiceNumber string, CreatedTime bigint, StoreID string, PosID string, CashierID string,
                        CustomerType string, CustomerCardNo string, TotalAmount double, NumberOfItems bigint, 
                        PaymentMethod string, TaxableAmount double, CGST double, SGST double, CESS double, 
                        DeliveryType string,
                        DeliveryAddress struct<AddressLine string, City string, ContactNumber string, PinCode string, 
                        State string>,
                        InvoiceLineItems array<struct<ItemCode string, ItemDescription string, 
                            ItemPrice double, ItemQty bigint, TotalValue double>>
                        """

    def read_invoice(self):
        return (spark.readStream
                .format("json")
                .schema(self.invoice_schema)
                .load(f"{self.base_dir}/source")
                )
    
    def get_kafka_msg(self, df: DataFrame, key: str):
        return (
            df.selectExpr(f"{key} AS key", "to_json(struct(*)) AS value")
        )

    def send_to_kafka(self, df:DataFrame):
        (df.writeStream
            .format("kafka")
            .option("kafka.bootstrap.servers", self.BOOTSTRAP_SERVER)
            .option("kafka.security.protocol", "SASL_SSL")
            .option("kafka.sasl.mechanism", "PLAIN")
            .option("kafka.sasl.jaas.config", f"{self.JAAS_MODULE} required username='{self.CLUSTER_API_KEY}' password='{self.CLUSTER_API_SECRET}';")
            .option("topic", "spark_sink")
            .option("checkpointLocation", f"{self.base_dir}/checkpoint/kafka_sink")
            .start()
        )

    def process(self):
        print("Starting kafka sink process...", end='')
        invoice_df = self.read_invoice()
        kafka_df = self.get_kafka_msg(df=invoice_df, key="StoreID")
        self.send_to_kafka(kafka_df)
        time.sleep(30)
        print("Done")

In [45]:
kafka_sink = KafkaSink()
kafka_sink.process()

Starting kafka sink process...+-------+--------------------+
|    key|               value|
+-------+--------------------+
|STR7188|{"InvoiceNumber":...|
|STR8513|{"InvoiceNumber":...|
|STR8513|{"InvoiceNumber":...|
|STR7188|{"InvoiceNumber":...|
|STR2629|{"InvoiceNumber":...|
|STR6347|{"InvoiceNumber":...|
|STR6347|{"InvoiceNumber":...|
|STR7188|{"InvoiceNumber":...|
|STR6162|{"InvoiceNumber":...|
|STR8513|{"InvoiceNumber":...|
|STR6347|{"InvoiceNumber":...|
|STR6347|{"InvoiceNumber":...|
|STR8513|{"InvoiceNumber":...|
|STR2629|{"InvoiceNumber":...|
|STR6347|{"InvoiceNumber":...|
|STR2629|{"InvoiceNumber":...|
|STR1955|{"InvoiceNumber":...|
|STR2629|{"InvoiceNumber":...|
|STR1955|{"InvoiceNumber":...|
|STR2629|{"InvoiceNumber":...|
+-------+--------------------+
only showing top 20 rows

