Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,8 @@ public Bounded(
@Override
public Partition[] getPartitions() {
try {
List<? extends Source<T>> partitionedSources;
if (bundleSize > 0) {
partitionedSources = source.split(bundleSize, options.get());
} else {
long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
long desiredSizeBytes = (bundleSize > 0) ? bundleSize : DEFAULT_BUNDLE_SIZE;
if (bundleSize == 0) {
try {
desiredSizeBytes = source.getEstimatedSizeBytes(options.get()) / numPartitions;
} catch (Exception e) {
Expand All @@ -113,8 +110,10 @@ public Partition[] getPartitions() {
source,
DEFAULT_BUNDLE_SIZE);
}
partitionedSources = source.split(desiredSizeBytes, options.get());
}

List<? extends Source<T>> partitionedSources =
source.split(desiredSizeBytes, options.get());
Partition[] partitions = new SourcePartition[partitionedSources.size()];
for (int i = 0; i < partitionedSources.size(); i++) {
partitions[i] = new SourcePartition<>(id(), i, partitionedSources.get(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.spark.translation;

import com.google.common.base.Optional;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.util.ByteArray;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -28,7 +29,7 @@
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
Expand All @@ -45,21 +46,18 @@ public static <K, V> JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> g
JavaRDD<WindowedValue<KV<K, V>>> rdd,
Coder<K> keyCoder,
WindowedValueCoder<V> wvCoder,
boolean defaultParallelism) {
@Nullable Partitioner partitioner) {
// we use coders to convert objects in the PCollection to byte arrays, so they
// can be transferred over the network for the shuffle.
JavaPairRDD<ByteArray, byte[]> pairRDD =
rdd.map(new ReifyTimestampsAndWindowsFunction<>())
.map(WindowingHelpers.unwindowFunction())
.mapToPair(TranslationUtils.toPairFunction())
.mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder));
JavaPairRDD<ByteArray, Iterable<byte[]>> groupedRDD;
if (defaultParallelism) {
groupedRDD =
pairRDD.groupByKey(new HashPartitioner(rdd.rdd().sparkContext().defaultParallelism()));
} else {
groupedRDD = pairRDD.groupByKey();
}

// If no partitioner is passed, the default group by key operation is called
JavaPairRDD<ByteArray, Iterable<byte[]>> groupedRDD =
(partitioner != null) ? pairRDD.groupByKey(partitioner) : pairRDD.groupByKey();

// using mapPartitions allows to preserve the partitioner
// and avoid unnecessary shuffle downstream.
Expand Down Expand Up @@ -93,10 +91,10 @@ public static <InputT, AccumT> Optional<Iterable<WindowedValue<AccumT>>> combine
// can be transferred over the network for the shuffle.
// for readability, we add comments with actual type next to byte[].
// to shorten line length, we use:
//---- WV: WindowedValue
//---- Iterable: Itr
//---- AccumT: A
//---- InputT: I
// ---- WV: WindowedValue
// ---- Iterable: Itr
// ---- AccumT: A
// ---- InputT: I
JavaRDD<byte[]> inputRDDBytes = rdd.map(CoderHelpers.toByteFunction(wviCoder));

if (inputRDDBytes.isEmpty()) {
Expand Down Expand Up @@ -173,10 +171,10 @@ JavaPairRDD<K, Iterable<WindowedValue<KV<K, AccumT>>>> combinePerKey(
// can be transferred over the network for the shuffle.
// for readability, we add comments with actual type next to byte[].
// to shorten line length, we use:
//---- WV: WindowedValue
//---- Iterable: Itr
//---- AccumT: A
//---- InputT: I
// ---- WV: WindowedValue
// ---- Iterable: Itr
// ---- AccumT: A
// ---- InputT: I
JavaPairRDD<ByteArray, byte[]> inRddDuplicatedKeyPairBytes =
inRddDuplicatedKeyPair.mapToPair(CoderHelpers.toByteFunction(keyCoder, wkviCoder));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -130,17 +132,14 @@ public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());

// --- group by key only.
Long bundleSize =
context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize();
Partitioner partitioner =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be flipped to maintain old functionality:

        Partitioner partitioner =
            (bundleSize > 0)
                ? null
                : new HashPartitioner(context.getSparkContext().defaultParallelism());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think it will have basically the same effect if we always use null in the batch context. If we split a source on bundleSize and it has n partitions or split it on defaultParallelism and it has defaultParallelism partitions, either way going forward we want to maintain that many partitions which is what no HashPartitioner does. Only case where this is weird is if we try to split on defaultParallelism but the source doesn't support splitting that much causing < defaultParallelism partitions.

(bundleSize > 0)
? new HashPartitioner(context.getSparkContext().defaultParallelism())
: null;
JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKey =
GroupCombineFunctions.groupByKeyOnly(
inRDD,
keyCoder,
wvCoder,
context
.getSerializableOptions()
.get()
.as(SparkPipelineOptions.class)
.getBundleSize()
> 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For anyone observing: this is the real issue where it should have been ==0 to pass in defaultParallelism==true if no explicit bundle size was set.

The refactor removes the likelihood of error by explicitly taking a Partitioner (which could potentially be of other type that a HashPartitioner). Nice

GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder, partitioner);

// --- now group also by window.
// for batch, GroupAlsoByWindow uses an in-memory StateInternals.
Expand Down Expand Up @@ -433,7 +432,7 @@ private static <K, V, OutputT> JavaPairRDD<TupleTag<?>, WindowedValue<?>> statef
WindowedValue.FullWindowedValueCoder.of(kvCoder.getValueCoder(), windowCoder);

JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupRDD =
GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, true);
GroupCombineFunctions.groupByKeyOnly(kvInRDD, keyCoder, wvCoder, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is actually correct and I did it wrong originally.


return groupRDD
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void evaluate(GroupByKey<K, V> transform, EvaluationContext context) {
JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>> groupedByKeyStream =
dStream.transform(
rdd ->
GroupCombineFunctions.groupByKeyOnly(rdd, coder.getKeyCoder(), wvCoder, true));
GroupCombineFunctions.groupByKeyOnly(rdd, coder.getKeyCoder(), wvCoder, null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To maintain old functionality:

GroupCombineFunctions.groupByKeyOnly(rdd, coder.getKeyCoder(), wvCoder, new HashPartitioner(context.getSparkContext().defaultParallelism()));


// --- now group also by window.
JavaDStream<WindowedValue<KV<K, Iterable<V>>>> outStream =
Expand Down