Skip to content

Commit

Permalink
[FLINK-1752][streaming] Rework and improve Kafka connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
Gábor Hermann authored and rmetzger committed Mar 19, 2015
1 parent 5c6603e commit b18c66b
Show file tree
Hide file tree
Showing 19 changed files with 727 additions and 272 deletions.
47 changes: 35 additions & 12 deletions docs/streaming_guide.md
Expand Up @@ -1166,52 +1166,75 @@ A class providing an interface for receiving data from Kafka.


The followings have to be provided for the `KafkaSource(…)` constructor in order: The followings have to be provided for the `KafkaSource(…)` constructor in order:


1. The hostname 1. Zookeeper hostname
2. The group name 2. The topic name
3. The topic name 3. Deserialisation schema
4. The parallelism
5. Deserialisation schema



Example: Example:


<div class="codetabs" markdown="1"> <div class="codetabs" markdown="1">
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% highlight java %} {% highlight java %}
DataStream<String> stream = env DataStream<String> stream = env
.addSource(new KafkaSource<String>("localhost:2181", "group", "test", new SimpleStringSchema())) .addSource(new KafkaSource<String>("localhost:2181", "test", new SimpleStringSchema()))
.print(); .print();
{% endhighlight %} {% endhighlight %}
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
stream = env stream = env
.addSource(new KafkaSource[String]("localhost:2181", "group", "test", new SimpleStringSchema) .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)
.print .print
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>


#### Persistent Kafka Source
As Kafka persists all their data, a fault tolerant Kafka source can be provided.

The PersistentKafkaSource can read a topic, and if the job fails for some reason, when restarting the source will continue on reading from where it left off. For example if there are 3 partitions in the topic with offsets 31, 122, 110 read at the time of job failure, then at the time of restart it will continue on reading from those offsets, no matter whether these partitions have new messages.

The followings have to be provided for the `PersistentKafkaSource(…)` constructor in order:

1. The topic name
2. The hostname of a Kafka broker
3. Deserialisation schema

Example:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
stream.addSink(new PersistentKafkaSource<String>("test", "localhost:9092", new SimpleStringSchema()));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
stream.addSink(new PersistentKafkaSource[String]("test", "localhost:9092", new SimpleStringSchema))
{% endhighlight %}
</div>
</div>

#### Kafka Sink #### Kafka Sink
A class providing an interface for sending data to Kafka. A class providing an interface for sending data to Kafka.


The followings have to be provided for the `KafkaSink()` constructor in order: The followings have to be provided for the `KafkaSink()` constructor in order:


1. The topic name 1. The topic name
2. The hostname 2. The hostname of a Kafka broker
3. Serialisation schema 3. Serialisation schema


Example: Example:


<div class="codetabs" markdown="1"> <div class="codetabs" markdown="1">
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% highlight java %} {% highlight java %}
stream.addSink(new KafkaSink<String, String>("test", "localhost:9092", new SimpleStringSchema())); stream.addSink(new KafkaSink<String>("test", "localhost:9092", new SimpleStringSchema()));
{% endhighlight %} {% endhighlight %}
</div> </div>
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% highlight scala %} {% highlight scala %}
stream.addSink(new KafkaSink[String, String]("test", "localhost:9092", new SimpleStringSchema)) stream.addSink(new KafkaSink[String]("test", "localhost:9092", new SimpleStringSchema))
{% endhighlight %} {% endhighlight %}
</div> </div>
</div> </div>
Expand Down
Expand Up @@ -46,7 +46,7 @@ under the License.
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId> <artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version> <version>0.8.1</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>com.sun.jmx</groupId> <groupId>com.sun.jmx</groupId>
Expand Down
Expand Up @@ -56,7 +56,7 @@ public void cancel() {




}).addSink( }).addSink(
new KafkaSink<String>(topic, host + ":" + port, new JavaDefaultStringSchema()) new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema())
) )
.setParallelism(3); .setParallelism(3);


Expand Down
Expand Up @@ -27,8 +27,6 @@ public class KafkaSimpleConsumerExample {
private static String host; private static String host;
private static int port; private static int port;
private static String topic; private static String topic;
private static int partition;
private static long offset;


public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {


Expand All @@ -37,25 +35,22 @@ public static void main(String[] args) throws Exception {
} }


StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);

DataStream<String> kafkaStream = env DataStream<String> kafkaStream = env
.addSource(new PersistentKafkaSource<String>(topic, host, port, partition, offset, new JavaDefaultStringSchema())); .addSource(new PersistentKafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));


kafkaStream.print(); kafkaStream.print();


env.execute(); env.execute();
} }


private static boolean parseParameters(String[] args) { private static boolean parseParameters(String[] args) {
if (args.length == 4) { if (args.length == 3) {
host = args[0]; host = args[0];
port = Integer.parseInt(args[1]); port = Integer.parseInt(args[1]);
topic = args[2]; topic = args[2];
partition = Integer.parseInt(args[3]);
offset = Long.parseLong(args[4]);
return true; return true;
} else { } else {
System.err.println("Usage: KafkaConsumerExample <host> <port> <topic> <partition> <offset>"); System.err.println("Usage: KafkaConsumerExample <host> <port> <topic>");
return false; return false;
} }
} }
Expand Down
Expand Up @@ -19,23 +19,23 @@


import java.util.Properties; import java.util.Properties;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;

import org.apache.flink.streaming.api.function.sink.RichSinkFunction; import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.config.EncoderWrapper; import org.apache.flink.streaming.connectors.kafka.config.EncoderWrapper;
import org.apache.flink.streaming.connectors.kafka.config.PartitionerWrapper; import org.apache.flink.streaming.connectors.kafka.config.PartitionerWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaDistributePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaDistributePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.util.SerializationSchema; import org.apache.flink.streaming.connectors.util.SerializationSchema;


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;

/** /**
* Sink that emits its inputs to a Kafka topic. * Sink that emits its inputs to a Kafka topic.
* *
* @param <IN> * @param <IN>
* Type of the sink input * Type of the sink input
*/ */
public class KafkaSink<IN> extends RichSinkFunction<IN> { public class KafkaSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
Expand All @@ -51,33 +51,33 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
/** /**
* Creates a KafkaSink for a given topic. The partitioner distributes the * Creates a KafkaSink for a given topic. The partitioner distributes the
* messages between the partitions of the topics. * messages between the partitions of the topics.
* *
* @param topicId
* ID of the Kafka topic.
* @param brokerAddr * @param brokerAddr
* Address of the Kafka broker (with port number). * Address of the Kafka broker (with port number).
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema * @param serializationSchema
* User defined serialization schema. * User defined serialization schema.
*/ */
public KafkaSink(String topicId, String brokerAddr, public KafkaSink(String brokerAddr, String topicId,
SerializationSchema<IN, byte[]> serializationSchema) { SerializationSchema<IN, byte[]> serializationSchema) {
this(topicId, brokerAddr, serializationSchema, new KafkaDistributePartitioner<IN>()); this(brokerAddr, topicId, serializationSchema, new KafkaDistributePartitioner<IN>());
} }


/** /**
* Creates a KafkaSink for a given topic. The sink produces its input into * Creates a KafkaSink for a given topic. The sink produces its input into
* the topic. * the topic.
* *
* @param topicId
* ID of the Kafka topic.
* @param brokerAddr * @param brokerAddr
* Address of the Kafka broker (with port number). * Address of the Kafka broker (with port number).
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema * @param serializationSchema
* User defined serialization schema. * User defined serialization schema.
* @param partitioner * @param partitioner
* User defined partitioner. * User defined partitioner.
*/ */
public KafkaSink(String topicId, String brokerAddr, public KafkaSink(String brokerAddr, String topicId,
SerializationSchema<IN, byte[]> serializationSchema, KafkaPartitioner<IN> partitioner) { SerializationSchema<IN, byte[]> serializationSchema, KafkaPartitioner<IN> partitioner) {
this.topicId = topicId; this.topicId = topicId;
this.brokerAddr = brokerAddr; this.brokerAddr = brokerAddr;
Expand Down Expand Up @@ -107,15 +107,19 @@ public void initialize() {


ProducerConfig config = new ProducerConfig(props); ProducerConfig config = new ProducerConfig(props);


producer = new Producer<IN, byte[]>(config); try {
producer = new Producer<IN, byte[]>(config);
} catch (NullPointerException e) {
throw new RuntimeException("Cannot connect to Kafka broker " + brokerAddr);
}
initDone = true; initDone = true;
} }


/** /**
* Called when new data arrives to the sink, and forwards it to Kafka. * Called when new data arrives to the sink, and forwards it to Kafka.
* *
* @param next * @param next
* The incoming data * The incoming data
*/ */
@Override @Override
public void invoke(IN next) { public void invoke(IN next) {
Expand Down
Expand Up @@ -106,6 +106,8 @@ private void initializeConnection() {
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicId); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicId);
KafkaStream<byte[], byte[]> stream = streams.get(0); KafkaStream<byte[], byte[]> stream = streams.get(0);


consumer.commitOffsets();

consumerIterator = stream.iterator(); consumerIterator = stream.iterator();
} }


Expand Down

This file was deleted.

0 comments on commit b18c66b

Please sign in to comment.