Skip to content

Commit

Permalink
Window final result dev narrative (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
DivLoic committed Sep 21, 2019
1 parent f13c4a9 commit 679057c
Show file tree
Hide file tree
Showing 28 changed files with 481 additions and 0 deletions.
137 changes: 137 additions & 0 deletions _data/harnesses/window-final-result/kstreams.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
dev:
steps:
- title: Get Confluent Platform
content:
- action: skip
render:
file: tutorials/window-final-result/kstreams/markup/dev/1-get-cp.adoc

- title: Initialize the project
content:
- action: execute
file: tutorial-steps/dev/workdir.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/2_0-workdir.adoc

- change_directory: window-final-result
action: make_file
file: build.gradle
render:
file: tutorials/window-final-result/kstreams/markup/dev/2_1-build-file.adoc

- action: make_file
file: settings.gradle
render:
file: tutorials/window-final-result/kstreams/markup/dev/2_2-settings-file.adoc

- action: execute
file: tutorial-steps/dev/gradle-wrapper.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/2_3-gradle-wrapper.adoc

- action: execute
file: tutorial-steps/dev/make-resources-dir.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/2_4-resources-dir.adoc

- action: make_file
file: src/main/resources/application.conf
render:
file: tutorials/window-final-result/kstreams/markup/dev/2_5-conf-file.adoc

- action: make_file
file: src/main/resources/logback.xml
render:
file: tutorials/window-final-result/kstreams/markup/dev/2_6-log-file.adoc

- title: Create a schema for the events
content:
- action: execute
file: tutorial-steps/dev/make-avro-dir.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/3_0-avro-dir.adoc

- action: make_file
file: src/main/avro/pressure-alert.avsc
render:
file: tutorials/window-final-result/kstreams/markup/dev/3_1-pressure-schema.adoc

- action: execute
file: tutorial-steps/dev/build-project.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/3_2-build-project.adoc

- title: Add the helper gradle tasks
content:
- action: execute
file: tutorial-steps/dev/4_0-helper-package.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/4_0-helper-package.adoc

- action: make_file
file: src/main/java/io/confluent/developer/helper/TopicCreation.java
render:
file: tutorials/window-final-result/kstreams/markup/dev/4_1-topic-task.adoc

- action: make_file
file: src/main/java/io/confluent/developer/helper/SchemaPublicaition.java
render:
file: tutorials/window-final-result/kstreams/markup/dev/4_2-schema-task.adoc

- action: skip
render:
file: tutorials/window-final-result/kstreams/markup/dev/4_3-gradle-tasks.adoc

- title: Create the timestamp exctractor
content:
- action: make_file
file: src/main/java/io/confluent/developer/PressureDatetimeExtractor.java
render:
file: tutorials/window-final-result/kstreams/markup/dev/5_0-timestamp-extractor.adoc

- title: Create the Kafka Streams topology
content:
- action: make_file
file: src/main/java/io/confluent/developer/WindowFinalResult.java
render:
file: tutorials/window-final-result/kstreams/markup/dev/6_0-stream-topology.adoc

- title: Compile and run the Kafka Streams program
content:
- action: execute_async
file: tutorials/window-final-result/kstreams/code/tutorial-steps/dev/run-project.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/7_0-gradle-run.adoc

- action: skip
render:
file: tutorials/window-final-result/kstreams/markup/dev/7_1-java-cp.adoc

- title: Produce events to the input topic
content:
- action: execute
file: tutorial-steps/dev/run-producer.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/8_0-produce-input.adoc

- action: execute
file: tutorial-steps/dev/run-producer2.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/8_1-produce-input.adoc

- title: Consume events from the output topic
content:
- action: make_file
file: src/main/java/io/confluent/developer/helper/ResultConsumer.java
render:
file: tutorials/window-final-result/kstreams/markup/dev/9_0-consume-output.adoc

- action: execute_async
file: tutorials/window-final-result/kstreams/code/tutorial-steps/dev/run-consumer.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/9_1-consume-output.adoc

- action: skip
file: tutorials/window-final-result/kstreams/code/tutorial-steps/dev/run-consumer.sh
render:
file: tutorials/window-final-result/kstreams/markup/dev/9_2-consume-output.adoc
8 changes: 8 additions & 0 deletions _data/tutorials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,11 @@ serialization:
status:
ksql: enabled
kstreams: enabled

window-final-result:
title: "Emit a final result from a time window"
slug: "/window-final-result"
problem: "you have a Kafka topic and you want to count the number of messages per key over a time window and get a final result that takes in account late data arrivals."
introduction: "Consider a topic with events that represent sensor warnings (pressure on robotic arms). One warning per time slot is fine, but you don't want to have too much warnings at the same time. In this tutorial, we'll write a program that counts the messages of a same sensor and sends a result at the end of the window."
status:
kstreams: enabled
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Install the Confluent distribution of Apache Kafka by downloading both
the https://www.confluent.io/download/[Confluent Platform]
and the https://docs.confluent.io/current/cli/installing.html[Confluent CLI].

You can also start all the services needed via docker.

+++++
<pre class="snippet"><code class="bash">{% include shared-content/docker-install.txt %}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
To get started, make a new directory anywhere you'd like for this project:

+++++
<pre class="snippet"><code class="bash">{%
include_raw tutorials/window-final-result/kstreams/code/tutorial-steps/dev/workdir.sh
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Create the following Gradle build file, named `build.gradle` for the project:

+++++
<pre class="snippet"><code class="groovy">{%
include_raw tutorials/window-final-result/kstreams/code/build.gradle
%}</code></pre>
+++++

*Note*: In addition to our main class, this tutorial bring two Java executions responsible for create the topics and
schemas. In a real life application, these may be outside your project.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Create the following Gradle settings file, named `settings.gradle` for the project:

+++++
<pre class="snippet"><code class="groovy">{%
include_raw tutorials/window-final-result/kstreams/code/settings.gradle
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Run the following command to obtain the Gradle wrapper:

+++++
<pre class="snippet"><code class="bash">{%
include_raw tutorials/window-final-result/kstreams/code/tutorial-steps/dev/gradle-wrapper.sh
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Create a directory for the project resources:

+++++
<pre class="snippet"><code class="bash">{%
include_raw tutorials/window-final-result/kstreams/code/tutorial-steps/dev/make-resources-dir.sh
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
Add the config file `src/main/resource/application.conf` to setup your application:

+++++
<pre class="snippet"><code class="hocon">{%
include_raw tutorials/window-final-result/kstreams/code/src/main/resources/application.conf
%}</code></pre>
+++++

Adapt the blocks `local.date.lang` and `local.date.pattern`

This file contains defaults configs. In production they will be overridden by the following environment variables

|===
|Variable |Description

|`APP_ID`
| id of app used as Kafka streams application Id

|`BOOTSTRAP_SERVERS`
| host:port of your Kafka broker

|`SCHEMA_REGISTRY_URL`
| host:port of your schema registry

|`INPUT_TOPIC`
| name of the input topic

|`INPUT_TOPIC_PARTITIONS`
| number of partition of the input topic

|`INPUT_TOPIC_REPLICATION`
| replication factor of the input topic

|`OUTPUT_TOPIC`
| name of the output topic

|`OUTPUT_TOPIC_PARTITIONS`
| number of partition of the output topic

|`OUTPUT_TOPIC_REPLICATION`
| replication factor of the output topic

|`WINDOW_SIZE`
| size of the aggregation window

|`GRACE_PERIOD`
| how long would you like to wait for late data point ??
|===
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Add the logging configuration in the file: `src/main/resources/logback.xml`:

+++++
<pre class="snippet"><code class="xml">{%
include_raw tutorials/window-final-result/kstreams/code/src/main/resources/logback.xml
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Create a directory for the pressure event schemas:

+++++
<pre class="snippet"><code class="bash">{%
include_raw tutorials/window-final-result/kstreams/code/tutorial-steps/dev/make-avro-dir.sh
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Then create the following Avro schema file at `src/main/avro/pressure-alert.avsc` for the publication events:

+++++
<pre class="snippet"><code class="avro">{%
include_raw tutorials/window-final-result/kstreams/code/src/main/avro/pressure-alert.avsc
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Because this Avro schema is used in the Java code, it needs to compile it. Run the following:

+++++
<pre class="snippet"><code class="bash">{%
include_raw tutorials/window-final-result/kstreams/code/tutorial-steps/dev/build-project.sh
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Topic creation and avro schema declaration are often part of an external process. For the sake of clarity in this
tutorial, we won't include these steps as part of the main application, but isolate theme in a dedicated package.

Create a directory for the package helper:

+++++
<pre class="snippet"><code class="bash">{%
include_raw tutorials/window-final-result/kstreams/code/tutorial-steps/dev/make-helper-package.sh
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Add the following class in the `io.confluent.developer.helper` package

+++++
<pre class="snippet"><code class="java">{%
include_raw tutorials/window-final-result/kstreams/code/src/main/java/io/confluent/developer/helper/TopicCreation.java
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Add the following class in the `io.confluent.developer.helper` package

+++++
<pre class="snippet"><code class="java">{%
include_raw tutorials/window-final-result/kstreams/code/src/main/java/io/confluent/developer/helper/SchemaPublication.java
%}</code></pre>
+++++
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Now the topics can be created separately with the following command.

+++++
<pre class="snippet"><code class="bash">./gradlew createTopics</code></pre>
+++++

Same thing for the schemas.

+++++
<pre class="snippet"><code class="bash">./gradlew publishSchemas</code></pre>
+++++

Check the `build.gradle` again. You will find the tasks declared as `JavaExec` with a main class corresponding to the
two last files
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Time can be a tricky concept. Besides having different times, you need to keep in mind that each of them may arrived
from a different time zone.

1. *Event time*, time of the sensor that is different rather it comes from Paris (UTC+02:00) or Tokyo (UTC+19:00)
2. *Processing time*, the time of the Kafka Stream instances. Here the zone depends of your deployment (e : your fancy
managed kubernetes cluster deployed in us-west-b :p)
3. *Ingestion time*, less relevant, this is the time when the Kafka message has been published
Since our operations will be time based, you need to ensure the right time is considered. In this example, our data
producer is not aware of message timestamp and places the time of the alert in the message value. We need to extract
it from there. This can be performed by implementing a
`https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html[TimestampExtractor]`.
Add the next class in the `io.confluent.developer` package.

+++++
<pre class="snippet"><code class="groovy">{%
include_raw tutorials/window-final-result/kstreams/code/src/main/java/io/confluent/developer/PressureDatetimeExtractor.java
%}</code></pre>
+++++

Ok, lets translate this `extract` method from Java to English. First of all, we try to realise the following operation
that may raise an exception:

1. we cast the value Object as `PressureAlert` and call its `.getDatetime` method
2. then we parse the string datetime base on the defined pattern
3. then we convert it as `Instant`, in case the kafka message suffer from jet lag
4. and get the epoch in milliseconds
If one this steps fail we will log the error and set the timestamp to a negative number, so it will silently ignored.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
In the main function we create a `https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing[TimeWindows]`
with a given size and the same step size. This result in a non-overlapping windows called
https://docs.confluent.io/current/ksql/docs/concepts/time-and-windows-in-ksql-queries.html#tumbling-window[Tumbling Window].
Also we add a extra period were even if messages come late, if their datetime key correspond to a window they may join
the window. Finally we pass this window, to a function that takes also `StreamsBuilder` and return a `Topology`.
Add the next class in the `io.confluent.developer` package.

+++++
<pre class="snippet"><code class="groovy">{%
include_raw tutorials/window-final-result/kstreams/code/src/main/java/io/confluent/developer/WindowFinalResult.java
%}</code></pre>
+++++

Here are several notes about the `WindowFinalResult#buildTopology` function:

- To consume events, we create a `SpecificAvroSerde` based on the generated source code in part 3.
- The serde used to produce aggregated result is a windowed serde. It will store the key but also the window start time.
- Our custom timestamp extractor is added thank to the `Consumed#withTimestampExtractor` method.
Then we stream, selectKey and groupByKey and finally apply the *Suppress* operator.

The suppress operator will delete every intermediates changes once the grace period is over. By doing so it will also
emit the final result

*Note*: even after suppress operator applied, you will need the next event to advance the
https://docs.confluent.io/current/streams/concepts.html#time[stream time]
and get your result.

0 comments on commit 679057c

Please sign in to comment.