Skip to content

Commit

Permalink
[FLINK-2407] [streaming] Add an API switch to choose between "exactly…
Browse files Browse the repository at this point in the history
… once" and "at least once".
  • Loading branch information
StephanEwen committed Jul 29, 2015
1 parent 833862a commit b211a62
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 71 deletions.
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api;

/**
* The checkpointing mode defines what consistency guarantees the system gives in the presence of
* failures.
*
* <p>When checkpointing is activated, the data streams are replayed such that lost parts of the
* processing are repeated. For stateful operations and functions, the checkpointing mode defines
* whether the system draws checkpoints such that a recovery behaves as if the operators/functions
* see each record "exactly once" ({@link #EXACTLY_ONCE}), or whether the checkpoints are drawn
* in a simpler fashion that typically encounteres some duplicates upon recovery
* ({@link #AT_LEAST_ONCE})</p>
*/
public enum CheckpointingMode {

/**
* Sets the checkpointing mode to "exactly once". This mode means that the system will
* checkpoint the operator and user function state in such a way that, upon recovery,
* every record will be reflected exactly once in the operator state.
*
* <p>For example, if a user function counts the number of elements in a stream,
* this number will consistently be equal to the number of actual elements in the stream,
* regardless of failures and recovery.</p>
*
* <p>Note that this does not mean that each record flows through the streaming data flow
* only once. It means that upon recovery, the state of operators/functions is restored such
* that the resumed data streams pick up exactly at after the last modification to the state.</p>
*
* <p>Note that this mode does not guarantee exactly-once behavior in the interaction with
* external systems (only state in Flink's operators and user functions). The reason for that
* is that a certain level of "collaboration" is required between two systems to achieve
* exactly-once guarantees. However, for certain systems, connectors can be written that facilitate
* this collaboration.</p>
*
* <p>This mode sustains high throughput. Depending on the data flow graph and operations,
* this mode may increase the record latency, because operators need to align their input
* streams, in order to create a consistent snapshot point. The latency increase for simple
* dataflows (no repartitioning) is negligible. For simple dataflows with repartitioning, the average
* latency remains small, but the slowest records typically have an increased latency.</p>
*/
EXACTLY_ONCE,

/**
* Sets the checkpointing mode to "at least once". This mode means that the system will
* checkpoint the operator and user function state in a simpler way. Upon failure and recovery,
* some records may be reflected multiple times in the operator state.
*
* <p>For example, if a user function counts the number of elements in a stream,
* this number will equal to, or larger, than the actual number of elements in the stream,
* in the presence of failure and recovery.</p>
*
* <p>This mode has minimal impact on latency and may be preferable in very-low latency
* scenarios, where a sustained very-low latency (such as few milliseconds) is needed,
* and where occasional duplicate messages (on recovery) do not matter.</p>
*/
AT_LEAST_ONCE
}
Expand Up @@ -45,6 +45,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
Expand Down Expand Up @@ -224,70 +225,117 @@ public StreamExecutionEnvironment disableOperatorChaining() {
return this;
}

// ------------------------------------------------------------------------
// Checkpointing Settings
// ------------------------------------------------------------------------

/**
* Method for enabling fault-tolerance. Activates monitoring and backup of
* streaming operator states.
* <p/>
* <p/>
* Setting this option assumes that the job is used in production and thus
* if not stated explicitly otherwise with calling with the
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
* in case of failure the job will be resubmitted to the cluster
* indefinitely.
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint. This method selects
* {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
*
* <p>The job draws checkpoints periodically, in the given interval. The state will be
* stored in the configured state backend.</p>
*
* <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. For that reason, iterative jobs will not be started if used
* with enabled checkpointing. To override this mechanism, use the
* {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
*
* @param interval
* Time interval between state checkpoints in millis
* @param interval Time interval between state checkpoints in milliseconds.
*/
public StreamExecutionEnvironment enableCheckpointing(long interval) {
return enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
}

/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint.
*
* <p>The job draws checkpoints periodically, in the given interval. The system uses the
* given {@link CheckpointingMode} for the checkpointing ("exactly once" vs "at least once").
* The state will be stored in the configured state backend.</p>
*
* <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. For that reason, iterative jobs will not be started if used
* with enabled checkpointing. To override this mechanism, use the
* {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
*
* @param interval
* Time interval between state checkpoints in milliseconds.
* @param mode
* The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
*/
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
if (mode == null) {
throw new NullPointerException("checkpoint mode must not be null");
}
if (interval <= 0) {
throw new IllegalArgumentException("the checkpoint interval must be positive");
}

streamGraph.setCheckpointingEnabled(true);
streamGraph.setCheckpointingInterval(interval);
streamGraph.setCheckpointingMode(mode);
return this;
}

/**
* Method for force-enabling fault-tolerance. Activates monitoring and
* backup of streaming operator states even for jobs containing iterations.
*
* Please note that the checkpoint/restore guarantees for iterative jobs are
* only best-effort at the moment. Records inside the loops may be lost
* during failure.
* <p/>
* <p/>
* Setting this option assumes that the job is used in production and thus
* if not stated explicitly otherwise with calling with the
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
* in case of failure the job will be resubmitted to the cluster
* indefinitely.
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint.
*
* <p>The job draws checkpoints periodically, in the given interval. The state will be
* stored in the configured state backend.</p>
*
* <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. If the "force" parameter is set to true, the system will execute the
* job nonetheless.</p>
*
* @param interval
* Time interval between state checkpoints in millis
* Time interval between state checkpoints in millis.
* @param mode
* The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
* @param force
* If true checkpointing will be enabled for iterative jobs as
* well
* If true checkpointing will be enabled for iterative jobs as well.
*/
@Deprecated
public StreamExecutionEnvironment enableCheckpointing(long interval, boolean force) {
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
if (mode == null) {
throw new NullPointerException("checkpoint mode must not be null");
}
if (interval <= 0) {
throw new IllegalArgumentException("the checkpoint interval must be positive");
}

streamGraph.setCheckpointingEnabled(true);
streamGraph.setCheckpointingInterval(interval);
streamGraph.setCheckpointingMode(mode);
if (force) {
streamGraph.forceCheckpoint();
}
return this;
}

/**
* Method for enabling fault-tolerance. Activates monitoring and backup of
* streaming operator states.
* <p/>
* <p/>
* Setting this option assumes that the job is used in production and thus
* if not stated explicitly otherwise with calling with the
* {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
* in case of failure the job will be resubmitted to the cluster
* indefinitely.
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
* dataflow will be restarted from the latest completed checkpoint. This method selects
* {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
*
* <p>The job draws checkpoints periodically, in the default interval. The state will be
* stored in the configured state backend.</p>
*
* <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
* the moment. For that reason, iterative jobs will not be started if used
* with enabled checkpointing. To override this mechanism, use the
* {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
*/
public StreamExecutionEnvironment enableCheckpointing() {
streamGraph.setCheckpointingEnabled(true);
streamGraph.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
return this;
}

Expand Down Expand Up @@ -323,8 +371,7 @@ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
* A value of {@code -1} indicates that the system default value (as defined
* in the configuration) should be used.
*
* @return The number of times the system will try to re-execute failed
* tasks.
* @return The number of times the system will try to re-execute failed tasks.
*/
public int getNumberOfExecutionRetries() {
return config.getNumberOfExecutionRetries();
Expand Down
Expand Up @@ -68,13 +68,17 @@ public class StreamConfig implements Serializable {
private static final String CHECKPOINTING_ENABLED = "checkpointing";
private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
private static final String STATE_PARTITIONER = "statePartitioner";

private static final String CHECKPOINT_MODE = "checkpointMode";


// ------------------------------------------------------------------------
// Default Values
// ------------------------------------------------------------------------

private static final long DEFAULT_TIMEOUT = 100;

private static final CheckpointingMode DEFAULT_CHECKPOINTING_MODE = CheckpointingMode.EXACTLY_ONCE;


// ------------------------------------------------------------------------
// Config
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -351,13 +355,29 @@ public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
}
}

// --------------------- checkpointing -----------------------

public void setCheckpointingEnabled(boolean enabled) {
config.setBoolean(CHECKPOINTING_ENABLED, enabled);
}

public boolean isCheckpointingEnabled() {
return config.getBoolean(CHECKPOINTING_ENABLED, false);
}

public void setCheckpointMode(CheckpointingMode mode) {
config.setInteger(CHECKPOINT_MODE, mode.ordinal());
}

public CheckpointingMode getCheckpointMode() {
int ordinal = config.getInteger(CHECKPOINT_MODE, -1);
if (ordinal >= 0) {
return CheckpointingMode.values()[ordinal];
} else {
return DEFAULT_CHECKPOINTING_MODE;
}
}


public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
try {
Expand Down
Expand Up @@ -147,6 +147,8 @@ public long getCheckpointingInterval() {
return checkpointingInterval;
}

// Checkpointing

public boolean isChainingEnabled() {
return chaining;
}
Expand All @@ -155,6 +157,15 @@ public boolean isCheckpointingEnabled() {
return checkpointingEnabled;
}

public CheckpointingMode getCheckpointingMode() {
return checkpointingMode;
}

public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
this.checkpointingMode = checkpointingMode;
}


public boolean isIterative() {
return !streamLoops.isEmpty();
}
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
Expand Down Expand Up @@ -269,10 +270,19 @@ private void setVertexConfig(Integer vertexID, StreamConfig config,
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);

config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
config.setStateHandleProvider(streamGraph.getStateHandleProvider());
if (streamGraph.isCheckpointingEnabled()) {
config.setCheckpointMode(streamGraph.getCheckpointingMode());
config.setStateHandleProvider(streamGraph.getStateHandleProvider());
} else {
// the at least once input handler is slightly cheaper (in the absence of checkpoints),
// so we use that one if checkpointing is not enabled
config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
}
config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());


Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();

if (vertexClass.equals(StreamIterationHead.class)
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
Expand Down Expand Up @@ -79,12 +80,22 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
EventListener<CheckpointBarrier> checkpointListener,
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {

super(InputGateUtil.createInputGate(inputGates));

this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
}
else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
this.barrierHandler = new BarrierTracker(inputGate);
}
else {
throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
}

if (checkpointListener != null) {
this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
}
Expand Down

0 comments on commit b211a62

Please sign in to comment.