Skip to content

Commit

Permalink
[FLINK-2974] Add periodic offset committer for Kafka when checkpointi…
Browse files Browse the repository at this point in the history
…ng is disabled

This closes #1341
  • Loading branch information
rmetzger committed Nov 20, 2015
1 parent 8dc70f2 commit 5864e4f
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 41 deletions.
1 change: 1 addition & 0 deletions docs/apis/streaming_guide.md
Expand Up @@ -3499,6 +3499,7 @@ Also note that Flink can only restart the topology if enough processing slots ar
So if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
Flink on YARN supports automatic restart of lost YARN containers.

If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.

#### Kafka Producer

Expand Down
2 changes: 1 addition & 1 deletion docs/setup/yarn_setup.md
Expand Up @@ -60,7 +60,7 @@ Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management
- at least Apache Hadoop 2.2
- HDFS (Hadoop Distributed File System) (or another distributed file system supported by Hadoop)

If you have troubles using the Flink YARN client, have a look in the [FAQ section]({{ site.baseurl }}/faq.html).
If you have troubles using the Flink YARN client, have a look in the [FAQ section](http://flink.apache.org/faq.html#yarn-deployment).

### Start Flink Session

Expand Down
Expand Up @@ -175,4 +175,5 @@ public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> sta
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}

}
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
Expand Down Expand Up @@ -233,7 +234,7 @@ public enum FetcherType {
private transient long[] restoreToOffset;

private volatile boolean running = true;

// ------------------------------------------------------------------------

/**
Expand All @@ -258,8 +259,8 @@ public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializ
this.offsetStore = checkNotNull(offsetStore);
this.fetcherType = checkNotNull(fetcherType);

if(fetcherType == FetcherType.NEW_HIGH_LEVEL) {
throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 is not yet " +
if (fetcherType == FetcherType.NEW_HIGH_LEVEL) {
throw new UnsupportedOperationException("The fetcher for Kafka 0.8.3 / 0.9.0 is not yet " +
"supported in Flink");
}
if (offsetStore == OffsetStore.KAFKA && fetcherType == FetcherType.LEGACY_LOW_LEVEL) {
Expand Down Expand Up @@ -290,9 +291,6 @@ public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializ
}
}
LOG.info("Topic {} has {} partitions", topic, partitions.length);

// make sure that we take care of the committing
props.setProperty("enable.auto.commit", "false");
}

// ------------------------------------------------------------------------
Expand All @@ -302,7 +300,7 @@ public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializ
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

final int numConsumers = getRuntimeContext().getNumberOfParallelSubtasks();
final int thisComsumerIndex = getRuntimeContext().getIndexOfThisSubtask();

Expand Down Expand Up @@ -374,12 +372,33 @@ public void open(Configuration parameters) throws Exception {
// no restore request. Let the offset handler take care of the initial offset seeking
offsetHandler.seekFetcherToInitialOffsets(subscribedPartitions, fetcher);
}


}

@Override
public void run(SourceContext<T> sourceContext) throws Exception {
if (fetcher != null) {
// For non-checkpointed sources, a thread which periodically commits the current offset into ZK.
PeriodicOffsetCommitter offsetCommitter = null;

// check whether we need to start the periodic checkpoint committer
StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext) getRuntimeContext();
if (!streamingRuntimeContext.isCheckpointingEnabled()) {
// we use Kafka's own configuration parameter key for this.
// Note that the default configuration value in Kafka is 60 * 1000, so we use the
// same here.
long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this);
offsetCommitter.start();
LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval);
}

fetcher.run(sourceContext, valueDeserializer, lastOffsets);

if (offsetCommitter != null) {
offsetCommitter.close();
}
}
else {
// this source never completes
Expand Down Expand Up @@ -419,7 +438,7 @@ public void cancel() {
LOG.warn("Error while closing Kafka connector data fetcher", e);
}
}

OffsetHandler offsetHandler = this.offsetHandler;
this.offsetHandler = null;
if (offsetHandler != null) {
Expand All @@ -430,6 +449,8 @@ public void cancel() {
LOG.warn("Error while closing Kafka connector offset handler", e);
}
}


}

@Override
Expand Down Expand Up @@ -567,6 +588,69 @@ protected static List<TopicPartition> assignPartitions(int[] partitions, String
}
return partitionsToSub;
}

/**
* Thread to periodically commit the current read offset into Zookeeper.
*/
private static class PeriodicOffsetCommitter extends Thread {
private final long commitInterval;
private final FlinkKafkaConsumer consumer;
private volatile boolean running = true;

public PeriodicOffsetCommitter(long commitInterval, FlinkKafkaConsumer consumer) {
this.commitInterval = commitInterval;
this.consumer = consumer;
}

@Override
public void run() {
try {
while (running) {
try {
Thread.sleep(commitInterval);
// ------------ commit current offsets ----------------

// create copy of current offsets
long[] currentOffsets = Arrays.copyOf(consumer.lastOffsets, consumer.lastOffsets.length);

Map<TopicPartition, Long> offsetsToCommit = new HashMap<>();
for (TopicPartition tp : (List<TopicPartition>)consumer.subscribedPartitions) {
int partition = tp.partition();
long offset = currentOffsets[partition];
long lastCommitted = consumer.commitedOffsets[partition];

if (offset != OFFSET_NOT_SET) {
if (offset > lastCommitted) {
offsetsToCommit.put(tp, offset);
LOG.debug("Committing offset {} for partition {}", offset, partition);
} else {
LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
}
}
}

consumer.offsetHandler.commit(offsetsToCommit);
} catch (InterruptedException e) {
if (running) {
// throw unexpected interruption
throw e;
}
// looks like the thread is being closed. Leave loop
break;
}
}
} catch (Throwable t) {
LOG.warn("Periodic checkpoint committer is stopping the fetcher because of an error", t);
consumer.fetcher.stopWithError(t);
}
}

public void close() {
this.running = false;
this.interrupt();
}

}

// ------------------------------------------------------------------------
// Kafka / ZooKeeper communication utilities
Expand Down Expand Up @@ -657,10 +741,19 @@ public static List<PartitionInfo> getPartitionsForTopic(final String topic, fina
return partitions;
}

/**
* Turn a broker instance into a node instance
* @param broker broker instance
* @return Node representing the given broker
*/
private static Node brokerToNode(Broker broker) {
return new Node(broker.id(), broker.host(), broker.port());
}


/**
* Validate the ZK configuration, checking for required parameters
* @param props Properties to check
*/
protected static void validateZooKeeperConfig(Properties props) {
if (props.getProperty("zookeeper.connect") == null) {
throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties");
Expand Down
Expand Up @@ -64,12 +64,11 @@ public interface Fetcher {
*
* @param sourceContext The source context to emit elements to.
* @param valueDeserializer The deserializer to decode the raw values with.
* @param lastOffsets The array into which to store the offsets foe which elements are emitted.
* @param lastOffsets The array into which to store the offsets for which elements are emitted (operator state)
*
* @param <T> The type of elements produced by the fetcher and emitted to the source context.
*/
<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer,
long[] lastOffsets) throws Exception;
<T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> valueDeserializer, long[] lastOffsets) throws Exception;

/**
* Set the next offset to read from for the given partition.
Expand All @@ -80,4 +79,11 @@ <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchem
* @param offsetToRead To offset to seek to.
*/
void seek(TopicPartition topicPartition, long offsetToRead);

/**
* Exit run loop with given error and release all resources.
*
* @param t Error cause
*/
void stopWithError(Throwable t);
}
Expand Up @@ -122,7 +122,7 @@ public void close() {
}

@Override
public <T> void run(SourceFunction.SourceContext<T> sourceContext,
public <T> void run(SourceFunction.SourceContext<T> sourceContext,
DeserializationSchema<T> valueDeserializer,
long[] lastOffsets) throws Exception {

Expand Down Expand Up @@ -258,7 +258,8 @@ public <T> void run(SourceFunction.SourceContext<T> sourceContext,
*
* @param error The error to report.
*/
void onErrorInFetchThread(Throwable error) {
@Override
public void stopWithError(Throwable error) {
if (this.error.compareAndSet(null, error)) {
// we are the first to report an error
if (mainThread != null) {
Expand Down Expand Up @@ -445,7 +446,7 @@ public void run() {
final T value = valueDeserializer.deserialize(valueByte);
final long offset = msg.offset();

synchronized (sourceContext.getCheckpointLock()) {
synchronized (this.sourceContext.getCheckpointLock()) {
sourceContext.collect(value);
offsetsState[partition] = offset;
}
Expand All @@ -464,7 +465,7 @@ public void run() {
}
catch (Throwable t) {
// report to the main thread
owner.onErrorInFetchThread(t);
owner.stopWithError(t);
}
finally {
// end of run loop. close connection to consumer
Expand Down

0 comments on commit 5864e4f

Please sign in to comment.