Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-30056][connectors][kafka] Replace deprecated kafka-clients methods #21150

Merged
merged 2 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static Map<String, TopicDescription> getAllTopicMetadata(AdminClient adminClient
static Map<String, TopicDescription> getTopicMetadata(
AdminClient adminClient, Set<String> topicNames) {
try {
return adminClient.describeTopics(topicNames).all().get();
return adminClient.describeTopics(topicNames).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for topics %s.", topicNames), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import javax.annotation.Nonnull;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -258,7 +259,7 @@ public void run() {
// over
if (records == null) {
try {
records = consumer.poll(pollTimeout);
records = consumer.poll(Duration.ofMillis(pollTimeout));
Comment on lines -261 to +262
Copy link
Contributor

@zentol zentol Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This changes the blocking behavior of this call.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior

I'd like to see a proper ticket for this particular change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@snuyanzin snuyanzin Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please let me know if i need to change commit message or rebase

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split the commits such that 1 is making the poll change, and the other (hotfix) commit with the remaining changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} catch (WakeupException we) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public List<String> generateTestData(TestingSinkSettings sinkSettings, long seed

protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {
try {
return kafkaAdminClient.describeTopics(topics).all().get();
return kafkaAdminClient.describeTopics(topics).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for topics %s.", topics), e);
Expand All @@ -202,7 +202,7 @@ protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) {

private boolean topicExists(String topic) {
try {
kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get();
kafkaAdminClient.describeTopics(Arrays.asList(topic)).allTopicNames().get();
return true;
} catch (Exception e) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,10 @@ private KafkaPartitionDataWriter scaleOutTopic(String topicName) throws Exceptio
final Set<String> topics = adminClient.listTopics().names().get();
if (topics.contains(topicName)) {
final Map<String, TopicDescription> topicDescriptions =
adminClient.describeTopics(Collections.singletonList(topicName)).all().get();
adminClient
.describeTopics(Collections.singletonList(topicName))
.allTopicNames()
.get();
final int numPartitions = topicDescriptions.get(topicName).partitions().size();
LOG.info("Creating partition {} for topic '{}'", numPartitions + 1, topicName);
adminClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ private void assertRecord(String topicName, String expectedKey, String expectedV
kafkaConsumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<String, String> records = ConsumerRecords.empty();
while (records.isEmpty()) {
records = kafkaConsumer.poll(10000);
records = kafkaConsumer.poll(Duration.ofMillis(10000));
}

ConsumerRecord<String, String> record = Iterables.getOnlyElement(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public void createTestTopic(
topicDescriptions =
adminClient
.describeTopics(Collections.singleton(topic))
.all()
.allTopicNames()
.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Exception e) {
LOG.warn("Exception caught when describing Kafka topics", e);
Expand Down Expand Up @@ -331,7 +331,10 @@ public void stopBroker(int brokerId) throws Exception {
public int getLeaderToShutDown(String topic) throws Exception {
try (final AdminClient client = AdminClient.create(getStandardProperties())) {
TopicDescription result =
client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
client.describeTopics(Collections.singleton(topic))
.allTopicNames()
.get()
.get(topic);
return result.partitions().get(0).leader().id();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private Map<String, TopicDescription> describeExternalTopics() {
.map(TopicListing::name)
.collect(Collectors.toList());

return adminClient.describeTopics(topics).all().get();
return adminClient.describeTopics(topics).allTopicNames().get();
} catch (Exception e) {
throw new RuntimeException("Failed to list Kafka topics", e);
}
Expand Down