From dbebc4ba4fc718d2f2fcfeea17787dcf2bae9662 Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Wed, 21 Oct 2020 10:59:25 -0400 Subject: [PATCH] fix: Fix PubsubLiteUnboundedSource to create n partitions not partitions of n size (#313) --- .../pubsublite/beam/PubsubLiteUnboundedSource.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java index 813e8de7f..d300e6fc4 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java @@ -30,9 +30,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import io.grpc.StatusException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import javax.annotation.Nullable; @@ -50,9 +50,17 @@ class PubsubLiteUnboundedSource extends UnboundedSource> split( int desiredNumSplits, PipelineOptions options) { + ArrayList> partitionPartitions = new ArrayList<>(desiredNumSplits); + for (int i = 0; i < desiredNumSplits; i++) { + partitionPartitions.add(new ArrayList<>()); + } + int counter = 0; + for (Partition partition : subscriberOptions.partitions()) { + partitionPartitions.get(counter % desiredNumSplits).add(partition); + ++counter; + } ImmutableList.Builder builder = ImmutableList.builder(); - for (List partitionSubset : - Iterables.partition(subscriberOptions.partitions(), desiredNumSplits)) { + for (List partitionSubset : partitionPartitions) { if (partitionSubset.isEmpty()) continue; try { builder.add(