-
Notifications
You must be signed in to change notification settings - Fork 160
/
Copy pathKafkaAvroSinkDemo.py
86 lines (75 loc) · 3.61 KB
/
KafkaAvroSinkDemo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import to_avro
from pyspark.sql.functions import from_json, col, expr, struct
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, LongType, DoubleType, IntegerType
from lib.logger import Log4j
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Multi Query Demo") \
.master("local[3]") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.getOrCreate()
logger = Log4j(spark)
schema = StructType([
StructField("InvoiceNumber", StringType()),
StructField("CreatedTime", LongType()),
StructField("StoreID", StringType()),
StructField("PosID", StringType()),
StructField("CashierID", StringType()),
StructField("CustomerType", StringType()),
StructField("CustomerCardNo", StringType()),
StructField("TotalAmount", DoubleType()),
StructField("NumberOfItems", IntegerType()),
StructField("PaymentMethod", StringType()),
StructField("CGST", DoubleType()),
StructField("SGST", DoubleType()),
StructField("CESS", DoubleType()),
StructField("DeliveryType", StringType()),
StructField("DeliveryAddress", StructType([
StructField("AddressLine", StringType()),
StructField("City", StringType()),
StructField("State", StringType()),
StructField("PinCode", StringType()),
StructField("ContactNumber", StringType())
])),
StructField("InvoiceLineItems", ArrayType(StructType([
StructField("ItemCode", StringType()),
StructField("ItemDescription", StringType()),
StructField("ItemPrice", DoubleType()),
StructField("ItemQty", IntegerType()),
StructField("TotalValue", DoubleType())
]))),
])
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "invoices") \
.option("startingOffsets", "earliest") \
.load()
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))
explode_df = value_df.selectExpr("value.InvoiceNumber", "value.CreatedTime", "value.StoreID",
"value.PosID", "value.CustomerType", "value.CustomerCardNo", "value.DeliveryType",
"value.DeliveryAddress.City",
"value.DeliveryAddress.State", "value.DeliveryAddress.PinCode",
"explode(value.InvoiceLineItems) as LineItem")
flattened_df = explode_df \
.withColumn("ItemCode", expr("LineItem.ItemCode")) \
.withColumn("ItemDescription", expr("LineItem.ItemDescription")) \
.withColumn("ItemPrice", expr("LineItem.ItemPrice")) \
.withColumn("ItemQty", expr("LineItem.ItemQty")) \
.withColumn("TotalValue", expr("LineItem.TotalValue")) \
.drop("LineItem")
kafka_target_df = flattened_df.select(expr("InvoiceNumber as key"),
to_avro(struct("*")).alias("value"))
invoice_writer_query = kafka_target_df \
.writeStream \
.queryName("Flattened Invoice Writer") \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "invoice-items") \
.outputMode("append") \
.option("checkpointLocation", "chk-point-dir") \
.start()
logger.info("Start Writer Query")
invoice_writer_query.awaitTermination()