Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-158] add support for bounded sources in streaming #104

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.beam.runners.flink.translation;

import com.google.cloud.dataflow.sdk.io.BoundedSource;
import org.apache.beam.runners.flink.translation.functions.UnionCoder;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
import org.apache.beam.runners.flink.translation.wrappers.streaming.*;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class FlinkStreamingTransformTranslators {
// here you can find all the available translators.
static {
TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
Expand Down Expand Up @@ -126,7 +129,7 @@ public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslatio
DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction)
.returns(outputType);

context.setOutputDataStream(context.getOutput(transform), outputDataStream);
context.setOutputDataStream(output, outputDataStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also swap the call to getOutput at L109. [GitHub won't let me comment there.]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

}
}

Expand Down Expand Up @@ -164,6 +167,36 @@ public void flatMap(WindowedValue<T> value, Collector<String> out) throws Except
}
}

private static class BoundedReadSourceTranslator<T>
implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {

@Override
public void translateNode(Read.Bounded<T> transform, FlinkStreamingTranslationContext context) {

BoundedSource<T> boundedSource = transform.getSource();
PCollection<T> output = context.getOutput(transform);

Coder<T> defaultOutputCoder = boundedSource.getDefaultOutputCoder();
CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(defaultOutputCoder);

DataStream<T> source = context.getExecutionEnvironment().createInput(
new SourceInputFormat<>(
boundedSource,
context.getPipelineOptions()),
typeInfo);

DataStream<WindowedValue<T>> windowedStream = source.flatMap(
new FlatMapFunction<T, WindowedValue<T>>() {
@Override
public void flatMap(T value, Collector<WindowedValue<T>> out) throws Exception {
out.collect(WindowedValue.of(value, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We normally treat untimestamped items as being from time -infinity. Is this switch to processing time explicitly desired and/or required for Flink's current semantics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, BoundedSource has a getCurrentTimestamp function -- I'd expect you would call it and that it would, in turn, default to -infinity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that BoundedWindow.TIMESTAMP_MIN_VALUE is the value of -infinity ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink's InputFormat (evolved from batch) doesn't support emission of watermarks or timestamps (although the SourceFunction in streaming does). However, you can always assign timestamps as the next step. I believe that used to be the same in Dataflow.

The SourceInputFormat is shared between batch and streaming mode. I propose to use BoundedWindow.TIMESTAMP_MIN_VALUE for now as timestamps. We should do a follow-up to address the proper use for the getCurrentTimeStamp function. Internally, Flink wraps the InputFormat and we could probably also create a dedicated wrapper for Beam.

}
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know nothing here, just want to confirm that it's okay to use an "IngestionTimeExtractor" for a collection where all elements have timestamps of MIN_VALUE.


context.setOutputDataStream(output, windowedStream);
}
}

private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {

@Override
Expand All @@ -172,19 +205,21 @@ public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslation

DataStream<WindowedValue<T>> source;
if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource();
@SuppressWarnings("unchecked")
UnboundedFlinkSource<T> flinkSource = (UnboundedFlinkSource<T>) transform.getSource();
source = context.getExecutionEnvironment()
.addSource(flinkSource.getFlinkSource())
.flatMap(new FlatMapFunction<String, WindowedValue<String>>() {
.flatMap(new FlatMapFunction<T, WindowedValue<T>>() {
@Override
public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception {
collector.collect(WindowedValue.of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
}
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor());
}).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same questions as above w.r.t. timestamps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SourceFunctions don't have to assign timestamps or emit watermarks when we are using Event Time. In Flink, you can always assign them later. In the Beam Runner we default to Event Time which makes it tricky for us because the user can't actually extract watermarks/timestamps via the Flink API. Thus, we defaulted to Ingestion Time here to give users a good out of the box experience, e.g. when they read data from Kafka. We are currently reworking some of the provided sources to emit timestamps/watermarks directly in the source.

Note that, this limitation applies only to Flink wrapped sources. The general wrapper for unbounded sources emits watermarks and timestamped values.

} else {
source = context.getExecutionEnvironment()
.addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
}

context.setOutputDataStream(output, source);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@
package org.apache.beam.runners.flink.translation.wrappers;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Source;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;


/**
* A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a
* Dataflow {@link com.google.cloud.dataflow.sdk.io.Source}.
Expand All @@ -44,37 +43,35 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>>
private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);

private final BoundedSource<T> initialSource;

private transient PipelineOptions options;
private final byte[] serializedOptions;

private BoundedSource.BoundedReader<T> reader = null;
private boolean reachedEnd = true;
private transient BoundedSource.BoundedReader<T> reader = null;
private boolean inputAvailable = true;

public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
this.initialSource = initialSource;
this.options = options;
}

private void writeObject(ObjectOutputStream out)
throws IOException, ClassNotFoundException {
out.defaultWriteObject();
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(out, options);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
new ObjectMapper().writeValue(baos, options);
serializedOptions = baos.toByteArray();
} catch (Exception e) {
throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
}

private void readObject(ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
ObjectMapper mapper = new ObjectMapper();
options = mapper.readValue(in, PipelineOptions.class);
}

@Override
public void configure(Configuration configuration) {}

@Override
public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
options = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems inefficient to decode pipeline options several times.

Is this to protect the user from mutating it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This will be called on every input split. We can move the deserialization code to the configure method.

reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
reachedEnd = false;
inputAvailable = reader.start();
}

@Override
Expand All @@ -86,7 +83,6 @@ public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOExce
@Override
public long getTotalInputSize() {
return estimatedSize;

}

@Override
Expand All @@ -109,14 +105,12 @@ public float getAverageRecordWidth() {
@Override
@SuppressWarnings("unchecked")
public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
long desiredSizeBytes;
try {
desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes,
options);
long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, options);
List<SourceInputSplit<T>> splits = new ArrayList<>();
int splitCount = 0;
for (Source<T> shard: shards) {
for (Source<T> shard : shards) {
splits.add(new SourceInputSplit<>(shard, splitCount++));
}
return splits.toArray(new SourceInputSplit[splits.size()]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not especially important but looks like a minor efficiency gain from preallocating an array instead of the list. You already have the size at L110.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Thanks.

Expand All @@ -127,33 +121,24 @@ public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException

@Override
public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
return new InputSplitAssigner() {
private int index = 0;
private final SourceInputSplit[] splits = sourceInputSplits;
@Override
public InputSplit getNextInputSplit(String host, int taskId) {
if (index < splits.length) {
return splits[index++];
} else {
return null;
}
}
};
return new DefaultInputSplitAssigner(sourceInputSplits);
}


@Override
public boolean reachedEnd() throws IOException {
return reachedEnd;
return !inputAvailable;
}

@Override
public T nextRecord(T t) throws IOException {

reachedEnd = !reader.advance();
if (!reachedEnd) {
return reader.getCurrent();
if (inputAvailable) {
final T current = reader.getCurrent();
// advance reader to have a record ready next time
inputAvailable = reader.advance();
return current;
}

return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using null to mean "no element"? What about collections or sources that might produce null elements as a valid value? (This is why we use the NoSuchElement exception)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this doesn't look good but it is the behavior of the InputFormat in Flink. A null signals that the current input split has been read. Flink doesn't programmatically support null values which was a little tricky when implementing the runner.

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,28 @@
package org.apache.beam.runners.flink;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.CountingInput;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.flink.test.util.JavaProgramTestBase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class ReadSourceITCase extends JavaProgramTestBase {

protected String resultPath;

public ReadSourceITCase(){
}

static final String[] EXPECTED_RESULT = new String[] {
"1", "2", "3", "4", "5", "6", "7", "8", "9"};
private static final String[] EXPECTED_RESULT = new String[] {
"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};

@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
System.out.println(resultPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would generally prefer log statements over stdout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, me to.

}

@Override
Expand All @@ -66,99 +57,17 @@ private static void runProgram(String resultPath) {
Pipeline p = FlinkTestPipeline.createForBatch();

PCollection<String> result = p
.apply(Read.from(new ReadSource(1, 10)))
.apply(ParDo.of(new DoFn<Integer, String>() {
.apply(CountingInput.upTo(10))
.apply(ParDo.of(new DoFn<Long, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().toString());
}
}));

result.apply(TextIO.Write.to(resultPath));
p.run();
}


private static class ReadSource extends BoundedSource<Integer> {
final int from;
final int to;

ReadSource(int from, int to) {
this.from = from;
this.to = to;
}

@Override
public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options)
throws Exception {
List<ReadSource> res = new ArrayList<>();
FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
int numWorkers = flinkOptions.getParallelism();
Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0.");

float step = 1.0f * (to - from) / numWorkers;
for (int i = 0; i < numWorkers; ++i) {
res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step)));
}
return res;
}

@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
return 8 * (to - from);
}

@Override
public boolean producesSortedKeys(PipelineOptions options) throws Exception {
return true;
}

@Override
public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
return new RangeReader(this);
}

@Override
public void validate() {}

@Override
public Coder<Integer> getDefaultOutputCoder() {
return BigEndianIntegerCoder.of();
}

private class RangeReader extends BoundedReader<Integer> {
private int current;

public RangeReader(ReadSource source) {
this.current = source.from - 1;
}

@Override
public boolean start() throws IOException {
return true;
}

@Override
public boolean advance() throws IOException {
current++;
return (current < to);
}

@Override
public Integer getCurrent() {
return current;
}

@Override
public void close() throws IOException {
// Nothing
}

@Override
public BoundedSource<Integer> getCurrentSource() {
return ReadSource.this;
}
}
p.run();
}
}

Expand Down
Loading