Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,16 @@ public void run(SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) th
ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker =
new ReaderInvocationUtil<>(stepName, serializedOptions.get(), metricContainer);

if (localReaders.size() == 1) {
if (localReaders.size() == 0) {
// It can happen when value of parallelism is greater than number of IO readers (for example,
// parallelism is 2 and number of Kafka topic partitions is 1). In this case, we just fall
// through to idle this executor.
LOG.info("Number of readers is 0 for this task executor, idle");

// set this, so that the later logic will emit a final watermark and then decide whether
// to idle or not
isRunning = false;
} else if (localReaders.size() == 1) {
// the easy case, we just read from one reader
UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);

Expand Down Expand Up @@ -437,15 +446,14 @@ public void onProcessingTime(long timestamp) {
private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
if (this.isRunning) {
long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval);
runtime.getProcessingTimeService().registerTimer(timeToNextWatermark, this);
synchronized (context.getCheckpointLock()) {
long timeToNextWatermark =
runtime.getProcessingTimeService().getCurrentProcessingTime() + watermarkInterval;
runtime.getProcessingTimeService().registerTimer(timeToNextWatermark, this);
}
}
}

private long getTimeToNextWatermark(long watermarkInterval) {
return System.currentTimeMillis() + watermarkInterval;
}

/** Visible so that we can check this in tests. Must not be used for anything else. */
@VisibleForTesting
public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -52,7 +53,7 @@ public class TestCountingSource
private final int shardNumber;
private final boolean dedup;
private final boolean throwOnFirstSnapshot;
private final boolean allowSplitting;
private final int fixedNumSplits;

/**
* We only allow an exception to be thrown from getCheckpointMark at most once. This must be
Expand All @@ -66,40 +67,43 @@ public static void setFinalizeTracker(List<Integer> finalizeTracker) {
}

public TestCountingSource(int numMessagesPerShard) {
this(numMessagesPerShard, 0, false, false, true);
this(numMessagesPerShard, 0, false, false, -1);
}

public TestCountingSource withDedup() {
return new TestCountingSource(
numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, true);
return new TestCountingSource(numMessagesPerShard, shardNumber, true, throwOnFirstSnapshot, -1);
}

private TestCountingSource withShardNumber(int shardNumber) {
return new TestCountingSource(
numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, -1);
}

public TestCountingSource withThrowOnFirstSnapshot(boolean throwOnFirstSnapshot) {
return new TestCountingSource(
numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, true);
numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, -1);
}

public TestCountingSource withoutSplitting() {
return new TestCountingSource(numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, 1);
}

public TestCountingSource withFixedNumSplits(int maxNumSplits) {
return new TestCountingSource(
numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, false);
numMessagesPerShard, shardNumber, dedup, throwOnFirstSnapshot, maxNumSplits);
}

private TestCountingSource(
int numMessagesPerShard,
int shardNumber,
boolean dedup,
boolean throwOnFirstSnapshot,
boolean allowSplitting) {
int fixedNumSplits) {
this.numMessagesPerShard = numMessagesPerShard;
this.shardNumber = shardNumber;
this.dedup = dedup;
this.throwOnFirstSnapshot = throwOnFirstSnapshot;
this.allowSplitting = allowSplitting;
this.fixedNumSplits = fixedNumSplits;
}

public int getShardNumber() {
Expand All @@ -109,8 +113,8 @@ public int getShardNumber() {
@Override
public List<TestCountingSource> split(int desiredNumSplits, PipelineOptions options) {
List<TestCountingSource> splits = new ArrayList<>();
int numSplits = allowSplitting ? desiredNumSplits : 1;
for (int i = 0; i < numSplits; i++) {
int actualNumSplits = (fixedNumSplits == -1) ? desiredNumSplits : fixedNumSplits;
for (int i = 0; i < actualNumSplits; i++) {
splits.add(withShardNumber(i));
}
return splits;
Expand Down Expand Up @@ -199,6 +203,11 @@ public TestCountingSource getCurrentSource() {

@Override
public Instant getWatermark() {
if (current >= numMessagesPerShard - 1) {
// we won't emit further data, signal this with the final watermark
return new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE);
}

// The watermark is a promise about future elements, and the timestamps of elements are
// strictly increasing for this source.
return new Instant(current + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueWithRecordId;
Expand All @@ -59,16 +60,16 @@ public class UnboundedSourceWrapperTest {

/** Parameterized tests. */
@RunWith(Parameterized.class)
public static class UnboundedSourceWrapperTestWithParams {
public static class ParameterizedUnboundedSourceWrapperTest {
private final int numTasks;
private final int numSplits;

public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) {
public ParameterizedUnboundedSourceWrapperTest(int numTasks, int numSplits) {
this.numTasks = numTasks;
this.numSplits = numSplits;
}

@Parameterized.Parameters
@Parameterized.Parameters(name = "numTasks = {0}; numSplits={1}")
public static Collection<Object[]> data() {
/*
* Parameters for initializing the tests:
Expand All @@ -89,75 +90,113 @@ public static Collection<Object[]> data() {
*/
@Test
public void testValueEmission() throws Exception {
final int numElements = 20;
final int numElementsPerShard = 20;
final Object checkpointLock = new Object();
PipelineOptions options = PipelineOptionsFactory.create();

// this source will emit exactly NUM_ELEMENTS across all parallel readers,
final long[] numElementsReceived = {0L};
final int[] numWatermarksReceived = {0};

// this source will emit exactly NUM_ELEMENTS for each parallel reader,
// afterwards it will stall. We check whether we also receive NUM_ELEMENTS
// elements later.
TestCountingSource source = new TestCountingSource(numElements);
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>("stepName", options, source, numSplits);

assertEquals(numSplits, flinkWrapper.getSplitSources().size());

StreamSource<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
sourceOperator = new StreamSource<>(flinkWrapper);

AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
testHarness =
new AbstractStreamOperatorTestHarness<>(
sourceOperator,
numTasks /* max parallelism */,
numTasks /* parallelism */,
0 /* subtask index */);

testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);

try {
testHarness.open();
sourceOperator.run(
checkpointLock,
new TestStreamStatusMaintainer(),
new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
private int count = 0;

@Override
public void emitWatermark(Watermark watermark) {}

TestCountingSource source =
new TestCountingSource(numElementsPerShard).withFixedNumSplits(numSplits);

for (int subtaskIndex = 0; subtaskIndex < numTasks; subtaskIndex++) {
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
new UnboundedSourceWrapper<>("stepName", options, source, numTasks);

// the source wrapper will only request as many splits as there are tasks and the source
// will create at most numSplits splits
assertEquals(numSplits, flinkWrapper.getSplitSources().size());

StreamSource<
WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>,
UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>>
sourceOperator = new StreamSource<>(flinkWrapper);

AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
testHarness =
new AbstractStreamOperatorTestHarness<>(
sourceOperator,
numTasks /* max parallelism */,
numTasks /* parallelism */,
subtaskIndex /* subtask index */);

testHarness.setProcessingTime(System.currentTimeMillis());

// start a thread that advances processing time, so that we eventually get the final
// watermark which is only updated via a processing-time trigger
Thread processingTimeUpdateThread =
new Thread() {
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
collect((StreamRecord) streamRecord);
public void run() {
while (true) {
try {
synchronized (testHarness.getCheckpointLock()) {
testHarness.setProcessingTime(System.currentTimeMillis());
}
Thread.sleep(1000);
} catch (Exception e) {
break;
}
}
}
};
processingTimeUpdateThread.start();

testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);

try {
testHarness.open();
sourceOperator.run(
checkpointLock,
new TestStreamStatusMaintainer(),
new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() {
private boolean hasSeenMaxWatermark = false;

@Override
public void emitWatermark(Watermark watermark) {
// we get this when there is no more data
// it can happen that we get the max watermark several times, so guard against
// this
if (!hasSeenMaxWatermark
&& watermark.getTimestamp()
>= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
numWatermarksReceived[0]++;
hasSeenMaxWatermark = true;
}
}

@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {}

@Override
public void collect(
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
windowedValueStreamRecord) {

count++;
if (count >= numElements) {
throw new SuccessException();
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
collect((StreamRecord) streamRecord);
}
}

@Override
public void close() {}
});
} catch (SuccessException e) {
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {}

assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
@Override
public void collect(
StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>
windowedValueStreamRecord) {
numElementsReceived[0]++;
}

// success
return;
@Override
public void close() {}
});
} catch (SuccessException e) {
processingTimeUpdateThread.interrupt();
processingTimeUpdateThread.join();
// success, continue for the other subtask indices
}
}
fail("Read terminated without producing expected number of outputs");
// verify that we get the expected count across all subtasks
assertEquals(numElementsPerShard * numSplits, numElementsReceived[0]);
// and that we get as many final watermarks as there are subtasks
assertEquals(numTasks, numWatermarksReceived[0]);
}

/**
Expand Down