-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6311: Expose Kafka cluster ID in Connect REST API (KIP-238) #4314
KAFKA-6311: Expose Kafka cluster ID in Connect REST API (KIP-238) #4314
Conversation
b657438
to
d11af89
Compare
@ewencp: this is very useful. do you think we can include some additional metadata besides just the cluster id (for instance, |
@wicknicks agreed that cluster ID isn't necessarily the most useful human-readable property. my motivation in this case is actually pretty much not at all about humans, so i'm not too worried about that :) i'm more interested in just being able to get the id from the kafka cluster and correlate it w/ other info i have (for monitoring, config, mgmt, etc). the comment about i think exposing the worker config properties could be a pretty interesting feature, especially for tools designed to manage both connect & kafka. i'm not sure i'd be in favor of exposing them all on the I'd prefer to keep those extensions to a separate KIP, but if there's something concrete you really want to see included here we could discuss further. And if you have ideas about how those other worker configs could be cleanly exposed (and securely, and in a way where we don't need to worry about compatibility), I would love to see a proposal. We're kind of stingy in what configuration information we expose programmatically across core Kafka, Connect, and Streams; exposing a bit more, but in a conservative way, might make building tooling around these components much simpler. |
@kkonstantine @rhauch @wicknicks KIP vote passed, any comments before I drag in another reviewer to commit? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two minor comments, but looks good to me! Thanks, lgtm
public void testLookupKafkaClusterId() { | ||
final Node broker1 = new Node(0, "dummyHost-1", 1234); | ||
final Node broker2 = new Node(1, "dummyHost-2", 1234); | ||
List<Node> cluster = new ArrayList<Node>(2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have to admit, I'm not a big fan of the trick with the anonymous class on collections.
MockAdminClient
doesn't seem to mutate this argument, plus it should probably create a copy if it did anyways.
Given that, List<Node> cluster = Arrays.asList(broker1, broker1)
would be my suggestion, but I also like to leave plenty of room for different styles, so not a strong suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that Arrays.asList(broker1, broker2)
is more readable and more concise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was just copy paste from another test, cleaned up.
public void testLookupKafkaClusterIdTimeout() { | ||
final Node broker1 = new Node(0, "dummyHost-1", 1234); | ||
final Node broker2 = new Node(1, "dummyHost-2", 1234); | ||
List<Node> cluster = new ArrayList<Node>(2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you change above you might change here too.
@@ -22,10 +22,12 @@ | |||
public class ServerInfo { | |||
private String version; | |||
private String commit; | |||
private String kafkaClusterId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like all the member fields could be final
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with one of @kkonstantine's preferences, but nothing big so +1 from me.
} | ||
|
||
static String lookupKafkaClusterId(AdminClient adminClient) { | ||
log.debug("Looking up Kafka cluster ID"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any benefit to logging (in debug) the cluster ID once we get it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly i logged this statement so it would be obvious what was going wrong if something hangs / you see connection errors logged since this is all blocking full startup. but no harm in also logging the result. i also realized i missed a case where the future can be null if the broker version is < 0.10.1.0, i've handled that now too.
public void testLookupKafkaClusterId() { | ||
final Node broker1 = new Node(0, "dummyHost-1", 1234); | ||
final Node broker2 = new Node(1, "dummyHost-2", 1234); | ||
List<Node> cluster = new ArrayList<Node>(2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that Arrays.asList(broker1, broker2)
is more readable and more concise.
@hachikuji @junrao Just waiting on tests to come back after addressing a few minor comments, either one of you care to review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a few minor comments/questions.
this.kafkaClusterId = kafkaClusterId; | ||
} else { | ||
try (AdminClient adminClient = AdminClient.create(config.originals())) { | ||
this.kafkaClusterId = lookupKafkaClusterId(adminClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be reasonable to move this lookup to the caller (i.e. ConnectDistributed
and ConnectStandalone
). That makes the intent to kill the process if the cluster cannot be found a little clearer. Also, why not move the try
into lookupKafkaClusterId
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i've moved the calls up as suggested and just pass the info into the herders. the actual contents are now in ConnectUtils because I couldn't find them a better home.
the try
is separated so the method is properly unit testable. i would just mock out AdminClient.create
but people seem to prefer doing this to useing powermock.
version = AppInfoParser.getVersion(); | ||
commit = AppInfoParser.getCommitId(); | ||
this.kafkaClusterId = kafkaClusterId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it looks nicer if we use the this.
prefix on all of the fields initialized
return kafkaClusterId; | ||
} catch (InterruptedException e) { | ||
final String msg = "Unexpectedly interrupted when looking up Kafka cluster info"; | ||
log.error(msg, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log and rethrow pattern tends to lead to duplicate logging of errors. I would have expected that something higher up would catch this exception and log it.
@@ -37,4 +39,9 @@ public String version() { | |||
public String commit() { | |||
return commit; | |||
} | |||
|
|||
@JsonProperty("kafka_cluster_id") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the clusterId is null, will the field still show up? I am wondering if there should be a descriptive sentinel instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have to change ObjectMapper settings to omit null values, so it'll be there and null. not sure a sentinel is much better than a null, and actually seems more likely to cause problems since it could incorrectly be interpreted as a valid id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
private int timeoutNextRequests = 0; | ||
|
||
/** | ||
* Creates MockAdminClient for a cluster with the given brokers. By default the first broker in the list is the controller |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It would be a little clearer to use a second argument for the controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hah, you caught me. i was just being lazy because i thought i'd have to update a bunch of places where this is used. turns out it's only used in one other place in streams... changed the signature and updated the comment.
this.controller = brokers.get(0); | ||
} | ||
|
||
public void controller(Node controller) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we don't use this anywhere, but I guess there's no harm having it. Not sure it matters, but maybe we should have a sanity check that the controller is included in the broker list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, i mainly added it because i was adding enough to make describeCluster work and it seemed like it could potentially be useful.
import static org.junit.Assert.assertEquals; | ||
|
||
@RunWith(EasyMockRunner.class) | ||
public class RootResourceTest extends EasyMockSupport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps not too interesting, but maybe we should cover the case when the clusterId is null as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't really add much to this test since we're already verifying it is passing the value through and this code isn't expected to do anything with the value. however, I did add a test for the supporting code when adminclient return a null value.
@hachikuji Mind taking another look? I've addressed the last round of comments, hopefully this is ready to go now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, LGTM.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Simple unit tests sufficiently exercise the behavior. In fact, this addition increases coverage since
RootResource
was not previously unit tested.Committer Checklist (excluded from commit message)