-
Notifications
You must be signed in to change notification settings - Fork 160
/
Copy pathTumblingWindowDemo.py
66 lines (53 loc) · 2.41 KB
/
TumblingWindowDemo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import from_json, col, to_timestamp, window, expr, sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from lib.logger import Log4j
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Tumbling Window Demo") \
.master("local[3]") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()
logger = Log4j(spark)
stock_schema = StructType([
StructField("CreatedTime", StringType()),
StructField("Type", StringType()),
StructField("Amount", IntegerType()),
StructField("BrokerCode", StringType())
])
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "trades") \
.option("startingOffsets", "earliest") \
.load()
value_df = kafka_df.select(from_json(col("value").cast("string"), stock_schema).alias("value"))
trade_df = value_df.select("value.*") \
.withColumn("CreatedTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) \
.withColumn("Buy", expr("case when Type == 'BUY' then Amount else 0 end")) \
.withColumn("Sell", expr("case when Type == 'SELL' then Amount else 0 end"))
window_agg_df = trade_df \
.groupBy( # col("BrokerCode"),
window(col("CreatedTime"), "15 minute")) \
.agg(sum("Buy").alias("TotalBuy"),
sum("Sell").alias("TotalSell"))
output_df = window_agg_df.select("window.start", "window.end", "TotalBuy", "TotalSell")
'''
running_total_window = Window.orderBy("end") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
final_output_df = output_df \
.withColumn("RTotalBuy", sum("TotalBuy").over(running_total_window)) \
.withColumn("RTotalSell", sum("TotalSell").over(running_total_window)) \
.withColumn("NetValue", expr("RTotalBuy - RTotalSell"))
final_output_df.show(truncate=False)
'''
window_query = output_df.writeStream \
.format("console") \
.outputMode("update") \
.option("checkpointLocation", "chk-point-dir") \
.trigger(processingTime="1 minute") \
.start()
logger.info("Waiting for Query")
window_query.awaitTermination()