Skip to content

Commit

Permalink
CCMSG: Add a synchronous flushing mode to support topic mutating SMTs. (
Browse files Browse the repository at this point in the history
#589)

* [maven-release-plugin] prepare for next development iteration

* Flush not waiting for async batches to finish (#579)

* Fix index name

* CCMSG-1376: offsets are marked as processed for individual record failures (#587)

* Add strict mapping test case

* Fix offsets for individual record failure

* test updates

* Add more tests

* poc of synchronous mode

* Add config property and fix test

* Fix checkstyle

* Fail fast if misconfigured

* Fix test

* CCMSG-1447: Added integration tests for routing SMTs.

* CCMSG-1447: Added unit test for verifyChangingTopic.

* CCMSG-1447: Added an IT for reconfiguring a connector to use routing SMT.

* CCMSG-1447: Assert that connector commits offsets.

* CCMSG-1447: Do not use offsetTracker and partitionPauser in synchronous mode.

* CCMSG-1447: Notify waitForInFlightRequests instead of busy cycle.

* CCMSG-1447: Fix testOffsetsBackpressure().

* CCMSG-1447: Address code review comments.

* CCMSG-1447: Use an appropriate offset tracker in ElasticsearchClient.

* CCMSG-1447: Parametrize ElasticsearchSinkTaskTest.

* CCMSG-1447: Use assertThrows.

* Update src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchSinkTaskIT.java

Co-authored-by: Nigel Liang <nigel@nigelliang.com>

* CCMSG-1447: Addressed code review comments.

* CCMSG-1447: Deleted verifyChangingTopic.

* CCMSG-1447: Address code review comments.

* CCMSG-1447: Decouple ElasticsearchClient and OffsetTracker.

* CCMSG-1447: Added Javadocs to OffsetState.

* CCMSG-1447: Fix failed ElasticsearchSinkTaskIT.

* CCMSG-1447: fix testPausePartitionsAndFail.

* Flush not waiting for async batches to finish (#579)

* CCMSG-1376: offsets are marked as processed for individual record failures (#587)

* Add strict mapping test case

* Fix offsets for individual record failure

* test updates

* Add more tests

* poc of synchronous mode

* Add config property and fix test

* Fix checkstyle

* Fail fast if misconfigured

* Fix test

* CCMSG-1447: Added integration tests for routing SMTs.

* CCMSG-1447: Added unit test for verifyChangingTopic.

* CCMSG-1447: Added an IT for reconfiguring a connector to use routing SMT.

* CCMSG-1447: Assert that connector commits offsets.

* CCMSG-1447: Do not use offsetTracker and partitionPauser in synchronous mode.

* CCMSG-1447: Notify waitForInFlightRequests instead of busy cycle.

* CCMSG-1447: Fix testOffsetsBackpressure().

* CCMSG-1447: Address code review comments.

* CCMSG-1447: Use an appropriate offset tracker in ElasticsearchClient.

* CCMSG-1447: Parametrize ElasticsearchSinkTaskTest.

* CCMSG-1447: Use assertThrows.

* Update src/test/java/io/confluent/connect/elasticsearch/integration/ElasticsearchSinkTaskIT.java

Co-authored-by: Nigel Liang <nigel@nigelliang.com>

* CCMSG-1447: Addressed code review comments.

* CCMSG-1447: Deleted verifyChangingTopic.

* CCMSG-1447: Address code review comments.

* CCMSG-1447: Decouple ElasticsearchClient and OffsetTracker.

* CCMSG-1447: Added Javadocs to OffsetState.

* CCMSG-1447: Fix failed ElasticsearchSinkTaskIT.

* CCMSG-1447: fix testPausePartitionsAndFail.

* CCMSG-1447: Fixed version in pom.

Co-authored-by: amuamusshu <35519361+amuamushu@users.noreply.github.com>
Co-authored-by: Alex Diachenko <sansanichfb@gmail.com>
Co-authored-by: Alex Diachenko <alex.diachenko@confluent.io>
Co-authored-by: Confluent Jenkins Bot <jenkins@confluent.io>
Co-authored-by: Ilanji Rajamanickam <ilanji@confluent.io>
Co-authored-by: Konstantine Karantasis <konstantine@confluent.io>
Co-authored-by: Daniel Osvath <dosvath@confluent.io>
Co-authored-by: Andrew Egelhofer <aegelhofer@confluent.io>
Co-authored-by: Nigel Liang <nigel@nigelliang.com>
  • Loading branch information
10 people committed Nov 4, 2021
1 parent 158a9ac commit ae8c23c
Show file tree
Hide file tree
Showing 15 changed files with 806 additions and 311 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.connect.elasticsearch;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.stream.Collectors.toMap;

/**
* It's an asynchronous implementation of <code>OffsetTracker</code>
*
* <p>Since ElasticsearchClient can potentially process multiple batches asynchronously for the same
* partition, if we don't want to wait for all in-flight batches at the end of the put call
* (or flush/preCommit) we need to keep track of what's the highest offset that is safe to commit.
* For now, we do that at the individual record level because batching is handled by BulkProcessor,
* and we don't have control over grouping/ordering.
*/
class AsyncOffsetTracker implements OffsetTracker {

private static final Logger log = LoggerFactory.getLogger(AsyncOffsetTracker.class);

private final Map<TopicPartition, Map<Long, OffsetState>> offsetsByPartition = new HashMap<>();
private final Map<TopicPartition, Long> maxOffsetByPartition = new HashMap<>();

private final AtomicLong numEntries = new AtomicLong();
private final SinkTaskContext context;

public AsyncOffsetTracker(SinkTaskContext context) {
this.context = context;
}

static class AsyncOffsetState implements OffsetState {

private final long offset;
private volatile boolean processed;

AsyncOffsetState(long offset) {
this.offset = offset;
}

@Override
public void markProcessed() {
processed = true;
}

@Override
public boolean isProcessed() {
return processed;
}

@Override
public long offset() {
return offset;
}
}

/**
* Partitions are no longer owned, we should release all related resources.
* @param topicPartitions partitions to close
*/
@Override
public synchronized void closePartitions(Collection<TopicPartition> topicPartitions) {
topicPartitions.forEach(tp -> {
Map<Long, OffsetState> offsets = offsetsByPartition.remove(tp);
if (offsets != null) {
numEntries.getAndAdd(-offsets.size());
}
maxOffsetByPartition.remove(tp);
});
}

/**
* This method assumes that new records are added in offset order.
* Older records can be re-added, and the same Offset object will be return if its
* offset hasn't been reported yet.
* @param sinkRecord record to add
* @return offset state record that can be used to mark the record as processed
*/
@Override
public synchronized OffsetState addPendingRecord(
SinkRecord sinkRecord
) {
log.trace("Adding pending record");
TopicPartition tp = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition());
if (!context.assignment().contains(tp)) {
String msg = String.format("Found a topic name '%s' that doesn't match assigned partitions."
+ " Connector doesn't support topic mutating SMTs", sinkRecord.topic());
throw new ConnectException(msg);
}
Long partitionMax = maxOffsetByPartition.get(tp);
if (partitionMax == null || sinkRecord.kafkaOffset() > partitionMax) {
numEntries.incrementAndGet();
return offsetsByPartition
// Insertion order needs to be maintained
.computeIfAbsent(tp, key -> new LinkedHashMap<>())
.computeIfAbsent(sinkRecord.kafkaOffset(), AsyncOffsetState::new);
} else {
return new AsyncOffsetState(sinkRecord.kafkaOffset());
}
}

/**
* @return overall number of entries currently in memory.
*/
@Override
public long numOffsetStateEntries() {
return numEntries.get();
}

/**
* Move offsets to the highest we can.
*/
@Override
public synchronized void updateOffsets() {
log.trace("Updating offsets");
offsetsByPartition.forEach(((topicPartition, offsets) -> {
Long max = maxOffsetByPartition.get(topicPartition);
boolean newMaxFound = false;
Iterator<OffsetState> iterator = offsets.values().iterator();
while (iterator.hasNext()) {
OffsetState offsetState = iterator.next();
if (offsetState.isProcessed()) {
iterator.remove();
numEntries.decrementAndGet();
if (max == null || offsetState.offset() > max) {
max = offsetState.offset();
newMaxFound = true;
}
} else {
break;
}
}
if (newMaxFound) {
maxOffsetByPartition.put(topicPartition, max);
}
}));
log.trace("Updated offsets, num entries: {}", numEntries);
}

/**
* @param currentOffsets current offsets from a task
* @return offsets to commit
*/
@Override
public synchronized Map<TopicPartition, OffsetAndMetadata> offsets(
Map<TopicPartition, OffsetAndMetadata> currentOffsets
) {
return maxOffsetByPartition.entrySet().stream()
.collect(toMap(
Map.Entry::getKey,
// The offsets you commit are the offsets of the messages you want to read next
// (not the offsets of the messages you did read last)
e -> new OffsetAndMetadata(e.getValue() + 1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,25 @@

package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.OffsetTracker.OffsetState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;

import org.apache.http.HttpHost;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -45,22 +63,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc;

import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG;
Expand Down Expand Up @@ -108,19 +110,13 @@ public class ElasticsearchClient {
private final RestHighLevelClient client;
private final ExecutorService bulkExecutorService;
private final Time clock;

// Visible for testing
public ElasticsearchClient(
ElasticsearchSinkConnectorConfig config,
ErrantRecordReporter reporter
) {
this(config, reporter, new OffsetTracker());
}
private final Lock inFlightRequestLock = new ReentrantLock();
private final Condition inFlightRequestsUpdated = inFlightRequestLock.newCondition();

public ElasticsearchClient(
ElasticsearchSinkConnectorConfig config,
ErrantRecordReporter reporter,
OffsetTracker offsetTracker
Runnable afterBulkCallback
) {
this.bulkExecutorService = Executors.newFixedThreadPool(config.maxInFlightRequests());
this.numBufferedRecords = new AtomicInteger(0);
Expand All @@ -144,7 +140,7 @@ public ElasticsearchClient(
.setHttpClientConfigCallback(configCallbackHandler)
);
this.bulkProcessor = BulkProcessor
.builder(buildConsumer(), buildListener(offsetTracker))
.builder(buildConsumer(), buildListener(afterBulkCallback))
.setBulkActions(config.batchSize())
.setBulkSize(config.bulkSize())
.setConcurrentRequests(config.maxInFlightRequests() - 1) // 0 = no concurrent requests
Expand Down Expand Up @@ -248,6 +244,20 @@ public void flush() {
bulkProcessor.flush();
}

public void waitForInFlightRequests() {
inFlightRequestLock.lock();
try {
while (numBufferedRecords.get() > 0) {
inFlightRequestsUpdated.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConnectException(e);
} finally {
inFlightRequestLock.unlock();
}
}

/**
* Checks whether the index already has a mapping or not.
* @param index the index to check
Expand All @@ -273,6 +283,7 @@ public boolean hasMapping(String index) {
*
* @param record the record to index
* @param request the associated request to send
* @param offsetState record's offset state
* @throws ConnectException if one of the requests failed
*/
public void index(SinkRecord record, DocWriteRequest<?> request, OffsetState offsetState) {
Expand Down Expand Up @@ -347,7 +358,7 @@ public boolean indexExists(String index) {
*
* @return the listener
*/
private BulkProcessor.Listener buildListener(OffsetTracker offsetTracker) {
private BulkProcessor.Listener buildListener(Runnable afterBulkCallback) {
return new Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
Expand All @@ -373,7 +384,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
idx++;
}

offsetTracker.updateOffsets();
afterBulkCallback.run();

bulkFinished(executionId, request);
}
Expand All @@ -388,7 +399,13 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
private void bulkFinished(long executionId, BulkRequest request) {
request.requests().forEach(requestToSinkRecord::remove);
removeFromInFlightRequests(executionId);
numBufferedRecords.addAndGet(-request.requests().size());
inFlightRequestLock.lock();
try {
numBufferedRecords.addAndGet(-request.requests().size());
inFlightRequestsUpdated.signalAll();
} finally {
inFlightRequestLock.unlock();
}
}
};
}
Expand Down Expand Up @@ -566,7 +583,7 @@ private boolean handleMalformedDocResponse(BulkItemResponse response) {
*
* @return true if a response has failed, false if none have failed
*/
private boolean isFailed() {
public boolean isFailed() {
return error.get() != null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
private static final String FLUSH_TIMEOUT_MS_DISPLAY = "Flush Timeout (ms)";
private static final int FLUSH_TIMEOUT_MS_DEFAULT = (int) TimeUnit.MINUTES.toMillis(3);

public static final String FLUSH_SYNCHRONOUSLY_CONFIG = "flush.synchronously";
private static final String FLUSH_SYNCHRONOUSLY_DOC =
"True if flushes should wait for background processing to finish. This has a throughput"
+ " penalty and makes the connector less responsive but allows for topic-mutating SMTs"
+ " (e.g. RegexRouter or TimestampRouter)";
private static final String FLUSH_SYNCHRONOUSLY_DISPLAY = "Flush synchronously";
private static final boolean FLUSH_SYNCHRONOUSLY_DEFAULT = true;

public static final String MAX_RETRIES_CONFIG = "max.retries";
private static final String MAX_RETRIES_DOC =
"The maximum number of retries that are allowed for failed indexing requests. If the retry "
Expand Down Expand Up @@ -493,6 +501,16 @@ private static void addConnectorConfigs(ConfigDef configDef) {
++order,
Width.SHORT,
FLUSH_TIMEOUT_MS_DISPLAY
).define(
FLUSH_SYNCHRONOUSLY_CONFIG,
Type.BOOLEAN,
FLUSH_SYNCHRONOUSLY_DEFAULT,
Importance.LOW,
FLUSH_SYNCHRONOUSLY_DOC,
CONNECTOR_GROUP,
++order,
Width.SHORT,
FLUSH_SYNCHRONOUSLY_DISPLAY
).define(
MAX_RETRIES_CONFIG,
Type.INT,
Expand Down Expand Up @@ -895,6 +913,10 @@ public long flushTimeoutMs() {
return getLong(FLUSH_TIMEOUT_MS_CONFIG);
}

public boolean flushSynchronously() {
return getBoolean(FLUSH_SYNCHRONOUSLY_CONFIG);
}

public boolean ignoreKey() {
return getBoolean(IGNORE_KEY_CONFIG);
}
Expand Down
Loading

0 comments on commit ae8c23c

Please sign in to comment.