Skip to content

Commit

Permalink
[FLINK-1753] [streaming] Added Kafka broker failure test
Browse files Browse the repository at this point in the history
This closes #589
  • Loading branch information
Gábor Hermann authored and rmetzger committed Apr 14, 2015
1 parent e5a3b95 commit cb34e97
Show file tree
Hide file tree
Showing 10 changed files with 730 additions and 144 deletions.
23 changes: 21 additions & 2 deletions docs/streaming_guide.md
Expand Up @@ -1257,12 +1257,12 @@ Example:
<div class="codetabs" markdown="1"> <div class="codetabs" markdown="1">
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% highlight java %} {% highlight java %}
stream.addSink(new PersistentKafkaSource<String>("localhost:2181", "test", new SimpleStringSchema())); stream.addSource(new PersistentKafkaSource<String>("localhost:2181", "test", new SimpleStringSchema()));
{% endhighlight %} {% endhighlight %}
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
stream.addSink(new PersistentKafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) stream.addSource(new PersistentKafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
Expand Down Expand Up @@ -1291,6 +1291,25 @@ stream.addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringS
</div> </div>
</div> </div>


The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
SerializationSchema<IN, byte[]> serializationSchema)
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
SerializationSchema serializationSchema)
{% endhighlight %}
</div>
</div>

If this constructor is used, the user needs to make sure to set the broker with the "metadata.broker.list" property. Also the serializer configuration should be left default, the serialization should be set via SerializationSchema.

More about Kafka can be found [here](https://kafka.apache.org/documentation.html). More about Kafka can be found [here](https://kafka.apache.org/documentation.html).


[Back to top](#top) [Back to top](#top)
Expand Down
Expand Up @@ -17,6 +17,7 @@


package org.apache.flink.streaming.connectors.kafka.api; package org.apache.flink.streaming.connectors.kafka.api;


import java.util.Map;
import java.util.Properties; import java.util.Properties;


import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.ClosureCleaner;
Expand All @@ -27,6 +28,8 @@
import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils; import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;


Expand All @@ -45,17 +48,19 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);

private Producer<IN, byte[]> producer; private Producer<IN, byte[]> producer;
private Properties props; private Properties userDefinedProperties;
private String topicId; private String topicId;
private String zookeeperAddress; private String zookeeperAddress;
private SerializationSchema<IN, byte[]> schema; private SerializationSchema<IN, byte[]> schema;
private SerializableKafkaPartitioner partitioner; private SerializableKafkaPartitioner partitioner;
private Class<? extends SerializableKafkaPartitioner> partitionerClass = null; private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;


/** /**
* Creates a KafkaSink for a given topic. The partitioner distributes the * Creates a KafkaSink for a given topic. The sink produces its input to
* messages between the partitions of the topics. * the topic.
* *
* @param zookeeperAddress * @param zookeeperAddress
* Address of the Zookeeper host (with port number). * Address of the Zookeeper host (with port number).
Expand All @@ -64,46 +69,60 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
* @param serializationSchema * @param serializationSchema
* User defined serialization schema. * User defined serialization schema.
*/ */
@SuppressWarnings({ "rawtypes", "unchecked" })
public KafkaSink(String zookeeperAddress, String topicId, public KafkaSink(String zookeeperAddress, String topicId,
SerializationSchema<IN, byte[]> serializationSchema) { SerializationSchema<IN, byte[]> serializationSchema) {
this(zookeeperAddress, topicId, serializationSchema, (Class) null); this(zookeeperAddress, topicId, new Properties(), serializationSchema);
} }


/** /**
* Creates a KafkaSink for a given topic. The sink produces its input into * Creates a KafkaSink for a given topic with custom Producer configuration.
* the topic. * If you use this constructor, the broker should be set with the "metadata.broker.list"
* configuration.
* *
* @param zookeeperAddress * @param zookeeperAddress
* Address of the Zookeeper host (with port number). * Address of the Zookeeper host (with port number).
* @param topicId * @param topicId
* ID of the Kafka topic. * ID of the Kafka topic.
* @param producerConfig
* Configurations of the Kafka producer
* @param serializationSchema * @param serializationSchema
* User defined serialization schema. * User defined serialization schema.
* @param partitioner
* User defined partitioner.
*/ */
public KafkaSink(String zookeeperAddress, String topicId, public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) { SerializationSchema<IN, byte[]> serializationSchema) {
NetUtils.ensureCorrectHostnamePort(zookeeperAddress); NetUtils.ensureCorrectHostnamePort(zookeeperAddress);
Preconditions.checkNotNull(topicId, "TopicID not set"); Preconditions.checkNotNull(topicId, "TopicID not set");
ClosureCleaner.ensureSerializable(partitioner); ClosureCleaner.ensureSerializable(partitioner);


this.zookeeperAddress = zookeeperAddress; this.zookeeperAddress = zookeeperAddress;
this.topicId = topicId; this.topicId = topicId;
this.schema = serializationSchema; this.schema = serializationSchema;
this.partitionerClass = null;
this.userDefinedProperties = producerConfig;
}

/**
* Creates a KafkaSink for a given topic. The sink produces its input to
* the topic.
*
* @param zookeeperAddress
* Address of the Zookeeper host (with port number).
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema.
* @param partitioner
* User defined partitioner.
*/
public KafkaSink(String zookeeperAddress, String topicId,
SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
this(zookeeperAddress, topicId, serializationSchema);
this.partitioner = partitioner; this.partitioner = partitioner;
} }


public KafkaSink(String zookeeperAddress, String topicId, public KafkaSink(String zookeeperAddress, String topicId,
SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> partitioner) { SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> partitioner) {
NetUtils.ensureCorrectHostnamePort(zookeeperAddress); this(zookeeperAddress, topicId, serializationSchema);
Preconditions.checkNotNull(topicId, "TopicID not set");
ClosureCleaner.ensureSerializable(partitioner);

this.zookeeperAddress = zookeeperAddress;
this.topicId = topicId;
this.schema = serializationSchema;
this.partitionerClass = partitioner; this.partitionerClass = partitioner;
} }


Expand All @@ -114,33 +133,42 @@ public KafkaSink(String zookeeperAddress, String topicId,
public void open(Configuration configuration) { public void open(Configuration configuration) {


KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperAddress); KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperAddress);
String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId); String listOfBrokers = kafkaTopicUtils.getBrokerList(topicId);


props = new Properties(); if (LOG.isInfoEnabled()) {
LOG.info("Broker list: {}", listOfBrokers);
}

Properties properties = new Properties();


props.put("metadata.broker.list", brokerAddress); properties.put("metadata.broker.list", listOfBrokers);
props.put("request.required.acks", "1"); properties.put("request.required.acks", "-1");
properties.put("message.send.max.retries", "10");


props.put("serializer.class", DefaultEncoder.class.getCanonicalName()); properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());


// this will not be used as the key will not be serialized // this will not be used as the key will not be serialized
props.put("key.serializer.class", DefaultEncoder.class.getCanonicalName()); properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());

for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
}


if (partitioner != null) { if (partitioner != null) {
props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName()); properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
// java serialization will do the rest. // java serialization will do the rest.
props.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner); properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
} }
if (partitionerClass != null) { if (partitionerClass != null) {
props.put("partitioner.class", partitionerClass); properties.put("partitioner.class", partitionerClass);
} }


ProducerConfig config = new ProducerConfig(props); ProducerConfig config = new ProducerConfig(properties);


try { try {
producer = new Producer<IN, byte[]>(config); producer = new Producer<IN, byte[]>(config);
} catch (NullPointerException e) { } catch (NullPointerException e) {
throw new RuntimeException("Cannot connect to Kafka broker " + brokerAddress, e); throw new RuntimeException("Cannot connect to Kafka broker " + listOfBrokers, e);
} }
} }


Expand Down

0 comments on commit cb34e97

Please sign in to comment.