Skip to content

Commit

Permalink
This closes #2356
Browse files Browse the repository at this point in the history
  • Loading branch information
staslev committed Mar 30, 2017
2 parents 769398e + 3876f83 commit 2a40534
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,7 @@ public static <T, CheckpointMarkT extends CheckpointMark> UnboundedDataset<T> re

// report the number of input elements for this InputDStream to the InputInfoTracker.
int id = inputDStream.inputDStream().id();
JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map(
new Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata>() {
@Override
public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
return t2._2();
}
});
JavaDStream<Metadata> metadataDStream = mapWithStateDStream.map(new Tuple2MetadataFunction());

// register ReadReportDStream to report information related to this read.
new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register();
Expand All @@ -118,13 +112,10 @@ public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
WindowedValue.FullWindowedValueCoder.of(
source.getDefaultOutputCoder(),
GlobalWindow.Coder.INSTANCE);
JavaDStream<WindowedValue<T>> readUnboundedStream = mapWithStateDStream.flatMap(
new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() {
@Override
public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
return t2._1();
}
}).map(CoderHelpers.fromByteFunction(coder));
JavaDStream<WindowedValue<T>> readUnboundedStream =
mapWithStateDStream
.flatMap(new Tuple2byteFlatMapFunction())
.map(CoderHelpers.fromByteFunction(coder));
return new UnboundedDataset<>(readUnboundedStream, Collections.singletonList(id));
}

Expand Down Expand Up @@ -274,4 +265,22 @@ SparkMetricsContainer getMetricsContainer() {
return metricsContainer;
}
}

private static class Tuple2MetadataFunction
implements Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata> {

@Override
public Metadata call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
return t2._2();
}
}

private static class Tuple2byteFlatMapFunction
implements FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]> {

@Override
public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception {
return t2._1();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.spark.coders.CoderHelpers;
Expand Down Expand Up @@ -197,14 +197,13 @@ public Tuple2<Iterable<byte[]>, Metadata> apply(
throw new RuntimeException("Failed to read from reader.", e);
}

Iterable <byte[]> iterable = new Iterable<byte[]>() {
@Override
public Iterator<byte[]> iterator() {
return Iterators.unmodifiableIterator(readValues.iterator());
}
};
return new Tuple2<>(iterable,
new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer));
final ArrayList<byte[]> payload =
Lists.newArrayList(Iterators.unmodifiableIterator(readValues.iterator()));

return new Tuple2<>(
(Iterable<byte[]>) payload,
new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer));

} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 2a40534

Please sign in to comment.