From 560b003dc48cb62d520aff94d1e3b4620310c3e3 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 14 Jun 2017 13:03:36 -0700 Subject: [PATCH 01/16] Support ValueProviders. --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 8bfc247adda8..1d5cdc4435fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -109,9 +109,8 @@ public class SpannerIO { */ @Experimental public static Write write() { - return new AutoValue_SpannerIO_Write.Builder() - .setBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES) - .build(); + return new AutoValue_SpannerIO_Write.Builder().build() + .withBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES); } /** @@ -132,7 +131,8 @@ public abstract static class Write extends PTransform, PDo @Nullable abstract ValueProvider getDatabaseId(); - abstract long getBatchSizeBytes(); + @Nullable + abstract ValueProvider getBatchSizeBytes(); @Nullable @VisibleForTesting @@ -149,7 +149,7 @@ abstract static class Builder { abstract Builder setDatabaseId(ValueProvider databaseId); - abstract Builder setBatchSizeBytes(long batchSizeBytes); + abstract Builder setBatchSizeBytes(ValueProvider batchSizeBytes); @VisibleForTesting abstract Builder setServiceFactory(ServiceFactory serviceFactory); @@ -190,6 +190,10 @@ public Write withInstanceId(ValueProvider instanceId) { *

Does not modify this object. */ public Write withBatchSizeBytes(long batchSizeBytes) { + return withBatchSizeBytes(ValueProvider.StaticValueProvider.of(batchSizeBytes)); + } + + public Write withBatchSizeBytes(ValueProvider batchSizeBytes) { return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); } @@ -314,7 +318,7 @@ public void processElement(ProcessContext c) throws Exception { MutationGroup m = c.element(); mutations.add(m); batchSizeBytes += MutationSizeEstimator.sizeOf(m); - if (batchSizeBytes >= spec.getBatchSizeBytes()) { + if (batchSizeBytes >= spec.getBatchSizeBytes().get()) { flushBatch(); } } From 78537fd5dc2efe5b5b529039c964e516cb4b6a03 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 19 Jun 2017 10:11:55 -0700 Subject: [PATCH 02/16] ValueProvider for the batch size is not a good idea. --- .../org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 1d5cdc4435fe..8cc327cee729 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -132,7 +132,7 @@ public abstract static class Write extends PTransform, PDo abstract ValueProvider getDatabaseId(); @Nullable - abstract ValueProvider getBatchSizeBytes(); + abstract Long getBatchSizeBytes(); @Nullable @VisibleForTesting @@ -149,7 +149,7 @@ abstract static class Builder { abstract Builder setDatabaseId(ValueProvider databaseId); - abstract Builder setBatchSizeBytes(ValueProvider batchSizeBytes); + abstract Builder setBatchSizeBytes(Long batchSizeBytes); @VisibleForTesting abstract Builder setServiceFactory(ServiceFactory serviceFactory); @@ -190,10 +190,6 @@ public Write withInstanceId(ValueProvider instanceId) { *

Does not modify this object. */ public Write withBatchSizeBytes(long batchSizeBytes) { - return withBatchSizeBytes(ValueProvider.StaticValueProvider.of(batchSizeBytes)); - } - - public Write withBatchSizeBytes(ValueProvider batchSizeBytes) { return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); } @@ -318,7 +314,7 @@ public void processElement(ProcessContext c) throws Exception { MutationGroup m = c.element(); mutations.add(m); batchSizeBytes += MutationSizeEstimator.sizeOf(m); - if (batchSizeBytes >= spec.getBatchSizeBytes().get()) { + if (batchSizeBytes >= spec.getBatchSizeBytes()) { flushBatch(); } } From 9bde4503fcae3c36751480fb24c7f09340b275f1 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 19 Jun 2017 12:23:52 -0700 Subject: [PATCH 03/16] Bump spanner version --- pom.xml | 2 +- .../java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 2 +- .../java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java | 3 --- .../org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java | 2 +- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 29bb4ebed18e..f06568b77860 100644 --- a/pom.xml +++ b/pom.xml @@ -138,7 +138,7 @@ 3.2.0 v1-rev10-1.22.0 1.7.14 - 0.16.0-beta + 0.20.0-beta 1.6.2 4.3.5.RELEASE 3.1.4 diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 8cc327cee729..252852eef0fd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -337,7 +337,7 @@ public void teardown() throws Exception { if (spanner == null) { return; } - spanner.closeAsync().get(); + spanner.close(); spanner = null; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java index 1e19a59c4849..0cc08bfc0318 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; -import com.google.api.core.ApiFuture; import com.google.cloud.ServiceFactory; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; @@ -274,10 +273,8 @@ public FakeServiceFactory() { mockSpanners.add(mock(Spanner.class, withSettings().serializable())); mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable())); } - ApiFuture voidFuture = mock(ApiFuture.class, withSettings().serializable()); when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class))) .thenReturn(mockDatabaseClient()); - when(mockSpanner().closeAsync()).thenReturn(voidFuture); } DatabaseClient mockDatabaseClient() { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java index e1f6582749a4..33532c929bab 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java @@ -150,7 +150,7 @@ public void testWrite() throws Exception { @After public void tearDown() throws Exception { databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName); - spanner.closeAsync().get(); + spanner.close(); } private static class GenerateMutations extends DoFn { From e9528e16a4e3a9a8b8b352fcd2183c505f32bfa9 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 19 Jun 2017 13:01:20 -0700 Subject: [PATCH 04/16] Pre read api refactoring. Extract `SpannerConfig` and `AbstractSpannerFn` --- .../sdk/io/gcp/spanner/AbstractSpannerFn.java | 41 ++++ .../sdk/io/gcp/spanner/SpannerConfig.java | 118 +++++++++ .../beam/sdk/io/gcp/spanner/SpannerIO.java | 227 ++++-------------- .../io/gcp/spanner/SpannerWriteGroupFn.java | 108 +++++++++ .../sdk/io/gcp/spanner/SpannerIOTest.java | 8 +- 5 files changed, 321 insertions(+), 181 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java new file mode 100644 index 000000000000..08f7fa9cb60f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java @@ -0,0 +1,41 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * Abstract {@link DoFn} that manages {@link Spanner} lifecycle. Use {@link + * AbstractSpannerFn#databaseClient} to access the Cloud Spanner database client. + */ +abstract class AbstractSpannerFn extends DoFn { + private transient Spanner spanner; + private transient DatabaseClient databaseClient; + + abstract SpannerConfig getSpannerConfig(); + + @Setup + public void setup() throws Exception { + SpannerConfig spannerConfig = getSpannerConfig(); + SpannerOptions options = spannerConfig.buildSpannerOptions(); + spanner = options.getService(); + databaseClient = spanner.getDatabaseClient(DatabaseId + .of(options.getProjectId(), spannerConfig.getInstanceId().get(), + spannerConfig.getDatabaseId().get())); + } + + @Teardown + public void teardown() throws Exception { + if (spanner == null) { + return; + } + spanner.close(); + spanner = null; + } + + protected DatabaseClient databaseClient() { + return databaseClient; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java new file mode 100644 index 000000000000..4cb8aa28bd63 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -0,0 +1,118 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import com.google.cloud.ServiceFactory; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.common.annotations.VisibleForTesting; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.display.DisplayData; + +/** Configuration for a Cloud Spanner client. */ +@AutoValue +public abstract class SpannerConfig implements Serializable { + + private static final long serialVersionUID = -5680874609304170301L; + + @Nullable + abstract ValueProvider getProjectId(); + + @Nullable + abstract ValueProvider getInstanceId(); + + @Nullable + abstract ValueProvider getDatabaseId(); + + @Nullable + @VisibleForTesting + abstract ServiceFactory getServiceFactory(); + + abstract Builder toBuilder(); + + SpannerOptions buildSpannerOptions() { + SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + if (getProjectId() != null) { + builder.setProjectId(getProjectId().get()); + } + if (getServiceFactory() != null) { + builder.setServiceFactory(getServiceFactory()); + } + return builder.build(); + } + + public static SpannerConfig create() { + return builder().build(); + } + + public static Builder builder() { + return new AutoValue_SpannerConfig.Builder(); + } + + public void validate(PipelineOptions options) { + checkNotNull( + getInstanceId(), + "SpannerIO.read() requires instance id to be set with withInstanceId method"); + checkNotNull( + getDatabaseId(), + "SpannerIO.read() requires database id to be set with withDatabaseId method"); + } + + public void populateDisplayData(DisplayData.Builder builder) { + builder + .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("Output Project")) + .addIfNotNull(DisplayData.item("instanceId", getInstanceId()).withLabel("Output Instance")) + .addIfNotNull(DisplayData.item("databaseId", getDatabaseId()).withLabel("Output Database")); + + if (getServiceFactory() != null) { + builder.addIfNotNull( + DisplayData.item("serviceFactory", getServiceFactory().getClass().getName()) + .withLabel("Service Factory")); + } + } + + /** Builder for {@link SpannerConfig}. */ + @AutoValue.Builder + public abstract static class Builder { + + + abstract Builder setProjectId(ValueProvider projectId); + + abstract Builder setInstanceId(ValueProvider instanceId); + + abstract Builder setDatabaseId(ValueProvider databaseId); + + + abstract Builder setServiceFactory(ServiceFactory serviceFactory); + + public abstract SpannerConfig build(); + } + + public SpannerConfig withProjectId(ValueProvider projectId) { + return toBuilder().setProjectId(projectId).build(); + } + + public SpannerConfig withProjectId(String projectId) { + return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); + } + + public SpannerConfig withInstanceId(ValueProvider instanceId) { + return toBuilder().setInstanceId(instanceId).build(); + } + + public SpannerConfig withInstanceId(String instanceId) { + return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); + } + + public SpannerConfig withDatabaseId(ValueProvider databaseId) { + return toBuilder().setDatabaseId(databaseId).build(); + } + + public SpannerConfig withDatabaseId(String databaseId) { + return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 252852eef0fd..74723b0237ba 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -17,22 +17,13 @@ */ package org.apache.beam.sdk.io.gcp.spanner; -import static com.google.common.base.Preconditions.checkNotNull; - import com.google.auto.value.AutoValue; import com.google.cloud.ServiceFactory; -import com.google.cloud.ServiceOptions; -import com.google.cloud.spanner.AbortedException; -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Iterables; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; + import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; @@ -42,16 +33,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayData.Builder; -import org.apache.beam.sdk.util.BackOff; -import org.apache.beam.sdk.util.BackOffUtils; -import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Experimental {@link PTransform Transforms} for reading from and writing to , PDone> { - @Nullable - abstract ValueProvider getProjectId(); + private static final long serialVersionUID = 1920175411827980145L; - @Nullable - abstract ValueProvider getInstanceId(); - - @Nullable - abstract ValueProvider getDatabaseId(); + abstract SpannerConfig getSpannerConfig(); @Nullable abstract Long getBatchSizeBytes(); - @Nullable - @VisibleForTesting - abstract ServiceFactory getServiceFactory(); - abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { - abstract Builder setProjectId(ValueProvider projectId); + abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - abstract Builder setInstanceId(ValueProvider instanceId); - - abstract Builder setDatabaseId(ValueProvider databaseId); + abstract SpannerConfig.Builder spannerConfigBuilder(); abstract Builder setBatchSizeBytes(Long batchSizeBytes); - @VisibleForTesting - abstract Builder setServiceFactory(ServiceFactory serviceFactory); - abstract Write build(); } @@ -166,8 +135,15 @@ public Write withProjectId(String projectId) { return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); } + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner project. + * + *

Does not modify this object. + */ public Write withProjectId(ValueProvider projectId) { - return toBuilder().setProjectId(projectId).build(); + Write.Builder builder = toBuilder(); + builder.spannerConfigBuilder().setProjectId(projectId); + return builder.build(); } /** @@ -180,10 +156,29 @@ public Write withInstanceId(String instanceId) { return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); } + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * instance. + * + *

Does not modify this object. + */ public Write withInstanceId(ValueProvider instanceId) { - return toBuilder().setInstanceId(instanceId).build(); + Write.Builder builder = toBuilder(); + builder.spannerConfigBuilder().setInstanceId(instanceId); + return builder.build(); + } + + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * config. + * + *

Does not modify this object. + */ + public Write withSpannerConfig(SpannerConfig spannerConfig) { + return toBuilder().setSpannerConfig(spannerConfig).build(); } + /** * Returns a new {@link SpannerIO.Write} with a new batch size limit. * @@ -203,8 +198,16 @@ public Write withDatabaseId(String databaseId) { return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); } + /** + * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner + * database. + * + *

Does not modify this object. + */ public Write withDatabaseId(ValueProvider databaseId) { - return toBuilder().setDatabaseId(databaseId).build(); + Write.Builder builder = toBuilder(); + builder.spannerConfigBuilder().setDatabaseId(databaseId); + return builder.build(); } /** @@ -216,17 +219,14 @@ public WriteGrouped grouped() { @VisibleForTesting Write withServiceFactory(ServiceFactory serviceFactory) { - return toBuilder().setServiceFactory(serviceFactory).build(); + Write.Builder builder = toBuilder(); + builder.spannerConfigBuilder().setServiceFactory(serviceFactory); + return builder.build(); } @Override public void validate(PipelineOptions options) { - checkNotNull( - getInstanceId(), - "SpannerIO.write() requires instance id to be set with withInstanceId method"); - checkNotNull( - getDatabaseId(), - "SpannerIO.write() requires database id to be set with withDatabaseId method"); + getSpannerConfig().validate(options); } @Override @@ -237,22 +237,13 @@ public PDone expand(PCollection input) { return PDone.in(input.getPipeline()); } + @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("Output Project")) - .addIfNotNull( - DisplayData.item("instanceId", getInstanceId()).withLabel("Output Instance")) - .addIfNotNull( - DisplayData.item("databaseId", getDatabaseId()).withLabel("Output Database")) - .add(DisplayData.item("batchSizeBytes", getBatchSizeBytes()) - .withLabel("Batch Size in Bytes")); - if (getServiceFactory() != null) { - builder.addIfNotNull( - DisplayData.item("serviceFactory", getServiceFactory().getClass().getName()) - .withLabel("Service Factory")); - } + getSpannerConfig().populateDisplayData(builder); + builder.add( + DisplayData.item("batchSizeBytes", getBatchSizeBytes()).withLabel("Batch Size in Bytes")); } } @@ -278,123 +269,5 @@ public void processElement(ProcessContext c) throws Exception { } } - /** Batches together and writes mutations to Google Cloud Spanner. */ - @VisibleForTesting - static class SpannerWriteGroupFn extends DoFn { - private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class); - private final Write spec; - private transient Spanner spanner; - private transient DatabaseClient dbClient; - // Current batch of mutations to be written. - private List mutations; - private long batchSizeBytes = 0; - - private static final int MAX_RETRIES = 5; - private static final FluentBackoff BUNDLE_WRITE_BACKOFF = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RETRIES) - .withInitialBackoff(Duration.standardSeconds(5)); - - @VisibleForTesting SpannerWriteGroupFn(Write spec) { - this.spec = spec; - } - - @Setup - public void setup() throws Exception { - SpannerOptions spannerOptions = getSpannerOptions(); - spanner = spannerOptions.getService(); - dbClient = spanner.getDatabaseClient( - DatabaseId.of(projectId(), spec.getInstanceId().get(), spec.getDatabaseId().get())); - mutations = new ArrayList<>(); - batchSizeBytes = 0; - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - MutationGroup m = c.element(); - mutations.add(m); - batchSizeBytes += MutationSizeEstimator.sizeOf(m); - if (batchSizeBytes >= spec.getBatchSizeBytes()) { - flushBatch(); - } - } - - private String projectId() { - return spec.getProjectId() == null - ? ServiceOptions.getDefaultProjectId() - : spec.getProjectId().get(); - } - - @FinishBundle - public void finishBundle() throws Exception { - if (!mutations.isEmpty()) { - flushBatch(); - } - } - - @Teardown - public void teardown() throws Exception { - if (spanner == null) { - return; - } - spanner.close(); - spanner = null; - } - - private SpannerOptions getSpannerOptions() { - SpannerOptions.Builder spannerOptionsBuider = SpannerOptions.newBuilder(); - if (spec.getServiceFactory() != null) { - spannerOptionsBuider.setServiceFactory(spec.getServiceFactory()); - } - if (spec.getProjectId() != null) { - spannerOptionsBuider.setProjectId(spec.getProjectId().get()); - } - return spannerOptionsBuider.build(); - } - - /** - * Writes a batch of mutations to Cloud Spanner. - * - *

If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. If the retry limit - * is exceeded, the last exception from Cloud Spanner will be thrown. - * - * @throws AbortedException if the commit fails or IOException or InterruptedException if - * backing off between retries fails. - */ - private void flushBatch() throws AbortedException, IOException, InterruptedException { - LOG.debug("Writing batch of {} mutations", mutations.size()); - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); - - while (true) { - // Batch upsert rows. - try { - dbClient.writeAtLeastOnce(Iterables.concat(mutations)); - - // Break if the commit threw no exception. - break; - } catch (AbortedException exception) { - // Only log the code and message for potentially-transient errors. The entire exception - // will be propagated upon the last retry. - LOG.error( - "Error writing to Spanner ({}): {}", exception.getCode(), exception.getMessage()); - if (!BackOffUtils.next(sleeper, backoff)) { - LOG.error("Aborting after {} retries.", MAX_RETRIES); - throw exception; - } - } - } - LOG.debug("Successfully wrote {} mutations", mutations.size()); - mutations = new ArrayList<>(); - batchSizeBytes = 0; - } - - @Override - public void populateDisplayData(Builder builder) { - super.populateDisplayData(builder); - spec.populateDisplayData(builder); - } - } - private SpannerIO() {} // Prevent construction. } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java new file mode 100644 index 000000000000..aed4832b7d86 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java @@ -0,0 +1,108 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.AbortedException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterables; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Batches together and writes mutations to Google Cloud Spanner. */ +@VisibleForTesting class SpannerWriteGroupFn extends AbstractSpannerFn { + private static final Logger LOG = LoggerFactory.getLogger(SpannerWriteGroupFn.class); + private final SpannerIO.Write spec; + // Current batch of mutations to be written. + private List mutations; + private long batchSizeBytes = 0; + + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BUNDLE_WRITE_BACKOFF = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES) + .withInitialBackoff(Duration.standardSeconds(5)); + + @VisibleForTesting SpannerWriteGroupFn(SpannerIO.Write spec) { + this.spec = spec; + } + + @Override SpannerConfig getSpannerConfig() { + return spec.getSpannerConfig(); + } + + @Setup + public void setup() throws Exception { + super.setup(); + mutations = new ArrayList<>(); + batchSizeBytes = 0; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + MutationGroup m = c.element(); + mutations.add(m); + batchSizeBytes += MutationSizeEstimator.sizeOf(m); + if (batchSizeBytes >= spec.getBatchSizeBytes()) { + flushBatch(); + } + } + + @FinishBundle + public void finishBundle() throws Exception { + if (!mutations.isEmpty()) { + flushBatch(); + } + } + + /** + * Writes a batch of mutations to Cloud Spanner. + * + *

If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. If the retry limit + * is exceeded, the last exception from Cloud Spanner will be thrown. + * + * @throws AbortedException if the commit fails or IOException or InterruptedException if + * backing off between retries fails. + */ + private void flushBatch() throws AbortedException, IOException, InterruptedException { + LOG.debug("Writing batch of {} mutations", mutations.size()); + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); + + while (true) { + // Batch upsert rows. + try { + databaseClient().writeAtLeastOnce(Iterables.concat(mutations)); + + // Break if the commit threw no exception. + break; + } catch (AbortedException exception) { + // Only log the code and message for potentially-transient errors. The entire exception + // will be propagated upon the last retry. + LOG.error( + "Error writing to Spanner ({}): {}", exception.getCode(), exception.getMessage()); + if (!BackOffUtils.next(sleeper, backoff)) { + LOG.error("Aborting after {} retries.", MAX_RETRIES); + throw exception; + } + } + } + LOG.debug("Successfully wrote {} mutations", mutations.size()); + mutations = new ArrayList<>(); + batchSizeBytes = 0; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + spec.populateDisplayData(builder); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java index 0cc08bfc0318..abeac0a8f4ae 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java @@ -149,7 +149,7 @@ public void batching() throws Exception { .withDatabaseId("test-database") .withBatchSizeBytes(1000000000) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); DoFnTester fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two)); @@ -175,7 +175,7 @@ public void batchingGroups() throws Exception { .withDatabaseId("test-database") .withBatchSizeBytes(batchSize) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); DoFnTester fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two, three)); @@ -198,7 +198,7 @@ public void noBatching() throws Exception { .withDatabaseId("test-database") .withBatchSizeBytes(0) // turn off batching. .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); DoFnTester fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(one, two)); @@ -224,7 +224,7 @@ public void groups() throws Exception { .withDatabaseId("test-database") .withBatchSizeBytes(batchSize) .withServiceFactory(serviceFactory); - SpannerIO.SpannerWriteGroupFn writerFn = new SpannerIO.SpannerWriteGroupFn(write); + SpannerWriteGroupFn writerFn = new SpannerWriteGroupFn(write); DoFnTester fnTester = DoFnTester.of(writerFn); fnTester.processBundle(Arrays.asList(g(one, two, three))); From f3ee44b84fc83741dc7a457fdaa76ac8c2b1b21e Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 19 Jun 2017 13:28:52 -0700 Subject: [PATCH 05/16] Read api with naive implementation --- .../io/gcp/spanner/CreateTransactionFn.java | 36 ++ .../io/gcp/spanner/NaiveSpannerReadFn.java | 50 ++ .../beam/sdk/io/gcp/spanner/SpannerIO.java | 443 +++++++++++++++++- .../beam/sdk/io/gcp/spanner/Transaction.java | 18 + .../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 10 + 5 files changed, 541 insertions(+), 16 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java new file mode 100644 index 000000000000..53f1bf4171e3 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java @@ -0,0 +1,36 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.ReadOnlyTransaction; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Statement; + +/** Creates a batch transaction. */ +class CreateTransactionFn extends AbstractSpannerFn { + + private static final long serialVersionUID = -4174426331092286581L; + + private final SpannerIO.CreateTransaction config; + + CreateTransactionFn(SpannerIO.CreateTransaction config) { + this.config = config; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + try (ReadOnlyTransaction readOnlyTransaction = + databaseClient().readOnlyTransaction(config.getTimestampBound())) { + // Run a dummy sql statement to force the RPC and obtain the timestamp from the server. + ResultSet resultSet = readOnlyTransaction.executeQuery(Statement.of("SELECT 1")); + while (resultSet.next()) { + // do nothing + } + Transaction tx = Transaction.create(readOnlyTransaction.getReadTimestamp()); + c.output(tx); + } + } + + @Override + SpannerConfig getSpannerConfig() { + return config.getSpannerConfig(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java new file mode 100644 index 000000000000..800f71fd0b90 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java @@ -0,0 +1,50 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.ReadOnlyTransaction; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.TimestampBound; +import com.google.common.annotations.VisibleForTesting; + +/** A simplest read function implementation. Parallelism support is coming. */ +@VisibleForTesting +class NaiveSpannerReadFn extends AbstractSpannerFn { + private static final long serialVersionUID = 7645917508410554173L; + + private final SpannerIO.Read config; + + NaiveSpannerReadFn(SpannerIO.Read config) { + this.config = config; + } + + SpannerConfig getSpannerConfig() { + return config.getSpannerConfig(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + TimestampBound timestampBound = TimestampBound.strong(); + if (config.getTransaction() != null) { + Transaction transaction = c.sideInput(config.getTransaction()); + timestampBound = TimestampBound.ofReadTimestamp(transaction.timestamp()); + } + try (ReadOnlyTransaction readOnlyTransaction = + databaseClient().readOnlyTransaction(timestampBound)) { + ResultSet resultSet = execute(readOnlyTransaction); + while (resultSet.next()) { + c.output(resultSet.getCurrentRowAsStruct()); + } + } + } + + private ResultSet execute(ReadOnlyTransaction readOnlyTransaction) { + if (config.getQuery() != null) { + return readOnlyTransaction.executeQuery(config.getQuery()); + } + if (config.getIndex() != null) { + return readOnlyTransaction.readUsingIndex( + config.getTable(), config.getIndex(), config.getKeySet(), config.getColumns()); + } + return readOnlyTransaction.read(config.getTable(), config.getKeySet(), config.getColumns()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 74723b0237ba..42faa19695d4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -17,23 +17,38 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.auto.value.AutoValue; import com.google.cloud.ServiceFactory; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.TimestampBound; import com.google.common.annotations.VisibleForTesting; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; /** @@ -42,7 +57,69 @@ * *

Reading from Cloud Spanner

* - *

This functionality is not yet implemented. + *

To read from Cloud Spanner, apply {@link SpannerIO.Read} transformation. It will return a + * {@link PCollection} of {@link Struct Structs}, where each element represents + * an individual row returned from the read operation. Both Query and Read APIs are supported. + * See more information about reading from + * Cloud Spanner + * + *

To execute a query, specify a {@link SpannerIO.Read#withQuery(Statement)} or + * {@link SpannerIO.Read#withQuery(String)} during the construction of the transform. + * + *

{@code
+ *  PCollection rows = p.apply(
+ *      SpannerIO.read()
+ *          .withInstanceId(instanceId)
+ *          .withDatabaseId(dbId)
+ *          .withQuery("SELECT id, name, email FROM users"));
+ * }
+ * + *

To use the Read API, specify a {@link SpannerIO.Read#withTable(String) table name} and + * a {@link SpannerIO.Read#withColumns(List) list of columns}. + * + *

{@code
+ * PCollection rows = p.apply(
+ *    SpannerIO.read()
+ *        .withInstanceId(instanceId)
+ *        .withDatabaseId(dbId)
+ *        .withTable("users")
+ *        .withColumns("id", "name", "email"));
+ * }
+ * + *

To optimally read using index, specify the index name using {@link SpannerIO.Read#withIndex}. + * + *

The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the + * power of read only transactions. Staleness of data can be controlled using + * {@link SpannerIO.Read#withTimestampBound} or {@link SpannerIO.Read#withTimestamp(Timestamp)} + * methods. Read more about + * transactions in Cloud Spanner. + * + *

It is possible to read several {@link PCollection PCollections} within a single transaction. + * Apply {@link SpannerIO#createTransaction()} transform, that lazily creates a transaction. The + * result of this transformation can be passed to read operation using + * {@link SpannerIO.Read#withTransaction(PCollectionView)}. + * + *

{@code
+ * SpannerConfig spannerConfig = ...
+ *
+ * PCollectionView tx =
+ * p.apply(
+ *    SpannerIO.createTransaction()
+ *        .withSpannerConfig(spannerConfig)
+ *        .withTimestampBound(TimestampBound.strong()));
+ *
+ * PCollection users = p.apply(
+ *    SpannerIO.read()
+ *        .withSpannerConfig(spannerConfig)
+ *        .withQuery("SELECT name, email FROM users")
+ *        .withTransaction(tx));
+ *
+ * PCollection tweets = p.apply(
+ *    SpannerIO.read()
+ *        .withSpannerConfig(spannerConfig)
+ *        .withQuery("SELECT user, tweet, date FROM tweets")
+ *        .withTransaction(tx));
+ * }
* *

Writing to Cloud Spanner

* @@ -85,6 +162,31 @@ public class SpannerIO { private static final long DEFAULT_BATCH_SIZE_BYTES = 1024 * 1024; // 1 MB + /** + * Creates an uninitialized instance of {@link Read}. Before use, the {@link Read} must be + * configured with a {@link Read#withInstanceId} and {@link Read#withDatabaseId} that identify the + * Cloud Spanner database. + */ + @Experimental + public static Read read() { + return new AutoValue_SpannerIO_Read.Builder() + .setTimestampBound(TimestampBound.strong()) + .setKeySet(KeySet.all()) + .build(); + } + + /** + * Returns a transform that creates a batch transaction. By default, + * {@link TimestampBound#strong()} transaction is created, to override this use + * {@link CreateTransaction#withTimestampBound(TimestampBound)}. + */ + @Experimental + public static CreateTransaction createTransaction() { + return new AutoValue_SpannerIO_CreateTransaction.Builder() + .setTimestampBound(TimestampBound.strong()) + .build(); + } + /** * Creates an uninitialized instance of {@link Write}. Before use, the {@link Write} must be * configured with a {@link Write#withInstanceId} and {@link Write#withDatabaseId} that identify @@ -96,6 +198,316 @@ public static Write write() { .withBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES); } + /** + * A {@link PTransform} that reads data from Google Cloud Spanner. + * + * @see SpannerIO + */ + @Experimental(Experimental.Kind.SOURCE_SINK) + @AutoValue + public abstract static class Read extends PTransform> { + + private static final long serialVersionUID = 3124449388202574415L; + + abstract SpannerConfig getSpannerConfig(); + + @Nullable + abstract TimestampBound getTimestampBound(); + + @Nullable + abstract Statement getQuery(); + + @Nullable + abstract String getTable(); + + @Nullable + abstract String getIndex(); + + @Nullable + abstract List getColumns(); + + @Nullable + abstract KeySet getKeySet(); + + @Nullable + abstract PCollectionView getTransaction(); + + abstract Builder toBuilder(); + + public Read withTimestamp(Timestamp timestamp) { + return withTimestampBound(TimestampBound.ofReadTimestamp(timestamp)); + } + + public Read withTimestampBound(TimestampBound timestampBound) { + return toBuilder().setTimestampBound(timestampBound).build(); + } + + public Read withTable(String table) { + return toBuilder().setTable(table).build(); + } + + public Read withColumns(String... columns) { + return withColumns(Arrays.asList(columns)); + } + + public Read withColumns(List columns) { + return toBuilder().setColumns(columns).build(); + } + + public Read withQuery(Statement statement) { + return toBuilder().setQuery(statement).build(); + } + + public Read withQuery(String sql) { + return withQuery(Statement.of(sql)); + } + + public Read withKeySet(KeySet keySet) { + return toBuilder().setKeySet(keySet).build(); + } + + public Read withIndex(String index) { + return toBuilder().setIndex(index).build(); + } + + public Read withSpannerConfig(SpannerConfig spannerConfig) { + return toBuilder().setSpannerConfig(spannerConfig).build(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract SpannerConfig.Builder spannerConfigBuilder(); + + abstract Builder setSpannerConfig(SpannerConfig spannerConfig); + + abstract Builder setTimestampBound(TimestampBound timestampBound); + + abstract Builder setQuery(Statement statement); + + abstract Builder setTable(String table); + + abstract Builder setIndex(String index); + + abstract Builder setColumns(List columns); + + abstract Builder setKeySet(KeySet keySet); + + abstract Builder setTransaction(PCollectionView transaction); + + abstract Read build(); + } + + /** + * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner project. + * + *

Does not modify this object. + */ + public Read withProjectId(String projectId) { + return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); + } + + /** + * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner project. + * + *

Does not modify this object. + */ + public Read withProjectId(ValueProvider projectId) { + Builder builder = toBuilder(); + builder.spannerConfigBuilder().setProjectId(projectId); + return builder.build(); + } + + /** + * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner + * instance. + * + *

Does not modify this object. + */ + public Read withInstanceId(String instanceId) { + return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); + } + + /** + * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner + * instance. + * + *

Does not modify this object. + */ + public Read withInstanceId(ValueProvider instanceId) { + Builder builder = toBuilder(); + builder.spannerConfigBuilder().setInstanceId(instanceId); + return builder.build(); + } + + /** + * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner + * database. + * + *

Does not modify this object. + */ + public Read withDatabaseId(String databaseId) { + return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); + } + + /** + * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner + * database. + * + *

Does not modify this object. + */ + public Read withDatabaseId(ValueProvider databaseId) { + Builder builder = toBuilder(); + builder.spannerConfigBuilder().setDatabaseId(databaseId); + return builder.build(); + } + + @VisibleForTesting + Read withServiceFactory(ServiceFactory serviceFactory) { + Builder builder = toBuilder(); + builder.spannerConfigBuilder().setServiceFactory(serviceFactory); + return builder.build(); + } + + /** + * Returns a new {@link SpannerIO.Read} that will read from the specified Cloud Spanner + * config. + * + *

Does not modify this object. + */ + public Read withTransaction(PCollectionView transaction) { + return toBuilder().setTransaction(transaction).build(); + } + + @Override + public void validate(PipelineOptions options) { + getSpannerConfig().validate(options); + checkNotNull( + getTimestampBound(), + "SpannerIO.read() runs in a read only transaction and requires timestamp to be set " + + "with withTimestampBound or withTimestamp method"); + + if (getQuery() != null) { + // TODO: validate query? + } else if (getTable() != null) { + // Assume read + checkNotNull( + getColumns(), + "For a read operation SpannerIO.read() requires a list of " + + "columns to set with withColumns method"); + checkArgument( + !getColumns().isEmpty(), + "For a read operation SpannerIO.read() requires a" + + " list of columns to set with withColumns method"); + } else { + throw new IllegalArgumentException( + "SpannerIO.read() requires configuring query or read operation."); + } + } + + @Override + public PCollection expand(PBegin input) { + Read config = this; + List> sideInputs = Collections.emptyList(); + if (getTimestampBound() != null) { + PCollectionView transaction = + input.apply(createTransaction().withSpannerConfig(getSpannerConfig())); + config = config.withTransaction(transaction); + sideInputs = Collections.singletonList(transaction); + } + return input + .apply(Create.of(1)) + .apply( + "Execute query", ParDo.of(new NaiveSpannerReadFn(config)).withSideInputs(sideInputs)); + } + } + + /** + * A {@link PTransform} that create a transaction. + * + * @see SpannerIO + */ + @Experimental(Experimental.Kind.SOURCE_SINK) + @AutoValue + public abstract static class CreateTransaction + extends PTransform> { + + private static final long serialVersionUID = 9201734106453817417L; + + abstract SpannerConfig getSpannerConfig(); + + @Nullable abstract TimestampBound getTimestampBound(); + + abstract Builder toBuilder(); + + @Override public PCollectionView expand(PBegin input) { + return input.apply(Create.of(1)) + .apply("Create transaction", ParDo.of(new CreateTransactionFn(this))) + .apply("As PCollectionView", View.asSingleton()); + } + + public CreateTransaction withProjectId(String projectId) { + return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); + } + + public CreateTransaction withProjectId(ValueProvider projectId) { + Builder builder = toBuilder(); + builder.spannerConfigBuilder().setProjectId(projectId); + return builder.build(); + } + + public CreateTransaction withInstanceId(String instanceId) { + return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); + } + + public CreateTransaction withInstanceId(ValueProvider instanceId) { + Builder builder = toBuilder(); + builder.spannerConfigBuilder().setInstanceId(instanceId); + return builder.build(); + } + + public CreateTransaction withSpannerConfig(SpannerConfig spannerConfig) { + return toBuilder().setSpannerConfig(spannerConfig).build(); + } + + public CreateTransaction withDatabaseId(String databaseId) { + return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); + } + + public CreateTransaction withDatabaseId(ValueProvider databaseId) { + Builder builder = toBuilder(); + builder.spannerConfigBuilder().setDatabaseId(databaseId); + return builder.build(); + } + + @VisibleForTesting CreateTransaction withServiceFactory( + ServiceFactory serviceFactory) { + Builder builder = toBuilder(); + builder.spannerConfigBuilder().setServiceFactory(serviceFactory); + return builder.build(); + } + + public CreateTransaction withTimestampBound(TimestampBound timestampBound) { + return toBuilder().setTimestampBound(timestampBound).build(); + } + + @Override public void validate(PipelineOptions options) { + getSpannerConfig().validate(options); + } + + /** A builder for {@link CreateTransaction}. */ + @AutoValue.Builder public abstract static class Builder { + + public abstract SpannerConfig.Builder spannerConfigBuilder(); + + public abstract Builder setSpannerConfig(SpannerConfig spannerConfig); + + public abstract Builder setTimestampBound(TimestampBound newTimestampBound); + + public abstract CreateTransaction build(); + } + } + + /** * A {@link PTransform} that writes {@link Mutation} objects to Google Cloud Spanner. * @@ -178,16 +590,6 @@ public Write withSpannerConfig(SpannerConfig spannerConfig) { return toBuilder().setSpannerConfig(spannerConfig).build(); } - - /** - * Returns a new {@link SpannerIO.Write} with a new batch size limit. - * - *

Does not modify this object. - */ - public Write withBatchSizeBytes(long batchSizeBytes) { - return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); - } - /** * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner * database. @@ -210,6 +612,13 @@ public Write withDatabaseId(ValueProvider databaseId) { return builder.build(); } + @VisibleForTesting + Write withServiceFactory(ServiceFactory serviceFactory) { + Write.Builder builder = toBuilder(); + builder.spannerConfigBuilder().setServiceFactory(serviceFactory); + return builder.build(); + } + /** * Same transform but can be applied to {@link PCollection} of {@link MutationGroup}. */ @@ -217,11 +626,13 @@ public WriteGrouped grouped() { return new WriteGrouped(this); } - @VisibleForTesting - Write withServiceFactory(ServiceFactory serviceFactory) { - Write.Builder builder = toBuilder(); - builder.spannerConfigBuilder().setServiceFactory(serviceFactory); - return builder.build(); + /** + * Returns a new {@link SpannerIO.Write} with a new batch size limit. + * + *

Does not modify this object. + */ + public Write withBatchSizeBytes(long batchSizeBytes) { + return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java new file mode 100644 index 000000000000..024507f3dbde --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java @@ -0,0 +1,18 @@ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.auto.value.AutoValue; +import com.google.cloud.Timestamp; +import java.io.Serializable; + +/** A transaction object. */ +@AutoValue +public abstract class Transaction implements Serializable { + + private static final long serialVersionUID = -6879867902917208538L; + + abstract Timestamp timestamp(); + + public static Transaction create(Timestamp timestamp) { + return new AutoValue_Transaction(timestamp); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java index 91caded1ad35..8aac417f5816 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java @@ -52,6 +52,7 @@ public void testGcpApiSurface() throws Exception { @SuppressWarnings("unchecked") final Set>> allowedClasses = ImmutableSet.of( + classesInPackage("com.google.api.core"), classesInPackage("com.google.api.client.googleapis"), classesInPackage("com.google.api.client.http"), classesInPackage("com.google.api.client.json"), @@ -60,9 +61,18 @@ public void testGcpApiSurface() throws Exception { classesInPackage("com.google.auth"), classesInPackage("com.google.bigtable.v2"), classesInPackage("com.google.cloud.bigtable.config"), + classesInPackage("com.google.spanner.v1"), + Matchers.>equalTo(com.google.api.gax.grpc.ApiException.class), Matchers.>equalTo(com.google.cloud.bigtable.grpc.BigtableClusterName.class), Matchers.>equalTo(com.google.cloud.bigtable.grpc.BigtableInstanceName.class), Matchers.>equalTo(com.google.cloud.bigtable.grpc.BigtableTableName.class), + Matchers.>equalTo(com.google.cloud.BaseServiceException.class), + Matchers.>equalTo(com.google.cloud.BaseServiceException.Error.class), + Matchers.>equalTo(com.google.cloud.BaseServiceException.ExceptionData.class), + Matchers.>equalTo(com.google.cloud.BaseServiceException.ExceptionData.Builder + .class), + Matchers.>equalTo(com.google.cloud.RetryHelper.RetryHelperException.class), + Matchers.>equalTo(com.google.cloud.grpc.BaseGrpcServiceException.class), Matchers.>equalTo(com.google.cloud.ByteArray.class), Matchers.>equalTo(com.google.cloud.Date.class), Matchers.>equalTo(com.google.cloud.Timestamp.class), From 3f273ac12c07fb1e8890d1992d533171b4525785 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 19 Jun 2017 13:32:11 -0700 Subject: [PATCH 06/16] Tests --- .../sdk/io/gcp/spanner/SpannerIOReadTest.java | 108 +++++++++++ ...nerIOTest.java => SpannerIOWriteTest.java} | 2 +- .../sdk/io/gcp/spanner/SpannerReadIT.java | 169 ++++++++++++++++++ 3 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java rename sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/{SpannerIOTest.java => SpannerIOWriteTest.java} (99%) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java new file mode 100644 index 000000000000..08a86b30a834 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.Timestamp; +import java.io.Serializable; +import org.apache.beam.sdk.testing.TestPipeline; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link SpannerIO}. */ +@RunWith(JUnit4.class) +public class SpannerIOReadTest implements Serializable { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Before + @SuppressWarnings("unchecked") + public void setUp() throws Exception {} + + @Test + public void emptyTransform() throws Exception { + SpannerIO.Read read = SpannerIO.read(); + thrown.expect(NullPointerException.class); + thrown.expectMessage("requires instance id to be set with"); + read.validate(null); + } + + @Test + public void emptyInstanceId() throws Exception { + SpannerIO.Read read = SpannerIO.read().withDatabaseId("123"); + thrown.expect(NullPointerException.class); + thrown.expectMessage("requires instance id to be set with"); + read.validate(null); + } + + @Test + public void emptyDatabaseId() throws Exception { + SpannerIO.Read read = SpannerIO.read().withInstanceId("123"); + thrown.expect(NullPointerException.class); + thrown.expectMessage("requires database id to be set with"); + read.validate(null); + } + + @Test + public void emptyQuery() throws Exception { + SpannerIO.Read read = + SpannerIO.read().withInstanceId("123").withDatabaseId("aaa").withTimestamp(Timestamp.now()); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("requires configuring query or read operation"); + read.validate(null); + } + + @Test + public void emptyColumns() throws Exception { + SpannerIO.Read read = + SpannerIO.read() + .withInstanceId("123") + .withDatabaseId("aaa") + .withTimestamp(Timestamp.now()) + .withTable("users"); + thrown.expect(NullPointerException.class); + thrown.expectMessage("requires a list of columns"); + read.validate(null); + } + + @Test + public void validRead() throws Exception { + SpannerIO.Read read = + SpannerIO.read() + .withInstanceId("123") + .withDatabaseId("aaa") + .withTimestamp(Timestamp.now()) + .withTable("users") + .withColumns("id", "name", "email"); + read.validate(null); + } + + @Test + public void validQuery() throws Exception { + SpannerIO.Read read = + SpannerIO.read() + .withInstanceId("123") + .withDatabaseId("aaa") + .withTimestamp(Timestamp.now()) + .withQuery("SELECT * FROM users"); + read.validate(null); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java similarity index 99% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java index abeac0a8f4ae..530c4f460d99 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java @@ -61,7 +61,7 @@ * Unit tests for {@link SpannerIO}. */ @RunWith(JUnit4.class) -public class SpannerIOTest implements Serializable { +public class SpannerIOWriteTest implements Serializable { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java new file mode 100644 index 000000000000..f5d7cbd6c31d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.TimestampBound; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** End-to-end test of Cloud Spanner Source. */ +@RunWith(JUnit4.class) +public class SpannerReadIT { + + private static final int MAX_DB_NAME_LENGTH = 30; + + @Rule public final transient TestPipeline p = TestPipeline.create(); + + /** Pipeline options for this test. */ + public interface SpannerTestPipelineOptions extends TestPipelineOptions { + @Description("Project ID for Spanner") + @Default.String("apache-beam-testing") + String getProjectId(); + void setProjectId(String value); + + @Description("Instance ID to write to in Spanner") + @Default.String("beam-test") + String getInstanceId(); + void setInstanceId(String value); + + @Description("Database ID prefix to write to in Spanner") + @Default.String("beam-testdb") + String getDatabaseIdPrefix(); + void setDatabaseIdPrefix(String value); + + @Description("Table name") + @Default.String("users") + String getTable(); + void setTable(String value); + } + + private Spanner spanner; + private DatabaseAdminClient databaseAdminClient; + private SpannerTestPipelineOptions options; + private String databaseName; + + @Before + public void setUp() throws Exception { + PipelineOptionsFactory.register(SpannerTestPipelineOptions.class); + options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class); + + spanner = SpannerOptions.newBuilder().setProjectId(options.getProjectId()).build().getService(); + + databaseName = generateDatabaseName(); + + databaseAdminClient = spanner.getDatabaseAdminClient(); + + // Delete database if exists. + databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName); + + Operation op = + databaseAdminClient.createDatabase( + options.getInstanceId(), + databaseName, + Collections.singleton( + "CREATE TABLE " + + options.getTable() + + " (" + + " Key INT64," + + " Value STRING(MAX)," + + ") PRIMARY KEY (Key)")); + op.waitFor(); + } + + @Test + public void testRead() throws Exception { + DatabaseClient databaseClient = + spanner.getDatabaseClient( + DatabaseId.of( + options.getProjectId(), options.getInstanceId(), databaseName)); + + List mutations = new ArrayList<>(); + for (int i = 0; i < 5L; i++) { + mutations.add( + Mutation.newInsertOrUpdateBuilder(options.getTable()) + .set("key") + .to((long) i) + .set("value") + .to(RandomStringUtils.random(100, true, true)) + .build()); + } + + databaseClient.writeAtLeastOnce(mutations); + + SpannerConfig spannerConfig = SpannerConfig.create() + .withProjectId(options.getProjectId()) + .withInstanceId(options.getInstanceId()) + .withDatabaseId(databaseName); + + PCollectionView tx = + p.apply( + SpannerIO.createTransaction() + .withSpannerConfig(spannerConfig) + .withTimestampBound(TimestampBound.strong())); + + PCollection output = + p.apply( + SpannerIO.read() + .withSpannerConfig(spannerConfig) + .withQuery("SELECT * FROM " + options.getTable()) + .withTransaction(tx)); + PAssert.thatSingleton(output.apply("Count rows", Count.globally())).isEqualTo(5L); + p.run(); + } + + @After + public void tearDown() throws Exception { + databaseAdminClient.dropDatabase(options.getInstanceId(), databaseName); + spanner.close(); + } + + private String generateDatabaseName() { + String random = + RandomStringUtils.randomAlphanumeric( + MAX_DB_NAME_LENGTH - 1 - options.getDatabaseIdPrefix().length()) + .toLowerCase(); + return options.getDatabaseIdPrefix() + "-" + random; + } +} From 29c35e52fd51bcd912a97bee9b2fc873a28d165a Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 19 Jun 2017 16:15:03 -0700 Subject: [PATCH 07/16] License headers --- .../sdk/io/gcp/spanner/AbstractSpannerFn.java | 17 +++++++++++++++++ .../sdk/io/gcp/spanner/CreateTransactionFn.java | 17 +++++++++++++++++ .../sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 17 +++++++++++++++++ .../beam/sdk/io/gcp/spanner/SpannerConfig.java | 17 +++++++++++++++++ .../sdk/io/gcp/spanner/SpannerWriteGroupFn.java | 17 +++++++++++++++++ .../beam/sdk/io/gcp/spanner/Transaction.java | 17 +++++++++++++++++ 6 files changed, 102 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java index 08f7fa9cb60f..00008f1ebdcc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/AbstractSpannerFn.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.spanner; import com.google.cloud.spanner.DatabaseClient; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java index 53f1bf4171e3..10e1dd2502ef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.spanner; import com.google.cloud.spanner.ReadOnlyTransaction; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java index 800f71fd0b90..971012eeb394 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.spanner; import com.google.cloud.spanner.ReadOnlyTransaction; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index 4cb8aa28bd63..24dcb2791dc8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.spanner; import static com.google.common.base.Preconditions.checkNotNull; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java index aed4832b7d86..34a11da8754f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteGroupFn.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.spanner; import com.google.cloud.spanner.AbortedException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java index 024507f3dbde..65b4966460db 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.io.gcp.spanner; import com.google.auto.value.AutoValue; From 13a280f4edae672afd714fb9e4312d4a290edcc1 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 19 Jun 2017 16:26:09 -0700 Subject: [PATCH 08/16] Deps hell --- pom.xml | 12 ++++++++++++ sdks/java/io/google-cloud-platform/pom.xml | 7 ++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f06568b77860..069191ccbb95 100644 --- a/pom.xml +++ b/pom.xml @@ -161,6 +161,7 @@ -Werror -Xpkginfo:always nothing + 0.20.0 pom @@ -637,6 +638,12 @@ ${google-api-common.version} + + com.google.api + gax-grpc + ${gax-grpc.version} + + com.google.api-client google-api-client @@ -851,6 +858,11 @@ + + com.google.cloud + google-cloud-core-grpc + ${grpc.version} + com.google.cloud.bigtable bigtable-protos diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 6737eea5b256..828220410f7e 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -93,7 +93,12 @@ com.google.api - api-common + gax-grpc + + + + com.google.cloud + google-cloud-core-grpc From f28af5b111914e470a13f54f5711fed9ad0e5720 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 20 Jun 2017 17:51:06 -0700 Subject: [PATCH 09/16] Removed serialVersionUID --- .../apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java | 2 -- .../apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 2 -- .../org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java | 2 -- .../java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 6 ------ .../org/apache/beam/sdk/io/gcp/spanner/Transaction.java | 2 -- 5 files changed, 14 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java index 10e1dd2502ef..da8e8b15e1ad 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/CreateTransactionFn.java @@ -24,8 +24,6 @@ /** Creates a batch transaction. */ class CreateTransactionFn extends AbstractSpannerFn { - private static final long serialVersionUID = -4174426331092286581L; - private final SpannerIO.CreateTransaction config; CreateTransactionFn(SpannerIO.CreateTransaction config) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java index 971012eeb394..d193b95768ad 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java @@ -26,8 +26,6 @@ /** A simplest read function implementation. Parallelism support is coming. */ @VisibleForTesting class NaiveSpannerReadFn extends AbstractSpannerFn { - private static final long serialVersionUID = 7645917508410554173L; - private final SpannerIO.Read config; NaiveSpannerReadFn(SpannerIO.Read config) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index 24dcb2791dc8..915f8d5bebfb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -34,8 +34,6 @@ @AutoValue public abstract class SpannerConfig implements Serializable { - private static final long serialVersionUID = -5680874609304170301L; - @Nullable abstract ValueProvider getProjectId(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 42faa19695d4..a1fe84f39e04 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -207,8 +207,6 @@ public static Write write() { @AutoValue public abstract static class Read extends PTransform> { - private static final long serialVersionUID = 3124449388202574415L; - abstract SpannerConfig getSpannerConfig(); @Nullable @@ -431,8 +429,6 @@ public PCollection expand(PBegin input) { public abstract static class CreateTransaction extends PTransform> { - private static final long serialVersionUID = 9201734106453817417L; - abstract SpannerConfig getSpannerConfig(); @Nullable abstract TimestampBound getTimestampBound(); @@ -517,8 +513,6 @@ public CreateTransaction withTimestampBound(TimestampBound timestampBound) { @AutoValue public abstract static class Write extends PTransform, PDone> { - private static final long serialVersionUID = 1920175411827980145L; - abstract SpannerConfig getSpannerConfig(); @Nullable diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java index 65b4966460db..22af3b8fe663 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/Transaction.java @@ -25,8 +25,6 @@ @AutoValue public abstract class Transaction implements Serializable { - private static final long serialVersionUID = -6879867902917208538L; - abstract Timestamp timestamp(); public static Transaction create(Timestamp timestamp) { From 23d8910ec566739e85216bece7090343f1e6acff Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 20 Jun 2017 18:02:36 -0700 Subject: [PATCH 10/16] make SpannerConfig.Builder package private. --- .../sdk/io/gcp/spanner/SpannerConfig.java | 8 +- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 81 ++++++++----------- 2 files changed, 42 insertions(+), 47 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index 915f8d5bebfb..fcf7714a8c83 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -64,7 +64,7 @@ public static SpannerConfig create() { return builder().build(); } - public static Builder builder() { + static Builder builder() { return new AutoValue_SpannerConfig.Builder(); } @@ -130,4 +130,10 @@ public SpannerConfig withDatabaseId(ValueProvider databaseId) { public SpannerConfig withDatabaseId(String databaseId) { return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); } + + @VisibleForTesting + SpannerConfig withServiceFactory(ServiceFactory serviceFactory) { + return toBuilder().setServiceFactory(serviceFactory).build(); + } + } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index a1fe84f39e04..397b277e769e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -170,6 +170,7 @@ public class SpannerIO { @Experimental public static Read read() { return new AutoValue_SpannerIO_Read.Builder() + .setSpannerConfig(SpannerConfig.create()) .setTimestampBound(TimestampBound.strong()) .setKeySet(KeySet.all()) .build(); @@ -183,6 +184,7 @@ public static Read read() { @Experimental public static CreateTransaction createTransaction() { return new AutoValue_SpannerIO_CreateTransaction.Builder() + .setSpannerConfig(SpannerConfig.create()) .setTimestampBound(TimestampBound.strong()) .build(); } @@ -194,7 +196,7 @@ public static CreateTransaction createTransaction() { */ @Experimental public static Write write() { - return new AutoValue_SpannerIO_Write.Builder().build() + return new AutoValue_SpannerIO_Write.Builder().setSpannerConfig(SpannerConfig.create()).build() .withBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES); } @@ -274,7 +276,6 @@ public Read withSpannerConfig(SpannerConfig spannerConfig) { @AutoValue.Builder abstract static class Builder { - abstract SpannerConfig.Builder spannerConfigBuilder(); abstract Builder setSpannerConfig(SpannerConfig spannerConfig); @@ -310,9 +311,8 @@ public Read withProjectId(String projectId) { *

Does not modify this object. */ public Read withProjectId(ValueProvider projectId) { - Builder builder = toBuilder(); - builder.spannerConfigBuilder().setProjectId(projectId); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withProjectId(projectId)); } /** @@ -332,9 +332,8 @@ public Read withInstanceId(String instanceId) { *

Does not modify this object. */ public Read withInstanceId(ValueProvider instanceId) { - Builder builder = toBuilder(); - builder.spannerConfigBuilder().setInstanceId(instanceId); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withInstanceId(instanceId)); } /** @@ -354,16 +353,14 @@ public Read withDatabaseId(String databaseId) { *

Does not modify this object. */ public Read withDatabaseId(ValueProvider databaseId) { - Builder builder = toBuilder(); - builder.spannerConfigBuilder().setDatabaseId(databaseId); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withDatabaseId(databaseId)); } @VisibleForTesting Read withServiceFactory(ServiceFactory serviceFactory) { - Builder builder = toBuilder(); - builder.spannerConfigBuilder().setServiceFactory(serviceFactory); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withServiceFactory(serviceFactory)); } /** @@ -431,11 +428,13 @@ public abstract static class CreateTransaction abstract SpannerConfig getSpannerConfig(); - @Nullable abstract TimestampBound getTimestampBound(); + @Nullable + abstract TimestampBound getTimestampBound(); abstract Builder toBuilder(); - @Override public PCollectionView expand(PBegin input) { + @Override + public PCollectionView expand(PBegin input) { return input.apply(Create.of(1)) .apply("Create transaction", ParDo.of(new CreateTransactionFn(this))) .apply("As PCollectionView", View.asSingleton()); @@ -446,9 +445,8 @@ public CreateTransaction withProjectId(String projectId) { } public CreateTransaction withProjectId(ValueProvider projectId) { - Builder builder = toBuilder(); - builder.spannerConfigBuilder().setProjectId(projectId); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withProjectId(projectId)); } public CreateTransaction withInstanceId(String instanceId) { @@ -456,9 +454,8 @@ public CreateTransaction withInstanceId(String instanceId) { } public CreateTransaction withInstanceId(ValueProvider instanceId) { - Builder builder = toBuilder(); - builder.spannerConfigBuilder().setInstanceId(instanceId); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withInstanceId(instanceId)); } public CreateTransaction withSpannerConfig(SpannerConfig spannerConfig) { @@ -470,31 +467,29 @@ public CreateTransaction withDatabaseId(String databaseId) { } public CreateTransaction withDatabaseId(ValueProvider databaseId) { - Builder builder = toBuilder(); - builder.spannerConfigBuilder().setDatabaseId(databaseId); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withDatabaseId(databaseId)); } - @VisibleForTesting CreateTransaction withServiceFactory( + @VisibleForTesting + CreateTransaction withServiceFactory( ServiceFactory serviceFactory) { - Builder builder = toBuilder(); - builder.spannerConfigBuilder().setServiceFactory(serviceFactory); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withServiceFactory(serviceFactory)); } public CreateTransaction withTimestampBound(TimestampBound timestampBound) { return toBuilder().setTimestampBound(timestampBound).build(); } - @Override public void validate(PipelineOptions options) { + @Override + public void validate(PipelineOptions options) { getSpannerConfig().validate(options); } /** A builder for {@link CreateTransaction}. */ @AutoValue.Builder public abstract static class Builder { - public abstract SpannerConfig.Builder spannerConfigBuilder(); - public abstract Builder setSpannerConfig(SpannerConfig spannerConfig); public abstract Builder setTimestampBound(TimestampBound newTimestampBound); @@ -525,8 +520,6 @@ abstract static class Builder { abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - abstract SpannerConfig.Builder spannerConfigBuilder(); - abstract Builder setBatchSizeBytes(Long batchSizeBytes); abstract Write build(); @@ -547,9 +540,8 @@ public Write withProjectId(String projectId) { *

Does not modify this object. */ public Write withProjectId(ValueProvider projectId) { - Write.Builder builder = toBuilder(); - builder.spannerConfigBuilder().setProjectId(projectId); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withProjectId(projectId)); } /** @@ -569,9 +561,8 @@ public Write withInstanceId(String instanceId) { *

Does not modify this object. */ public Write withInstanceId(ValueProvider instanceId) { - Write.Builder builder = toBuilder(); - builder.spannerConfigBuilder().setInstanceId(instanceId); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withInstanceId(instanceId)); } /** @@ -601,16 +592,14 @@ public Write withDatabaseId(String databaseId) { *

Does not modify this object. */ public Write withDatabaseId(ValueProvider databaseId) { - Write.Builder builder = toBuilder(); - builder.spannerConfigBuilder().setDatabaseId(databaseId); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withDatabaseId(databaseId)); } @VisibleForTesting Write withServiceFactory(ServiceFactory serviceFactory) { - Write.Builder builder = toBuilder(); - builder.spannerConfigBuilder().setServiceFactory(serviceFactory); - return builder.build(); + SpannerConfig config = getSpannerConfig(); + return withSpannerConfig(config.withServiceFactory(serviceFactory)); } /** From d3cc7e6a69f298ded2a14010d4fd7af3c6c8ec43 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 20 Jun 2017 18:03:09 -0700 Subject: [PATCH 11/16] Vertical space --- .../java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index fcf7714a8c83..02716fbaf480 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -94,14 +94,12 @@ public void populateDisplayData(DisplayData.Builder builder) { @AutoValue.Builder public abstract static class Builder { - abstract Builder setProjectId(ValueProvider projectId); abstract Builder setInstanceId(ValueProvider instanceId); abstract Builder setDatabaseId(ValueProvider databaseId); - abstract Builder setServiceFactory(ServiceFactory serviceFactory); public abstract SpannerConfig build(); From 1286935e58fab837a5ec199c5163067f76ef7d22 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 20 Jun 2017 18:53:24 -0700 Subject: [PATCH 12/16] Reviewed documentation --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 204 ++++++------------ 1 file changed, 71 insertions(+), 133 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 397b277e769e..60433158806c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -234,46 +234,6 @@ public abstract static class Read extends PTransform abstract Builder toBuilder(); - public Read withTimestamp(Timestamp timestamp) { - return withTimestampBound(TimestampBound.ofReadTimestamp(timestamp)); - } - - public Read withTimestampBound(TimestampBound timestampBound) { - return toBuilder().setTimestampBound(timestampBound).build(); - } - - public Read withTable(String table) { - return toBuilder().setTable(table).build(); - } - - public Read withColumns(String... columns) { - return withColumns(Arrays.asList(columns)); - } - - public Read withColumns(List columns) { - return toBuilder().setColumns(columns).build(); - } - - public Read withQuery(Statement statement) { - return toBuilder().setQuery(statement).build(); - } - - public Read withQuery(String sql) { - return withQuery(Statement.of(sql)); - } - - public Read withKeySet(KeySet keySet) { - return toBuilder().setKeySet(keySet).build(); - } - - public Read withIndex(String index) { - return toBuilder().setIndex(index).build(); - } - - public Read withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - @AutoValue.Builder abstract static class Builder { @@ -296,62 +256,39 @@ abstract static class Builder { abstract Read build(); } - /** - * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner project. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner configuration. */ + public Read withSpannerConfig(SpannerConfig spannerConfig) { + return toBuilder().setSpannerConfig(spannerConfig).build(); + } + + /** Specifies the Cloud Spanner project. */ public Read withProjectId(String projectId) { return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); } - /** - * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner project. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner project. */ public Read withProjectId(ValueProvider projectId) { SpannerConfig config = getSpannerConfig(); return withSpannerConfig(config.withProjectId(projectId)); } - /** - * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner - * instance. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner instance. */ public Read withInstanceId(String instanceId) { return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); } - /** - * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner - * instance. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner instance. */ public Read withInstanceId(ValueProvider instanceId) { SpannerConfig config = getSpannerConfig(); return withSpannerConfig(config.withInstanceId(instanceId)); } - /** - * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner - * database. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner database. */ public Read withDatabaseId(String databaseId) { return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); } - /** - * Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner - * database. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner database. */ public Read withDatabaseId(ValueProvider databaseId) { SpannerConfig config = getSpannerConfig(); return withSpannerConfig(config.withDatabaseId(databaseId)); @@ -363,16 +300,47 @@ Read withServiceFactory(ServiceFactory serviceFactory) return withSpannerConfig(config.withServiceFactory(serviceFactory)); } - /** - * Returns a new {@link SpannerIO.Read} that will read from the specified Cloud Spanner - * config. - * - *

Does not modify this object. - */ public Read withTransaction(PCollectionView transaction) { return toBuilder().setTransaction(transaction).build(); } + public Read withTimestamp(Timestamp timestamp) { + return withTimestampBound(TimestampBound.ofReadTimestamp(timestamp)); + } + + public Read withTimestampBound(TimestampBound timestampBound) { + return toBuilder().setTimestampBound(timestampBound).build(); + } + + public Read withTable(String table) { + return toBuilder().setTable(table).build(); + } + + public Read withColumns(String... columns) { + return withColumns(Arrays.asList(columns)); + } + + public Read withColumns(List columns) { + return toBuilder().setColumns(columns).build(); + } + + public Read withQuery(Statement statement) { + return toBuilder().setQuery(statement).build(); + } + + public Read withQuery(String sql) { + return withQuery(Statement.of(sql)); + } + + public Read withKeySet(KeySet keySet) { + return toBuilder().setKeySet(keySet).build(); + } + + public Read withIndex(String index) { + return toBuilder().setIndex(index).build(); + } + + @Override public void validate(PipelineOptions options) { getSpannerConfig().validate(options); @@ -440,32 +408,39 @@ public PCollectionView expand(PBegin input) { .apply("As PCollectionView", View.asSingleton()); } + /** Specifies the Cloud Spanner configuration. */ + public CreateTransaction withSpannerConfig(SpannerConfig spannerConfig) { + return toBuilder().setSpannerConfig(spannerConfig).build(); + } + + /** Specifies the Cloud Spanner project. */ public CreateTransaction withProjectId(String projectId) { return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); } + /** Specifies the Cloud Spanner project. */ public CreateTransaction withProjectId(ValueProvider projectId) { SpannerConfig config = getSpannerConfig(); return withSpannerConfig(config.withProjectId(projectId)); } + /** Specifies the Cloud Spanner instance. */ public CreateTransaction withInstanceId(String instanceId) { return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); } + /** Specifies the Cloud Spanner instance. */ public CreateTransaction withInstanceId(ValueProvider instanceId) { SpannerConfig config = getSpannerConfig(); return withSpannerConfig(config.withInstanceId(instanceId)); } - public CreateTransaction withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - + /** Specifies the Cloud Spanner database. */ public CreateTransaction withDatabaseId(String databaseId) { return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); } + /** Specifies the Cloud Spanner database. */ public CreateTransaction withDatabaseId(ValueProvider databaseId) { SpannerConfig config = getSpannerConfig(); return withSpannerConfig(config.withDatabaseId(databaseId)); @@ -525,72 +500,39 @@ abstract static class Builder { abstract Write build(); } - /** - * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner project. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner configuration. */ + public Write withSpannerConfig(SpannerConfig spannerConfig) { + return toBuilder().setSpannerConfig(spannerConfig).build(); + } + + /** Specifies the Cloud Spanner project. */ public Write withProjectId(String projectId) { return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); } - /** - * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner project. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner project. */ public Write withProjectId(ValueProvider projectId) { SpannerConfig config = getSpannerConfig(); return withSpannerConfig(config.withProjectId(projectId)); } - /** - * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner - * instance. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner instance. */ public Write withInstanceId(String instanceId) { return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); } - /** - * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner - * instance. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner instance. */ public Write withInstanceId(ValueProvider instanceId) { SpannerConfig config = getSpannerConfig(); return withSpannerConfig(config.withInstanceId(instanceId)); } - /** - * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner - * config. - * - *

Does not modify this object. - */ - public Write withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - - /** - * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner - * database. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner database. */ public Write withDatabaseId(String databaseId) { return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); } - /** - * Returns a new {@link SpannerIO.Write} that will write to the specified Cloud Spanner - * database. - * - *

Does not modify this object. - */ + /** Specifies the Cloud Spanner database. */ public Write withDatabaseId(ValueProvider databaseId) { SpannerConfig config = getSpannerConfig(); return withSpannerConfig(config.withDatabaseId(databaseId)); @@ -609,11 +551,7 @@ public WriteGrouped grouped() { return new WriteGrouped(this); } - /** - * Returns a new {@link SpannerIO.Write} with a new batch size limit. - * - *

Does not modify this object. - */ + /** Specifies the batch size limit. */ public Write withBatchSizeBytes(long batchSizeBytes) { return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); } From 2a8e0e5ee77461407579275da16f029dffa43f93 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 21 Jun 2017 17:57:48 -0700 Subject: [PATCH 13/16] Better test coverage. --- .../io/gcp/spanner/FakeServiceFactory.java | 83 +++++++++ .../sdk/io/gcp/spanner/SpannerIOReadTest.java | 160 +++++++++++++++++- .../io/gcp/spanner/SpannerIOWriteTest.java | 56 ------ 3 files changed, 242 insertions(+), 57 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java new file mode 100644 index 000000000000..bda8a8d70d1a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import com.google.cloud.ServiceFactory; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.DatabaseId; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import org.mockito.Matchers; + +import javax.annotation.concurrent.GuardedBy; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +/** + * A serialization friendly type service factory that maintains a mock {@link Spanner} and + * {@link DatabaseClient}. + * */ +class FakeServiceFactory + implements ServiceFactory, Serializable { + + // Marked as static so they could be returned by serviceFactory, which is serializable. + private static final Object lock = new Object(); + + @GuardedBy("lock") + private static final List mockSpanners = new ArrayList<>(); + + @GuardedBy("lock") + private static final List mockDatabaseClients = new ArrayList<>(); + + @GuardedBy("lock") + private static int count = 0; + + private final int index; + + public FakeServiceFactory() { + synchronized (lock) { + index = count++; + mockSpanners.add(mock(Spanner.class, withSettings().serializable())); + mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable())); + } + when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class))) + .thenReturn(mockDatabaseClient()); + } + + DatabaseClient mockDatabaseClient() { + synchronized (lock) { + return mockDatabaseClients.get(index); + } + } + + Spanner mockSpanner() { + synchronized (lock) { + return mockSpanners.get(index); + } + } + + @Override + public Spanner create(SpannerOptions serviceOptions) { + return mockSpanner(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java index 08a86b30a834..3db135db5ac4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java @@ -19,13 +19,31 @@ import com.google.cloud.Timestamp; import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import com.google.cloud.spanner.*; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; + +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** Unit tests for {@link SpannerIO}. */ @RunWith(JUnit4.class) @@ -33,9 +51,22 @@ public class SpannerIOReadTest implements Serializable { @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); + private FakeServiceFactory serviceFactory; + private ReadOnlyTransaction mockTx; + + private Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()), + Type.StructField.of("name", Type.string())); + + List fakeRows = Arrays.asList( + Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(), + Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build()); + @Before @SuppressWarnings("unchecked") - public void setUp() throws Exception {} + public void setUp() throws Exception { + serviceFactory = new FakeServiceFactory(); + mockTx = Mockito.mock(ReadOnlyTransaction.class); + } @Test public void emptyTransform() throws Exception { @@ -105,4 +136,131 @@ public void validQuery() throws Exception { .withQuery("SELECT * FROM users"); read.validate(null); } + + @Test + public void runQuery() throws Exception { + SpannerIO.Read read = + SpannerIO.read() + .withInstanceId("123") + .withDatabaseId("aaa") + .withTimestamp(Timestamp.now()) + .withQuery("SELECT * FROM users") + .withServiceFactory(serviceFactory); + + NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read); + DoFnTester fnTester = DoFnTester.of(readFn); + + when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) + .thenReturn(mockTx); + when(mockTx.executeQuery(any(Statement.class))) + .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + + List result = fnTester.processBundle(1); + assertThat(result, Matchers.iterableWithSize(2)); + + verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound + .strong()); + verify(mockTx).executeQuery(Statement.of("SELECT * FROM users")); + } + + @Test + public void runRead() throws Exception { + SpannerIO.Read read = + SpannerIO.read() + .withInstanceId("123") + .withDatabaseId("aaa") + .withTimestamp(Timestamp.now()) + .withTable("users") + .withColumns("id", "name") + .withServiceFactory(serviceFactory); + + NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read); + DoFnTester fnTester = DoFnTester.of(readFn); + + when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) + .thenReturn(mockTx); + when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name"))) + .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + + List result = fnTester.processBundle(1); + assertThat(result, Matchers.iterableWithSize(2)); + + verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong()); + verify(mockTx).read("users", KeySet.all(), Arrays.asList("id", "name")); + } + + @Test + public void runReadUsingIndex() throws Exception { + SpannerIO.Read read = + SpannerIO.read() + .withInstanceId("123") + .withDatabaseId("aaa") + .withTimestamp(Timestamp.now()) + .withTable("users") + .withColumns("id", "name") + .withIndex("theindex") + .withServiceFactory(serviceFactory); + + NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read); + DoFnTester fnTester = DoFnTester.of(readFn); + + when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) + .thenReturn(mockTx); + when(mockTx.readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name"))) + .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + + List result = fnTester.processBundle(1); + assertThat(result, Matchers.iterableWithSize(2)); + + verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong()); + verify(mockTx).readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name")); + } + + @Test + @Category(NeedsRunner.class) + public void readPipeline() throws Exception { + Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345); + + PCollectionView tx = pipeline + .apply("tx", SpannerIO.createTransaction() + .withInstanceId("123") + .withDatabaseId("aaa") + .withServiceFactory(serviceFactory)); + + PCollection one = pipeline.apply("read q", SpannerIO.read() + .withInstanceId("123") + .withDatabaseId("aaa") + .withTimestamp(Timestamp.now()) + .withQuery("SELECT * FROM users") + .withServiceFactory(serviceFactory) + .withTransaction(tx)); + PCollection two = pipeline.apply("read r", SpannerIO.read() + .withInstanceId("123") + .withDatabaseId("aaa") + .withTimestamp(Timestamp.now()) + .withTable("users") + .withColumns("id", "name") + .withServiceFactory(serviceFactory) + .withTransaction(tx)); + + when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class))) + .thenReturn(mockTx); + + when(mockTx.executeQuery(Statement.of("SELECT 1"))).thenReturn(ResultSets.forRows(Type.struct(), + Collections.emptyList())); + + when(mockTx.executeQuery(Statement.of("SELECT * FROM users"))) + .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name"))) + .thenReturn(ResultSets.forRows(fakeType, fakeRows)); + when(mockTx.getReadTimestamp()).thenReturn(timestamp); + + PAssert.that(one).containsInAnyOrder(fakeRows); + PAssert.that(two).containsInAnyOrder(fakeRows); + + pipeline.run(); + + verify(serviceFactory.mockDatabaseClient(), times(2)) + .readOnlyTransaction(TimestampBound.ofReadTimestamp(timestamp)); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java index 530c4f460d99..09cdb8e995d7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java @@ -21,24 +21,14 @@ import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; -import com.google.cloud.ServiceFactory; -import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Mutation; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerOptions; import com.google.common.collect.Iterables; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import javax.annotation.concurrent.GuardedBy; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; @@ -54,8 +44,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentMatcher; -import org.mockito.Matchers; - /** * Unit tests for {@link SpannerIO}. @@ -251,50 +239,6 @@ public void displayData() throws Exception { assertThat(data, hasDisplayItem("batchSizeBytes", 123)); } - private static class FakeServiceFactory - implements ServiceFactory, Serializable { - // Marked as static so they could be returned by serviceFactory, which is serializable. - private static final Object lock = new Object(); - - @GuardedBy("lock") - private static final List mockSpanners = new ArrayList<>(); - - @GuardedBy("lock") - private static final List mockDatabaseClients = new ArrayList<>(); - - @GuardedBy("lock") - private static int count = 0; - - private final int index; - - public FakeServiceFactory() { - synchronized (lock) { - index = count++; - mockSpanners.add(mock(Spanner.class, withSettings().serializable())); - mockDatabaseClients.add(mock(DatabaseClient.class, withSettings().serializable())); - } - when(mockSpanner().getDatabaseClient(Matchers.any(DatabaseId.class))) - .thenReturn(mockDatabaseClient()); - } - - DatabaseClient mockDatabaseClient() { - synchronized (lock) { - return mockDatabaseClients.get(index); - } - } - - Spanner mockSpanner() { - synchronized (lock) { - return mockSpanners.get(index); - } - } - - @Override - public Spanner create(SpannerOptions serviceOptions) { - return mockSpanner(); - } - } - private static class IterableOfSize extends ArgumentMatcher> { private final int size; From 47e7120086cabb821e93aef2918e073fa79ddff3 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 21 Jun 2017 18:10:59 -0700 Subject: [PATCH 14/16] Making checkstyle happy --- .../io/gcp/spanner/FakeServiceFactory.java | 13 ++++----- .../sdk/io/gcp/spanner/SpannerIOReadTest.java | 29 ++++++++++++------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java index bda8a8d70d1a..753d807eb7ee 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/FakeServiceFactory.java @@ -17,21 +17,20 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + import com.google.cloud.ServiceFactory; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; -import org.mockito.Matchers; - -import javax.annotation.concurrent.GuardedBy; import java.io.Serializable; import java.util.ArrayList; import java.util.List; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; +import javax.annotation.concurrent.GuardedBy; +import org.mockito.Matchers; /** * A serialization friendly type service factory that maintains a mock {@link Spanner} and diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java index 3db135db5ac4..e5d4e72f5198 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java @@ -17,12 +17,25 @@ */ package org.apache.beam.sdk.io.gcp.spanner; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.google.cloud.Timestamp; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.ReadOnlyTransaction; +import com.google.cloud.spanner.ResultSets; +import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.Struct; +import com.google.cloud.spanner.TimestampBound; +import com.google.cloud.spanner.Type; +import com.google.cloud.spanner.Value; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; -import com.google.cloud.spanner.*; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -39,17 +52,13 @@ import org.junit.runners.JUnit4; import org.mockito.Mockito; -import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - /** Unit tests for {@link SpannerIO}. */ @RunWith(JUnit4.class) public class SpannerIOReadTest implements Serializable { - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule + public final transient ExpectedException thrown = ExpectedException.none(); private FakeServiceFactory serviceFactory; private ReadOnlyTransaction mockTx; @@ -57,7 +66,7 @@ public class SpannerIOReadTest implements Serializable { private Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()), Type.StructField.of("name", Type.string())); - List fakeRows = Arrays.asList( + private List fakeRows = Arrays.asList( Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(), Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build()); From 237f020cbc275345a192b5ede204aa20b3195682 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 27 Jun 2017 15:39:51 -0700 Subject: [PATCH 15/16] Long -> long --- .../java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 60433158806c..1cd6a166c8d8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -485,8 +485,7 @@ public abstract static class Write extends PTransform, PDo abstract SpannerConfig getSpannerConfig(); - @Nullable - abstract Long getBatchSizeBytes(); + abstract long getBatchSizeBytes(); abstract Builder toBuilder(); @@ -495,7 +494,7 @@ abstract static class Builder { abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - abstract Builder setBatchSizeBytes(Long batchSizeBytes); + abstract Builder setBatchSizeBytes(long batchSizeBytes); abstract Write build(); } From 45dda688b850e6d8d41d3a8b38e40c4816975aca Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 27 Jun 2017 17:07:16 -0700 Subject: [PATCH 16/16] happy `mvn clean install` --- sdks/java/io/google-cloud-platform/pom.xml | 9 +++++++-- .../org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index 828220410f7e..94066c7b038a 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -260,11 +260,16 @@ org.apache.commons - commons-text - test + commons-lang3 + provided + + org.apache.commons + commons-text + test + org.apache.beam beam-sdks-java-core diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 1cd6a166c8d8..bf5d020bec8d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -196,8 +196,8 @@ public static CreateTransaction createTransaction() { */ @Experimental public static Write write() { - return new AutoValue_SpannerIO_Write.Builder().setSpannerConfig(SpannerConfig.create()).build() - .withBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES); + return new AutoValue_SpannerIO_Write.Builder().setSpannerConfig(SpannerConfig.create()) + .setBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES).build(); } /**