# Kafka Streams

Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. 

Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more.

## Features

- Elastic, highly scalable, fault-tolerant

- Deploy to containers, VMs, bare metal, cloud

- Equally viable for small, medium, & large use cases

- Fully integrated with Kafka security

- Write standard Java and Scala applications

- Exactly-once processing semantics

- No separate processing cluster required

- Develop on Mac, Linux, Window

# Use Cases

## The New York Times

![](https://kafka.apache.org/images/powered-by/NYT.jpg)

The New York Times uses Apache Kafka and the Kafka Streams to store and distribute, in real-time, published content to the various applications and systems that make it available to the readers.

[https://open.nytimes.com/publishing-with-apache-kafka-at-the-new-york-times-7f0e3b7d2077]

## Real Time Analytics

![](https://dzone.com/storage/temp/12275703-kafka-use-case.png)

Story: https://dzone.com/articles/real-time-stream-processing-with-apache-kafka-part-1

Code: https://github.com/hellosatish/microservice-patterns/tree/master/vehicle-tracker

# Wordcount

```java
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
 
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(
      "streams-plaintext-input",
      Consumed.with(stringSerde, stringSerde)
    );
 
KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
 
    // Group the text words as message keys
    .groupBy((key, value) -> value)
 
    // Count the occurrences of each word (message key).
    .count();
 
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
```

# Run Demo App
``` bash
kafkaStartZk.sh
kafkaStartServer.sh
kafkaCreateTopic.sh streams-plaintext-input
kafkaCreateTopic.sh streams-wordcount-output
kafkaWordCountStream
kafkaWordCountProducer
kafkaWordCountConsumer
```

Messages:
- all streams lead to kafka
- hello kafka streams

# Behind the scenes

![](images/streams-table-updates-01.png)

# Stream and Tables: A Primer
https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/

## Event Record

**An event records the fact that “something happened” in the world**

- Event key: “Alice”
- Event value: “Has arrived in Rome”
- Event timestamp: “Dec. 3, 2019 at 9:06 a.m.”

## Event Stream

**An event stream records the history of what has happened in the world as a sequence of events**

This history is an ordered sequence or chain of events, so we know which event happened before another event to infer causality.

A stream thus represents both the past and the present: as we go from today to tomorrow—or from one millisecond to the next—new events are constantly being appended to the history.

### Example

_The sequence of moves in a chess match_

White moved the e2 pawn to e4, then Black moved the e7 pawn to e5

![](https://66.media.tumblr.com/tumblr_m8ok25dsch1r8gmlso1_500.gifv)

## Event Table

**A table represents the state of the world** at a particular point in time, typically “now.”

![](https://cdn.confluent.io/wp-content/uploads/streams-vs-tables-1.png)

| Stream | Table |
| ------ | ----- |
|A stream provides immutable data. It supports only inserting (appending) new events, whereas existing events cannot be changed. Streams are persistent, durable, and fault tolerant. Events in a stream can be keyed, and you can have many events for one key, like “all of Bob’s payments.” If you squint a bit, you could consider a stream to be like a table in a relational database (RDBMS) that has no unique key constraint and that is append only.| A table provides mutable data. New events—rows—can be inserted, and existing rows can be updated and deleted. Here, an event’s key aka row key identifies which row is being mutated. Like streams, tables are persistent, durable, and fault tolerant. Today, a table behaves much like an RDBMS materialized view because it is being changed automatically as soon as any of its input streams or tables change, rather than letting you directly run insert, update, or delete operations against it.|

|                                           | Stream |  Table    |
|-------------------------------------------|--------|-----------|
| First event with key bob arrives          | Insert | Insert    |
| Another event with key bob arrives        | Insert | Update    |
| Event with key bob and value null arrives | Insert | Delete    |
| Event with key null arrives               | Insert | _ignored_ |

![](https://cdn.confluent.io/wp-content/uploads/event-stream-1.gif)

# Writing App

https://kafka.apache.org/24/documentation/streams/tutorial

https://kafka-tutorials.confluent.io/

```bash
mvn archetype:generate \
    -DarchetypeGroupId=org.apache.kafka \
    -DarchetypeArtifactId=streams-quickstart-java \
    -DarchetypeVersion=2.4.1 \
    -DgroupId=streams.examples \
    -DartifactId=streams.examples \
    -Dversion=0.1 \
    -Dpackage=tap
```

https://stackoverflow.com/questions/51630260/connect-to-kafka-running-in-docker

# Kafka Connect

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems.

It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. 

Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. 

An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.


https://data-flair.training/blogs/kafka-connect/

![](https://data-flair.training/blogs/wp-content/uploads/sites/2/2018/05/Kafka-Connect.png)

![](https://d2h0cx97tjks2p.cloudfront.net/blogs/wp-content/uploads/sites/2/2018/05/Kafka-Connect.png)

![](https://d2h0cx97tjks2p.cloudfront.net/blogs/wp-content/uploads/sites/2/2018/05/Kafka-Connect-Features-01.jpg)

![](https://d2h0cx97tjks2p.cloudfront.net/blogs/wp-content/uploads/sites/2/2018/05/image.png)

## Connect Standalone Demo

## Scenario
Writing data from the console and writing it back to the console is a convenient place to start, but you'll probably want to use data from other sources or export data from Kafka to other systems. 

For many systems, instead of writing custom integration code you can use Kafka Connect to import or export data.

## Kafka Connect
Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. 

It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. In this quickstart we'll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file.

# Input
First, we'll start by creating some seed data to test with:

```bash
> echo foo> test.txt
> echo bar>> test.txt
```

# Run Kakfa Connect
Next, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. 

We provide three configuration files as parameters. 

The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. 

The remaining configuration files each specify a connector to create. 

These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.

```bash
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
```

These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier and create two connectors

the first is a source connector that reads lines from an input file and produces each to a Kafka topic 

```properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
```

the second is a sink connector that reads messages from a Kafka topic and produces each as a line in an output file.

```properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
```

During startup you'll see a number of log messages, including some indicating that the connectors are being instantiated. 

Once the Kafka Connect process has started, the source connector should start reading lines from test.txt and producing them to the topic connect-test, and the sink connector should start reading messages from the topic connect-test and write them to the file test.sink.txt. 

We can verify the data has been delivered through the entire pipeline by examining the contents of the output file:

```bash
> more test.sink.txt
foo
bar
```

Note that the data is being stored in the Kafka topic connect-test, so we can also run a console consumer to see the data in the topic (or use custom consumer code to process it):

```bash
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
```

The connectors continue to process data, so we can add data to the file and see it move through the pipeline

```bash
> echo Another line>> test.txt
```
You should see the line appear in the console consumer output and in the sink file.

# API Rest


https://kafka.apache.org/documentation/#connect_rest

```bash
#Stop
docker stop kafkaConnectorTwitterFile

#Remove previuos container 
docker container rm kafkaConnectorTwitterFile

docker build ../kafka/ --tag tap:kafka
docker run -e KAFKA_ACTION=connect-standalone -e KAFKA_WORKER_PROPERTIES=connectStandaloneStringTwitter.properties -e KAFKA_CONNECTOR_PROPERTIES=mysqlSinkTwitter.conf --network tap --ip 10.0.100.25 --name kafkaConnectorTwitterFile -it tap:kafka
```

## Config

```properties
# my-standalone.properties worker config file

#bootstrap kafka servers
bootstrap.servers=10.0.100.23:9092

# specify input data format
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# The internal converter used for offsets, most will always want to use the built-in default
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# local file storing offsets and config data
offset.storage.file.filename=/tmp/connect.offsets
```

## Connector Properties

```properties
name=test-sink
connector.class=FileStreamSink
tasks.max=1
topics=tap
file=/tmp/my-test.txt
```

# Confluent

https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html

https://www.confluent.io/blog/hello-world-kafka-connect-kafka-streams/

# Biblio
- https://blog.softwaremill.com/do-not-reinvent-the-wheel-use-kafka-connect-4bcabb143292
- https://dev.to/thegroo/kafka-connect-crash-course-1chd
- https://data-flair.training/blogs/kafka-connect/
- https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-1/