Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6150: KIP-204 part III; Change repartition topic segment size and ms #4315

Closed
wants to merge 20 commits into from

Conversation

guozhangwang
Copy link
Contributor

@guozhangwang guozhangwang commented Dec 12, 2017

  1. Create default internal topic configs in StreamsConfig, especially for repartition topics change the segment size and time to smaller value.
  2. Consolidate the default internal topic settings to InternalTopicManager and simplify InternalTopicConfig correspondingly.
  3. Add an integration test for purging data.
  4. MINOR: change TopologyBuilderException to IllegalStateException in StreamPartitionAssignor (part of https://issues.apache.org/jira/browse/KAFKA-5660).

Here are a few public facing APIs that get added:

  1. AbstractConfig#originalsWithPrefix(String prefix, boolean strip): this for simplify the logic of passing admin and topic prefixed configs to consumer properties.
  2. KafkaStreams constructor with Time object for convienent mocking in tests.

Will update KIP-204 accordingly if people re-votes these changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang
Copy link
Contributor Author

@dguy @mjsax @bbejeck for reviews.

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang, made a quick pass and left a few comments/questions

/**
* The configuration for the new topic or null if no configs ever specified.
*/
public Map<String, String> configs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public API change? is this in the KIP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I'm planning to update the KIP and calls for a re-vote including this change.


topicConfig.putAll(internalTopicConfig.toProperties(windowChangeLogAdditionalRetention));

System.out.println("\nCREATE topic " + internalTopicConfig.name() + " with partitions " + internalTopicConfig.numberOfPartitions() + ": config " + topicConfig + "\n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trace log maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

// internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
final Map<String, String> topicConfig = new HashMap<>();

switch (internalTopicConfig.type()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and other code, i.e, if(topicType == ..) makes me think we should maybe have an interface and three implementations of this. Then we can remove the if and switch around the type and just ask the impl to do whatever it needs to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this statement, could you elaborate a bit more? Currently this switch pattern is only called once so I'm not sure if it is worthwhile to refactor it as an interface, or maybe I completely got it wrong since you mention this and other code (while I cannot think of other code?).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what Damian meant is, that we need to pass in the internal topic type in the constructor -- instead of that, we can just instantiate different types directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point now. The reason I wanted to pass in the topicType at constructor is that, the setRetentionMs() can be guarded earlier like we did today: if we do not know the topicType we will blindly accept the retention ms.

.groupBy(MockKeyValueMapper.SelectKeyKeyValueMapper())
.count();

System.out.println(builder.build().describe());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess this isn't needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a pass. Couple of minor comments.

* @param time {@code Time} implementation; cannot be null
* @throws StreamsException if any fatal error occurs
*/
public KafkaStreams(final Topology topology,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Public API change! Can't do this. Can this be package private? Why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, see my description on the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mentioned in the PR description that Time was added for the convenience of unit testing, should we add something similar to javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually do not need to mention that, since people may have their own Time implementation other than SYSTEM in their applications that is not only for testing.

private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier) throws StreamsException {
this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we remove this private constructor and add Time.SYSTEM to the internal this() calls of the others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, though it means we need to add this Time.SYSTEM on all the callers, whereas I was trying to consolidate these in a single call. I guess this this pure subjective. Let me know if you have a strong opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion.

* @param strip strip the prefix before adding to the output if set true
* @return a Map containing the settings with the prefix
*/
public Map<String, Object> originalsWithPrefix(String prefix, boolean strip) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a public API change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, see my description on the PR.

Copy link
Member

@mjsax mjsax Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Reading. A skill I still need to master... :)

// add admin and topic configs required for creating topics
consumerProps.put(adminClientPrefix(RETRIES_CONFIG), getInt(RETRIES_CONFIG));
consumerProps.putAll(originalsWithPrefix(TOPIC_PREFIX, false));
consumerProps.putAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX, false));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pass in all admin client configs? Actually, we only need retries.

Also, I am not sure if we would set the correct default here. If nothing is specified, we would use StreamsConfig retry as default, but we actually should set AdminClientConfig default -- atm, both might be the same so it doesn't matter too much. Just is doesn't seems to be "correct" (in a very strong sense).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point: I will revert the changes.

builder.append(cleanupPolicy.name()).append(",");
}
builder.deleteCharAt(builder.length() - 1);
public Map<String, String> toProperties(final long additionalRetentionMs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Return type and method name to does match. maybe rename to addToTopicConfig ?

Also update JavaDoc return tag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the name / return tag still makes sense: it returns a map of properties from its topic config, retention ms and the passed-in additional retention ms.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound like it uses Properties class (while it doesn't) -- but maybe I am overthinking this... Should be fine.

try {
Collection<DescribeLogDirsResponse.LogDirInfo> logDirInfo = adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values();

if (logDirInfo.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WE don't need this check. If logDirInfo.isEmpty() == true the for-loop will never be entered and we hit the return false after the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

"Repartition topic " + REPARTITION_TOPIC + " not created with the expected configs after 60000 ms.");

TestUtils.waitForCondition(
new RepartitionTopicPurged(new TopicSizeVerifier() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, we actually test if the topic is not purged yet? Thus, the name RepartitionTopicPurged is a little miss leading/confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I'm renaming to RepartitionTopicVerified, which is used for both verify filled and verify purged.

}
}
return false;
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add final

}
}
}
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we have single return false at the very end?

new RepartitionTopicPurged(new TopicSizeVerifier() {
@Override
public boolean verify(long currentSize) {
return currentSize <= purgeSegmentBytes;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must it not be < ? If the latest segment is full, won't we roll it and thus it would get purged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will roll if the segment size > segment bytes, not >=.


@Override
public final boolean conditionMet() {
time.sleep(purgeIntervalMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we advance the time before we verify that the topic was written. I think, this is ok for the "purging" test but not for the first test for which we expect to see data in the repartitioning topic. As the StreamsThreads run in the back ground, in the first check directly after we advanced time, we could have a context switch and data might get purged and thus, we can never hit the condition that "repartitionsTopicSize > 0"

Or can this never happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen, since starting offset based purging should never be "clean". I.e. even if you set the purge offset to be LEO it should not delete the topic partition segments to empty.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Thx.

@guozhangwang
Copy link
Contributor Author

Jenkins failures seem not relevant. @dguy @mjsax have addressed the comments.

@@ -112,6 +112,8 @@
* These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig TopicConfig}.
* It is recommended to use {@link #topicPrefix(String)}.
*/
// TODO: currently we cannot get the full topic configurations and hence cannot allow topic configs without the prefix,
// this can be lifted once kafka.log.LogConfig is completely deprecated by org.apache.kafka.common.config.TopicConfig
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't understand this comment. Can you elaborate?

Copy link
Contributor Author

@guozhangwang guozhangwang Dec 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For admin, producer and consumer we have getXXXConfigs which relies on getClientPropsWithPrefix(XXX_PREFIX, XXXConfig.configNames()); However for topic configs we do not have such a class (note TopicConfig is not sufficient, and the source of truth is still in broker-side KafkaConfig), so we cannot have a getTopicConfigs that can detect all the topic config names. Hence we ALWAYS have to prefix the config name to indicate it is a topic config name, unlike others which we do not have to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx. Got it.

final AdminClientConfig config = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()));
consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), config.getInt(AdminClientConfig.RETRIES_CONFIG));

// add admin and topic configs required for creating topics, also add batch size of producer for verification
final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
consumerProps.put(producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG) ? (Integer) producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG) : 16384);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does 16384 come from? Sounds like the producer default config value. Shouldn't we get this out of the ProducerConfig similar to retires from AdminClientConfig ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is indeed from ProducerConfig, unfortunately this value is not exposed from ProducerConfig object and hence cannot be leveraged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We have the same issue with AdminClientConfig and retries -- thus, we instantiate a AdminClientConfig got get the default out of it. Can we do the same thing here and instantiate a ProducerConfig object?

I now it's not very nice code, but still better than hardcoding the value.

topicConfig.put(key, topicProperties.getProperty(key));
final Map<String, String> topicConfig = internalTopicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention);

if (topicConfig.containsKey(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we put this check into StreamsConfig ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had tried that. The problem is that the default topic configs value are unknown at the beginning. But maybe I can rework something out of it.

Bare with me for another minor change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Okay now I remembered why I cannot do that, because ProducerConfig constructor is not public in its package as AdminClientConfig constructor. That's why I cannot do sth. like

new ProducerConfig(getProducerConfigs("dummy")).getInt(ProducerConfig.BATCH_SIZE_CONFIG);

to read it out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like an inconsistency... Would it be a problem making those constructors (ie, also for consumer) public?

// internal topic config overridden rule: library overrides < global config overrides < per-topic config overrides
final Map<String, String> topicConfig = new HashMap<>();

switch (topicType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i said this before, but i think it would be better/nicer to have different implementations of InternalTopicConfig for the different types of internal topics and then we don't need to have switch and if statements. Each impl will know what ti needs to do. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it but felt it may be an overkill.. because now we have all the logics depending on type consolidated in one class already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally support Damian idea here. Don't think it's overkill but actually more elegant OO code. If you feel strong to keep as it, also fine with me though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy @mjsax Just adding a few more notes here: I think the major concern is our type-dependent logic is scattered across multiple packages and classes, so we'd like to consolidate it. One way is to still pass in the type information but make all related logic within this single class (what I did), another way is to make different instantiations of the class so we do not need to have this type field at all. Personally I think they achieved similar goal while mine is less code: the downside is, as mentioned in #4315 (comment) the setRetentionMs should only be accepted with windowed store changelog type, but we still need to provide this API for either approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally support Damian idea here. Don't think it's overkill but actually more elegant OO code. If you feel strong to keep as it, also fine with me though.

I did not see this comment while writing mine. If my understanding about the root concern is right but we feel it is better to get rid of this type information, I can change it (i.e. I do not feel strong about it :P). LMK @mjsax @dguy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is overkill either, it is just good OO design. Anyway, i'm not going to block this PR because of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I have refactored InternalTopicConfig in the latest commit as you suggested and will go ahead to merge it as is; if you can take a quick look that would be greatly appreciated.

@guozhangwang
Copy link
Contributor Author

@mjsax @dguy could you take another look? Also please let me know about #4315 (comment)

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

consumerProps.put(producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), producerProps.containsKey(ProducerConfig.BATCH_SIZE_CONFIG) ? (Integer) producerProps.get(ProducerConfig.BATCH_SIZE_CONFIG) : 16384);
consumerProps.putAll(originalsWithPrefix(TOPIC_PREFIX, false));
// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@guozhangwang
Copy link
Contributor Author

Jenkins failures are irrelevant, it is a known issue: https://issues.apache.org/jira/browse/KAFKA-6377

@guozhangwang
Copy link
Contributor Author

Merged to trunk.

@asfgit asfgit closed this in 82c6d42 Dec 20, 2017
private final Map<String, String> logConfig;
private final Set<CleanupPolicy> cleanupPolicies;
// we need to distinguish windowed and un-windowed store changelog since their cleanup policy may be different
public enum InternalTopicType { REPARTITION, WINDOWED_STORE_CHANGELOG, UNWINDOWED_STORE_CHANGELOG }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can be removed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, will do in the docs PR.

if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length())
result.put(entry.getKey().substring(prefix.length()), entry.getValue());
if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
if (strip)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this check can be lifted above the check at line 192.
Since we don't need to check prefix if strip == false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we still need to check prefix even if strip == false, the only difference is that if the check of the prefix succeeds, whether or not to maintain the prefix in the returned map.

@guozhangwang guozhangwang deleted the K6150-segment-size branch February 14, 2018 19:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants