Skip to content

Commit

Permalink
[FLINK-4035] Refactor the Kafka 0.10 connector to be based upon the 0…
Browse files Browse the repository at this point in the history
….9 connector

Add a test case for Kafka's new timestamp functionality and update the documentation.

This closes #2369
  • Loading branch information
rmetzger committed Oct 11, 2016
1 parent 63859c6 commit 6731ec1
Show file tree
Hide file tree
Showing 29 changed files with 936 additions and 852 deletions.
67 changes: 55 additions & 12 deletions docs/dev/connectors/kafka.md
Expand Up @@ -46,29 +46,29 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is
</thead>
<tbody>
<tr>
<td>flink-connector-kafka</td>
<td>0.9.1, 0.10</td>
<td>FlinkKafkaConsumer082<br>
FlinkKafkaProducer</td>
<td>0.8.x</td>
<td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
</tr>
<tr>
<td>flink-connector-kafka-0.8{{ site.scala_version_suffix }}</td>
<td>1.0.0</td>
<td>FlinkKafkaConsumer08<br>
FlinkKafkaProducer08</td>
<td>0.8.x</td>
<td>Uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example">SimpleConsumer</a> API of Kafka internally. Offsets are committed to ZK by Flink.</td>
</tr>
<tr>
<tr>
<td>flink-connector-kafka-0.9{{ site.scala_version_suffix }}</td>
<td>1.0.0</td>
<td>FlinkKafkaConsumer09<br>
FlinkKafkaProducer09</td>
<td>0.9.x</td>
<td>Uses the new <a href="http://kafka.apache.org/documentation.html#newconsumerapi">Consumer API</a> Kafka.</td>
</tr>
<tr>
<td>flink-connector-kafka-0.10{{ site.scala_version_suffix }}</td>
<td>1.2.0</td>
<td>FlinkKafkaConsumer010<br>
FlinkKafkaProducer010</td>
<td>0.10.x</td>
<td>This connector supports <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message">Kafka messages with timestamps</a> both for producing and consuming.</td>
</tr>
</tbody>
</table>

Expand All @@ -87,7 +87,6 @@ Note that the streaming connectors are currently not part of the binary distribu
### Installing Apache Kafka

* Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
* On 32 bit computers [this](http://stackoverflow.com/questions/22325364/unrecognized-vm-option-usecompressedoops-when-running-kafka-from-my-ubuntu-in) problem may occur.
* If the Kafka and Zookeeper servers are running on a remote machine, then the `advertised.host.name` setting in the `config/server.properties` file must be set to the machine's IP address.

### Kafka Consumer
Expand Down Expand Up @@ -256,17 +255,28 @@ records to partitions.

Example:


<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
<div data-lang="java, Kafka 0.8+" markdown="1">
{% highlight java %}
stream.addSink(new FlinkKafkaProducer08<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
<div data-lang="java, Kafka 0.10+" markdown="1">
{% highlight java %}
FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.8+" markdown="1">
{% highlight scala %}
stream.addSink(new FlinkKafkaProducer08[String]("localhost:9092", "my-topic", new SimpleStringSchema()))
{% endhighlight %}
</div>
<div data-lang="scala, Kafka 0.10+" markdown="1">
{% highlight scala %}
FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, "my-topic", new SimpleStringSchema(), properties);
{% endhighlight %}
</div>
</div>

You can also define a custom Kafka producer configuration for the KafkaSink with the constructor. Please refer to
Expand All @@ -287,3 +297,36 @@ higher value.

There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
into a Kafka topic.

### Using Kafka timestamps and Flink event time in Kafka 0.10

Since Apache Kafka 0.10., Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating
the time the event has occurred (see ["event time" in Apache Flink](../event_time.html)) or the time when the message
has been written to the Kafka broker.

The `FlinkKafkaConsumer010` will emit records with the timestamp attached, if the time characteristic in Flink is
set to `TimeCharacteristic.EventTime` (`StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`).

The Kafka consumer does not emit watermarks. To emit watermarks, the same mechanisms as described above in
"Kafka Consumers and Timestamp Extraction/Watermark Emission" using the `assignTimestampsAndWatermarks` method are applicable.

There is no need to define a timestamp extractor when using the timestamps from Kafka. The `previousElementTimestamp` argument of
the `extractTimestamp()` method contains the timestamp carried by the Kafka message.

A timestamp extractor for a Kafka consumer would look like this:
{% highlight java %}
public long extractTimestamp(Long element, long previousElementTimestamp) {
return previousElementTimestamp;
}
{% endhighlight %}



The `FlinkKafkaProducer010` only emits the record timestamp, if `setWriteTimestampToKafka(true)` is set.

{% highlight java %}
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);
config.setWriteTimestampToKafka(true);
{% endhighlight %}


3 changes: 1 addition & 2 deletions docs/page/js/flink.js
Expand Up @@ -42,6 +42,7 @@ function codeTabs() {
var image = $(this).data("image");
var notabs = $(this).data("notabs");
var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1);
lang = lang.replace(/[^a-zA-Z0-9]/g, "_");
var id = "tab_" + lang + "_" + counter;
$(this).attr("id", id);
if (image != null && langImages[lang]) {
Expand Down Expand Up @@ -99,9 +100,7 @@ function viewSolution() {
// A script to fix internal hash links because we have an overlapping top bar.
// Based on https://github.com/twitter/bootstrap/issues/193#issuecomment-2281510
function maybeScrollToHash() {
console.log("HERE");
if (window.location.hash && $(window.location.hash).length) {
console.log("HERE2", $(window.location.hash), $(window.location.hash).offset().top);
var newTop = $(window.location.hash).offset().top - 57;
$(window).scrollTop(newTop);
}
Expand Down
50 changes: 31 additions & 19 deletions flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
Expand Up @@ -26,7 +26,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>1.1-SNAPSHOT</version>
<version>1.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

Expand All @@ -37,7 +37,7 @@ under the License.

<!-- Allow users to pass custom connector versions -->
<properties>
<kafka.version>0.10.0.0</kafka.version>
<kafka.version>0.10.0.1</kafka.version>
</properties>

<dependencies>
Expand All @@ -46,21 +46,16 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- Add Kafka 0.10.x as a dependency -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.10</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
Expand All @@ -73,20 +68,29 @@ under the License.
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>${project.version}</version>
<exclusions>
<!-- exclude Kafka dependencies -->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- test dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.10</artifactId>
<version>${project.version}</version>
<exclusions>
<!-- exclude 0.8 dependencies -->
<!-- exclude Kafka dependencies -->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -127,6 +131,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-jmx</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>


</dependencies>

<build>
Expand Down
Expand Up @@ -28,20 +28,10 @@
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.SerializedValue;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
Expand All @@ -64,30 +54,10 @@
* is constructed. That means that the client that submits the program needs to be able to
* reach the Kafka brokers or ZooKeeper.</p>
*/
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumerBase<T> {
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {

private static final long serialVersionUID = 2324564345203409112L;

private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer010.class);

/** Configuration key to change the polling timeout **/
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";

/** Boolean configuration key to disable metrics tracking **/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";

/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
* available. If 0, returns immediately with any records that are available now. */
public static final long DEFAULT_POLL_TIMEOUT = 100L;

// ------------------------------------------------------------------------

/** User-supplied properties for Kafka **/
private final Properties properties;

/** From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
* available. If 0, returns immediately with any records that are available now */
private final long pollTimeout;

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -151,51 +121,7 @@ public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deser
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
super(deserializer);

checkNotNull(topics, "topics");
this.properties = checkNotNull(props, "props");
setDeserializer(this.properties);

// configure the polling timeout
try {
if (properties.containsKey(KEY_POLL_TIMEOUT)) {
this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
} else {
this.pollTimeout = DEFAULT_POLL_TIMEOUT;
}
}
catch (Exception e) {
throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
}

// read the partitions that belong to the listed topics
final List<KafkaTopicPartition> partitions = new ArrayList<>();

try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) {
for (final String topic: topics) {
// get partitions for each topic
List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic);
// for non existing topics, the list might be null.
if (partitionsForTopic != null) {
partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
}
}
}

if (partitions.isEmpty()) {
throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
}

// we now have a list of partitions which is the same for all parallel consumer instances.
LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics);

if (LOG.isInfoEnabled()) {
logPartitionInfo(LOG, partitions);
}

// register these partitions
setSubscribedPartitions(partitions);
super(topics, deserializer, props);
}

@Override
Expand All @@ -212,48 +138,5 @@ public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T>
watermarksPeriodic, watermarksPunctuated,
runtimeContext, deserializer,
properties, pollTimeout, useMetrics);

}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

/**
* Converts a list of Kafka PartitionInfo's to Flink's KafkaTopicPartition (which are serializable)
*
* @param partitions A list of Kafka PartitionInfos.
* @return A list of KafkaTopicPartitions
*/
private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
checkNotNull(partitions);

List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size());
for (PartitionInfo pi : partitions) {
ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
}
return ret;
}

/**
* Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
*
* @param props The Kafka properties to register the serializer in.
*/
private static void setDeserializer(Properties props) {
final String deSerName = ByteArrayDeserializer.class.getCanonicalName();

Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);

if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valDeSer != null && !valDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
}

0 comments on commit 6731ec1

Please sign in to comment.