Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,7 @@ public void run(SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) th
// parallelism is 2 and number of Kafka topic partitions is 1). In this case, we just fall
// through to idle this executor.
LOG.info("Number of readers is 0 for this task executor, idle");

// set this, so that the later logic will emit a final watermark and then decide whether
// to idle or not
isRunning = false;
// Do nothing here but still execute the rest of the source logic
} else if (localReaders.size() == 1) {
// the easy case, we just read from one reader
UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
Expand Down Expand Up @@ -281,6 +278,10 @@ public void run(SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) th

ctx.emitWatermark(new Watermark(Long.MAX_VALUE));

finalizeSource();
}

private void finalizeSource() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'While' inside this method was never executed earlier as isRunning was always false when we reached here.
Shall we call this method only from the 1st if statement L205 as even not, in the other else blocks will exit only after isRunning is false.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also wait is only used as sleep here. Shall we change it to sleep to make it more evident?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the bug was that the finalize code was never executed for empty readers.

Shall we call this method only from the 1st if statement L205 as even not, in the other else blocks will exit only after isRunning is false.

All if blocks will eventually execute finalizeSource, so I wouldn't call it directly in line 205 because that would additionally require a return; statement.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also wait is only used as sleep here. Shall we change it to sleep to make it more evident?

Yes, good idea. The object we're waiting for, can't be notified, so sleep is better here.

FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class);
if (!options.isShutdownSourcesOnFinalWatermark()) {
// do nothing, but still look busy ...
Expand All @@ -290,15 +291,10 @@ public void run(SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) th
// See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue

// wait until this is canceled
final Object waitLock = new Object();
while (isRunning) {
try {
// Flink will interrupt us at some point
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (waitLock) {
// don't wait indefinitely, in case something goes horribly wrong
waitLock.wait(1000);
}
Thread.sleep(1000);
} catch (InterruptedException e) {
if (!isRunning) {
// restore the interrupted state, and fall through the loop
Expand Down Expand Up @@ -466,6 +462,12 @@ public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSo
return localSplitSources;
}

/** Visible so that we can check this in tests. Must not be used for anything else. */
@VisibleForTesting
public List<UnboundedSource.UnboundedReader<OutputT>> getLocalReaders() {
return localReaders;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.beam.runners.flink.streaming;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -26,21 +28,27 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand All @@ -54,6 +62,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -95,7 +104,8 @@ public static Collection<Object[]> data() {
@Test(timeout = 30_000)
public void testValueEmission() throws Exception {
final int numElementsPerShard = 20;
PipelineOptions options = PipelineOptionsFactory.create();
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setShutdownSourcesOnFinalWatermark(true);

final long[] numElementsReceived = {0L};
final int[] numWatermarksReceived = {0};
Expand Down Expand Up @@ -555,6 +565,54 @@ public void testSerialization() throws Exception {

InstantiationUtil.serializeObject(flinkWrapper);
}

@Test(timeout = 10_000)
public void testSourceWithNoReaderDoesNotShutdown() throws Exception {
final int parallelism = 2;
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setShutdownSourcesOnFinalWatermark(true);

TestCountingSource source = new TestCountingSource(20).withoutSplitting();

UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> sourceWrapper =
new UnboundedSourceWrapper<>("noReader", options, source, parallelism);

StreamingRuntimeContext mock = Mockito.mock(StreamingRuntimeContext.class);
// Set up the RuntimeContext such that this instance won't receive any readers
Mockito.when(mock.getIndexOfThisSubtask()).thenReturn(parallelism - 1);
Mockito.when(mock.getNumberOfParallelSubtasks()).thenReturn(parallelism);
sourceWrapper.setRuntimeContext(mock);
sourceWrapper.open(new Configuration());

SourceFunction.SourceContext sourceContext = Mockito.mock(SourceFunction.SourceContext.class);
Object checkpointLock = new Object();
Mockito.when(sourceContext.getCheckpointLock()).thenReturn(checkpointLock);

Thread thread =
new Thread(
() -> {
try {
sourceWrapper.run(sourceContext);
} catch (Exception e) {
LOG.error("Error while running UnboundedSourceWrapper", e);
}
});

try {
thread.start();
List<UnboundedSource.UnboundedReader<KV<Integer, Integer>>> localReaders =
sourceWrapper.getLocalReaders();
while (localReaders != null && !localReaders.isEmpty()) {
Thread.sleep(200);
// should stay alive
assertThat(thread.isAlive(), is(true));
}
sourceWrapper.cancel();
} finally {
thread.interrupt();
thread.join();
}
}
}

private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer {
Expand Down