Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11996] spannerio splittable #14811

Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import com.google.cloud.spanner.TimestampBound;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -73,18 +74,18 @@ public PCollection<Struct> expand(PCollection<ReadOperation> input) {
.apply(
"Generate Partitions",
ParDo.of(new GeneratePartitionsFn(getSpannerConfig(), txView)).withSideInputs(txView))
.apply("Shuffle partitions", Reshuffle.<Partition>viaRandomKey())
.apply(
"Read from Partitions",
ParDo.of(new ReadFromPartitionFn(getSpannerConfig(), txView)).withSideInputs(txView));
}

// TODO (BEAM-12522) When @InitialRestriction and @SplitRestriction are able to access sideInputs
// this DoFn should be integrated into ReadFromPartitionFn
@VisibleForTesting
static class GeneratePartitionsFn extends DoFn<ReadOperation, Partition> {
static class GeneratePartitionsFn extends DoFn<ReadOperation, List<Partition>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the GeneratePartitionsFn as well by using @GetInitialRestriction and @SplitRestriction from y our Splittable DoFn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @boyuanzz I have been working in trying to get this part done but I have encountered a problem with the approach. I have changed the code to a single DoFn to remove GeneratePartitionsFn as you mentioned, now my DoFn looks like this private static class ReadFromPartitionFn extends DoFn<ReadOperation, Struct>. But I'm having problems getting @GetInitialRestriction and @SplitRestriction right, in order to know the size of the Partition List we are going to process I need to use the side input c.sideInput(txView); to process the ReadOperation, is there a way to get the SideInput in @GetInitialRestriction or @SplitRestriction to get the List Size and the amount of partitions we are going to process? or the approach you were suggesting was something different? Thanks in advance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late.
Splittable DoFn supports sideinput just like normal DoFn. So you can access sideinput from @GetInitialRestriction and @SplitRestriction just like from @ProcessElement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @boyuanzz , thanks for your response, I have been trying out that option, trying to access the sideinput either from @GetInitialRestriction or @SplitRestriction, both using @SideInput and ProcessContext, but in both cases I get an error specifying that Illegal parameter type: ProcessContextParameter{} or Illegal parameter type: SideInputParameter, checking the code for DoFnSignatures I can see @SideInput is not in the ALLOWED_SPLIT_RESTRICTION_PARAMETERS or ALLOWED_GET_INITIAL_RESTRICTION_PARAMETERS. I don’t know if I maybe missing something in how to use the side parameters?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me double check and I'll back to here when I get an answer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, can you share the whole stacktrace for the error you get?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.lang.IllegalArgumentException: org.apache.beam.sdk.io.gcp.spanner.BatchSpannerRead$ReadFromPartitionFn, @SplitRestriction splitRestriction(OffsetRange, PCollectionView, OutputReceiver): Illegal parameter type: SideInputParameter{elementT=org.apache.beam.sdk.values.PCollectionView<org.apache.beam.sdk.io.gcp.spanner.Transaction>, sideInputId=tx}
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:2399)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.checkParameterOneOf(DoFnSignatures.java:1292)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeSplitRestrictionMethod(DoFnSignatures.java:1841)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.parseSignature(DoFnSignatures.java:722)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.lambda$getSignature$0(DoFnSignatures.java:282)
at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getSignature(DoFnSignatures.java:282)
at org.apache.beam.sdk.transforms.ParDo.validate(ParDo.java:613)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:402)
at org.apache.beam.sdk.io.gcp.spanner.BatchSpannerRead.expand(BatchSpannerRead.java:79)
at org.apache.beam.sdk.io.gcp.spanner.BatchSpannerRead.expand(BatchSpannerRead.java:43)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$ReadAll.expand(SpannerIO.java:518)
at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$ReadAll.expand(SpannerIO.java:395)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
at org.apache.beam.sdk.io.gcp.spanner.SpannerIOReadTest.readAllPipeline(SpannerIOReadTest.java:305)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @boyuanzz
Retaking the idea in this issue, I looked for an example of using side inputs in the SplitRestriction and GetInitialRestriction methods but I haven’t find any. In case that it's not possible to use the sideinput there to estimate the amount of data I think the best way to get a good estimation of the amount of data to split would be to keep the logic in two doFn, as we get the Partitions and the size of the list of partitions that we can work with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created https://issues.apache.org/jira/browse/BEAM-12522 for tracking the sideinput with Splittable DoFn issue. I'm trying to work on a quick fix, though I'm not sure whether the fix would be simple. In order to unblock you, you can put a TODO(BEAM-12522) over the expansion saying that we can do so when the sideinput is supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TODO message to GeneratePartitionsFn mentioning it should be integrated into ReadFromPartitionFn when the issue is solved


private final SpannerConfig config;
private final PCollectionView<? extends Transaction> txView;

private transient SpannerAccessor spannerAccessor;

public GeneratePartitionsFn(
Expand All @@ -108,9 +109,7 @@ public void processElement(ProcessContext c) throws Exception {
Transaction tx = c.sideInput(txView);
BatchReadOnlyTransaction context =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());
for (Partition p : execute(c.element(), context)) {
c.output(p);
}
c.output(execute(c.element(), context));
}

private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction tx) {
Expand All @@ -133,7 +132,8 @@ private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction tx) {
}
}

private static class ReadFromPartitionFn extends DoFn<Partition, Struct> {
@DoFn.BoundedPerElement
private static class ReadFromPartitionFn extends DoFn<List<Partition>, Struct> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to mark it as @BoundedPerElement.

And are we able to provide better size information here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added @BoundedPerElement.
About size information, we have a list of partitions to process, so we have the size of the total elements to process, which I think would be the best information to have about the size.


private final SpannerConfig config;
private final PCollectionView<? extends Transaction> txView;
Expand All @@ -157,19 +157,31 @@ public void teardown() throws Exception {
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
public void processElement(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker)
throws Exception {
Transaction tx = c.sideInput(txView);

BatchReadOnlyTransaction batchTx =
spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId());

Partition p = c.element();
try (ResultSet resultSet = batchTx.execute(p)) {
while (resultSet.next()) {
Struct s = resultSet.getCurrentRowAsStruct();
c.output(s);
List<Partition> partitions = c.element();
for (int i = (int) tracker.currentRestriction().getFrom();
i < (int) tracker.currentRestriction().getTo();
i++) {
if (tracker.tryClaim(Long.valueOf(i))) {
try (ResultSet resultSet = batchTx.execute(partitions.get(i))) {
while (resultSet.next()) {
Struct s = resultSet.getCurrentRowAsStruct();
c.output(s);
}
}
}
}
}

@GetInitialRestriction
public OffsetRange getInitialRange(@Element List<Partition> partitions) {
return new OffsetRange(0L, partitions.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OffsetsetRange is usually a range where number >= start && number < end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking for some examples, and all I could find used the size of the collection of elements as end, and changing it to a number < end gave me some incorrect results.

}
}
}