Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4161: prototype for exploring API change #2040

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;

import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -94,14 +95,30 @@ public void initialize(SinkTaskContext context) {
*/
public abstract void put(Collection<SinkRecord> records);

/**
* @return {@code null} if not implemented, an empty map if Connect-managed offset commits should be disabled,
* and otherwise a map of committable offsets by topic-partition.
*/
public Map<TopicPartition, Long> commitableOffsets() {
return null;
}

/**
* Flush all records that have been {@link #put} for the specified topic-partitions. The
* offsets are provided for convenience, but could also be determined by tracking all offsets
* included in the SinkRecords passed to {@link #put}.
*
* @deprecated You should implement connector-specific flush policies either synchronously with {@link #put(Collection)} (typically size-based)
* or using an additional thread (more flexible, typically time-based). It is also a good idea to flush on {@link #close(Collection)}.
* <strong>This method will only be called if {@link #commitableOffsets()} returns {@code null}.</strong>
*
* @param offsets mapping of TopicPartition to committed offset
*/
public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets);
@Deprecated
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (commitableOffsets() == null)
throw new ConnectException("flush() required to be implemented if commitableOffsets() returns null");
}

/**
* The SinkTask use this method to create writers for newly assigned partitions in case of partition
Expand Down
Expand Up @@ -79,4 +79,11 @@ public interface SinkTaskContext {
* @param partitions the partitions to resume
*/
void resume(TopicPartition... partitions);

/**
* Request an offset commit. Sink tasks can use this to minimize the potential for duplicate data upon recovery
* by requesting an offset commit as soon as they flush data to the destination system.
*/
void requestCommit();

}
Expand Up @@ -142,9 +142,8 @@ public void execute() {
while (!isStopping())
iteration();
} finally {
// Make sure any uncommitted data has been committed and the task has
// a chance to clean up its state
closePartitions();
// Make sure the task has a chance to clean up and any uncommitted offset state has been committed
commitOffsets(time.milliseconds(), true);
}
}

Expand All @@ -153,9 +152,10 @@ protected void iteration() {
long now = time.milliseconds();

// Maybe commit
if (!committing && now >= nextCommit) {
if (!committing && (now >= nextCommit || context.isCommitRequested())) {
commitOffsets(now, false);
nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
context.clearCommitRequest();
}

// Check for timed out commits
Expand Down Expand Up @@ -255,10 +255,13 @@ private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int se
}

/**
* Starts an offset commit by flushing outstanding messages from the task and then starting
* the write commit.
* Starts an offset commit by flushing outstanding messages from the task and then starting the write commit.
**/
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, final int seqno) {
if (offsets.equals(lastCommittedOffsets)) {
log.info("{} Skipping offset commits, no change since last commit");
return;
}
log.info("{} Committing offsets", this);
if (closing) {
doCommitSync(offsets, seqno);
Expand All @@ -282,30 +285,48 @@ private void commitOffsets(long now, boolean closing) {
commitSeqno += 1;
commitStarted = now;

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets);
try {
task.flush(offsets);
} catch (Throwable t) {
log.error("Commit of {} offsets failed due to exception while flushing:", this, t);
log.error("Rewinding offsets to last committed offsets");
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
consumer.seek(entry.getKey(), entry.getValue().offset());
}
currentOffsets = new HashMap<>(lastCommittedOffsets);
onCommitCompleted(t, commitSeqno);
return;
} finally {
// Close the task if needed before committing the offsets. This is basically the last chance for
// the connector to actually flush data that has been written to it.
if (closing)
Map<TopicPartition, Long> commitableOffsets = task.commitableOffsets();
final Map<TopicPartition, OffsetAndMetadata> offsets;
if (commitableOffsets != null) {
if (closing) {
// Close currently assigned partitions before committing the offsets, gives tasks an opportunity to flush data.
task.close(currentOffsets.keySet());
commitableOffsets = task.commitableOffsets();
}
offsets = new HashMap<>(commitableOffsets.size());
for (Map.Entry<TopicPartition, Long> flushedOffsetEntry : commitableOffsets.entrySet()) {
// Exclude any topic-partition's not owned by this task.
if (currentOffsets.containsKey(flushedOffsetEntry.getKey())) {
offsets.put(flushedOffsetEntry.getKey(), new OffsetAndMetadata(flushedOffsetEntry.getValue()));
}
}
} else {
offsets = new HashMap<>(currentOffsets);
try {
task.flush(offsets);
} catch (Throwable t) {
log.error("Commit of {} offsets failed due to exception while flushing: ", this, t);
log.error("Rewinding offsets to last committed offsets");
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
consumer.seek(entry.getKey(), entry.getValue().offset());
}
currentOffsets = new HashMap<>(lastCommittedOffsets);
onCommitCompleted(t, commitSeqno);
offsets.clear();
} finally {
// Close the task if needed before committing the offsets. This is basically the last chance for
// the connector to actually flush data that has been written to it.
if (closing)
task.close(currentOffsets.keySet());
}
}

doCommit(offsets, closing, commitSeqno);
if (!offsets.isEmpty()) {
doCommit(offsets, closing, commitSeqno);
}
}


@Override
public String toString() {
return "WorkerSinkTask{" +
Expand Down Expand Up @@ -427,14 +448,6 @@ private void rewind() {
context.clearOffsets();
}

private void openPartitions(Collection<TopicPartition> partitions) {
task.open(partitions);
}

private void closePartitions() {
commitOffsets(time.milliseconds(), true);
}

private class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
Expand Down Expand Up @@ -464,7 +477,7 @@ else if (!context.pausedPartitions().isEmpty())
// need to guard against invoking the user's callback method during that period.
if (rebalanceException == null || rebalanceException instanceof WakeupException) {
try {
openPartitions(partitions);
task.open(partitions);
// Rewind should be applied only if openPartitions succeeds.
rewind();
} catch (RuntimeException e) {
Expand All @@ -477,8 +490,10 @@ else if (!context.pausedPartitions().isEmpty())

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
assert currentOffsets.keySet().equals(partitions);

try {
closePartitions();
commitOffsets(time.milliseconds(), true);
} catch (RuntimeException e) {
// The consumer swallows exceptions raised in the rebalance listener, so we need to store
// exceptions and rethrow when poll() returns.
Expand Down
Expand Up @@ -27,6 +27,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
private long timeoutMs;
private KafkaConsumer<byte[], byte[]> consumer;
private final Set<TopicPartition> pausedPartitions;
private boolean commitRequested;

public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
this.offsets = new HashMap<>();
Expand Down Expand Up @@ -106,6 +107,19 @@ public void resume(TopicPartition... partitions) {
}
}

@Override
public void requestCommit() {
commitRequested = true;
}

public boolean isCommitRequested() {
return commitRequested;
}

public void clearCommitRequest() {
commitRequested = false;
}

public Set<TopicPartition> pausedPartitions() {
return pausedPartitions;
}
Expand Down