Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions assets/scss/_variables_project_after_bs.scss
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,55 @@ ul.dropdown-menu {
border: 1px solid #bdbdbd;
}

/* Pager button explicit neutral styling to avoid theme primary/orange overrides */
.pager-btn {
border: 3px solid #dee2e6 !important; /* gray-300 */
color: #343a40 !important; /* text-dark */
background-color: #ffffff !important; /* white background */
box-shadow: 0 .25rem .5rem rgba(0,0,0,.05) !important; /* subtle */
}
.pager-btn:hover {
background-color: #f8f9fa !important; /* gray-100 */
border-color: #ced4da !important; /* gray-400 */
text-decoration: none !important;
}
.pager-btn:focus {
outline: 0 !important;
box-shadow: 0 0 0 .2rem rgba(206,212,218,.5) !important; /* gray focus ring */
}

.carousel-item {
min-height: 400px;

.youtube-video {
max-width: 800px;
margin: 0 auto;
}

h4 {
color: var(--td-body-text-color);
}
}

.carousel-control-prev,
.carousel-control-next {
width: 5%;
opacity: 0.8;

&:hover {
opacity: 1;
}
}

/* Make carousel nav buttons more obvious */
.carousel-control-prev-icon,
.carousel-control-next-icon {
background-color: rgba(0, 0, 0, 0.65);
border-radius: 50%;
padding: 1.1rem;
box-shadow: 0 .5rem 1rem rgba(0, 0, 0, 0.15);
}
.carousel-control-prev:hover .carousel-control-prev-icon,
.carousel-control-next:hover .carousel-control-next-icon {
background-color: rgba(0, 0, 0, 0.85);
}
249 changes: 128 additions & 121 deletions content/en/0110/streams/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,131 +31,138 @@ type: docs

The code example below implements a WordCount application that is elastic, highly scalable, fault-tolerant, stateful, and ready to run in production at large scale

Java 8+ Java 7 Scala


import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;
import java.util.Properties;

public class WordCountApplication {

public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
.groupBy((key, word) -> word)
.count("Counts");
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}

{{< tabpane >}}
{{% tab header="Java 8+ Java 7" %}}
```java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;
import java.util.Properties;

public class WordCountApplication {

public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\W+")))
.groupBy((key, word) -> word)
.count("Counts");
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}


import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Arrays;
import java.util.Properties;

public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String textLine) {
return Arrays.asList(textLine.toLowerCase().split("\W+"));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String word) {
return word;
}
})
.count("Counts");
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}


}


import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.Arrays;
import java.util.Properties;

public class WordCountApplication {

public static void main(final String[] args) throws Exception {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String textLine) {
return Arrays.asList(textLine.toLowerCase().split("\W+"));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String word) {
return word;
}
})
.count("Counts");
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}



import java.lang.Long
import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}

import scala.collection.JavaConverters.asJavaIterableConverter

object WordCountApplication {

def main(args: Array[String]) {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}

val builder: KStreamBuilder = new KStreamBuilder()
val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("Counts")
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")

val streams: KafkaStreams = new KafkaStreams(builder, config)
streams.start()

Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
}))

}



import java.lang.Long
```
{{% /tab %}}
{{% tab header="Scala" %}}
```scala
import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}

import scala.collection.JavaConverters.asJavaIterableConverter

object WordCountApplication {

def main(args: Array[String]) {
val config: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
p
}


val builder: KStreamBuilder = new KStreamBuilder()
val textLines: KStream[String, String] = builder.stream("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count("Counts")
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic")

val streams: KafkaStreams = new KafkaStreams(builder, config)
streams.start()

Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
}))
}


}
```
{{% /tab %}}
{{< /tabpane >}}


## See how Kafka Streams is being used

Expand Down
Loading