Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions docs/dev/connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -660,19 +660,6 @@ we recommend setting the number of retries to a higher value.
**Note**: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
into a Kafka topic.

<div class="alert alert-warning">
<strong>Attention:</strong> Depending on your Kafka configuration, even after Kafka acknowledges
writes you can still experience data loss. In particular keep in mind the following Kafka settings:
<ul>
<li><tt>acks</tt></li>
<li><tt>log.flush.interval.messages</tt></li>
<li><tt>log.flush.interval.ms</tt></li>
<li><tt>log.flush.*</tt></li>
</ul>
Default values for the above options can easily lead to data loss. Please refer to Kafka documentation
for more explanation.
</div>

#### Kafka 0.11 and newer

With Flink's checkpointing enabled, the `FlinkKafkaProducer011` (`FlinkKafkaProducer` for Kafka >= 1.0.0 versions) can provide
Expand All @@ -690,21 +677,6 @@ chosen by passing appropriate `semantic` parameter to the `FlinkKafkaProducer011
or `read_uncommitted` - the latter one is the default value) for any application consuming records
from Kafka.

<div class="alert alert-warning">
<strong>Attention:</strong> Depending on your Kafka configuration, even after Kafka acknowledges
writes you can still experience data losses. In particular keep in mind about following properties
in Kafka config:
<ul>
<li><tt>acks</tt></li>
<li><tt>log.flush.interval.messages</tt></li>
<li><tt>log.flush.interval.ms</tt></li>
<li><tt>log.flush.*</tt></li>
</ul>
Default values for the above options can easily lead to data loss. Please refer to the Kafka documentation
for more explanation.
</div>


##### Caveats

`Semantic.EXACTLY_ONCE` mode relies on the ability to commit transactions
Expand Down Expand Up @@ -831,4 +803,38 @@ A mismatch in service name between client and server configuration will cause th
For more information on Flink configuration for Kerberos security, please see [here]({{ site.baseurl}}/ops/config.html).
You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further details on how Flink internally setups Kerberos-based security.

## Troubleshooting

<div class="alert alert-warning">
If you have a problem with Kafka when using Flink, keep in mind that Flink only wraps
<a href="https://kafka.apache.org/documentation/#consumerapi">KafkaConsumer</a> or
<a href="https://kafka.apache.org/documentation/#producerapi">KafkaProducer</a>
and your problem might be independent of Flink and sometimes can be solved by upgrading Kafka brokers,
reconfiguring Kafka brokers or reconfiguring <tt>KafkaConsumer</tt> or <tt>KafkaProducer</tt> in Flink.
Some examples of common problems are listed below.
</div>

### Data loss

Depending on your Kafka configuration, even after Kafka acknowledges
writes you can still experience data loss. In particular keep in mind about the following properties
in Kafka config:

- `acks`
- `log.flush.interval.messages`
- `log.flush.interval.ms`
- `log.flush.*`

Default values for the above options can easily lead to data loss.
Please refer to the Kafka documentation for more explanation.

### UnknownTopicOrPartitionException

One possible cause of this error is when a new leader election is taking place,
for example after or during restarting a Kafka broker.
This is a retriable exception, so Flink job should be able to restart and resume normal operation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a comment that is independent of the PR:
Have we thought about catching this exception, and reassigning the new elected partitions to the client?
I'm curious if this is possible and a proper solution on Flink's side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dunno, it would require some independent work to investigate it. I don't know how severe/often is that issue to weight it's priority. Probably not very frequent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, as I mentioned, this shouldn't affect the PR. Lets keep it as is.

It also can be circumvented by changing `retries` property in the producer settings.
However this might cause reordering of messages,
which in turn if undesired can be circumvented by setting `max.in.flight.requests.per.connection` to 1.

{% top %}