From 6a29dd2f481141d861ce2e23085e765dda3d9435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Hermann?= Date: Mon, 23 Mar 2015 14:18:55 +0100 Subject: [PATCH 1/3] [FLINK-1753] [streaming] Added Kafka broker failure test --- .../connectors/kafka/api/KafkaSink.java | 19 +- .../kafka/api/simple/KafkaTopicUtils.java | 152 +++++++++++- .../api/simple/PersistentKafkaSource.java | 8 +- .../KafkaMultiplePartitionsIterator.java | 16 +- .../KafkaSinglePartitionIterator.java | 142 ++++++++--- .../kafka/api/simple/offset/KafkaOffset.java | 24 +- .../connectors/kafka/KafkaITCase.java | 223 ++++++++++++++---- .../connectors/kafka/KafkaTopicUtilsTest.java | 152 ++++++++++++ .../kafka/util/KafkaLocalSystemTime.java | 48 ++++ 9 files changed, 666 insertions(+), 118 deletions(-) create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java create mode 100644 flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java index 0bbf9a7581484..1d4cdb5eb81c9 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java @@ -27,6 +27,8 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -45,6 +47,8 @@ public class KafkaSink extends RichSinkFunction { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); + private Producer producer; private Properties props; private String topicId; @@ -64,7 +68,7 @@ public class KafkaSink extends RichSinkFunction { * @param serializationSchema * User defined serialization schema. */ - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({"rawtypes", "unchecked"}) public KafkaSink(String zookeeperAddress, String topicId, SerializationSchema serializationSchema) { this(zookeeperAddress, topicId, serializationSchema, (Class) null); @@ -114,12 +118,17 @@ public KafkaSink(String zookeeperAddress, String topicId, public void open(Configuration configuration) { KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperAddress); - String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId); + String listOfBrokers = kafkaTopicUtils.getBrokerList(topicId); + + if (LOG.isInfoEnabled()) { + LOG.info("Broker list: {}", listOfBrokers); + } props = new Properties(); - props.put("metadata.broker.list", brokerAddress); - props.put("request.required.acks", "1"); + props.put("metadata.broker.list", listOfBrokers); + props.put("request.required.acks", "-1"); + props.put("message.send.max.retries", "10"); props.put("serializer.class", DefaultEncoder.class.getCanonicalName()); @@ -140,7 +149,7 @@ public void open(Configuration configuration) { try { producer = new Producer(config); } catch (NullPointerException e) { - throw new RuntimeException("Cannot connect to Kafka broker " + brokerAddress, e); + throw new RuntimeException("Cannot connect to Kafka broker " + listOfBrokers, e); } } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java index 9e09ea8561f6d..365961d98ec68 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java @@ -19,7 +19,10 @@ import java.io.UnsupportedEncodingException; import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; import java.util.Properties; +import java.util.Set; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkMarshallingError; @@ -31,6 +34,8 @@ import kafka.api.PartitionMetadata; import kafka.api.TopicMetadata; import kafka.cluster.Broker; +import kafka.common.LeaderNotAvailableException; +import kafka.common.UnknownTopicOrPartitionException; import scala.collection.JavaConversions; import scala.collection.Seq; @@ -42,22 +47,29 @@ public class KafkaTopicUtils { private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtils.class); - private final ZkClient zkClient; + private ZkClient zkClient; public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10000; public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10000; + private final String zookeeperAddress; + private final int sessionTimeoutMs; + private final int connectionTimeoutMs; + + private volatile boolean isRunning = false; + public KafkaTopicUtils(String zookeeperServer) { this(zookeeperServer, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS); } public KafkaTopicUtils(String zookeeperAddress, int sessionTimeoutMs, int connectionTimeoutMs) { - zkClient = new ZkClient(zookeeperAddress, sessionTimeoutMs, connectionTimeoutMs, - new KafkaZKStringSerializer()); - zkClient.waitUntilConnected(); + this.zookeeperAddress = zookeeperAddress; + this.sessionTimeoutMs = sessionTimeoutMs; + this.connectionTimeoutMs = connectionTimeoutMs; } public void createTopic(String topicName, int numOfPartitions, int replicationFactor) { + LOG.info("Creating Kafka topic '{}'", topicName); Properties topicConfig = new Properties(); if (topicExists(topicName)) { @@ -65,36 +77,150 @@ public void createTopic(String topicName, int numOfPartitions, int replicationFa LOG.warn("Kafka topic \"{}\" already exists. Returning without action.", topicName); } } else { + LOG.info("Connecting zookeeper"); + + initZkClient(); AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig); + closeZkClient(); } } + public String getBrokerList(String topicName) { + return getBrokerAddressList(getBrokerAddresses(topicName)); + } + + public String getBrokerList(String topicName, int partitionId) { + return getBrokerAddressList(getBrokerAddresses(topicName, partitionId)); + } + + public Set getBrokerAddresses(String topicName) { + int numOfPartitions = getNumberOfPartitions(topicName); + + HashSet brokers = new HashSet(); + for (int i = 0; i < numOfPartitions; i++) { + brokers.addAll(getBrokerAddresses(topicName, i)); + } + return brokers; + } + + public Set getBrokerAddresses(String topicName, int partitionId) { + PartitionMetadata partitionMetadata = waitAndGetPartitionMetadata(topicName, partitionId); + Collection inSyncReplicas = JavaConversions.asJavaCollection(partitionMetadata.isr()); + + HashSet addresses = new HashSet(); + for (Broker broker : inSyncReplicas) { + addresses.add(broker.connectionString()); + } + return addresses; + } + + private static String getBrokerAddressList(Set brokerAddresses) { + StringBuilder brokerAddressList = new StringBuilder(""); + for (String broker : brokerAddresses) { + brokerAddressList.append(broker); + brokerAddressList.append(','); + } + brokerAddressList.deleteCharAt(brokerAddressList.length() - 1); + + return brokerAddressList.toString(); + } + public int getNumberOfPartitions(String topicName) { - Seq partitionMetadataSeq = getTopicInfo(topicName).partitionsMetadata(); + Seq partitionMetadataSeq = getTopicMetadata(topicName).partitionsMetadata(); return JavaConversions.asJavaCollection(partitionMetadataSeq).size(); } - public String getLeaderBrokerAddressForTopic(String topicName) { - TopicMetadata topicInfo = getTopicInfo(topicName); + public PartitionMetadata waitAndGetPartitionMetadata(String topicName, int partitionId) { + isRunning = true; + PartitionMetadata partitionMetadata = null; + while (isRunning) { + try { + partitionMetadata = getPartitionMetadata(topicName, partitionId); + return partitionMetadata; + } catch (LeaderNotAvailableException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got {} trying to fetch metadata again", e.getMessage()); + } + } + } + isRunning = false; + return partitionMetadata; + } + + public PartitionMetadata getPartitionMetadata(String topicName, int partitionId) { + PartitionMetadata partitionMetadata = getPartitionMetadataWithErrorCode(topicName, partitionId); + switch (partitionMetadata.errorCode()) { + case 0: + return partitionMetadata; + case 3: + throw new UnknownTopicOrPartitionException("While fetching metadata for " + topicName + " / " + partitionId); + case 5: + throw new LeaderNotAvailableException("While fetching metadata for " + topicName + " / " + partitionId); + default: + throw new RuntimeException("Unknown error occurred while fetching metadata for " + + topicName + " / " + partitionId + ", with error code: " + partitionMetadata.errorCode()); + } + } + + private PartitionMetadata getPartitionMetadataWithErrorCode(String topicName, int partitionId) { + TopicMetadata topicInfo = getTopicMetadata(topicName); Collection partitions = JavaConversions.asJavaCollection(topicInfo.partitionsMetadata()); - PartitionMetadata partitionMetadata = partitions.iterator().next(); - Broker leader = JavaConversions.asJavaCollection(partitionMetadata.isr()).iterator().next(); + Iterator iterator = partitions.iterator(); + for (PartitionMetadata partition : partitions) { + if (partition.partitionId() == partitionId) { + return partition; + } + } - return leader.connectionString(); + throw new RuntimeException("No such partition: " + topicName + " / " + partitionId); } - public TopicMetadata getTopicInfo(String topicName) { + public TopicMetadata getTopicMetadata(String topicName) { + TopicMetadata topicMetadata = getTopicMetadataWithErrorCode(topicName); + switch (topicMetadata.errorCode()) { + case 0: + return topicMetadata; + case 3: + throw new UnknownTopicOrPartitionException("While fetching metadata for topic " + topicName); + case 5: + throw new LeaderNotAvailableException("While fetching metadata for topic " + topicName); + default: + throw new RuntimeException("Unknown error occurred while fetching metadata for topic " + + topicName + ", with error code: " + topicMetadata.errorCode()); + } + } + + private TopicMetadata getTopicMetadataWithErrorCode(String topicName) { if (topicExists(topicName)) { - return AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient); + initZkClient(); + TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient); + closeZkClient(); + + return topicMetadata; } else { throw new RuntimeException("Topic does not exist: " + topicName); } } public boolean topicExists(String topicName) { - return AdminUtils.topicExists(zkClient, topicName); + initZkClient(); + boolean topicExists = AdminUtils.topicExists(zkClient, topicName); + closeZkClient(); + + return topicExists; + } + + private void initZkClient() { + zkClient = new ZkClient(zookeeperAddress, sessionTimeoutMs, connectionTimeoutMs, + new KafkaZKStringSerializer()); + zkClient.waitUntilConnected(); + } + + private void closeZkClient() { + zkClient.close(); + zkClient = null; } private static class KafkaZKStringSerializer implements ZkSerializer { diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java index 45dc1c45a7873..7cd8a28b80778 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java @@ -167,8 +167,6 @@ public void open(Configuration parameters) throws InterruptedException { int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId); - String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId); - if (indexOfSubtask >= numberOfPartitions) { iterator = new KafkaIdleConsumerIterator(); } else { @@ -188,7 +186,7 @@ public void open(Configuration parameters) throws InterruptedException { context.registerState("kafka", kafkaOffSet); } - iterator = getMultiKafkaIterator(brokerAddress, topicId, partitions, waitOnEmptyFetchMillis); + iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, waitOnEmptyFetchMillis, connectTimeoutMs, bufferSize); if (LOG.isInfoEnabled()) { LOG.info("KafkaSource ({}/{}) listening to partitions {} of topic {}.", @@ -199,10 +197,6 @@ public void open(Configuration parameters) throws InterruptedException { iterator.initialize(); } - protected KafkaConsumerIterator getMultiKafkaIterator(String hostName, String topic, Map partitionsWithOffset, int waitOnEmptyFetch) { - return new KafkaMultiplePartitionsIterator(hostName, topic, partitionsWithOffset, waitOnEmptyFetch, this.connectTimeoutMs, this.bufferSize); - } - @Override public void run(Collector collector) throws Exception { isRunning = true; diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java index 40e1ff2d41a54..b76421e79bba3 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils; import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata; import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset; import org.slf4j.Logger; @@ -33,25 +34,22 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator { protected List partitions; protected final int waitOnEmptyFetch; - public KafkaMultiplePartitionsIterator(String hostName, String topic, + public KafkaMultiplePartitionsIterator(String topic, Map partitionsWithOffset, + KafkaTopicUtils kafkaTopicUtils, int waitOnEmptyFetch, int connectTimeoutMs, int bufferSize) { partitions = new ArrayList(partitionsWithOffset.size()); - String[] hostAndPort = hostName.split(":"); - - String host = hostAndPort[0]; - int port = Integer.parseInt(hostAndPort[1]); - this.waitOnEmptyFetch = waitOnEmptyFetch; for (Map.Entry partitionWithOffset : partitionsWithOffset.entrySet()) { partitions.add(new KafkaSinglePartitionIterator( - host, - port, topic, partitionWithOffset.getKey(), - partitionWithOffset.getValue(), connectTimeoutMs, bufferSize)); + partitionWithOffset.getValue(), + kafkaTopicUtils, + connectTimeoutMs, + bufferSize)); } } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java index cf49e43a2ca91..6e326e590b053 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java @@ -19,11 +19,14 @@ import java.io.Serializable; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Set; +import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils; import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata; import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset; import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset; @@ -32,7 +35,9 @@ import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; +import kafka.cluster.Broker; import kafka.common.ErrorMapping; +import kafka.common.NotLeaderForPartitionException; import kafka.javaapi.FetchResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; @@ -53,34 +58,43 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri private List hosts; private String topic; - private int port; private int partition; private long readOffset; private transient SimpleConsumer consumer; private List replicaBrokers; private String clientName; - private String leadBroker; + private Broker leadBroker; private final int connectTimeoutMs; private final int bufferSize; private KafkaOffset initialOffset; private transient Iterator iter; private transient FetchResponse fetchResponse; + private volatile boolean isRunning; /** * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers. * - * @param hostName Hostname of a known Kafka broker - * @param port Port of the known Kafka broker - * @param topic Name of the topic to listen to - * @param partition Partition in the chosen topic + * @param topic + * Name of the topic to listen to + * @param partition + * Partition in the chosen topic + * @param initialOffset + * Offset to start consuming at + * @param kafkaTopicUtils + * Util for receiving topic metadata + * @param connectTimeoutMs + * Connection timeout in milliseconds + * @param bufferSize + * Size of buffer */ - public KafkaSinglePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset, - int connectTimeoutMs, int bufferSize) { - this.hosts = new ArrayList(); - hosts.add(hostName); - this.port = port; + public KafkaSinglePartitionIterator(String topic, int partition, KafkaOffset initialOffset, + KafkaTopicUtils kafkaTopicUtils, int connectTimeoutMs, int bufferSize) { + + Set brokerAddresses = kafkaTopicUtils.getBrokerAddresses(topic, partition); + this.hosts = new ArrayList(brokerAddresses); + this.connectTimeoutMs = connectTimeoutMs; this.bufferSize = bufferSize; this.topic = topic; @@ -88,7 +102,7 @@ public KafkaSinglePartitionIterator(String hostName, int port, String topic, int this.initialOffset = initialOffset; - replicaBrokers = new ArrayList(); + this.replicaBrokers = new ArrayList(); } // -------------------------------------------------------------------------------------------- @@ -100,29 +114,55 @@ public KafkaSinglePartitionIterator(String hostName, int port, String topic, int * the topic and establishing a connection to it. */ public void initialize() throws InterruptedException { + if (LOG.isInfoEnabled()) { + LOG.info("Initializing consumer {} / {} with hosts {}", topic, partition, hosts); + } + PartitionMetadata metadata; + isRunning = true; do { - metadata = findLeader(hosts, port, topic, partition); + metadata = findLeader(hosts, topic, partition); try { Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH); } catch (InterruptedException e) { throw new InterruptedException("Establishing connection to Kafka failed"); } - } while (metadata == null); + } while (isRunning && metadata == null); + isRunning = false; if (metadata.leader() == null) { - throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts.get(0) - + ":" + port); + throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")"); } - leadBroker = metadata.leader().host(); + leadBroker = metadata.leader(); clientName = "Client_" + topic + "_" + partition; - consumer = new SimpleConsumer(leadBroker, port, connectTimeoutMs, bufferSize, clientName); + consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), connectTimeoutMs, bufferSize, clientName); + + try { + readOffset = initialOffset.getOffset(consumer, topic, partition, clientName); + } catch (NotLeaderForPartitionException e) { + do { + + metadata = findLeader(hosts, topic, partition); - readOffset = initialOffset.getOffset(consumer, topic, partition, clientName); + try { + Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH); + } catch (InterruptedException ie) { + throw new InterruptedException("Establishing connection to Kafka failed"); + } + } while (metadata == null); + readOffset = initialOffset.getOffset(consumer, topic, partition, clientName); + } - resetFetchResponse(readOffset); + try { + resetFetchResponse(readOffset); + } catch (ClosedChannelException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Got ClosedChannelException, trying to find new leader."); + } + findNewLeader(); + } } /** @@ -161,7 +201,14 @@ public byte[] next() throws InterruptedException { public boolean fetchHasNext() throws InterruptedException { synchronized (fetchResponse) { if (!iter.hasNext()) { - resetFetchResponse(readOffset); + try { + resetFetchResponse(readOffset); + } catch (ClosedChannelException e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Got ClosedChannelException, trying to find new leader."); + } + findNewLeader(); + } return iter.hasNext(); } else { return true; @@ -205,7 +252,7 @@ public MessageWithMetadata nextWithOffset() throws InterruptedException { // Internal utilities // -------------------------------------------------------------------------------------------- - private void resetFetchResponse(long offset) throws InterruptedException { + private void resetFetchResponse(long offset) throws InterruptedException, ClosedChannelException { FetchRequest req = new FetchRequestBuilder().clientId(clientName) .addFetch(topic, partition, offset, 100000).build(); @@ -225,24 +272,43 @@ private void resetFetchResponse(long offset) throws InterruptedException { readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName); } - consumer.close(); - consumer = null; - leadBroker = findNewLeader(leadBroker, topic, partition, port); + + findNewLeader(); } iter = fetchResponse.messageSet(topic, partition).iterator(); } - private PartitionMetadata findLeader(List a_hosts, int a_port, String a_topic, + private void findNewLeader() throws InterruptedException { + consumer.close(); + consumer = null; + leadBroker = findNewLeader(leadBroker, topic, partition); + consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), 100000, 64 * 1024, clientName); + } + + @SuppressWarnings("ConstantConditions") + private PartitionMetadata findLeader(List addresses, String a_topic, int a_partition) { + PartitionMetadata returnMetaData = null; loop: - for (String seed : a_hosts) { + for (String address : addresses) { + + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to find leader via broker: {}", address); + } + + String[] split = address.split(":"); + String host = split[0]; + int port = Integer.parseInt(split[1]); + SimpleConsumer consumer = null; try { - consumer = new SimpleConsumer(seed, a_port, connectTimeoutMs, bufferSize, "leaderLookup"); + consumer = new SimpleConsumer(host, port, connectTimeoutMs, bufferSize, "leaderLookup"); List topics = Collections.singletonList(a_topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List metaData = resp.topicsMetadata(); @@ -255,8 +321,13 @@ private PartitionMetadata findLeader(List a_hosts, int a_port, String a_ } } } catch (Exception e) { - throw new RuntimeException("Error communicating with Broker [" + seed - + "] to find Leader for [" + a_topic + ", " + a_partition + "]", e); + if (e instanceof ClosedChannelException) { + LOG.warn("Got ClosedChannelException while trying to communicate with Broker" + + "[{}] to find Leader for [{}, {}]. Trying other replicas.", address, a_topic, a_partition); + } else { + throw new RuntimeException("Error communicating with Broker [" + address + + "] to find Leader for [" + a_topic + ", " + a_partition + "]", e); + } } finally { if (consumer != null) { consumer.close(); @@ -266,30 +337,31 @@ private PartitionMetadata findLeader(List a_hosts, int a_port, String a_ if (returnMetaData != null) { replicaBrokers.clear(); for (kafka.cluster.Broker replica : returnMetaData.replicas()) { - replicaBrokers.add(replica.host()); + replicaBrokers.add(replica.host() + ":" + replica.port()); } } return returnMetaData; } - private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws InterruptedException { + @SuppressWarnings({"ConstantConditions", "UnusedAssignment"}) + private Broker findNewLeader(Broker a_oldLeader, String a_topic, int a_partition) throws InterruptedException { for (int i = 0; i < 3; i++) { if (LOG.isInfoEnabled()) { LOG.info("Trying to find a new leader after Broker failure."); } boolean goToSleep = false; - PartitionMetadata metadata = findLeader(replicaBrokers, a_port, a_topic, a_partition); + PartitionMetadata metadata = findLeader(replicaBrokers, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; - } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { + } else if (a_oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give ZooKeeper a second to recover // second time, assume the broker did recover before failover, or it was a non-Broker issue // goToSleep = true; } else { - return metadata.leader().host(); + return metadata.leader(); } if (goToSleep) { try { diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java index 4dfd314dc485b..ac45e329fedb1 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java @@ -21,6 +21,9 @@ import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse; @@ -28,6 +31,8 @@ public abstract class KafkaOffset implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaOffset.class); + private static final long serialVersionUID = 1L; public abstract long getOffset(SimpleConsumer consumer, String topic, int partition, @@ -38,14 +43,27 @@ protected long getLastOffset(SimpleConsumer consumer, String topic, int partitio TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map requestInfo = new HashMap(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); - if (response.hasError()) { - throw new RuntimeException("Error fetching data from Kafka broker. Reason: " - + response.errorCode(topic, partition)); + while (response.hasError()) { + switch (response.errorCode(topic, partition)) { + case 6: + case 3: + LOG.warn("Kafka broker trying to fetch from a non-leader broker."); + break; + default: + throw new RuntimeException("Error fetching data from Kafka broker. Reason: " + + response.errorCode(topic, partition)); + } + + request = new kafka.javaapi.OffsetRequest(requestInfo, + kafka.api.OffsetRequest.CurrentVersion(), clientName); + response = consumer.getOffsetsBefore(request); } + long[] offsets = response.offsets(topic, partition); return offsets[0]; } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 9344722b11bbc..b416839012a8a 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -24,7 +24,9 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.BitSet; +import java.util.List; import java.util.Properties; import org.apache.commons.lang.SerializationUtils; @@ -43,6 +45,7 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource; import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset; import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -58,7 +61,6 @@ import kafka.server.KafkaConfig; import kafka.server.KafkaServer; -import kafka.utils.Time; /** * Code in this test is based on the following GitHub repository: @@ -70,9 +72,9 @@ public class KafkaITCase { private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class); + private static final int NUMBER_OF_KAFKA_SERVERS = 3; private static int zkPort; - private static int kafkaPort; private static String kafkaHost; private static String zookeeperConnectionString; @@ -80,30 +82,39 @@ public class KafkaITCase { @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); public static File tmpZkDir; - public static File tmpKafkaDir; + public static List tmpKafkaDirs; private static TestingServer zookeeper; - private static KafkaServer broker1; + private static List brokers; + private static boolean shutdownKafkaBroker; @BeforeClass public static void prepare() throws IOException { LOG.info("Starting KafkaITCase.prepare()"); tmpZkDir = tempFolder.newFolder(); - tmpKafkaDir = tempFolder.newFolder(); + + tmpKafkaDirs = new ArrayList(NUMBER_OF_KAFKA_SERVERS); + for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) { + tmpKafkaDirs.add(tempFolder.newFolder()); + } + kafkaHost = InetAddress.getLocalHost().getHostName(); zkPort = NetUtils.getAvailablePort(); - kafkaPort = NetUtils.getAvailablePort(); zookeeperConnectionString = "localhost:" + zkPort; zookeeper = null; - broker1 = null; + brokers = null; try { LOG.info("Starting Zookeeper"); zookeeper = getZookeeper(); LOG.info("Starting KafkaServer"); - broker1 = getKafkaServer(0); + brokers = new ArrayList(NUMBER_OF_KAFKA_SERVERS); + for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) { + brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i))); + } + LOG.info("ZK and KafkaServer started."); } catch (Throwable t) { LOG.warn("Test failed with exception", t); @@ -114,8 +125,10 @@ public static void prepare() throws IOException { @AfterClass public static void shutDownServices() { LOG.info("Shutting down all services"); - if (broker1 != null) { - broker1.shutdown(); + for (KafkaServer broker : brokers) { + if (broker != null) { + broker.shutdown(); + } } if (zookeeper != null) { try { @@ -131,7 +144,7 @@ public void regularKafkaSourceTest() throws Exception { LOG.info("Starting KafkaITCase.regularKafkaSourceTest()"); String topic = "regularKafkaSourceTestTopic"; - createTestTopic(topic, 1); + createTestTopic(topic, 1, 1); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); @@ -145,7 +158,7 @@ public void regularKafkaSourceTest() throws Exception { @Override public void invoke(Tuple2 value) throws Exception { - LOG.info("Got " + value); + LOG.debug("Got " + value); String[] sp = value.f1.split("-"); int v = Integer.parseInt(sp[1]); @@ -217,7 +230,7 @@ public void tupleTestTopology() throws Exception { LOG.info("Starting KafkaITCase.tupleTestTopology()"); String topic = "tupleTestTopic"; - createTestTopic(topic, 1); + createTestTopic(topic, 1, 1); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); @@ -231,7 +244,7 @@ public void tupleTestTopology() throws Exception { @Override public void invoke(Tuple2 value) throws Exception { - LOG.info("Got " + value); + LOG.debug("Got " + value); String[] sp = value.f1.split("-"); int v = Integer.parseInt(sp[1]); @@ -305,8 +318,8 @@ public void customPartitioningTestTopology() throws Exception { LOG.info("Starting KafkaITCase.customPartitioningTestTopology()"); String topic = "customPartitioningTestTopic"; - - createTestTopic(topic, 3); + + createTestTopic(topic, 3, 1); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); @@ -323,7 +336,7 @@ public void customPartitioningTestTopology() throws Exception { @Override public void invoke(Tuple2 value) throws Exception { - LOG.info("Got " + value); + LOG.debug("Got " + value); String[] sp = value.f1.split("-"); int v = Integer.parseInt(sp[1]); @@ -412,6 +425,7 @@ private static class CustomPartitioner implements SerializableKafkaPartitioner { public int partition(Object key, int numPartitions) { partitionerHasBeenCalled = true; + @SuppressWarnings("unchecked") Tuple2 tuple = (Tuple2) key; if (tuple.f0 < 10) { return 0; @@ -441,14 +455,13 @@ public byte[] serialize(Tuple2 element) { public boolean isEndOfStream(Tuple2 nextElement) { return false; } - } @Test public void simpleTestTopology() throws Exception { String topic = "simpleTestTopic"; - createTestTopic(topic, 1); + createTestTopic(topic, 1, 1); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); @@ -462,7 +475,7 @@ public void simpleTestTopology() throws Exception { @Override public void invoke(String value) throws Exception { - LOG.info("Got " + value); + LOG.debug("Got " + value); String[] sp = value.split("-"); int v = Integer.parseInt(sp[1]); if (start == -1) { @@ -524,13 +537,149 @@ public void cancel() { } } + private static boolean leaderHasShutDown = false; + + @Test + public void brokerFailureTest() throws Exception { + String topic = "brokerFailureTestTopic"; + + createTestTopic(topic, 2, 2); - private void createTestTopic(String topic, int numberOfPartitions) { KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); - kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1); + final String leaderToShutDown = + kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString(); + + final Thread brokerShutdown = new Thread(new Runnable() { + @Override + public void run() { + shutdownKafkaBroker = false; + while (!shutdownKafkaBroker) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.warn("Interruption", e); + } + } + + for (KafkaServer kafkaServer : brokers) { + if (leaderToShutDown.equals( + kafkaServer.config().advertisedHostName() + + ":" + + kafkaServer.config().advertisedPort() + )) { + LOG.info("Killing Kafka Server {}", leaderToShutDown); + kafkaServer.shutdown(); + leaderHasShutDown = true; + break; + } + } + } + }); + brokerShutdown.start(); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); + + // add consuming topology: + DataStreamSource consuming = env.addSource( + new PersistentKafkaSource(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 10, Offset.FROM_BEGINNING)); + consuming.setParallelism(1); + + consuming.addSink(new SinkFunction() { + int elCnt = 0; + int start = 0; + int numOfMessagesToReceive = 100; + + BitSet validator = new BitSet(numOfMessagesToReceive + 1); + + @Override + public void invoke(String value) throws Exception { + LOG.debug("Got " + value); + String[] sp = value.split("-"); + int v = Integer.parseInt(sp[1]); + + if (start == -1) { + start = v; + } + Assert.assertFalse("Received tuple twice", validator.get(v - start)); + if (v - start < 0 && LOG.isWarnEnabled()) { + LOG.warn("Not in order: {}", value); + } + + validator.set(v - start); + elCnt++; + if (elCnt == 20) { + // shut down a Kafka broker + shutdownKafkaBroker = true; + } + + if (elCnt == numOfMessagesToReceive && leaderHasShutDown) { + // check if everything in the bitset is set to true + int nc; + if ((nc = validator.nextClearBit(0)) != numOfMessagesToReceive) { +// throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator); + System.out.println("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator); + } + throw new SuccessException(); + } else if (elCnt == numOfMessagesToReceive) { + numOfMessagesToReceive += 50; + LOG.info("Waiting for more messages till {}", numOfMessagesToReceive); + } + } + }); + + // add producing topology + DataStream stream = env.addSource(new SourceFunction() { + boolean running = true; + + @Override + public void run(Collector collector) throws Exception { + LOG.info("Starting source."); + int cnt = 0; + while (running) { + collector.collect("kafka-" + cnt++); + + if ((cnt - 1) % 20 == 0) { + LOG.debug("Sending message #{}", cnt - 1); + } + + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { + } + } + } + + @Override + public void cancel() { + LOG.info("Source got chancel()"); + running = false; + } + }); + stream.addSink(new KafkaSink(zookeeperConnectionString, topic, new JavaDefaultStringSchema())) + .setParallelism(1); + + try { + env.setParallelism(1); + env.execute(); + } catch (JobExecutionException good) { + Throwable t = good.getCause(); + int limit = 0; + while (!(t instanceof SuccessException)) { + t = t.getCause(); + if (limit++ == 20) { + LOG.warn("Test failed with exception", good); + Assert.fail("Test failed with: " + good.getMessage()); + } + } + } } + private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) { + KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); + kafkaTopicUtils.createTopic(topic, numberOfPartitions, replicationFactor); + } + private static TestingServer getZookeeper() throws Exception { return new TestingServer(zkPort, tmpZkDir); } @@ -538,42 +687,24 @@ private static TestingServer getZookeeper() throws Exception { /** * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) */ - private static KafkaServer getKafkaServer(int brokerId) throws UnknownHostException { + private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws UnknownHostException { Properties kafkaProperties = new Properties(); + + int kafkaPort = NetUtils.getAvailablePort(); + // properties have to be Strings kafkaProperties.put("advertised.host.name", kafkaHost); kafkaProperties.put("port", Integer.toString(kafkaPort)); kafkaProperties.put("broker.id", Integer.toString(brokerId)); - kafkaProperties.put("log.dir", tmpKafkaDir.toString()); + kafkaProperties.put("log.dir", tmpFolder.toString()); kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); - KafkaServer server = new KafkaServer(kafkaConfig, new LocalSystemTime()); + KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime()); server.startup(); return server; } - public static class LocalSystemTime implements Time { - - @Override - public long milliseconds() { - return System.currentTimeMillis(); - } - public long nanoseconds() { - return System.nanoTime(); - } - - @Override - public void sleep(long ms) { - try { - Thread.sleep(ms); - } catch (InterruptedException e) { - LOG.warn("Interruption", e); - } - } - - } - public static class SuccessException extends Exception { private static final long serialVersionUID = 1L; } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java new file mode 100644 index 0000000000000..5f0e1983918f0 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.curator.test.TestingServer; +import org.apache.flink.runtime.net.NetUtils; +import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils; +import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.api.PartitionMetadata; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; + +public class KafkaTopicUtilsTest { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtilsTest.class); + private static final int NUMBER_OF_BROKERS = 2; + private static final String TOPIC = "myTopic"; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void test() { + int zkPort; + String kafkaHost; + String zookeeperConnectionString; + + File tmpZkDir; + List tmpKafkaDirs; + Map kafkaServers = null; + TestingServer zookeeper = null; + + try { + tmpZkDir = tempFolder.newFolder(); + + tmpKafkaDirs = new ArrayList(NUMBER_OF_BROKERS); + for (int i = 0; i < NUMBER_OF_BROKERS; i++) { + tmpKafkaDirs.add(tempFolder.newFolder()); + } + + zkPort = NetUtils.getAvailablePort(); + kafkaHost = InetAddress.getLocalHost().getHostName(); + zookeeperConnectionString = "localhost:" + zkPort; + + // init zookeeper + zookeeper = new TestingServer(zkPort, tmpZkDir); + + // init kafka kafkaServers + kafkaServers = new HashMap(); + + for (int i = 0; i < NUMBER_OF_BROKERS; i++) { + KafkaServer kafkaServer = getKafkaServer(kafkaHost, zookeeperConnectionString, i, tmpKafkaDirs.get(i)); + kafkaServers.put(kafkaServer.config().advertisedHostName() + ":" + kafkaServer.config().advertisedPort(), kafkaServer); + } + + // create Kafka topic + final KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString); + kafkaTopicUtils.createTopic(TOPIC, 1, 2); + + // check whether topic exists + assertTrue(kafkaTopicUtils.topicExists(TOPIC)); + + // check number of partitions + assertEquals(1, kafkaTopicUtils.getNumberOfPartitions(TOPIC)); + + // get partition metadata without error + PartitionMetadata partitionMetadata = kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC, 0); + assertEquals(0, partitionMetadata.errorCode()); + + // get broker list + assertEquals(new HashSet(kafkaServers.keySet()), kafkaTopicUtils.getBrokerAddresses(TOPIC)); + } catch (IOException e) { + fail(e.toString()); + } catch (Exception e) { + fail(e.toString()); + } finally { + LOG.info("Shutting down all services"); + for (KafkaServer broker : kafkaServers.values()) { + if (broker != null) { + broker.shutdown(); + } + } + + if (zookeeper != null) { + try { + zookeeper.stop(); + } catch (IOException e) { + LOG.warn("ZK.stop() failed", e); + } + } + } + } + + /** + * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed) + */ + private static KafkaServer getKafkaServer(String kafkaHost, String zookeeperConnectionString, int brokerId, File tmpFolder) throws UnknownHostException { + Properties kafkaProperties = new Properties(); + + int kafkaPort = NetUtils.getAvailablePort(); + + // properties have to be Strings + kafkaProperties.put("advertised.host.name", kafkaHost); + kafkaProperties.put("port", Integer.toString(kafkaPort)); + kafkaProperties.put("broker.id", Integer.toString(brokerId)); + kafkaProperties.put("log.dir", tmpFolder.toString()); + kafkaProperties.put("zookeeper.connect", zookeeperConnectionString); + KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties); + + KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime()); + server.startup(); + return server; + } + +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java new file mode 100644 index 0000000000000..18fa46f16eb41 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.utils.Time; + +public class KafkaLocalSystemTime implements Time { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class); + + @Override + public long milliseconds() { + return System.currentTimeMillis(); + } + + public long nanoseconds() { + return System.nanoTime(); + } + + @Override + public void sleep(long ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + LOG.warn("Interruption", e); + } + } + +} + From 2ec7a5a7ced7088356de28f5bdf3c37cf0214b6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Hermann?= Date: Mon, 13 Apr 2015 11:19:08 +0200 Subject: [PATCH 2/3] [streaming] Added user-definable producer config to KafkaSink --- .../connectors/kafka/api/KafkaSink.java | 75 ++++++++++++------- 1 file changed, 47 insertions(+), 28 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java index 1d4cdb5eb81c9..376d96fe33421 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka.api; +import java.util.Map; import java.util.Properties; import org.apache.flink.api.java.ClosureCleaner; @@ -50,7 +51,7 @@ public class KafkaSink extends RichSinkFunction { private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); private Producer producer; - private Properties props; + private Properties userDefinedProperties; private String topicId; private String zookeeperAddress; private SerializationSchema schema; @@ -58,8 +59,8 @@ public class KafkaSink extends RichSinkFunction { private Class partitionerClass = null; /** - * Creates a KafkaSink for a given topic. The partitioner distributes the - * messages between the partitions of the topics. + * Creates a KafkaSink for a given topic. The sink produces its input to + * the topic. * * @param zookeeperAddress * Address of the Zookeeper host (with port number). @@ -68,27 +69,27 @@ public class KafkaSink extends RichSinkFunction { * @param serializationSchema * User defined serialization schema. */ - @SuppressWarnings({"rawtypes", "unchecked"}) public KafkaSink(String zookeeperAddress, String topicId, SerializationSchema serializationSchema) { - this(zookeeperAddress, topicId, serializationSchema, (Class) null); + this(zookeeperAddress, topicId, new Properties(), serializationSchema); } /** - * Creates a KafkaSink for a given topic. The sink produces its input into - * the topic. + * Creates a KafkaSink for a given topic with custom Producer configuration. + * If you use this constructor, the broker should be set with the "metadata.broker.list" + * configuration. * * @param zookeeperAddress * Address of the Zookeeper host (with port number). * @param topicId * ID of the Kafka topic. + * @param producerConfig + * Configurations of the Kafka producer * @param serializationSchema * User defined serialization schema. - * @param partitioner - * User defined partitioner. */ - public KafkaSink(String zookeeperAddress, String topicId, - SerializationSchema serializationSchema, SerializableKafkaPartitioner partitioner) { + public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig, + SerializationSchema serializationSchema) { NetUtils.ensureCorrectHostnamePort(zookeeperAddress); Preconditions.checkNotNull(topicId, "TopicID not set"); ClosureCleaner.ensureSerializable(partitioner); @@ -96,18 +97,32 @@ public KafkaSink(String zookeeperAddress, String topicId, this.zookeeperAddress = zookeeperAddress; this.topicId = topicId; this.schema = serializationSchema; + this.partitionerClass = null; + this.userDefinedProperties = producerConfig; + } + + /** + * Creates a KafkaSink for a given topic. The sink produces its input to + * the topic. + * + * @param zookeeperAddress + * Address of the Zookeeper host (with port number). + * @param topicId + * ID of the Kafka topic. + * @param serializationSchema + * User defined serialization schema. + * @param partitioner + * User defined partitioner. + */ + public KafkaSink(String zookeeperAddress, String topicId, + SerializationSchema serializationSchema, SerializableKafkaPartitioner partitioner) { + this(zookeeperAddress, topicId, serializationSchema); this.partitioner = partitioner; } public KafkaSink(String zookeeperAddress, String topicId, SerializationSchema serializationSchema, Class partitioner) { - NetUtils.ensureCorrectHostnamePort(zookeeperAddress); - Preconditions.checkNotNull(topicId, "TopicID not set"); - ClosureCleaner.ensureSerializable(partitioner); - - this.zookeeperAddress = zookeeperAddress; - this.topicId = topicId; - this.schema = serializationSchema; + this(zookeeperAddress, topicId, serializationSchema); this.partitionerClass = partitioner; } @@ -124,27 +139,31 @@ public void open(Configuration configuration) { LOG.info("Broker list: {}", listOfBrokers); } - props = new Properties(); + Properties properties = new Properties(); - props.put("metadata.broker.list", listOfBrokers); - props.put("request.required.acks", "-1"); - props.put("message.send.max.retries", "10"); + properties.put("metadata.broker.list", listOfBrokers); + properties.put("request.required.acks", "-1"); + properties.put("message.send.max.retries", "10"); - props.put("serializer.class", DefaultEncoder.class.getCanonicalName()); + properties.put("serializer.class", DefaultEncoder.class.getCanonicalName()); // this will not be used as the key will not be serialized - props.put("key.serializer.class", DefaultEncoder.class.getCanonicalName()); + properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName()); + + for (Map.Entry propertiesEntry : userDefinedProperties.entrySet()) { + properties.put(propertiesEntry.getKey(), propertiesEntry.getValue()); + } if (partitioner != null) { - props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName()); + properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName()); // java serialization will do the rest. - props.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner); + properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner); } if (partitionerClass != null) { - props.put("partitioner.class", partitionerClass); + properties.put("partitioner.class", partitionerClass); } - ProducerConfig config = new ProducerConfig(props); + ProducerConfig config = new ProducerConfig(properties); try { producer = new Producer(config); From f280ca1778b5f3e3a39df59de577b85820b852e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Hermann?= Date: Mon, 13 Apr 2015 11:37:47 +0200 Subject: [PATCH 3/3] [streaming] Updated Kafka connectors documentation --- docs/streaming_guide.md | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index 3a9a1c457d35c..47b4509be6435 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -1257,12 +1257,12 @@ Example:
{% highlight java %} -stream.addSink(new PersistentKafkaSource("localhost:2181", "test", new SimpleStringSchema())); +stream.addSource(new PersistentKafkaSource("localhost:2181", "test", new SimpleStringSchema())); {% endhighlight %}
{% highlight scala %} -stream.addSink(new PersistentKafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) +stream.addSource(new PersistentKafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) {% endhighlight %}
@@ -1291,6 +1291,25 @@ stream.addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringS +The user can also define custom Kafka producer configuration for the KafkaSink with the constructor: + +
+
+{% highlight java %} +public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig, + SerializationSchema serializationSchema) +{% endhighlight %} +
+
+{% highlight scala %} +public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig, + SerializationSchema serializationSchema) +{% endhighlight %} +
+
+ +If this constructor is used, the user needs to make sure to set the broker with the "metadata.broker.list" property. Also the serializer configuration should be left default, the serialization should be set via SerializationSchema. + More about Kafka can be found [here](https://kafka.apache.org/documentation.html). [Back to top](#top)