|
1 | 1 | from pyspark.sql import SparkSession |
2 | | -from pyspark.sql.functions import from_json, col, to_timestamp, window |
| 2 | +from pyspark.sql.functions import from_json, col, to_timestamp, window, max |
3 | 3 | from pyspark.sql.types import StructType, StructField, StringType, DoubleType |
4 | 4 |
|
5 | 5 | from lib.logger import Log4j |
6 | 6 |
|
7 | 7 | if __name__ == "__main__": |
8 | 8 | spark = SparkSession \ |
9 | 9 | .builder \ |
10 | | - .appName("Sliding Window Demo") \ |
11 | 10 | .master("local[3]") \ |
| 11 | + .appName("Sliding Window Demo") \ |
12 | 12 | .config("spark.streaming.stopGracefullyOnShutdown", "true") \ |
| 13 | + .config("spark.sql.shuffle.partitions", 1) \ |
13 | 14 | .getOrCreate() |
14 | 15 |
|
15 | 16 | logger = Log4j(spark) |
16 | 17 |
|
17 | 18 | invoice_schema = StructType([ |
18 | | - StructField("InvoiceNumber", StringType()), |
19 | 19 | StructField("CreatedTime", StringType()), |
20 | | - StructField("StoreID", StringType()), |
21 | | - StructField("TotalAmount", DoubleType()) |
| 20 | + StructField("Reading", DoubleType()) |
22 | 21 | ]) |
23 | 22 |
|
24 | | - kafka_df = spark.readStream \ |
| 23 | + kafka_source_df = spark \ |
| 24 | + .readStream \ |
25 | 25 | .format("kafka") \ |
26 | 26 | .option("kafka.bootstrap.servers", "localhost:9092") \ |
27 | | - .option("subscribe", "invoices") \ |
| 27 | + .option("subscribe", "sensor") \ |
28 | 28 | .option("startingOffsets", "earliest") \ |
29 | 29 | .load() |
30 | 30 |
|
31 | | - value_df = kafka_df.select(from_json(col("value").cast("string"), invoice_schema).alias("value")) |
32 | | - |
33 | | - # value_df.printSchema() |
34 | | - # value_df.show(truncate=False) |
35 | | - |
36 | | - invoice_df = value_df.select("value.*") \ |
37 | | - .withColumn("CreatedTime", to_timestamp("CreatedTime", "yyyy-MM-dd HH:mm:ss")) |
| 31 | + value_df = kafka_source_df.select(col("key").cast("string").alias("SensorID"), |
| 32 | + from_json(col("value").cast("string"), invoice_schema).alias("value")) |
38 | 33 |
|
39 | | - count_df = invoice_df.groupBy("StoreID", |
40 | | - window("CreatedTime", "5 minute", "1 minute")).count() |
| 34 | + sensor_df = value_df.select("SensorID", "value.*") \ |
| 35 | + .withColumn("CreatedTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) |
41 | 36 |
|
42 | | - # count_df.printSchema() |
43 | | - # count_df.show(truncate=False) |
| 37 | + agg_df = sensor_df \ |
| 38 | + .withWatermark("CreatedTime", "30 minute") \ |
| 39 | + .groupBy(col("SensorID"), |
| 40 | + window(col("CreatedTime"), "15 minute", "5 minute")) \ |
| 41 | + .agg(max("Reading").alias("MaxReading")) |
44 | 42 |
|
45 | | - output_df = count_df.select("StoreID", "window.start", "window.end", "count") |
| 43 | + output_df = agg_df.select("SensorID", "window.start", "window.end", "MaxReading") |
46 | 44 |
|
47 | | - windowQuery = output_df.writeStream \ |
| 45 | + window_query = output_df.writeStream \ |
48 | 46 | .format("console") \ |
49 | 47 | .outputMode("update") \ |
50 | 48 | .option("checkpointLocation", "chk-point-dir") \ |
51 | 49 | .trigger(processingTime="1 minute") \ |
52 | 50 | .start() |
53 | 51 |
|
54 | | - logger.info("Counting Invoices") |
55 | | - windowQuery.awaitTermination() |
| 52 | + logger.info("Waiting for Query") |
| 53 | + window_query.awaitTermination() |
0 commit comments