From 8eade93dab98534a1ef6d22cfae5cb819bd37125 Mon Sep 17 00:00:00 2001 From: George Date: Wed, 5 Oct 2016 11:48:02 +0200 Subject: [PATCH] [FLINK-4439] Validate 'bootstrap.servers' config in flink kafka consumer 0.8 --- .../kafka/FlinkKafkaConsumer08.java | 37 +++++++++++- .../connectors/kafka/KafkaConsumer08Test.java | 59 +++++++++++++++++-- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index d7a6364a0ba52..0aacccdaac359 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -42,7 +42,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Node; +import java.net.InetAddress; import java.net.URL; +import java.net.UnknownHostException; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -237,7 +240,7 @@ protected List getKafkaPartitions(List topics) { public static List getPartitionsForTopic(List topics, Properties properties) { String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES); - + checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); String[] seedBrokers = seedBrokersConfString.split(","); List partitions = new ArrayList<>(); @@ -290,6 +293,8 @@ public static List getPartitionsForTopic(List } break retryLoop; // leave the loop through the brokers } catch (Exception e) { + //validates seed brokers in case of a ClosedChannelException + validateSeedBrokers(seedBrokers, e); LOG.warn("Error communicating with broker " + seedBroker + " to find partitions for " + topics.toString() + "." + "" + e.getClass() + ". Message: " + e.getMessage()); LOG.debug("Detailed trace", e); @@ -348,6 +353,36 @@ protected static void validateZooKeeperConfig(Properties props) { } } + /** + * Validate that at least one seed broker is valid in case of a + * ClosedChannelException. + * + * @param seedBrokers + * array containing the seed brokers e.g. ["host1:port1", + * "host2:port2"] + * @param exception + * instance + */ + private static void validateSeedBrokers(String[] seedBrokers, Exception exception) { + if (!(exception instanceof ClosedChannelException)) { + return; + } + int unknownHosts = 0; + for (String broker : seedBrokers) { + URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim()); + try { + InetAddress.getByName(brokerUrl.getHost()); + } catch (UnknownHostException e) { + unknownHosts++; + } + } + // throw meaningful exception if all the provided hosts are invalid + if (unknownHosts == seedBrokers.length) { + throw new IllegalArgumentException("All the servers provided in: '" + + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)"); + } + } + private static long getInvalidOffsetBehavior(Properties config) { final String val = config.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest"); if (val.equals("none")) { diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java index f0b58cff72ee1..9520f551c572c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java @@ -18,17 +18,17 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.kafka.clients.consumer.ConsumerConfig; - -import org.junit.Test; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.Collections; import java.util.Properties; -import static org.junit.Assert.*; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.Test; public class KafkaConsumer08Test { @@ -89,4 +89,51 @@ public void testCreateSourceWithoutCluster() { assertTrue(e.getMessage().contains("Unable to retrieve any partitions")); } } + + @Test + public void testAllBoostrapServerHostsAreInvalid() { + try { + String zookeeperConnect = "localhost:56794"; + String bootstrapServers = "indexistentHost:11111"; + String groupId = "non-existent-group"; + Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId); + FlinkKafkaConsumer08 consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), + new SimpleStringSchema(), props); + consumer.open(new Configuration()); + fail(); + } catch (Exception e) { + assertTrue("Exception should be thrown containing 'all bootstrap servers invalid' message!", + e.getMessage().contains("All the servers provided in: '" + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + + "' config are invalid")); + } + } + + @Test + public void testAtLeastOneBootstrapServerHostIsValid() { + try { + String zookeeperConnect = "localhost:56794"; + // we declare one valid boostrap server, namely the one with + // 'localhost' + String bootstrapServers = "indexistentHost:11111, localhost:22222"; + String groupId = "non-existent-group"; + Properties props = createKafkaProps(zookeeperConnect, bootstrapServers, groupId); + FlinkKafkaConsumer08 consumer = new FlinkKafkaConsumer08<>(Collections.singletonList("no op topic"), + new SimpleStringSchema(), props); + consumer.open(new Configuration()); + fail(); + } catch (Exception e) { + // test is not failing because we have one valid boostrap server + assertTrue("The cause of the exception should not be 'all boostrap server are invalid'!", + !e.getMessage().contains("All the hosts provided in: " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + + " config are invalid")); + } + } + + private Properties createKafkaProps(String zookeeperConnect, String bootstrapServers, String groupId) { + Properties props = new Properties(); + props.setProperty("zookeeper.connect", zookeeperConnect); + props.setProperty("bootstrap.servers", bootstrapServers); + props.setProperty("group.id", groupId); + return props; + } }