Skip to content

Commit

Permalink
[BEAM-9900] Remove shutdownSourcesOnFinalWatermark flag
Browse files Browse the repository at this point in the history
The shutdownSourcesOnFinalWatermark has caused some confusion in the past. It is
generally used for testing pipelines to ensure that the pipeline and the testing
cluster shuts down at the end of the job. Without it, the pipeline will run
forever in streaming mode, regardless of whether the input is finite or not.

We didn't want to enable the flag by default because shutting down any operators
including sources in Flink will prevent checkpointing from working. If we have
side input, for example, that may be the case even for long-running
pipelines. However, we can detect whether checkpointing is enabled and set the
flag automatically.

The only situation where we may want the flag to be disabled is when users do
not have checkpointing enabled but want to take a savepoint. This should be rare
and users can mitigate by setting the flag to false to prevent operators from
shutting down.
  • Loading branch information
mxm committed May 7, 2020
1 parent bef40cc commit d106f26
Show file tree
Hide file tree
Showing 28 changed files with 139 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ def streamingScenarios = { datasetName, sdkHarnessImageTag -> [
// Turn on streaming mode (flags are indicated with null values)
streaming : null,
stateful : null,
shutdown_sources_on_final_watermark : null,
job_endpoint : 'localhost:8099',
environment_config : sdkHarnessImageTag,
environment_type : 'DOCKER',
Expand Down
5 changes: 0 additions & 5 deletions .test-infra/jenkins/job_PostCommit_Java_Nexmark_Flink.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
' -Pnexmark.args="' +
[NexmarkBigqueryProperties.nexmarkBigQueryArgs,
'--runner=FlinkRunner',
'--shutdownSourcesOnFinalWatermark=true',
'--streaming=false',
'--suite=SMOKE',
'--streamTimeout=60' ,
Expand All @@ -61,7 +60,6 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
' -Pnexmark.args="' +
[NexmarkBigqueryProperties.nexmarkBigQueryArgs,
'--runner=FlinkRunner',
'--shutdownSourcesOnFinalWatermark=true',
'--streaming=true',
'--suite=SMOKE',
'--streamTimeout=60' ,
Expand All @@ -78,7 +76,6 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
' -Pnexmark.args="' +
[NexmarkBigqueryProperties.nexmarkBigQueryArgs,
'--runner=FlinkRunner',
'--shutdownSourcesOnFinalWatermark=true',
'--queryLanguage=sql',
'--streaming=false',
'--suite=SMOKE',
Expand All @@ -95,7 +92,6 @@ NoPhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_
' -Pnexmark.args="' +
[NexmarkBigqueryProperties.nexmarkBigQueryArgs,
'--runner=FlinkRunner',
'--shutdownSourcesOnFinalWatermark=true',
'--queryLanguage=sql',
'--streaming=true',
'--suite=SMOKE',
Expand All @@ -117,7 +113,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob('beam_PostCommit_Java_Nexmark_Fl
def final JOB_SPECIFIC_OPTIONS = [
'suite' : 'SMOKE',
'streamTimeout' : 60,
'shutdownSourcesOnFinalWatermark' : true,
]

Nexmark.standardJob(delegate, Runner.FLINK, SDK.JAVA, JOB_SPECIFIC_OPTIONS, TriggeringContext.PR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2006,7 +2006,6 @@ class BeamModulePlugin implements Plugin<Project> {
"--output=${outputDir}/out.txt",
"--runner=${runner}",
"--parallelism=2",
"--shutdown_sources_on_final_watermark",
"--sdk_worker_parallelism=1",
"--flink_job_server_jar=${project.project(':runners:flink:1.10:job-server').shadowJar.archivePath}",
"--spark_job_server_jar=${project.project(':runners:spark:job-server').shadowJar.archivePath}",
Expand Down
2 changes: 0 additions & 2 deletions runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ def portableValidatesRunnerTask(String name, Boolean streaming) {
def pipelineOptions = [
// Limit resource consumption via parallelism
"--parallelism=2",
"--shutdownSourcesOnFinalWatermark",
]
if (streaming) {
pipelineOptions += "--streaming"
Expand Down Expand Up @@ -201,7 +200,6 @@ createCrossLanguageValidatesRunnerTask(
numParallelTests: 1,
pipelineOpts: [
"--parallelism=2",
"--shutdownSourcesOnFinalWatermark",
]
)

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,18 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
}
flinkStreamEnv.enableCheckpointing(
checkpointInterval, CheckpointingMode.valueOf(options.getCheckpointingMode()));

if (options.getShutdownSourcesAfterIdleMs() == -1) {
// If not explicitly configured, we never shutdown sources when checkpointing is enabled.
options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE);
}

if (options.getCheckpointTimeoutMillis() != -1) {
flinkStreamEnv
.getCheckpointConfig()
.setCheckpointTimeout(options.getCheckpointTimeoutMillis());
}

boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled();
boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation();
if (externalizedCheckpoint) {
Expand All @@ -246,6 +253,11 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
flinkStreamEnv
.getCheckpointConfig()
.setMaxConcurrentCheckpoints(options.getNumConcurrentCheckpoints());
} else {
if (options.getShutdownSourcesAfterIdleMs() == -1) {
// If not explicitly configured, we never shutdown sources when checkpointing is enabled.
options.setShutdownSourcesAfterIdleMs(0L);
}
}

applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options);
Expand All @@ -267,6 +279,8 @@ static StreamExecutionEnvironment createStreamExecutionEnvironment(
return flinkStreamEnv;
}

private void configureCheckpointingOptions() {}

/**
* Removes the http:// or https:// schema from a url string. This is commonly used with the
* flink_master address which is expected to be of form host:port but users may specify a URL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* requiring flink on the classpath (e.g. to use with the direct runner).
*/
public interface FlinkPipelineOptions
extends FlinkDebugPipelineOptions, PipelineOptions, ApplicationNameOptions, StreamingOptions {
extends PipelineOptions, ApplicationNameOptions, StreamingOptions {

String AUTO = "[auto]";
String PIPELINED = "PIPELINED";
Expand Down Expand Up @@ -126,6 +126,17 @@ public interface FlinkPipelineOptions

void setFailOnCheckpointingErrors(Boolean failOnCheckpointingErrors);

@Description(
"Shuts down sources which have been idle for the configured time of milliseconds. Once a source has been "
+ "shut down, checkpointing is not possible anymore. Shutting down the sources eventually leads to pipeline "
+ "shutdown (=Flink job finishes) once all input has been processed. Unless explicitly set, this will "
+ "default to Long.MAX_VALUE when checkpointing is enabled and to 0 when checkpointing is disabled. "
+ "See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue.")
@Default.Long(-1L)
Long getShutdownSourcesAfterIdleMs();

void setShutdownSourcesAfterIdleMs(Long timeoutMs);

@Description(
"Sets the number of times that failed tasks are re-executed. "
+ "A value of zero effectively disables fault tolerance. A value of -1 indicates "
Expand Down Expand Up @@ -193,20 +204,6 @@ public interface FlinkPipelineOptions

void setMaxBundleTimeMills(Long time);

/**
* Whether to shutdown sources when their watermark reaches {@code +Inf}. For production use cases
* you want this to be disabled because Flink will currently (versions {@literal <=} 1.5) stop
* doing checkpoints when any operator (which includes sources) is finished.
*
* <p>Please see <a href="https://issues.apache.org/jira/browse/FLINK-2491">FLINK-2491</a> for
* progress on this issue.
*/
@Description("If set, shutdown sources when their watermark reaches +Inf.")
@Default.Boolean(false)
Boolean isShutdownSourcesOnFinalWatermark();

void setShutdownSourcesOnFinalWatermark(Boolean shutdownOnFinalWatermark);

@Description(
"Interval in milliseconds for sending latency tracking marks from the sources to the sinks. "
+ "Interval value <= 0 disables the feature.")
Expand Down Expand Up @@ -255,4 +252,10 @@ public interface FlinkPipelineOptions
Boolean isAutoBalanceWriteFilesShardingEnabled();

void setAutoBalanceWriteFilesShardingEnabled(Boolean autoBalanceWriteFilesShardingEnabled);

@Description(
"If not null, reports the checkpoint duration of each ParDo stage in the provided metric namespace.")
String getReportCheckpointDuration();

void setReportCheckpointDuration(String metricNamespace);
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,15 @@ private <T> void translateFlatten(

if (allInputs.isEmpty()) {

FlinkPipelineOptions pipelineOptions = context.getPipelineOptions();
// create an empty dummy source to satisfy downstream operations
// we cannot create an empty source in Flink, therefore we have to
// add the flatMap that simply never forwards the single element
boolean keepSourceAlive = !pipelineOptions.isShutdownSourcesOnFinalWatermark();
long shutdownAfterIdleSourcesMs = pipelineOptions.getShutdownSourcesAfterIdleMs();
long shutdownAfterIdleSourcesMs =
context.getPipelineOptions().getShutdownSourcesAfterIdleMs();
DataStreamSource<WindowedValue<byte[]>> dummySource =
context
.getExecutionEnvironment()
.addSource(new ImpulseSourceFunction(keepSourceAlive, shutdownAfterIdleSourcesMs));
.addSource(new ImpulseSourceFunction(shutdownAfterIdleSourcesMs));

DataStream<WindowedValue<T>> result =
dummySource
Expand Down Expand Up @@ -549,14 +548,11 @@ private void translateImpulse(
new CoderTypeInformation<>(
WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE));

FlinkPipelineOptions pipelineOptions = context.getPipelineOptions();
boolean keepSourceAlive = !pipelineOptions.isShutdownSourcesOnFinalWatermark();
long shutdownAfterIdleSourcesMs = pipelineOptions.getShutdownSourcesAfterIdleMs();
long shutdownAfterIdleSourcesMs = context.getPipelineOptions().getShutdownSourcesAfterIdleMs();
SingleOutputStreamOperator<WindowedValue<byte[]>> source =
context
.getExecutionEnvironment()
.addSource(
new ImpulseSourceFunction(keepSourceAlive, shutdownAfterIdleSourcesMs), "Impulse")
.addSource(new ImpulseSourceFunction(shutdownAfterIdleSourcesMs), "Impulse")
.returns(typeInfo);

context.addDataStream(Iterables.getOnlyElement(pTransform.getOutputsMap().values()), source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class TestFlinkRunner extends PipelineRunner<PipelineResult> {

private TestFlinkRunner(FlinkPipelineOptions options) {
options.setRunner(TestFlinkRunner.class);
options.setShutdownSourcesOnFinalWatermark(true);
if (options.getParallelism() == -1) {
// Limit parallelism to avoid too much memory consumption during local execution
options.setParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@
public class ImpulseSourceFunction
implements SourceFunction<WindowedValue<byte[]>>, CheckpointedFunction {

/** Keep source running even after it has done all the work. */
private final boolean keepSourceAlive;

/** The idle time before we the source shuts down. */
/** The idle time before the source shuts down. */
private final long idleTimeoutMs;

/** Indicates the streaming job is running and the source can produce elements. */
Expand All @@ -48,12 +45,7 @@ public class ImpulseSourceFunction
/** Checkpointed state which indicates whether the impulse has finished. */
private transient ListState<Boolean> impulseEmitted;

public ImpulseSourceFunction(boolean keepSourceAlive) {
this(keepSourceAlive, 0);
}

public ImpulseSourceFunction(boolean keepSourceAlive, long idleTimeoutMs) {
this.keepSourceAlive = keepSourceAlive;
public ImpulseSourceFunction(long idleTimeoutMs) {
this.idleTimeoutMs = idleTimeoutMs;
this.running = true;
}
Expand Down Expand Up @@ -83,24 +75,21 @@ private void waitToEnsureCheckpointingWorksCorrectly() {
// otherwise checkpointing would not work correctly anymore
//
// See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue
if (keepSourceAlive) {
long idleStart = System.currentTimeMillis();
// wait until this is canceled
final Object waitLock = new Object();
while (running
&& (idleTimeoutMs <= 0 || System.currentTimeMillis() - idleStart < idleTimeoutMs)) {
try {
// Flink will interrupt us at some point
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (waitLock) {
// don't wait indefinitely, in case something goes horribly wrong
waitLock.wait(1000);
}
} catch (InterruptedException e) {
if (!running) {
// restore the interrupted state, and fall through the loop
Thread.currentThread().interrupt();
}
long idleStart = System.currentTimeMillis();
// wait until this is canceled
final Object waitLock = new Object();
while (running && (System.currentTimeMillis() - idleStart < idleTimeoutMs)) {
try {
// Flink will interrupt us at some point
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (waitLock) {
// don't wait indefinitely, in case something goes horribly wrong
waitLock.wait(1000);
}
} catch (InterruptedException e) {
if (!running) {
// restore the interrupted state, and fall through the loop
Thread.currentThread().interrupt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.FlinkDebugPipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
Expand Down Expand Up @@ -461,8 +460,7 @@ public void open() throws Exception {
if (!options.getDisableMetrics()) {
flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext());
doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, flinkMetricContainer);
String checkpointMetricNamespace =
options.as(FlinkDebugPipelineOptions.class).getReportCheckpointDuration();
String checkpointMetricNamespace = options.getReportCheckpointDuration();
if (checkpointMetricNamespace != null) {
MetricName checkpointMetric =
MetricName.named(checkpointMetricNamespace, "checkpoint_duration");
Expand Down

0 comments on commit d106f26

Please sign in to comment.