-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
extract checking topic availability from BrokerMessageProducer #1832
Conversation
9d9af13
to
5914731
Compare
List<MetadataLoadingResult> allResults = loadMetadataForTopics(topics, datacenters); | ||
logger.info("Finished loading topic metadata in {}ms", System.currentTimeMillis() - start); | ||
logResultInfo(allResults); | ||
return allResults.stream().anyMatch(MetadataLoadingResult::isFailure); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allResults.stream().anyMatch(MetadataLoadingResult::isFailure)
means that we return true
if any job fails. Shouldn't we negate the result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaReadinessCheckTest
is probably failing because of this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. Fixed.
@@ -13,8 +11,6 @@ | |||
|
|||
public class TopicMetadataLoadingJob implements Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really related to this PR but I think it would be helpful to at least add a comment what is this the purpose of this class - i.e. to keep producers warm. Without that it is hard to tell, it refreshes topics in the background and discards the result - what for?
|
||
when: | ||
hook.refreshMetadata() | ||
hook.refreshMetadataForLocalDatacenter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about checking the result of this method in tests? Right now we are never checking it so we were not able to catch a bug in TopicMetadataLoadingRunner
.
|
||
import java.util.List; | ||
|
||
public class KafkaBrokerTopicMetadataFetcher implements BrokerTopicMetadataFetcher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering whether it makes sense to separate KafkaBrokerTopicMetadataFetcher
from KafkaTopicMetadataFetcher
:
- they names are really similar which makes it confusing
- KafkaTopicMetadataFetcher is only used by KafkaBrokerTopicMetadataFetcher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Merged.
|
||
@Override | ||
public boolean tryFetchFromDatacenter(CachedTopic topic, String datacenter) { | ||
// TODO: To fetch metadata for a selected datacenter https://github.com/allegro/hermes/pull/1823 is required. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the final implementation we have to be extra careful: this class uses Producers
and KafkaTopicMetadataFetcher
. KafkaTopicMetadataFetcher
in turn uses AdminClient
. So we are essentially using two different Kafka clients (KafkaProducer and AdminClient) here and we need to make sure in the future that both of them refer to the same datacenter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, an option worth considering is to introduce datacenter awareness in this PR since its scope is relatively narrow and it is easier to spot any bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me play around with this a bit. Perhaps remodeling this class slightly will address this comment and #1832 (comment).
53765dc
to
bced977
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great PR!
Addresses
hermes/hermes-frontend/src/main/java/pl/allegro/tech/hermes/frontend/producer/kafka/MultiDCKafkaBrokerMessageProducer.java
Line 156 in e27aef8