New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-4916: test streams with brokers failing #2719
Changes from 22 commits
187f9b6
87f06a8
978f925
57856b0
5086dd7
40394f3
1ee01c1
0d30ba4
7dae290
7252e62
4eb3670
9df1010
829f57a
f702616
7f68745
10f40a3
fc1cabc
5e045e7
b34d6d3
05324ce
87b4ade
6a42955
cd274dc
247c7ea
6defb84
213b892
19d502f
4ac627a
90d077a
878d3b6
090877e
595566b
17029e7
86edfe2
a67575e
0b5743c
ad1848f
f66bcd3
186d7d0
bc68fe8
bca3a69
d690183
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
package org.apache.kafka.streams.processor.internals; | ||
|
||
import org.apache.kafka.common.requests.MetadataResponse; | ||
import org.apache.kafka.common.utils.Time; | ||
import org.apache.kafka.streams.errors.StreamsException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -35,16 +36,18 @@ public class InternalTopicManager { | |
public static final String RETENTION_MS = "retention.ms"; | ||
public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS); | ||
private static final int MAX_TOPIC_READY_TRY = 5; | ||
|
||
private final Time time; | ||
private final long windowChangeLogAdditionalRetention; | ||
|
||
private final int replicationFactor; | ||
private final StreamsKafkaClient streamsKafkaClient; | ||
|
||
public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, final long windowChangeLogAdditionalRetention) { | ||
public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, | ||
final long windowChangeLogAdditionalRetention, final Time time) { | ||
this.streamsKafkaClient = streamsKafkaClient; | ||
this.replicationFactor = replicationFactor; | ||
this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention; | ||
this.time = time; | ||
} | ||
|
||
/** | ||
|
@@ -60,11 +63,17 @@ public void makeReady(final Map<InternalTopicConfig, Integer> topics) { | |
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata(); | ||
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata); | ||
final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics, existingTopicPartitions); | ||
if (metadata.brokers().size() < replicationFactor) { | ||
throw new StreamsException("Not enough brokers " + metadata.brokers().size() + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I think the error message does not read well: Better: |
||
" for replication factor " + replicationFactor); | ||
} | ||
streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention, metadata); | ||
return; | ||
} catch (StreamsException ex) { | ||
log.warn("Could not create internal topics: " + ex.getMessage() + " Retry #" + i); | ||
} | ||
// backoff | ||
time.sleep(100L); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we increase backup time if we keep retrying? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Traditionally we don't do anything clever with backoff times throughout the code. It can get complicated, e.g., by how much to increase in each step. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Depends on the use case: either add the same value each time, or double up. We this when waiting for locks to get released on rebalance already: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not ready for this yet. We can revisit backoffs in all the streams code and see how the networking client code has done them. Also we should tie these numbers to some sort of config users can set. |
||
} | ||
throw new StreamsException("Could not create internal topics."); | ||
} | ||
|
@@ -73,11 +82,20 @@ public void makeReady(final Map<InternalTopicConfig, Integer> topics) { | |
* Get the number of partitions for the given topics | ||
*/ | ||
public Map<String, Integer> getNumPartitions(final Set<String> topics) { | ||
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata(); | ||
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata); | ||
existingTopicPartitions.keySet().retainAll(topics); | ||
for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) { | ||
try { | ||
final MetadataResponse metadata = streamsKafkaClient.fetchMetadata(); | ||
final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata); | ||
existingTopicPartitions.keySet().retainAll(topics); | ||
|
||
return existingTopicPartitions; | ||
return existingTopicPartitions; | ||
} catch (StreamsException ex) { | ||
log.warn("Could not get number of partitions: " + ex.getMessage() + " Retry #" + i); | ||
} | ||
// backoff | ||
time.sleep(100L); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above? |
||
} | ||
throw new StreamsException("Could not get number of partitions."); | ||
} | ||
|
||
public void close() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -207,7 +207,7 @@ public void configure(Map<String, ?> configs) { | |
configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1, | ||
configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ? | ||
(Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) | ||
: WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); | ||
: WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, streamThread.time); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking it might be better to have a field in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But couldn't that lead to cases when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In tests that could happen. I guess there are other There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd agree with Damian to have this Time object pass long the hierarchy than passing it from the thread directly to the internal topic manager. I would not worry too much about passing in different objects since both of them are internal topics so the only place we may directly pass the object is in unit tests. |
||
|
||
this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(streamThread.getName()); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -198,7 +198,7 @@ private synchronized void setStateWhenNotInPendingShutdown(final State newState) | |
private final Set<TaskId> prevActiveTasks; | ||
private final Map<TaskId, StreamTask> suspendedTasks; | ||
private final Map<TaskId, StandbyTask> suspendedStandbyTasks; | ||
private final Time time; | ||
public final Time time; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment above |
||
private final int rebalanceTimeoutMs; | ||
private final long pollTimeMs; | ||
private final long cleanTimeMs; | ||
|
@@ -401,17 +401,21 @@ private RuntimeException unAssignChangeLogPartitions() { | |
|
||
@SuppressWarnings("ThrowableNotThrown") | ||
private void shutdownTasksAndState() { | ||
log.debug("{} shutdownTasksAndState: shutting down all active tasks {} and standby tasks {}", logPrefix, | ||
activeTasks.keySet(), standbyTasks.keySet()); | ||
log.debug("{} shutdownTasksAndState: shutting down" + | ||
"active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", | ||
logPrefix, activeTasks.keySet(), standbyTasks.keySet(), | ||
suspendedTasks.keySet(), suspendedStandbyTasks.keySet()); | ||
|
||
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); | ||
// Close all processors in topology order | ||
firstException.compareAndSet(null, closeAllTasks()); | ||
firstException.compareAndSet(null, closeTasks(activeAndStandbytasks())); | ||
firstException.compareAndSet(null, closeTasks(suspendedAndSuspendedStandbytasks())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For suspended tasks, could the closure process be simpler? For example we have already closed the topology as well as committing the states, etc. Ditto below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could add an extra method in StreamTask.java to close the rest, not the topology. However, this only happens at shutdown, not sure it's worth it. So it could be simpler, but with more lines of code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. |
||
// flush state | ||
firstException.compareAndSet(null, flushAllState()); | ||
// Close all task state managers. Don't need to set exception as all | ||
// state would have been flushed above | ||
closeAllStateManagers(firstException.get() == null); | ||
closeStateManagers(activeAndStandbytasks(), firstException.get() == null); | ||
closeStateManagers(suspendedAndSuspendedStandbytasks(), firstException.get() == null); | ||
// only commit under clean exit | ||
if (cleanRun && firstException.get() == null) { | ||
firstException.set(commitOffsets()); | ||
|
@@ -430,7 +434,7 @@ private void suspendTasksAndState() { | |
activeTasks.keySet(), standbyTasks.keySet()); | ||
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); | ||
// Close all topology nodes | ||
firstException.compareAndSet(null, closeAllTasksTopologies()); | ||
firstException.compareAndSet(null, closeActiveAndStandbyTasksTopologies()); | ||
// flush state | ||
firstException.compareAndSet(null, flushAllState()); | ||
// only commit after all state has been flushed and there hasn't been an exception | ||
|
@@ -453,21 +457,20 @@ interface AbstractTaskAction { | |
void apply(final AbstractTask task); | ||
} | ||
|
||
private RuntimeException performOnAllTasks(final AbstractTaskAction action, | ||
final String exceptionMessage) { | ||
private RuntimeException performOnTasks(final List<AbstractTask> tasks, | ||
final AbstractTaskAction action, | ||
final String exceptionMessage) { | ||
RuntimeException firstException = null; | ||
final List<AbstractTask> allTasks = new ArrayList<AbstractTask>(activeTasks.values()); | ||
allTasks.addAll(standbyTasks.values()); | ||
for (final AbstractTask task : allTasks) { | ||
for (final AbstractTask task : tasks) { | ||
try { | ||
action.apply(task); | ||
} catch (RuntimeException t) { | ||
log.error("{} Failed while executing {} {} due to {}: ", | ||
StreamThread.this.logPrefix, | ||
task.getClass().getSimpleName(), | ||
task.id(), | ||
exceptionMessage, | ||
t); | ||
StreamThread.this.logPrefix, | ||
task.getClass().getSimpleName(), | ||
task.id(), | ||
exceptionMessage, | ||
t); | ||
if (firstException == null) { | ||
firstException = t; | ||
} | ||
|
@@ -476,8 +479,20 @@ private RuntimeException performOnAllTasks(final AbstractTaskAction action, | |
return firstException; | ||
} | ||
|
||
private Throwable closeAllStateManagers(final boolean writeCheckpoint) { | ||
return performOnAllTasks(new AbstractTaskAction() { | ||
private List<AbstractTask> activeAndStandbytasks() { | ||
final List<AbstractTask> tasks = new ArrayList<AbstractTask>(activeTasks.values()); | ||
tasks.addAll(standbyTasks.values()); | ||
return tasks; | ||
} | ||
|
||
private List<AbstractTask> suspendedAndSuspendedStandbytasks() { | ||
final List<AbstractTask> tasks = new ArrayList<AbstractTask>(suspendedTasks.values()); | ||
tasks.addAll(suspendedStandbyTasks.values()); | ||
return tasks; | ||
} | ||
|
||
private Throwable closeStateManagers(final List<AbstractTask> tasks, final boolean writeCheckpoint) { | ||
return performOnTasks(tasks, new AbstractTaskAction() { | ||
@Override | ||
public void apply(final AbstractTask task) { | ||
log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, task.id()); | ||
|
@@ -488,7 +503,7 @@ public void apply(final AbstractTask task) { | |
|
||
private RuntimeException commitOffsets() { | ||
// Exceptions should not prevent this call from going through all shutdown steps | ||
return performOnAllTasks(new AbstractTaskAction() { | ||
return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() { | ||
@Override | ||
public void apply(final AbstractTask task) { | ||
log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix, task.id()); | ||
|
@@ -498,7 +513,7 @@ public void apply(final AbstractTask task) { | |
} | ||
|
||
private RuntimeException flushAllState() { | ||
return performOnAllTasks(new AbstractTaskAction() { | ||
return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() { | ||
@Override | ||
public void apply(final AbstractTask task) { | ||
log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix, task.id()); | ||
|
@@ -579,6 +594,7 @@ private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> records) { | |
} | ||
} | ||
|
||
|
||
/** | ||
* Schedule the records processing by selecting which record is processed next. Commits may | ||
* happen as records are processed. | ||
|
@@ -1072,8 +1088,8 @@ private void removeStandbyTasks() { | |
standbyRecords.clear(); | ||
} | ||
|
||
private RuntimeException closeAllTasks() { | ||
return performOnAllTasks(new AbstractTaskAction() { | ||
private RuntimeException closeTasks(final List<AbstractTask> tasks) { | ||
return performOnTasks(tasks, new AbstractTaskAction() { | ||
@Override | ||
public void apply(final AbstractTask task) { | ||
log.info("{} Closing task {}", StreamThread.this.logPrefix, task.id()); | ||
|
@@ -1083,8 +1099,9 @@ public void apply(final AbstractTask task) { | |
}, "close"); | ||
} | ||
|
||
private RuntimeException closeAllTasksTopologies() { | ||
return performOnAllTasks(new AbstractTaskAction() { | ||
|
||
private RuntimeException closeActiveAndStandbyTasksTopologies() { | ||
return performOnTasks(activeAndStandbytasks(), new AbstractTaskAction() { | ||
@Override | ||
public void apply(final AbstractTask task) { | ||
log.info("{} Closing task's topology {}", StreamThread.this.logPrefix, task.id()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ | |
import org.apache.kafka.common.Node; | ||
import org.apache.kafka.common.protocol.Errors; | ||
import org.apache.kafka.common.requests.MetadataResponse; | ||
import org.apache.kafka.common.utils.MockTime; | ||
import org.apache.kafka.common.utils.Time; | ||
import org.apache.kafka.streams.StreamsConfig; | ||
import org.apache.kafka.streams.errors.StreamsException; | ||
import org.apache.kafka.test.MockTimestampExtractor; | ||
|
@@ -40,7 +42,7 @@ public class InternalTopicManagerTest { | |
private final String topic = "test_topic"; | ||
private final String userEndPoint = "localhost:2171"; | ||
private MockStreamKafkaClient streamsKafkaClient; | ||
|
||
private final Time time = new MockTime(); | ||
@Before | ||
public void init() { | ||
final StreamsConfig config = new StreamsConfig(configProps()); | ||
|
@@ -54,19 +56,22 @@ public void shutdown() throws IOException { | |
|
||
@Test | ||
public void shouldReturnCorrectPartitionCounts() throws Exception { | ||
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); | ||
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, | ||
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time); | ||
Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic))); | ||
} | ||
|
||
@Test | ||
public void shouldCreateRequiredTopics() throws Exception { | ||
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); | ||
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, | ||
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time); | ||
internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1)); | ||
} | ||
|
||
@Test | ||
public void shouldNotCreateTopicIfExistsWithDifferentPartitions() throws Exception { | ||
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT); | ||
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1, | ||
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time); | ||
boolean exceptionWasThrown = false; | ||
try { | ||
internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 2)); | ||
|
@@ -104,7 +109,7 @@ public MetadataResponse fetchMetadata() { | |
Node node = new Node(1, "host1", 1001); | ||
MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>()); | ||
MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata)); | ||
MetadataResponse response = new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, | ||
MetadataResponse response = new MetadataResponse(Collections.<Node>singletonList(node), null, MetadataResponse.NO_CONTROLLER_ID, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because now in |
||
Collections.singletonList(topicMetadata)); | ||
return response; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think
validateTopicPartitions
should also validatereplicationFactor