-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6150: Purge repartition topics #4270
KAFKA-6150: Purge repartition topics #4270
Conversation
…into K6170-admin-client
…into K6170-admin-client
…into K6170-admin-client
…into K6150-purge-repartition-topics
@@ -890,17 +888,17 @@ public synchronized ProcessorTopology buildGlobalStateTopology() { | |||
} | |||
|
|||
private ProcessorTopology build(final Set<String> nodeGroup) { | |||
final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the arraylist and use linked hashmap for processorMap to achieve the same thing: following insertion ordering.
…into K6150-purge-repartition-topics
@guozhangwang should this PR have all of those commits in it? |
@dguy yeah a lot of those are rebasing from trunk and other branches, and many others are minor commits. |
A few more notes on the PR itself:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @guozhangwang, left some comments.
@@ -33,7 +33,7 @@ | |||
|
|||
private final Map<TopicPartition, KafkaFuture<DeletedRecords>> futures; | |||
|
|||
DeleteRecordsResult(Map<TopicPartition, KafkaFuture<DeletedRecords>> futures) { | |||
public DeleteRecordsResult(Map<TopicPartition, KafkaFuture<DeletedRecords>> futures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
making this public
means it becomes part of the public API - right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I will just keep the KafkaFuture from the result in TaskManager
then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.<String>emptySet()); | ||
} | ||
|
||
public static ProcessorTopology withSources(final List<ProcessorNode> processorNodes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private
?
Collections.<String>emptySet()); | ||
} | ||
|
||
public static ProcessorTopology withLocalStores(final List<StateStore> stateStores, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private
?
Collections.<String>emptySet()); | ||
} | ||
|
||
public static ProcessorTopology withGlobalStores(final List<StateStore> stateStores, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Collections.<String>emptySet()); | ||
} | ||
|
||
public static ProcessorTopology withRepartitionTopics(final List<ProcessorNode> processorNodes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
|
||
public Set<SinkNode> sinks() { | ||
return new HashSet<>(sinkByTopics.values()); | ||
return new HashSet<>(sinksByTopic.values()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
public Map<String, String> storeToChangelogTopic() { | ||
return storeToChangelogTopic; | ||
} | ||
|
||
public List<StateStore> globalStateStores() { | ||
return globalStateStores; | ||
public boolean isTopicInternalTransient(String topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package private
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe name it isInternalTransientTopic
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isInternalTopicTransient
?
final Map<TopicPartition, Long> purgableConsumedOffsets = new HashMap<>(); | ||
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { | ||
if (topology.isTopicInternalTransient(entry.getKey().topic())) | ||
purgableConsumedOffsets.put(entry.getKey(), entry.getValue() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that right to purge the consumed offset + 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extract .getKey()
and call only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is that right to purge the consumed offset + 1?
The delete records call is to delete records before
the given offset, similar to commit offset, so this is correct.
if (deleteRecordsResult == null || deleteRecordsResult.all().isDone()) { | ||
|
||
if (deleteRecordsResult != null && deleteRecordsResult.all().isCompletedExceptionally()) { | ||
log.info("Previous delete-records request has failed: {}. Try sending the new request now", deleteRecordsResult.lowWatermarks()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be info level? seems more like debug to me, i.e., is a user actually going to care about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to print the whole map as-is? This might be quite large. Maybe filter for failed request?
@@ -542,6 +552,39 @@ public void shouldPropagateExceptionFromStandbyCommit() { | |||
verify(standby); | |||
} | |||
|
|||
@Test | |||
public void shouldSendPurgeData() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a test that also checks that a purge request is sent after a previous one has completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch! Couple of comments.
if (!factory.isWindowStore()) { | ||
return new InternalTopicConfig(name, | ||
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), | ||
factory.logConfig()); | ||
} else { | ||
final InternalTopicConfig config = new InternalTopicConfig(name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move name
to next line
if (!factory.isWindowStore()) { | ||
return new InternalTopicConfig(name, | ||
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), | ||
factory.logConfig()); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We don't need and else
here as the if
part end with return
-- if you prefer if-else pattern, maybe more return
after if-else block (seems to be cleaner to me -- but it's quite subjective)
public Map<String, String> storeToChangelogTopic() { | ||
return storeToChangelogTopic; | ||
} | ||
|
||
public List<StateStore> globalStateStores() { | ||
return globalStateStores; | ||
public boolean isTopicInternalTransient(String topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isInternalTopicTransient
?
@@ -364,6 +365,16 @@ private void commitOffsets(final boolean startNewTransaction) { | |||
} | |||
} | |||
|
|||
Map<TopicPartition, Long> purgableOffsets() { | |||
final Map<TopicPartition, Long> purgableConsumedOffsets = new HashMap<>(); | |||
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
final Map<TopicPartition, Long> purgableConsumedOffsets = new HashMap<>(); | ||
for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { | ||
if (topology.isTopicInternalTransient(entry.getKey().topic())) | ||
purgableConsumedOffsets.put(entry.getKey(), entry.getValue() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extract .getKey()
and call only once.
assertThat(map, equalTo(Collections.singletonMap(repartition, 11L))); | ||
} | ||
|
||
private StreamTask createStatefulTask(boolean eosEnabled, boolean logged) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
(same below)
@@ -328,7 +328,7 @@ public void shouldCommitAfterTheCommitInterval() { | |||
|
|||
@SuppressWarnings({"ThrowableNotThrown", "unchecked"}) | |||
private TaskManager mockTaskManagerCommit(final Consumer<byte[], byte[]> consumer, final int numberOfCommits, final int commits) { | |||
final TaskManager taskManager = EasyMock.createMock(TaskManager.class); | |||
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has one more functions that is not focus of the test now.
|
||
EasyMock.expect(active.recordsToDelete()).andReturn(recordsToDelete).once(); | ||
EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once(); | ||
replay(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not need to pass in active
and adminClient
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replay()
will replay all the mock clients.
public void shouldIgnorePurgeDataErrors() { | ||
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>(); | ||
final Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(t1p1, RecordsToDelete.beforeOffset(5L)); | ||
final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(t1p1, (KafkaFuture<DeletedRecords>) futureDeletedRecords)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cast unnecessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately it is not, since we are using KafkaFutureImpl
for this test (but not the other) in order to call completeExceptionally
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we specify the type via Collections.<KafkaFuture>singletonMap(..)
to make it work? A Map of type KafkaFuture
should accept KafkaFutureImpl
-- or do I miss something? Generics can be confusion sometimes :)
|
||
EasyMock.expect(active.recordsToDelete()).andReturn(recordsToDelete).times(2); | ||
EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2); | ||
replay(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above. pass in mocks?
Addressed comments. |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @guozhangwang, LGTM
Thanks for the reviews! Merged to trunk. |
NOTE that this code depends on the assumption that purge is only called right after the commit has succeeded, hence we presume all consumed offsets are committed.
Committer Checklist (excluded from commit message)