Skip to content

Commit

Permalink
add documentation for Kafka qos 0/1
Browse files Browse the repository at this point in the history
* add overwritable configuration for Kafka committer settings for both documentation purpose and environment variable definition for overwrites

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Aug 31, 2021
1 parent dddf50b commit f0735f8
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 10 deletions.
9 changes: 9 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,15 @@ ditto {

committer {
alpakka = ${akka.kafka.committer} # resolve defaults from reference.conf
alpakka = {
# Maximum number of messages in a single commit batch
max-batch = 1000
max-batch = ${?KAFKA_COMMITTER_MAX_BATCH}

# Maximum interval between commits
max-interval = 10s
max-interval = ${?KAFKA_COMMITTER_MAX_INTERVAL}
}
}

producer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,29 @@ tags: [protocol, connectivity, rql]
permalink: connectivity-protocol-bindings-kafka2.html
---

Send messages to Apache Kafka via [targets](#target-format).
Consume messages from Apache Kafka brokers via [sources](#source-format) and send messages to Apache Kafka brokers via
[targets](#target-format).

## Content-type

When Kafka messages are sent in [Ditto Protocol](protocol-overview.html), the payload should be `UTF-8` encoded strings.
When messages are sent in [Ditto Protocol](protocol-overview.html) (as `UTF-8` encoded String payload),
the `content-type` of Apache Kafka messages must be set to:

```
application/vnd.eclipse.ditto+json
```

If messages, which are not in Ditto Protocol, should be processed, a [payload mapping](connectivity-mapping.html) must
be configured for the connection in order to transform the messages.

## Global Kafka client configuration

The behavior of the used Kafka client can be configured in the [connectivity.conf](https://github.com/eclipse/ditto/blob/master/connectivity/service/src/main/resources/connectivity.conf)
under key `ditto.connectivity.connection.kafka`:
* `consumer`: The Kafka consumer configuration applied when configuring [sources](#source-format) in order to consume messages from Kafka
* `committer`: The Kafka committer configuration to apply when consuming messages, e.g. the `max-batch` size and `max-interval` duration
* `producer`: The Kafka producer configuration applied when configuring [targets](#target-format) in order to publish messages to Kafka

## Specific connection configuration

The common configuration for connections in [Connections > Targets](basic-connections.html#targets) applies here
Expand All @@ -22,13 +36,19 @@ as well. Following are some specifics for Apache Kafka 2.x connections:
### Source format
For a Kafka connection source "addresses" are Kafka topics to subscribe to. Legal characters are `[a-z]`, `[A-Z]`, `[0-9]`, `.`, `_` and `-`.

All messages are consumed in an "At-Most-Once" manner. This means that the offset will be committed after ditto consumed the message from kafka, no matter if the message can be processed correctly or not. Ditto's acknowledgement feature is right now not supported for Kafka consumers.
Messages are either consumed in an "at-most-once" or "at-least-once" manner depending on the
configured `"qos"` (Quality of Service) value of the source:
* `"qos": 0` (at-most-once): This means that the offset will be committed after Ditto consumed the message from Kafka,
no matter if the message could be processed or not.
* `"qos": 1` (at-least-once): This means that the offset will only be committed after
[requested acknowledgements](basic-acknowledgements.html#requesting-acks) were successfully issued.

The following example shows a valid kafka source:
The following example shows a valid Kafka source:
```json
{
"addresses": ["theAddress"],
"addresses": ["theTopic"],
"consumerCount": 1,
"qos": 1,
"authorizationContext": ["ditto:inbound-auth-subject"],
"enforcement": {
"input": "{%raw%}{{ header:device_id }}{%endraw%}",
Expand All @@ -38,18 +58,32 @@ The following example shows a valid kafka source:
"payloadMapping": ["Ditto"],
"replyTarget": {
"enabled": true,
"address": "theReplyAddress",
"address": "theReplyTopic",
"headerMapping": {},
"expectedResponseTypes": ["response", "error", "nack"]
},
"acknowledgementRequests": {
"includes": [],
"filter": "fn:filter(header:qos,\"ne\",\"0\")"
"includes": []
},
"declaredAcks": []
}
```

The shown example with the configured `"qos": 1` has the following behavior:
* Kafka messages from the topic `"theAddress"` are consumed in an "at-least-once" fashion, e.g.
[twin modify commands](basic-signals-command.html#modify-commands) will implicitly request the
[built-in acknowledgement label](basic-acknowledgements.html#built-in-acknowledgement-labels) `"twin-persisted"` meaning
that the consumed message will only be committed to Kafka after it was successfully persisted by Ditto
* When a consumed Kafka message could not be acknowledged by Ditto (e.g. because persisting a consumed command failed),
consuming from the Kafka source will be restarted which means that message consumption will restart from the last
committed offset of the Kafka topic, already successfully processed messages could be processed again as a result
(which is the "at-least-once" semantic).

For Kafka sources, it is not possible to have different Quality of Service on a per message basis.
Either all messages from a source are consumed in an "at-most-once" or in an "at-least-once" semantic, depending on the
configured `"qos"` value.


#### Source header mapping

The Kafka protocol binding supports to map arbitrary headers from a consumed record to the message that is further
Expand Down Expand Up @@ -161,15 +195,38 @@ Example connection configuration to create a new Kafka 2.x connection in order t
"bootstrapServers": "localhost:9092,other.host:9092",
"saslMechanism": "plain"
},
"sources": [],
"sources": [
{
"addresses": ["theTopic"],
"consumerCount": 1,
"qos": 1,
"authorizationContext": ["ditto:inbound-auth-subject"],
"enforcement": {
"input": "{%raw%}{{ header:device_id }}{%endraw%}",
"filters": ["{%raw%}{{ entity:id }}{%endraw%}"]
},
"headerMapping": {},
"payloadMapping": ["Ditto"],
"replyTarget": {
"enabled": true,
"address": "theReplyTopic",
"headerMapping": {},
"expectedResponseTypes": ["response", "error", "nack"]
},
"acknowledgementRequests": {
"includes": []
},
"declaredAcks": []
}
],
"targets": [
{
"address": "topic/key",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:outbound-auth-subject", "..."]
"authorizationContext": ["ditto:outbound-auth-subject"]
}
]
}
Expand Down

0 comments on commit f0735f8

Please sign in to comment.