Skip to content

Commit

Permalink
KAFKA-7941: Catch TimeoutException in KafkaBasedLog worker thread (#6283
Browse files Browse the repository at this point in the history
)

When calling readLogToEnd(), the KafkaBasedLog worker thread should catch TimeoutException and log a warning, which can occur if brokers are unavailable, otherwise the worker thread terminates.

Includes an enhancement to MockConsumer that allows simulating exceptions not just when polling but also when querying for offsets, which is necessary for testing the fix.

Author: Paul Whalen <pgwhalen@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Arjun Satish <arjun@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>
  • Loading branch information
pgwhalen authored and rhauch committed Aug 13, 2019
1 parent b85707c commit 2426926
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 12 deletions.
Expand Up @@ -61,7 +61,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final Set<TopicPartition> paused;

private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private KafkaException exception;
private KafkaException pollException;
private KafkaException offsetsException;
private AtomicBoolean wakeup;
private boolean closed;

Expand All @@ -74,7 +75,7 @@ public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
this.pollTasks = new LinkedList<>();
this.exception = null;
this.pollException = null;
this.wakeup = new AtomicBoolean(false);
this.committed = new HashMap<>();
}
Expand Down Expand Up @@ -173,9 +174,9 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
throw new WakeupException();
}

if (exception != null) {
RuntimeException exception = this.exception;
this.exception = null;
if (pollException != null) {
RuntimeException exception = this.pollException;
this.pollException = null;
throw exception;
}

Expand Down Expand Up @@ -220,8 +221,20 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
recs.add(record);
}

/**
* @deprecated Use {@link #setPollException(KafkaException)} instead
*/
@Deprecated
public synchronized void setException(KafkaException exception) {
this.exception = exception;
setPollException(exception);
}

public synchronized void setPollException(KafkaException exception) {
this.pollException = exception;
}

public synchronized void setOffsetsException(KafkaException exception) {
this.offsetsException = exception;
}

@Override
Expand Down Expand Up @@ -393,6 +406,11 @@ public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<

@Override
public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
if (offsetsException != null) {
RuntimeException exception = this.offsetsException;
this.offsetsException = null;
throw exception;
}
Map<TopicPartition, Long> result = new HashMap<>();
for (TopicPartition tp : partitions) {
Long beginningOffset = beginningOffsets.get(tp);
Expand All @@ -405,6 +423,11 @@ public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicP

@Override
public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
if (offsetsException != null) {
RuntimeException exception = this.offsetsException;
this.offsetsException = null;
throw exception;
}
Map<TopicPartition, Long> result = new HashMap<>();
for (TopicPartition tp : partitions) {
Long endOffset = getEndOffset(endOffsets.get(tp));
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
Expand Down Expand Up @@ -312,6 +313,10 @@ public void run() {
try {
readToLogEnd();
log.trace("Finished read to end log for topic {}", topic);
} catch (TimeoutException e) {
log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " +
"This may occur when brokers are unavailable or unreachable. Reason: {}", topic, e.getMessage());
continue;
} catch (WakeupException e) {
// Either received another get() call and need to retry reading to end of log or stop() was
// called. Both are handled by restarting this loop.
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.TimestampType;
Expand Down Expand Up @@ -370,7 +371,7 @@ public void run() {
}

@Test
public void testConsumerError() throws Exception {
public void testPollConsumerError() throws Exception {
expectStart();
expectStop();

Expand All @@ -388,7 +389,7 @@ public void run() {
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
consumer.setException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
consumer.setPollException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
});

Expand Down Expand Up @@ -423,6 +424,77 @@ public void run() {
PowerMock.verifyAll();
}

@Test
public void testGetOffsetsConsumerErrorOnReadToEnd() throws Exception {
expectStart();

// Producer flushes when read to log end is called
producer.flush();
PowerMock.expectLastCall();

expectStop();

PowerMock.replayAll();
final CountDownLatch finishedLatch = new CountDownLatch(1);
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(TP0, 0L);
endOffsets.put(TP1, 0L);
consumer.updateEndOffsets(endOffsets);
store.start();
final AtomicBoolean getInvoked = new AtomicBoolean(false);
final FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
getInvoked.set(true);
}
});
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
// Once we're synchronized in a poll, start the read to end and schedule the exact set of poll events
// that should follow. This readToEnd call will immediately wakeup this consumer.poll() call without
// returning any data.
Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
newEndOffsets.put(TP0, 1L);
newEndOffsets.put(TP1, 1L);
consumer.updateEndOffsets(newEndOffsets);
// Set exception to occur when getting offsets to read log to end. It'll be caught in the work thread,
// which will retry and eventually get the correct offsets and read log to end.
consumer.setOffsetsException(new TimeoutException("Failed to get offsets by times"));
store.readToEnd(readEndFutureCallback);

// Should keep polling until it reaches current log end offset for all partitions
consumer.scheduleNopPollTask();
consumer.scheduleNopPollTask();
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE));
consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TP0_KEY, TP0_VALUE_NEW));
}
});

consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
finishedLatch.countDown();
}
});
}
});
readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
assertTrue(getInvoked.get());
assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
assertEquals(1L, consumer.position(TP0));

store.stop();

assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
assertTrue(consumer.closed());
PowerMock.verifyAll();
}

@Test
public void testProducerError() throws Exception {
expectStart();
Expand Down
Expand Up @@ -302,7 +302,7 @@ public void shouldRestoreRecordsUpToHighwatermark() {
@Test
public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() {
initializeConsumer(2, 0, t1);
consumer.setException(new InvalidOffsetException("Try Again!") {
consumer.setPollException(new InvalidOffsetException("Try Again!") {
public Set<TopicPartition> partitions() {
return Collections.singleton(t1);
}
Expand Down
Expand Up @@ -236,7 +236,7 @@ public void shouldDieOnInvalidOffsetException() throws Exception {
10 * 1000,
"Input record never consumed");

mockConsumer.setException(new InvalidOffsetException("Try Again!") {
mockConsumer.setPollException(new InvalidOffsetException("Try Again!") {
@Override
public Set<TopicPartition> partitions() {
return Collections.singleton(topicPartition);
Expand Down
Expand Up @@ -158,7 +158,7 @@ public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
public void shouldRecoverFromInvalidOffsetExceptionAndFinishRestore() {
final int messages = 10;
setupConsumer(messages, topicPartition);
consumer.setException(new InvalidOffsetException("Try Again!") {
consumer.setPollException(new InvalidOffsetException("Try Again!") {
@Override
public Set<TopicPartition> partitions() {
return Collections.singleton(topicPartition);
Expand Down
Expand Up @@ -1380,7 +1380,7 @@ public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() t
() -> mockRestoreConsumer.position(changelogPartition) == 1L,
"Never restore first record");

mockRestoreConsumer.setException(new InvalidOffsetException("Try Again!") {
mockRestoreConsumer.setPollException(new InvalidOffsetException("Try Again!") {
@Override
public Set<TopicPartition> partitions() {
return changelogPartitionSet;
Expand Down

0 comments on commit 2426926

Please sign in to comment.