Skip to content

Commit dbebc4b

Browse files
fix: Fix PubsubLiteUnboundedSource to create n partitions not partitions of n size (#313)
1 parent ed9d961 commit dbebc4b

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PubsubLiteUnboundedSource.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import com.google.common.collect.ImmutableList;
3131
import com.google.common.collect.ImmutableMap;
3232
import com.google.common.collect.ImmutableSet;
33-
import com.google.common.collect.Iterables;
3433
import io.grpc.StatusException;
3534
import java.io.IOException;
35+
import java.util.ArrayList;
3636
import java.util.List;
3737
import java.util.Optional;
3838
import javax.annotation.Nullable;
@@ -50,9 +50,17 @@ class PubsubLiteUnboundedSource extends UnboundedSource<SequencedMessage, Offset
5050
@Override
5151
public List<? extends UnboundedSource<SequencedMessage, OffsetCheckpointMark>> split(
5252
int desiredNumSplits, PipelineOptions options) {
53+
ArrayList<ArrayList<Partition>> partitionPartitions = new ArrayList<>(desiredNumSplits);
54+
for (int i = 0; i < desiredNumSplits; i++) {
55+
partitionPartitions.add(new ArrayList<>());
56+
}
57+
int counter = 0;
58+
for (Partition partition : subscriberOptions.partitions()) {
59+
partitionPartitions.get(counter % desiredNumSplits).add(partition);
60+
++counter;
61+
}
5362
ImmutableList.Builder<PubsubLiteUnboundedSource> builder = ImmutableList.builder();
54-
for (List<Partition> partitionSubset :
55-
Iterables.partition(subscriberOptions.partitions(), desiredNumSplits)) {
63+
for (List<Partition> partitionSubset : partitionPartitions) {
5664
if (partitionSubset.isEmpty()) continue;
5765
try {
5866
builder.add(

0 commit comments

Comments
 (0)