From 0a6cd08dafe7be3c4a06afcca94ff71386150d07 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 7 Jun 2017 16:27:01 -0700 Subject: [PATCH 1/5] SpannerIO: Introduced a MutationGroup. Allows to group together mutation in a logical bundle that is submitted in the same transaction. --- .../sdk/io/gcp/spanner/MutationGroup.java | 84 +++++++++++++++++++ .../io/gcp/spanner/MutationSizeEstimator.java | 9 ++ .../beam/sdk/io/gcp/spanner/SpannerIO.java | 53 ++++++++++-- .../spanner/MutationSizeEstimatorTest.java | 12 +++ .../sdk/io/gcp/spanner/SpannerIOTest.java | 56 ++++++++++--- 5 files changed, 193 insertions(+), 21 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java new file mode 100644 index 000000000000..da8e60b38cef --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.Mutation; +import com.google.common.collect.ImmutableList; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * A bundle of mutations that must be submitted atomically. + * + *

One of the mutations is chosen to be "primary", and can be used to determine partitions. + */ +public final class MutationGroup implements Serializable, Iterable { + private final ImmutableList mutations; + + public static Builder withPrimary(Mutation primary) { + return new Builder(primary); + } + + @Override + public Iterator iterator() { + return mutations.iterator(); + } + + /** Builder for {@link org.apache.beam.sdk.io.gcp.spanner.MutationGroup}. */ + public static class Builder { + private final ImmutableList.Builder builder; + + private Builder(Mutation primary) { + this.builder = ImmutableList.builder().add(primary); + } + + public Builder attach(Mutation m) { + this.builder.add(m); + return this; + } + + public Builder attach(Iterable mutations) { + this.builder.addAll(mutations); + return this; + } + + public Builder attach(Mutation... mutations) { + this.builder.addAll(Arrays.asList(mutations)); + return this; + } + + public MutationGroup build() { + return new MutationGroup(builder.build()); + } + } + + private MutationGroup(ImmutableList mutations) { + this.mutations = mutations; + } + + public Mutation primary() { + return mutations.get(0); + } + + public List attached() { + return mutations.subList(1, mutations.size()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java index 61652e736e90..241881693f8d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java @@ -44,6 +44,15 @@ static long sizeOf(Mutation m) { return result; } + /** Estimates a size of the mutation group in bytes. */ + public static long sizeOf(MutationGroup group) { + long result = 0; + for (Mutation m : group) { + result += sizeOf(m); + } + return result; + } + private static long estimatePrimitiveValue(Value v) { switch (v.getType().getCode()) { case BOOL: diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 5058d13f77bb..4cce51f83748 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -29,10 +29,12 @@ import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; import java.io.IOException; import java.util.ArrayList; import java.util.List; import javax.annotation.Nullable; + import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -88,6 +90,11 @@ *

  • If the pipeline was unexpectedly stopped, mutations that were already applied will not get * rolled back. * + * + *

    Use {@link MutationGroup} to ensure that a small set mutations is bundled together. It is + * guaranteed that mutations in a group are submitted in the same transaction. Build + * {@link SpannerIO.Write} transform, and call {@link Write#grouped()} method. It will return a + * transformation that can be applied to a PCollection of MutationGroup. */ @Experimental(Experimental.Kind.SOURCE_SINK) public class SpannerIO { @@ -187,6 +194,13 @@ public Write withDatabaseId(String databaseId) { return toBuilder().setDatabaseId(databaseId).build(); } + /** + * Same transform but can be applied to {@link PCollection} of {@link MutationGroup}. + */ + public WriteGroup grouped() { + return new WriteGroup(this); + } + @VisibleForTesting Write withServiceFactory(ServiceFactory serviceFactory) { return toBuilder().setServiceFactory(serviceFactory).build(); @@ -204,7 +218,9 @@ public void validate(PipelineOptions options) { @Override public PDone expand(PCollection input) { - input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteFn(this))); + input + .apply("To mutation group", ParDo.of(new ToMutationGroupFn())) + .apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(this))); return PDone.in(input.getPipeline()); } @@ -227,15 +243,37 @@ public void populateDisplayData(DisplayData.Builder builder) { } } + /** Same as {@link Write} but supports grouped mutations. */ + public static class WriteGroup extends PTransform, PDone> { + private final Write spec; + + public WriteGroup(Write spec) { + this.spec = spec; + } + + @Override public PDone expand(PCollection input) { + input.apply("Write mutations to Cloud Spanner", ParDo.of(new SpannerWriteGroupFn(spec))); + return PDone.in(input.getPipeline()); + } + } + + private static class ToMutationGroupFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Mutation value = c.element(); + c.output(MutationGroup.withPrimary(value).build()); + } + } + /** Batches together and writes mutations to Google Cloud Spanner. */ @VisibleForTesting - static class SpannerWriteFn extends DoFn { - private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteFn.class); + static class SpannerWriteGroupFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class); private final Write spec; private transient Spanner spanner; private transient DatabaseClient dbClient; // Current batch of mutations to be written. - private List mutations; + private List mutations; private long batchSizeBytes = 0; private static final int MAX_RETRIES = 5; @@ -244,8 +282,7 @@ static class SpannerWriteFn extends DoFn { .withMaxRetries(MAX_RETRIES) .withInitialBackoff(Duration.standardSeconds(5)); - @VisibleForTesting - SpannerWriteFn(Write spec) { + @VisibleForTesting SpannerWriteGroupFn(Write spec) { this.spec = spec; } @@ -261,7 +298,7 @@ public void setup() throws Exception { @ProcessElement public void processElement(ProcessContext c) throws Exception { - Mutation m = c.element(); + MutationGroup m = c.element(); mutations.add(m); batchSizeBytes += MutationSizeEstimator.sizeOf(m); if (batchSizeBytes >= spec.getBatchSizeBytes()) { @@ -319,7 +356,7 @@ private void flushBatch() throws AbortedException, IOException, InterruptedExcep while (true) { // Batch upsert rows. try { - dbClient.writeAtLeastOnce(mutations); + dbClient.writeAtLeastOnce(Iterables.concat(mutations)); // Break if the commit threw no exception. break; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java index 03eb28ed943d..94af2b98b4cf 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java @@ -135,4 +135,16 @@ public void dates() throws Exception { assertThat(MutationSizeEstimator.sizeOf(timestampArray), is(24L)); assertThat(MutationSizeEstimator.sizeOf(dateArray), is(48L)); } + + @Test + public void group() throws Exception { + Mutation int64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); + Mutation float64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2.9).build(); + Mutation bool = Mutation.newInsertOrUpdateBuilder("test").set("one").to(false).build(); + + MutationGroup group = MutationGroup.withPrimary(int64).attach(float64).attach(bool).build(); + + assertThat(MutationSizeEstimator.sizeOf(group), is(17L)); + } + } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java index 5bdfea5522b2..2a03df396757 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java @@ -115,8 +115,8 @@ public void singleMutationPipeline() throws Exception { @Test public void batching() throws Exception { - Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); - Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); + MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build()); SpannerIO.Write write = SpannerIO.write() .withProjectId("test-project") @@ -124,8 +124,8 @@ public void batching() throws Exception { .withDatabaseId("test-database") .withBatchSizeBytes(1000000000) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write); - DoFnTester fnTester = DoFnTester.of(writerFn); + SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + DoFnTester fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two)); verify(serviceFactory.mockSpanner()) @@ -136,9 +136,9 @@ public void batching() throws Exception { @Test public void batchingGroups() throws Exception { - Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); - Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); - Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build(); + MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); + MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build()); + MutationGroup three = g(Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build()); // Have a room to accumulate one more item. long batchSize = MutationSizeEstimator.sizeOf(one) + 1; @@ -150,8 +150,8 @@ public void batchingGroups() throws Exception { .withDatabaseId("test-database") .withBatchSizeBytes(batchSize) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write); - DoFnTester fnTester = DoFnTester.of(writerFn); + SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + DoFnTester fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two, three)); verify(serviceFactory.mockSpanner()) @@ -164,8 +164,8 @@ public void batchingGroups() throws Exception { @Test public void noBatching() throws Exception { - Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); - Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); + MutationGroup two = g(Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build()); SpannerIO.Write write = SpannerIO.write() .withProjectId("test-project") @@ -173,8 +173,8 @@ public void noBatching() throws Exception { .withDatabaseId("test-database") .withBatchSizeBytes(0) // turn off batching. .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteFn writerFn = new SpannerIO.SpannerWriteFn(write); - DoFnTester fnTester = DoFnTester.of(writerFn); + SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + DoFnTester fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two)); verify(serviceFactory.mockSpanner()) @@ -183,6 +183,32 @@ public void noBatching() throws Exception { .writeAtLeastOnce(argThat(new IterableOfSize(1))); } + @Test + public void groups() throws Exception { + Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); + Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build(); + + // Smallest batch size + long batchSize = 1; + + SpannerIO.Write write = + SpannerIO.write() + .withProjectId("test-project") + .withInstanceId("test-instance") + .withDatabaseId("test-database") + .withBatchSizeBytes(batchSize) + .withServiceFactory(serviceFactory); + SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + DoFnTester fnTester = DoFnTester.of(writerFn); + fnTester.processBundle(Arrays.asList(g(one, two, three))); + + verify(serviceFactory.mockSpanner()) + .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnce(argThat(new IterableOfSize(3))); + } + private static class FakeServiceFactory implements ServiceFactory, Serializable { // Marked as static so they could be returned by serviceFactory, which is serializable. @@ -241,4 +267,8 @@ public boolean matches(Object argument) { return argument instanceof Iterable && Iterables.size((Iterable) argument) == size; } } + + private static MutationGroup g(Mutation m, Mutation... other) { + return MutationGroup.withPrimary(m).attach(other).build(); + } } From b3d81e348b04fcf24705997508849bd821f85d99 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 12 Jun 2017 14:49:16 -0700 Subject: [PATCH 2/5] Removed one of the `attach` methods in favor of varargs alternative --- .../org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java index da8e60b38cef..f5cfd0ed29cd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java @@ -50,11 +50,6 @@ private Builder(Mutation primary) { this.builder = ImmutableList.builder().add(primary); } - public Builder attach(Mutation m) { - this.builder.add(m); - return this; - } - public Builder attach(Iterable mutations) { this.builder.addAll(mutations); return this; From a79c0c0f6966b168785acd034e0037e1149e7329 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 12 Jun 2017 17:02:16 -0700 Subject: [PATCH 3/5] WriteGroup -> WriteGrouped --- .../org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 4cce51f83748..88cfedca544d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -197,8 +197,8 @@ public Write withDatabaseId(String databaseId) { /** * Same transform but can be applied to {@link PCollection} of {@link MutationGroup}. */ - public WriteGroup grouped() { - return new WriteGroup(this); + public WriteGrouped grouped() { + return new WriteGrouped(this); } @VisibleForTesting @@ -244,10 +244,10 @@ public void populateDisplayData(DisplayData.Builder builder) { } /** Same as {@link Write} but supports grouped mutations. */ - public static class WriteGroup extends PTransform, PDone> { + public static class WriteGrouped extends PTransform, PDone> { private final Write spec; - public WriteGroup(Write spec) { + public WriteGrouped(Write spec) { this.spec = spec; } From a16040e742f2fcd09f75ff016233d0e663408e21 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 12 Jun 2017 17:12:47 -0700 Subject: [PATCH 4/5] Grouped pipeline test --- .../sdk/io/gcp/spanner/SpannerIOTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java index 2a03df396757..4120e7dbc9a6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java @@ -113,6 +113,28 @@ public void singleMutationPipeline() throws Exception { .writeAtLeastOnce(argThat(new IterableOfSize(1))); } + @Test + @Category(NeedsRunner.class) + public void singleMutationGroupPipeline() throws Exception { + Mutation one = Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build(); + Mutation two = Mutation.newInsertOrUpdateBuilder("test").set("two").to(2).build(); + Mutation three = Mutation.newInsertOrUpdateBuilder("test").set("three").to(3).build(); + PCollection mutations = pipeline + .apply(Create.of(g(one, two, three))); + mutations.apply( + SpannerIO.write() + .withProjectId("test-project") + .withInstanceId("test-instance") + .withDatabaseId("test-database") + .withServiceFactory(serviceFactory) + .grouped()); + pipeline.run(); + verify(serviceFactory.mockSpanner()) + .getDatabaseClient(DatabaseId.of("test-project", "test-instance", "test-database")); + verify(serviceFactory.mockDatabaseClient(), times(1)) + .writeAtLeastOnce(argThat(new IterableOfSize(3))); + } + @Test public void batching() throws Exception { MutationGroup one = g(Mutation.newInsertOrUpdateBuilder("test").set("one").to(1).build()); From 3e5be31c236eddb554d3c87cec6c6db354aefe0d Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 13 Jun 2017 13:20:03 -0700 Subject: [PATCH 5/5] Factory methods instead of MutationGroup.Builder. --- .../sdk/io/gcp/spanner/MutationGroup.java | 38 +++++++------------ .../beam/sdk/io/gcp/spanner/SpannerIO.java | 2 +- .../spanner/MutationSizeEstimatorTest.java | 2 +- .../sdk/io/gcp/spanner/SpannerIOTest.java | 2 +- 4 files changed, 16 insertions(+), 28 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java index f5cfd0ed29cd..5b08da2f2536 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationGroup.java @@ -33,8 +33,19 @@ public final class MutationGroup implements Serializable, Iterable { private final ImmutableList mutations; - public static Builder withPrimary(Mutation primary) { - return new Builder(primary); + /** + * Creates a new group. + * + * @param primary a primary mutation. + * @param other other mutations, usually interleaved in parent. + * @return new mutation group. + */ + public static MutationGroup create(Mutation primary, Mutation... other) { + return create(primary, Arrays.asList(other)); + } + + public static MutationGroup create(Mutation primary, Iterable other) { + return new MutationGroup(ImmutableList.builder().add(primary).addAll(other).build()); } @Override @@ -42,29 +53,6 @@ public Iterator iterator() { return mutations.iterator(); } - /** Builder for {@link org.apache.beam.sdk.io.gcp.spanner.MutationGroup}. */ - public static class Builder { - private final ImmutableList.Builder builder; - - private Builder(Mutation primary) { - this.builder = ImmutableList.builder().add(primary); - } - - public Builder attach(Iterable mutations) { - this.builder.addAll(mutations); - return this; - } - - public Builder attach(Mutation... mutations) { - this.builder.addAll(Arrays.asList(mutations)); - return this; - } - - public MutationGroup build() { - return new MutationGroup(builder.build()); - } - } - private MutationGroup(ImmutableList mutations) { this.mutations = mutations; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 88cfedca544d..af5253ba1f3b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -261,7 +261,7 @@ private static class ToMutationGroupFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) throws Exception { Mutation value = c.element(); - c.output(MutationGroup.withPrimary(value).build()); + c.output(MutationGroup.create(value)); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java index 94af2b98b4cf..013b83d45866 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimatorTest.java @@ -142,7 +142,7 @@ public void group() throws Exception { Mutation float64 = Mutation.newInsertOrUpdateBuilder("test").set("one").to(2.9).build(); Mutation bool = Mutation.newInsertOrUpdateBuilder("test").set("one").to(false).build(); - MutationGroup group = MutationGroup.withPrimary(int64).attach(float64).attach(bool).build(); + MutationGroup group = MutationGroup.create(int64, float64, bool); assertThat(MutationSizeEstimator.sizeOf(group), is(17L)); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java index 4120e7dbc9a6..4a759fb11917 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java @@ -291,6 +291,6 @@ public boolean matches(Object argument) { } private static MutationGroup g(Mutation m, Mutation... other) { - return MutationGroup.withPrimary(m).attach(other).build(); + return MutationGroup.create(m, other); } }