From 54c59a43548090cd7c27cdd7615cfd20868f5de6 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 5 Nov 2018 17:57:10 +0100 Subject: [PATCH 1/2] [BEAM-5971] Prevent unwanted shutdown of UnboundedSourceWrapper When `shutdownSourcesOnFinalWatermark` is set to `false`, sources must not be shut down in order to enable the checkpointing to work correctly. --- .../streaming/io/UnboundedSourceWrapper.java | 15 +++-- .../streaming/UnboundedSourceWrapperTest.java | 60 ++++++++++++++++++- 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 7be626d2c768..83be3910a1d7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -207,10 +207,7 @@ public void run(SourceContext>> ctx) th // parallelism is 2 and number of Kafka topic partitions is 1). In this case, we just fall // through to idle this executor. LOG.info("Number of readers is 0 for this task executor, idle"); - - // set this, so that the later logic will emit a final watermark and then decide whether - // to idle or not - isRunning = false; + // Do nothing here but still execute the rest of the source logic } else if (localReaders.size() == 1) { // the easy case, we just read from one reader UnboundedSource.UnboundedReader reader = localReaders.get(0); @@ -281,6 +278,10 @@ public void run(SourceContext>> ctx) th ctx.emitWatermark(new Watermark(Long.MAX_VALUE)); + finalizeSource(); + } + + private void finalizeSource() { FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class); if (!options.isShutdownSourcesOnFinalWatermark()) { // do nothing, but still look busy ... @@ -466,6 +467,12 @@ public List> getLocalSplitSo return localSplitSources; } + /** Visible so that we can check this in tests. Must not be used for anything else. */ + @VisibleForTesting + public List> getLocalReaders() { + return localReaders; + } + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 053bc302d209..486994445057 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -17,7 +17,9 @@ */ package org.apache.beam.runners.flink.streaming; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -26,21 +28,27 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.ValueWithRecordId; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -54,6 +62,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.junit.runners.Parameterized; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +104,8 @@ public static Collection data() { @Test(timeout = 30_000) public void testValueEmission() throws Exception { final int numElementsPerShard = 20; - PipelineOptions options = PipelineOptionsFactory.create(); + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setShutdownSourcesOnFinalWatermark(true); final long[] numElementsReceived = {0L}; final int[] numWatermarksReceived = {0}; @@ -555,6 +565,54 @@ public void testSerialization() throws Exception { InstantiationUtil.serializeObject(flinkWrapper); } + + @Test(timeout = 10_000) + public void testSourceWithNoReaderDoesNotShutdown() throws Exception { + final int parallelism = 2; + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setShutdownSourcesOnFinalWatermark(true); + + TestCountingSource source = new TestCountingSource(20).withoutSplitting(); + + UnboundedSourceWrapper, TestCountingSource.CounterMark> sourceWrapper = + new UnboundedSourceWrapper<>("noReader", options, source, parallelism); + + StreamingRuntimeContext mock = Mockito.mock(StreamingRuntimeContext.class); + // Set up the RuntimeContext such that this instance won't receive any readers + Mockito.when(mock.getIndexOfThisSubtask()).thenReturn(parallelism - 1); + Mockito.when(mock.getNumberOfParallelSubtasks()).thenReturn(parallelism); + sourceWrapper.setRuntimeContext(mock); + sourceWrapper.open(new Configuration()); + + SourceFunction.SourceContext sourceContext = Mockito.mock(SourceFunction.SourceContext.class); + Object checkpointLock = new Object(); + Mockito.when(sourceContext.getCheckpointLock()).thenReturn(checkpointLock); + + Thread thread = + new Thread( + () -> { + try { + sourceWrapper.run(sourceContext); + } catch (Exception e) { + LOG.error("Error while running UnboundedSourceWrapper", e); + } + }); + + try { + thread.start(); + List>> localReaders = + sourceWrapper.getLocalReaders(); + while (localReaders != null && !localReaders.isEmpty()) { + Thread.sleep(200); + // should stay alive + assertThat(thread.isAlive(), is(true)); + } + sourceWrapper.cancel(); + } finally { + thread.interrupt(); + thread.join(); + } + } } private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer { From a64fda62ff7953601620a29661e315f1812ef957 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Tue, 6 Nov 2018 20:21:01 +0100 Subject: [PATCH 2/2] [BEAM-5971] Use Thread.sleep instead of Object#wait We're not holding a shared lock, so Thread.sleep is the better approach for waiting. --- .../wrappers/streaming/io/UnboundedSourceWrapper.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 83be3910a1d7..1595a95d0242 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -291,15 +291,10 @@ private void finalizeSource() { // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue // wait until this is canceled - final Object waitLock = new Object(); while (isRunning) { try { // Flink will interrupt us at some point - //noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (waitLock) { - // don't wait indefinitely, in case something goes horribly wrong - waitLock.wait(1000); - } + Thread.sleep(1000); } catch (InterruptedException e) { if (!isRunning) { // restore the interrupted state, and fall through the loop