Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13709 (backport): Add docs on exactly-once support for Connect #478

Merged
merged 1 commit into from
Jan 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
211 changes: 211 additions & 0 deletions 33/connect.html
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
# Tolerate all errors.
errors.tolerance=all</pre>

<h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>

<p>Kafka Connect is capable of providing exactly-once semantics for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once semantics is highly dependent on the type of connector you run.</b> Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>

<h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>

<p>If a sink connector supports exactly-once semantics, to enable exactly-once at the Connect worker level, you must ensure its consumer group is configured to ignore records in aborted transactions. You can do this by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>

<h5><a id="connect_exactlyoncesource" href="connect_exactlyoncesource">Source connectors</a></h5>

<p>If a source connector supports exactly-once semantics, you must configure your Connect cluster to enable framework-level support for exactly-once source connectors. Additional ACLs may be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once semantics.</p>

<h6>Worker configuration</h6>

<p>For new Connect clusters, set the <code>exactly.once.source.support</code> property to <code>enabled</code> in the worker config for each node in the cluster. For existing clusters, two rolling upgrades are necessary. During the first upgrade, the <code>exactly.once.source.support</code> property should be set to <code>preparing</code>, and during the second, it should be set to <code>enabled</code>.</p>

<h6>ACL requirements</h6>

<p>With exactly-once source support enabled, the principal for each Connect worker will require the following ACLs:</p>

<table class="data-table">
<thead>
<tr>
<th>Operation</th>
<th>Resource Type</th>
<th>Resource Name</th>
<th>Note</th>
</tr>
</thead>
<tbody>
<tr>
<td>Write</td>
<td>TransactionalId</td>
<td><code>connect-cluster-${groupId}</code>, where <code>${groupId}</code> is the <code>group.id</code> of the cluster</td>
<td></td>
</tr>
<tr>
<td>Describe</td>
<td>TransactionalId</td>
<td><code>connect-cluster-${groupId}</code>, where <code>${groupId}</code> is the <code>group.id</code> of the cluster</td>
<td></td>
</tr>
<tr>
<td>IdempotentWrite</td>
<td>Cluster</td>
<td>ID of the Kafka cluster that hosts the worker's config topic</td>
<td>The IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters</td>
</tr>
</tbody>
</table>

<p>And the principal for each individual connector will require the following ACLs:</p>

<table class="data-table">
<thead>
<tr>
<th>Operation</th>
<th>Resource Type</th>
<th>Resource Name</th>
<th>Note</th>
</tr>
</thead>
<tbody>
<tr>
<td>Write</td>
<td>TransactionalId</td>
<td><code>${groupId}-${connector}-${taskId}</code>, for each task that the connector will create, where <code>${groupId}</code> is the <code>group.id</code> of the Connect cluster, <code>${connector}</code> is the name of the connector, and <code>${taskId}</code> is the ID of the task (starting from zero)</td>
<td>A wildcard prefix of <code>${groupId}-${connector}*</code> can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.</td>
</tr>
<tr>
<td>Describe</td>
<td>TransactionalId</td>
<td><code>${groupId}-${connector}-${taskId}</code>, for each task that the connector will create, where <code>${groupId}</code> is the <code>group.id</code> of the Connect cluster, <code>${connector}</code> is the name of the connector, and <code>${taskId}</code> is the ID of the task (starting from zero)</td>
<td>A wildcard prefix of <code>${groupId}-${connector}*</code> can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.</td>
</tr>
<tr>
<td>Write</td>
<td>Topic</td>
<td>Offsets topic used by the connector, which is either the value of the <code>offsets.storage.topic</code> property in the connector’s configuration if provided, or the value of the <code>offsets.storage.topic</code> property in the worker’s configuration if not.</td>
<td></td>
</tr>
<tr>
<td>Read</td>
<td>Topic</td>
<td>Offsets topic used by the connector, which is either the value of the <code>offsets.storage.topic</code> property in the connector’s configuration if provided, or the value of the <code>offsets.storage.topic</code> property in the worker’s configuration if not.</td>
<td></td>
</tr>
<tr>
<td>Describe</td>
<td>Topic</td>
<td>Offsets topic used by the connector, which is either the value of the <code>offsets.storage.topic</code> property in the connector’s configuration if provided, or the value of the <code>offsets.storage.topic</code> property in the worker’s configuration if not.</td>
<td></td>
</tr>
<tr>
<td>Create</td>
<td>Topic</td>
<td>Offsets topic used by the connector, which is either the value of the <code>offsets.storage.topic</code> property in the connector’s configuration if provided, or the value of the <code>offsets.storage.topic</code> property in the worker’s configuration if not.</td>
<td>Only necessary if the offsets topic for the connector does not exist yet</td>
</tr>
<tr>
<td>IdempotentWrite</td>
<td>Cluster</td>
<td>ID of the Kafka cluster that the source connector writes to</td>
<td>The IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters</td>
</tr>
</tbody>
</table>

<h3><a id="connect_development" href="#connect_development">8.3 Connector Development Guide</a></h3>

<p>This guide describes how developers can write new connectors for Kafka Connect to move data between Kafka and other systems. It briefly reviews a few key concepts and then describes how to create a simple connector.</p>
Expand Down Expand Up @@ -586,6 +694,109 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off

<p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>

<h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>

<h6>Supporting exactly-once</h6>

<p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>

<h6>Defining transaction boundaries</h6>

<p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>

<p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>

<p>For example, to commit a transaction at least every ten records:</p>

<pre class="brush: java;">
private int recordsSent;

@Override
public void start(Map&lt;String, String&gt; props) {
this.recordsSent = 0;
}

@Override
public List&lt;SourceRecord&gt; poll() {
List&lt;SourceRecord&gt; records = fetchRecords();
boolean shouldCommit = false;
for (SourceRecord record : records) {
if (++this.recordsSent >= 10) {
shouldCommit = true;
}
}
if (shouldCommit) {
this.recordsSent = 0;
this.context.transactionContext().commitTransaction();
}
return records;
}
</pre>

<p>Or to commit a transaction for exactly every tenth record:</p>

<pre class="brush: java;">
private int recordsSent;

@Override
public void start(Map&lt;String, String&gt; props) {
this.recordsSent = 0;
}

@Override
public List&lt;SourceRecord&gt; poll() {
List&lt;SourceRecord&gt; records = fetchRecords();
for (SourceRecord record : records) {
if (++this.recordsSent % 10 == 0) {
this.context.transactionContext().commitTransaction(record);
}
}
return records;
}
</pre>

<p>Most connectors do not need to define their own transaction boundaries. However, it may be useful if files or objects in the source system are broken up into multiple source records, but should be delivered atomically. Additionally, it may be useful if it is impossible to give each source record a unique source offset, if every record with a given offset is delivered within a single transaction.</p>

<p>Note that if the user has not enabled connector-defined transaction boundaries in the connector configuration, the <code>TransactionContext</code> returned by <code>context.transactionContext()</code> will be <code>null</code>.</p>

<h6>Validation APIs</h6>

<p>A few additional preflight validation APIs can be implemented by source connector developers.</p>

<p>Some users may require exactly-once semantics from a connector. In this case, they may set the <code>exactly.once.support</code> property to <code>required</code> in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once semantics with the specified configuration. This is done by invoking the <code>exactlyOnceSupport</code> method on the connector.</p>

<p>If a connector doesn't support exactly-once semantics, it should still implement this method to let users know for certain that it cannot provide exactly-once semantics:</p>

<pre class="brush: java;">
@Override
public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
// This connector cannot provide exactly-once semantics under any conditions
return ExactlyOnceSupport.UNSUPPORTED;
}
</pre>

<p>Otherwise, a connector should examine the configuration, and return <code>ExactlyOnceSupport.SUPPORTED</code> if it can provide exactly-once semantics:</p>

<pre class="brush: java;">
@Override
public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
// This connector can always provide exactly-once semantics
return ExactlyOnceSupport.SUPPORTED;
}
</pre>

<p>Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, using the <code>canDefineTransactionBoundaries</code> method:</p>

<pre class="brush: java;">
@Override
public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map&lt;String, String&gt; props) {
// This connector can always define its own transaction boundaries
return ConnectorTransactionBoundaries.SUPPORTED;
}
</pre>

<p>This method should only be implemented for connectors that can define their own transaction boundaries in some cases. If a connector is never able to define its own transaction boundaries, it does not need to implement this method.</p>

<h4><a id="connect_dynamicio" href="#connect_dynamicio">Dynamic Input/Output Streams</a></h4>

<p>Kafka Connect is intended to define bulk data copying jobs, such as copying an entire database rather than creating many jobs to copy each table individually. One consequence of this design is that the set of input or output streams for a connector can vary over time.</p>
Expand Down