Skip to content

Commit b5fc79d

Browse files
Initial Commit
1 parent 671b3c2 commit b5fc79d

File tree

9 files changed

+110
-0
lines changed

9 files changed

+110
-0
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql.functions import from_json, col, to_timestamp, window
3+
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
4+
5+
from lib.logger import Log4j
6+
7+
if __name__ == "__main__":
8+
spark = SparkSession \
9+
.builder \
10+
.appName("Sliding Window Demo") \
11+
.master("local[3]") \
12+
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
13+
.getOrCreate()
14+
15+
logger = Log4j(spark)
16+
17+
invoice_schema = StructType([
18+
StructField("InvoiceNumber", StringType()),
19+
StructField("CreatedTime", StringType()),
20+
StructField("StoreID", StringType()),
21+
StructField("TotalAmount", DoubleType())
22+
])
23+
24+
kafka_df = spark.readStream \
25+
.format("kafka") \
26+
.option("kafka.bootstrap.servers", "localhost:9092") \
27+
.option("subscribe", "invoices") \
28+
.option("startingOffsets", "earliest") \
29+
.load()
30+
31+
value_df = kafka_df.select(from_json(col("value").cast("string"), invoice_schema).alias("value"))
32+
33+
# value_df.printSchema()
34+
# value_df.show(truncate=False)
35+
36+
invoice_df = value_df.select("value.*") \
37+
.withColumn("CreatedTime", to_timestamp("CreatedTime", "yyyy-MM-dd HH:mm:ss"))
38+
39+
count_df = invoice_df.groupBy("StoreID",
40+
window("CreatedTime", "5 minute", "1 minute")).count()
41+
42+
# count_df.printSchema()
43+
# count_df.show(truncate=False)
44+
45+
output_df = count_df.select("StoreID", "window.start", "window.end", "count")
46+
47+
windowQuery = output_df.writeStream \
48+
.format("console") \
49+
.outputMode("update") \
50+
.option("checkpointLocation", "chk-point-dir") \
51+
.trigger(processingTime="1 minute") \
52+
.start()
53+
54+
logger.info("Counting Invoices")
55+
windowQuery.awaitTermination()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
STR1534:{"InvoiceNumber": 101,"CreatedTime": "2019-02-05 10:01:00","StoreID": "STR1534", "TotalAmount": 1920}
2+
STR1534:{"InvoiceNumber": 103,"CreatedTime": "2019-02-05 10:03:19","StoreID": "STR1534", "TotalAmount": 2400}
3+
STR1534:{"InvoiceNumber": 105,"CreatedTime": "2019-02-05 10:07:50","StoreID": "STR1534", "TotalAmount": 6375}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
%KAFKA_HOME%\bin\windows\zookeeper-server-start.bat %KAFKA_HOME%\config\zookeeper.properties
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
%KAFKA_HOME%\bin\windows\kafka-server-start.bat %KAFKA_HOME%\config\server-0.properties
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
%KAFKA_HOME%\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic invoices
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
%KAFKA_HOME%\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic invoices --property "parse.key=true" --property "key.separator=:"

09-SlidingWindowDemo/lib/__init__.py

Whitespace-only changes.

09-SlidingWindowDemo/lib/logger.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
class Log4j:
2+
def __init__(self, spark):
3+
log4j = spark._jvm.org.apache.log4j
4+
5+
root_class = "guru.learningjournal.spark.examples"
6+
conf = spark.sparkContext.getConf()
7+
app_name = conf.get("spark.app.name")
8+
9+
self.logger = log4j.LogManager.getLogger(root_class + "." + app_name)
10+
11+
def warn(self, message):
12+
self.logger.warn(message)
13+
14+
def info(self, message):
15+
self.logger.info(message)
16+
17+
def error(self, message):
18+
self.logger.error(message)
19+
20+
def debug(self, message):
21+
self.logger.debug(message)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
4+
# define console appender
5+
log4j.appender.console=org.apache.log4j.ConsoleAppender
6+
log4j.appender.console.target=System.out
7+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
9+
10+
#application log
11+
log4j.logger.guru.learningjournal.spark.examples=INFO, console
12+
log4j.additivity.guru.learningjournal.spark.examples=false
13+
14+
#define following in Java System
15+
# -Dlog4j.configuration=file:log4j.properties
16+
17+
# Recommendations from Spark template
18+
log4j.logger.org.apache.spark.repl.Main=WARN
19+
log4j.logger.org.spark_project.jetty=WARN
20+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
21+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
22+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
23+
log4j.logger.org.apache.parquet=ERROR
24+
log4j.logger.parquet=ERROR
25+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
26+
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
27+

0 commit comments

Comments
 (0)