Skip to content

Commit

Permalink
KAFKA-3454: add Kafka Streams web docs
Browse files Browse the repository at this point in the history
Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Gwen Shapira

Closes #1127 from guozhangwang/KStreamsDocs

(cherry picked from commit 23b5009)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>
  • Loading branch information
guozhangwang authored and gwenshap committed Mar 25, 2016
1 parent 1b1b949 commit 496bd3f
Show file tree
Hide file tree
Showing 11 changed files with 546 additions and 74 deletions.
123 changes: 65 additions & 58 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ project(':core') {

task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs',
'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs',
':connect:runtime:genConnectConfigDocs'], type: Tar) {
':connect:runtime:genConnectConfigDocs', ':streams:genStreamsConfigDocs'], type: Tar) {
classifier = 'site-docs'
compression = Compression.GZIP
from project.file("../docs")
Expand Down Expand Up @@ -552,77 +552,84 @@ project(':clients') {
}

project(':tools') {
archivesBaseName = "kafka-tools"
archivesBaseName = "kafka-tools"

dependencies {
compile project(':clients')
compile project(':log4j-appender')
compile libs.argparse4j
compile libs.jacksonDatabind
compile libs.slf4jlog4j
dependencies {
compile project(':clients')
compile project(':log4j-appender')
compile libs.argparse4j
compile libs.jacksonDatabind
compile libs.slf4jlog4j

testCompile project(':clients')
testCompile libs.junit
}
testCompile project(':clients')
testCompile libs.junit
}

javadoc {
include "**/org/apache/kafka/tools/*"
}
javadoc {
include "**/org/apache/kafka/tools/*"
}

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
}
from (configurations.runtime) {
exclude('kafka-clients*')
}
into "$buildDir/dependant-libs-${versions.scala}"
duplicatesStrategy 'exclude'
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
}

jar {
dependsOn 'copyDependantLibs'
from (configurations.runtime) {
exclude('kafka-clients*')
}
into "$buildDir/dependant-libs-${versions.scala}"
duplicatesStrategy 'exclude'
}

jar {
dependsOn 'copyDependantLibs'
}
}

project(':streams') {
archivesBaseName = "kafka-streams"

dependencies {
compile project(':clients')
compile project(':connect:json') // this dependency should be removed after we unify data API
compile libs.slf4jlog4j
compile libs.rocksDBJni
compile libs.zkclient // this dependency should be removed after KIP-4
compile libs.jacksonDatabind // this dependency should be removed after KIP-4

testCompile project(':clients').sourceSets.test.output
testCompile libs.junit
}
archivesBaseName = "kafka-streams"

javadoc {
include "**/org/apache/kafka/streams/**"
exclude "**/internals/**"
}
dependencies {
compile project(':clients')
compile project(':connect:json') // this dependency should be removed after we unify data API
compile libs.slf4jlog4j
compile libs.rocksDBJni
compile libs.zkclient // this dependency should be removed after KIP-4
compile libs.jacksonDatabind // this dependency should be removed after KIP-4

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
}
from (configurations.runtime) {
exclude('kafka-clients*')
}
into "$buildDir/dependant-libs-${versions.scala}"
duplicatesStrategy 'exclude'
}
testCompile project(':clients').sourceSets.test.output
testCompile libs.junit
}

jar {
dependsOn 'copyDependantLibs'
}
javadoc {
include "**/org/apache/kafka/streams/**"
exclude "**/internals/**"
}

systemTestLibs {
dependsOn testJar
tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.testRuntime) {
include('slf4j-log4j12*')
}
from (configurations.runtime) {
exclude('kafka-clients*')
}
into "$buildDir/dependant-libs-${versions.scala}"
duplicatesStrategy 'exclude'
}

jar {
dependsOn 'copyDependantLibs'
}

systemTestLibs {
dependsOn testJar
}

task genStreamsConfigDocs(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = 'org.apache.kafka.streams.StreamsConfig'
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
standardOutput = new File(generatedDocsDir, "streams_config.html").newOutputStream()
}
}

project(':streams:examples') {
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,9 @@ <h4><a id="newconsumerconfigs" href="#newconsumerconfigs">3.3.2 New Consumer Con
<!--#include virtual="generated/consumer_config.html" -->

<h3><a id="connectconfigs" href="#connectconfigs">3.4 Kafka Connect Configs</a></h3>
Below is the configuration of the Kafka Connect framework.
<!--#include virtual="generated/connect_config.html" -->

<h3><a id="streamsconfigs" href="#streamsconfigs">3.5 Kafka Streams Configs</a></h3>
Below is the configuration of the Kafka Streams client library.
<!--#include virtual="generated/streams_config.html" -->
15 changes: 15 additions & 0 deletions docs/documentation.html
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ <h1>Kafka 0.10.0 Documentation</h1>
<li><a href="#newconsumerconfigs">3.3.2 New Consumer Configs</a>
</ul>
<li><a href="#connectconfigs">3.4 Kafka Connect Configs</a>
<li><a href="#streamsconfigs">3.5 Kafka Streams Configs</a>
</ul>
</li>
<li><a href="#design">4. Design</a>
Expand Down Expand Up @@ -136,6 +137,17 @@ <h1>Kafka 0.10.0 Documentation</h1>
<li><a href="#connect_development">8.3 Connector Development Guide</a></li>
</ul>
</li>
<li><a href="#streams">9. Kafka Streams</a>
<ul>
<li><a href="#streams_overview">9.1 Overview</a></li>
<li><a href="#streams_developer">9.2 Developer Guide</a></li>
<ul>
<li><a href="#streams_concepts">Core Concepts</a></li>
<li><a href="#streams_processor">Low-Level Processor API</a></li>
<li><a href="#streams_dsl">High-Level Streams DSL</a></li>
</ul>
</ul>
</li>
</ul>

<h2><a id="gettingStarted" href="#gettingStarted">1. Getting Started</a></h2>
Expand Down Expand Up @@ -171,4 +183,7 @@ <h2><a id="security" href="#security">7. Security</a></h2>
<h2><a id="connect" href="#connect">8. Kafka Connect</a></h2>
<!--#include virtual="connect.html" -->

<h2><a id="streams" href="#streams">9. Kafka Streams</a></h2>
<!--#include virtual="streams.html" -->

<!--#include virtual="../includes/footer.html" -->
109 changes: 109 additions & 0 deletions docs/quickstart.html
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,112 @@ <h4><a id="quickstart_kafkaconnect" href="#quickstart_kafkaconnect">Step 7: Use
</pre>

You should see the line appear in the console consumer output and in the sink file.

<h4><a id="quickstart_kafkastreams" href="#quickstart_kafkastreams">Step 8: Use Kafka Streams to process data</a></h4>

<p>
Kafka Streams is a client library of Kafka for real-time stream processing and analyzing data stored in Kafka brokers.
This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist
of the <code>WordCountDemo</code> example code (converted to use Java 8 lambda expressions for easy reading).
</p>
<pre>
KStream<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Ensure the words are available as message keys for the next aggregate operation.
.map((key, value) -> new KeyValue<>(value, value))
// Count the occurrences of each word (message key).
.countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts")
// Convert the resulted aggregate table into another stream.
.toStream();
</pre>

<p>
It implements the WordCount
algorithm, which computes a word occurrence histogram from the input text. However, unlike other WordCount examples
you might have seen before that operate on bounded data, the WordCount demo application behaves slightly differently because it is
designed to operate on an <b>infinite, unbounded stream</b> of data. Similar to the bounded variant, it is a stateful algorithm that
tracks and updates the counts of words. However, since it must assume potentially
unbounded input data, it will periodically output its current state and results while continuing to process more data
because it cannot know when it has processed "all" the input data.
</p>
<p>
We will now prepare input data to a Kafka topic, which will subsequently processed by a Kafka Streams application.
</p>

<!--
<pre>
&gt; <b>./bin/kafka-topics --create \</b>
<b>--zookeeper localhost:2181 \</b>
<b>--replication-factor 1 \</b>
<b>--partitions 1 \</b>
<b>--topic streams-file-input</b>
</pre>
-->

<pre>
&gt; <b>echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt</b>
</pre>

<p>
Next, we send this input data to the input topic named <b>streams-file-input</b> using the console producer (in practice,
stream data will likely be flowing continuously into Kafka where the application will be up and running):
</p>

<pre>
&gt; <b>cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-file-input</b>
</pre>

<p>
We can now run the WordCount demo application to process the input data:
</p>

<pre>
&gt; <b>./bin/kafka-run-class org.apache.kafka.streams.examples.wordcount.WordCountDemo</b>
</pre>

<p>
There won't be any STDOUT output except log entries as the results are continuously written back into another topic named <b>streams-wordcount-output</b> in Kafka.
The demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically.
</p>
<p>
We can now inspect the output of the WordCount demo application by reading from its output topic:
</p>

<pre>
&gt; <b>./bin/kafka-console-consumer --zookeeper localhost:2181 \</b>
<b>--topic streams-wordcount-output \</b>
<b>--from-beginning \</b>
<b>--formatter kafka.tools.DefaultMessageFormatter \</b>
<b>--property print.key=true \</b>
<b>--property print.key=true \</b>
<b>--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \</b>
<b>--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer</b>
</pre>

<p>
with the following output data being printed to the console (You can stop the console consumer via <b>Ctrl-C</b>):
</p>

<pre>
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
<b>^C</b>
</pre>

<p>
Here, the first column is the Kafka message key, and the second column is the message value, both in in <code>java.lang.String</code> format.
Note that the output is actually a continuous stream of updates, where each data record (i.e. each line in the original output above) is
an updated count of a single word, aka record key such as "kafka". For multiple records with the same key, each later record is an update of the previous one.
</p>
Loading

0 comments on commit 496bd3f

Please sign in to comment.