From 253a4e9741651157dbb1c9637052a812ca3d3c53 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 11 Apr 2024 15:30:09 -0400 Subject: [PATCH] Remove LocalSpannerIO --- .../teleport/spanner/ApplyDDLTransform.java | 6 +- .../teleport/spanner/ExportTransform.java | 6 +- .../teleport/spanner/ImportTransform.java | 12 +- .../cloud/teleport/spanner/ReadDialect.java | 6 +- .../spanner/ReadInformationSchema.java | 6 +- .../teleport/spanner/TextImportTransform.java | 6 +- .../teleport/templates/SpannerToText.java | 10 +- .../SpannerVectorEmbeddingExport.java | 10 +- .../templates/common/SpannerConverters.java | 14 +- .../io/gcp/spanner/LocalBatchSpannerRead.java | 239 -- .../gcp/spanner/LocalCreateTransactionFn.java | 55 - .../gcp/spanner/LocalReadSpannerSchema.java | 166 -- .../io/gcp/spanner/LocalSpannerAccessor.java | 76 - .../sdk/io/gcp/spanner/LocalSpannerIO.java | 2175 ----------------- .../io/gcp/spanner/MutationSizeEstimator.java | 210 -- .../sdk/io/gcp/spanner/ReadOperation.java | 114 - .../sdk/io/gcp/spanner/SpannerConfig.java | 323 --- .../sdk/io/gcp/spanner/SpannerSchema.java | 278 --- .../beam/sdk/io/gcp/spanner/package-info.java | 22 - .../sdk/io/gcp/spanner/SpannerSchemaTest.java | 117 - 20 files changed, 38 insertions(+), 3813 deletions(-) delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalBatchSpannerRead.java delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalCreateTransactionFn.java delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalReadSpannerSchema.java delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerAccessor.java delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java delete mode 100644 v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java delete mode 100644 v1/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java index b11a639778..8484de2d11 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java @@ -21,7 +21,7 @@ import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; import java.util.List; import java.util.concurrent.ExecutionException; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor; +import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; @@ -66,11 +66,11 @@ public PCollection expand(PCollection input) { ParDo.of( new DoFn() { - private transient LocalSpannerAccessor spannerAccessor; + private transient SpannerAccessor spannerAccessor; @Setup public void setup() { - spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig); + spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); } @Teardown diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java index c65ea85b52..df939a4028 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java @@ -77,9 +77,9 @@ import org.apache.beam.sdk.io.WriteFilesResult; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO; import org.apache.beam.sdk.io.gcp.spanner.ReadOperation; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; import org.apache.beam.sdk.io.gcp.spanner.Transaction; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Combine; @@ -182,7 +182,7 @@ public WriteFilesResult expand(PBegin begin) { /* * Allow users to specify read timestamp. - * CreateTransaction and CreateTransactionFn classes in LocalSpannerIO + * CreateTransaction and CreateTransactionFn classes in SpannerIO * only take a timestamp object for exact staleness which works when * parameters are provided during template compile time. They do not work with * a Timestamp valueProvider which can take parameters at runtime. Hence a new @@ -403,7 +403,7 @@ public void processElement(ProcessContext c) { PCollection rows = tables.apply( "Read all rows from Spanner", - LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig)); + SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig)); ValueProvider resource = ValueProvider.NestedValueProvider.of( diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java index 3841fcad9d..f834582259 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java @@ -59,9 +59,9 @@ import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO; +import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult; import org.apache.beam.sdk.io.gcp.spanner.Transaction; import org.apache.beam.sdk.options.ValueProvider; @@ -166,7 +166,7 @@ public void processElement(ProcessContext c) { schemas.apply("Build avro DDL", Combine.globally(AsList.fn())); PCollectionView tx = - begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig)); + begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig)); PCollection informationSchemaDdl = begin.apply( @@ -266,7 +266,7 @@ public void processElement(ProcessContext c) { SpannerWriteResult result = mutations.apply( "Write mutations " + depth, - LocalSpannerIO.write() + SpannerIO.write() .withSchemaReadySignal(ddl) .withSpannerConfig(spannerConfig) .withCommitDeadline(Duration.standardMinutes(1)) @@ -401,7 +401,7 @@ private static class CreateTables extends PTransform { private final ValueProvider earlyIndexCreateFlag; private final ValueProvider ddlCreationTimeoutInMinutes; - private transient LocalSpannerAccessor spannerAccessor; + private transient SpannerAccessor spannerAccessor; private static final Logger LOG = LoggerFactory.getLogger(CreateTables.class); /* If the schema has a lot of DDL changes after data load, it's preferable to create @@ -457,7 +457,7 @@ public PCollectionTuple expand(PBegin begin) { @Setup public void setup() { - spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig); + spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); } @Teardown diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java index 3699d45df5..7050b7ef46 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java @@ -17,7 +17,7 @@ import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.Dialect; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor; +import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -43,7 +43,7 @@ public PCollection expand(PBegin p) { private static class ReadDialectFn extends DoFn { private final SpannerConfig spannerConfig; - private transient LocalSpannerAccessor spannerAccessor; + private transient SpannerAccessor spannerAccessor; public ReadDialectFn(SpannerConfig spannerConfig) { this.spannerConfig = spannerConfig; @@ -51,7 +51,7 @@ public ReadDialectFn(SpannerConfig spannerConfig) { @Setup public void setup() throws Exception { - spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig); + spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); } @Teardown diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java index 231aa8319d..2b3bc5fe19 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java @@ -21,7 +21,7 @@ import com.google.cloud.spanner.Dialect; import com.google.cloud.teleport.spanner.ddl.Ddl; import com.google.cloud.teleport.spanner.ddl.InformationSchemaScanner; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor; +import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.io.gcp.spanner.Transaction; import org.apache.beam.sdk.transforms.Create; @@ -59,7 +59,7 @@ public PCollection expand(PBegin p) { private static class ReadInformationSchemaFn extends DoFn { private final SpannerConfig spannerConfig; - private transient LocalSpannerAccessor spannerAccessor; + private transient SpannerAccessor spannerAccessor; private final PCollectionView tx; private final PCollectionView dialectView; @@ -74,7 +74,7 @@ public ReadInformationSchemaFn( @Setup public void setup() throws Exception { - spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig); + spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); } @Teardown diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java b/v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java index a3ff48a606..0f060d71fe 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java @@ -53,8 +53,8 @@ import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult; import org.apache.beam.sdk.io.gcp.spanner.Transaction; import org.apache.beam.sdk.options.ValueProvider; @@ -105,7 +105,7 @@ public TextImportTransform( @Override public PDone expand(PBegin begin) { PCollectionView tx = - begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig)); + begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig)); PCollectionView dialectView = begin @@ -203,7 +203,7 @@ public void processElement(ProcessContext c) { .apply("Wait for previous depth " + depth, Wait.on(previousComputation)) .apply( "Write mutations " + depth, - LocalSpannerIO.write() + SpannerIO.write() .withSpannerConfig(spannerConfig) .withCommitDeadline(Duration.standardMinutes(1)) .withMaxCumulativeBackoff(Duration.standardHours(2)) diff --git a/v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java b/v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java index 475ec9f88e..d46ef33fba 100644 --- a/v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java +++ b/v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java @@ -31,9 +31,9 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO; import org.apache.beam.sdk.io.gcp.spanner.ReadOperation; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; import org.apache.beam.sdk.io.gcp.spanner.Transaction; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -142,7 +142,7 @@ public static void main(String[] args) { options.getTextWritePrefix(), options.getSpannerSnapshotTime()); - /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO + /* CreateTransaction and CreateTransactionFn classes in SpannerIO * only take a timestamp object for exact staleness which works when * parameters are provided during template compile time. They do not work with * a Timestamp valueProvider which can take parameters at runtime. Hence a new @@ -162,14 +162,14 @@ public static void main(String[] args) { PCollection csv = pipeline .apply("Create export", spannerExport) - // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read() + // We need to use SpannerIO.readAll() instead of SpannerIO.read() // because ValueProvider parameters such as table name required for - // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of + // SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of // type PTransform, which prevents prepending it with DoFn that reads // these parameters at the pipeline execution time. .apply( "Read all records", - LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig)) + SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig)) .apply( "Struct To Csv", MapElements.into(TypeDescriptors.strings()) diff --git a/v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java b/v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java index a1de487e6a..12df716f91 100644 --- a/v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java +++ b/v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java @@ -27,9 +27,9 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO; import org.apache.beam.sdk.io.gcp.spanner.ReadOperation; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; import org.apache.beam.sdk.io.gcp.spanner.Transaction; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptions; @@ -269,7 +269,7 @@ public static void main(String[] args) { options.getSpannerColumnsToExport(), ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false)); - /* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO + /* CreateTransaction and CreateTransactionFn classes in SpannerIO * only take a timestamp object for exact staleness which works when * parameters are provided during template compile time. They do not work with * a Timestamp valueProvider which can take parameters at runtime. Hence a new @@ -289,14 +289,14 @@ public static void main(String[] args) { PCollection json = pipeline .apply("Create export", spannerExport) - // We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read() + // We need to use SpannerIO.readAll() instead of SpannerIO.read() // because ValueProvider parameters such as table name required for - // LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of + // SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of // type PTransform, which prevents prepending it with DoFn that reads // these parameters at the pipeline execution time. .apply( "Read all records", - LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig)) + SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig)) .apply( "Struct To JSON", MapElements.into(TypeDescriptors.strings()) diff --git a/v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java b/v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java index 9336bd972b..fd11a9d926 100644 --- a/v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java +++ b/v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java @@ -52,8 +52,8 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; import org.apache.beam.sdk.io.FileSystems; -import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.ReadOperation; +import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.io.gcp.spanner.Transaction; import org.apache.beam.sdk.options.Default; @@ -249,16 +249,16 @@ public static Builder builder() { return new AutoValue_SpannerConverters_ExportTransform.Builder(); } - private LocalSpannerAccessor spannerAccessor; + private SpannerAccessor spannerAccessor; private DatabaseClient databaseClient; - // LocalSpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass + // SpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass // mocked database client directly instead. We can't generate stub of ExportTransform because // AutoValue generates a final class. - // TODO make LocalSpannerAccessor serializable + // TODO make SpannerAccessor serializable DatabaseClient getDatabaseClient(SpannerConfig spannerConfig) { if (databaseClient == null) { - this.spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig); + this.spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); return this.spannerAccessor.getDatabaseClient(); } else { return this.databaseClient; @@ -766,11 +766,11 @@ public CreateTransactionFnWithTimestamp( this.spannerSnapshotTime = spannerSnapshotTime; } - private transient LocalSpannerAccessor spannerAccessor; + private transient SpannerAccessor spannerAccessor; @DoFn.Setup public void setup() throws Exception { - spannerAccessor = LocalSpannerAccessor.getOrCreate(config); + spannerAccessor = SpannerAccessor.getOrCreate(config); } @Teardown diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalBatchSpannerRead.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalBatchSpannerRead.java deleted file mode 100644 index 05561df729..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalBatchSpannerRead.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.auto.value.AutoValue; -import com.google.cloud.spanner.BatchReadOnlyTransaction; -import com.google.cloud.spanner.Options; -import com.google.cloud.spanner.Partition; -import com.google.cloud.spanner.ResultSet; -import com.google.cloud.spanner.SpannerException; -import com.google.cloud.spanner.SpannerOptions; -import com.google.cloud.spanner.Struct; -import com.google.cloud.spanner.TimestampBound; -import java.util.HashMap; -import java.util.List; -import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; -import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; -import org.apache.beam.runners.core.metrics.ServiceCallMetric; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Reshuffle; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.checkerframework.checker.nullness.qual.Nullable; - -/** - * This transform reads from Cloud Spanner using the {@link com.google.cloud.spanner.BatchClient}. - * Reads from multiple partitions are executed concurrently yet in the same read-only transaction. - */ -@AutoValue -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -abstract class LocalBatchSpannerRead - extends PTransform, PCollection> { - - public static LocalBatchSpannerRead create( - SpannerConfig spannerConfig, - PCollectionView txView, - TimestampBound timestampBound) { - return new AutoValue_LocalBatchSpannerRead(spannerConfig, txView, timestampBound); - } - - abstract SpannerConfig getSpannerConfig(); - - abstract @Nullable PCollectionView getTxView(); - - abstract TimestampBound getTimestampBound(); - - @Override - public PCollection expand(PCollection input) { - PCollectionView txView = getTxView(); - if (txView == null) { - Pipeline begin = input.getPipeline(); - SpannerIO.CreateTransaction createTx = - SpannerIO.createTransaction() - .withSpannerConfig(getSpannerConfig()) - .withTimestampBound(getTimestampBound()); - txView = begin.apply(createTx); - } - return input - .apply( - "Generate Partitions", - ParDo.of(new GeneratePartitionsFn(getSpannerConfig(), txView)).withSideInputs(txView)) - .apply("Shuffle partitions", Reshuffle.viaRandomKey()) - .apply( - "Read from Partitions", - ParDo.of(new ReadFromPartitionFn(getSpannerConfig(), txView)).withSideInputs(txView)); - } - - @VisibleForTesting - static class GeneratePartitionsFn extends DoFn { - - private final SpannerConfig config; - private final PCollectionView txView; - - private transient LocalSpannerAccessor spannerAccessor; - - public GeneratePartitionsFn( - SpannerConfig config, PCollectionView txView) { - this.config = config; - this.txView = txView; - } - - @Setup - public void setup() throws Exception { - spannerAccessor = LocalSpannerAccessor.getOrCreate(config); - } - - @Teardown - public void teardown() throws Exception { - spannerAccessor.close(); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Transaction tx = c.sideInput(txView); - BatchReadOnlyTransaction batchTx = - spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId()); - ReadOperation op = c.element(); - boolean dataBoostEnabled = - config.getDataBoostEnabled() != null - && Boolean.TRUE.equals(config.getDataBoostEnabled().get()); - - List partitions; - if (op.getQuery() != null) { - // Query was selected. - partitions = - batchTx.partitionQuery( - op.getPartitionOptions(), - op.getQuery(), - Options.priority(config.getRpcPriority().get()), - Options.dataBoostEnabled(dataBoostEnabled)); - } else if (op.getIndex() != null) { - // Read with index was selected. - partitions = - batchTx.partitionReadUsingIndex( - op.getPartitionOptions(), - op.getTable(), - op.getIndex(), - op.getKeySet(), - op.getColumns(), - Options.priority(config.getRpcPriority().get()), - Options.dataBoostEnabled(dataBoostEnabled)); - } else { - // Read from table was selected. - partitions = - batchTx.partitionRead( - op.getPartitionOptions(), - op.getTable(), - op.getKeySet(), - op.getColumns(), - Options.priority(config.getRpcPriority().get()), - Options.dataBoostEnabled(dataBoostEnabled)); - } - - for (Partition p : partitions) { - c.output(p); - } - } - } - - private static class ReadFromPartitionFn extends DoFn { - - private final SpannerConfig config; - private final PCollectionView txView; - - private transient LocalSpannerAccessor spannerAccessor; - private transient String projectId; - private transient ServiceCallMetric serviceCallMetric; - - public ReadFromPartitionFn( - SpannerConfig config, PCollectionView txView) { - this.config = config; - this.txView = txView; - } - - @Setup - public void setup() throws Exception { - spannerAccessor = LocalSpannerAccessor.getOrCreate(config); - projectId = - this.config.getProjectId() == null - || this.config.getProjectId().get() == null - || this.config.getProjectId().get().isEmpty() - ? SpannerOptions.getDefaultProjectId() - : this.config.getProjectId().get(); - } - - @Teardown - public void teardown() throws Exception { - spannerAccessor.close(); - } - - @StartBundle - public void startBundle() throws Exception { - serviceCallMetric = - createServiceCallMetric( - projectId, - this.config.getInstanceId().get(), - this.config.getDatabaseId().get(), - this.config.getInstanceId().get()); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Transaction tx = c.sideInput(txView); - - BatchReadOnlyTransaction batchTx = - spannerAccessor.getBatchClient().batchReadOnlyTransaction(tx.transactionId()); - - Partition p = c.element(); - try (ResultSet resultSet = batchTx.execute(p)) { - while (resultSet.next()) { - Struct s = resultSet.getCurrentRowAsStruct(); - c.output(s); - } - } catch (SpannerException e) { - serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString()); - throw (e); - } - serviceCallMetric.call("ok"); - } - - private ServiceCallMetric createServiceCallMetric( - String projectId, String instanceId, String databaseId, String tableId) { - HashMap baseLabels = new HashMap<>(); - baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); - baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner"); - baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "Read"); - baseLabels.put( - MonitoringInfoConstants.Labels.RESOURCE, - GcpResourceIdentifiers.spannerTable(projectId, instanceId, databaseId, tableId)); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId); - ServiceCallMetric serviceCallMetric = - new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); - return serviceCallMetric; - } - } -} diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalCreateTransactionFn.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalCreateTransactionFn.java deleted file mode 100644 index acd6df5744..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalCreateTransactionFn.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.spanner.BatchReadOnlyTransaction; -import com.google.cloud.spanner.TimestampBound; -import org.apache.beam.sdk.transforms.DoFn; - -/** Creates a batch transaction. */ -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -class LocalCreateTransactionFn extends DoFn { - private final SpannerConfig config; - private final TimestampBound timestampBound; - - LocalCreateTransactionFn(SpannerConfig config, TimestampBound timestampBound) { - this.config = config; - this.timestampBound = timestampBound; - } - - private transient LocalSpannerAccessor spannerAccessor; - - @DoFn.Setup - public void setup() throws Exception { - spannerAccessor = LocalSpannerAccessor.getOrCreate(config); - } - - @Teardown - public void teardown() throws Exception { - spannerAccessor.close(); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - BatchReadOnlyTransaction tx = - spannerAccessor.getBatchClient().batchReadOnlyTransaction(timestampBound); - c.output(Transaction.create(tx.getBatchTransactionId())); - } -} diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalReadSpannerSchema.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalReadSpannerSchema.java deleted file mode 100644 index 41cae3f959..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalReadSpannerSchema.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.Dialect; -import com.google.cloud.spanner.ReadOnlyTransaction; -import com.google.cloud.spanner.ResultSet; -import com.google.cloud.spanner.Statement; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * This {@link DoFn} reads Cloud Spanner 'information_schema.*' tables to build the {@link - * SpannerSchema}. - */ -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -class LocalReadSpannerSchema extends DoFn { - - private final SpannerConfig config; - - private final PCollectionView dialectView; - - private transient LocalSpannerAccessor spannerAccessor; - - public LocalReadSpannerSchema(SpannerConfig config, PCollectionView dialectView) { - this.config = config; - this.dialectView = dialectView; - } - - @Setup - public void setup() throws Exception { - spannerAccessor = LocalSpannerAccessor.getOrCreate(config); - } - - @Teardown - public void teardown() throws Exception { - spannerAccessor.close(); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Dialect dialect = c.sideInput(dialectView); - SpannerSchema.Builder builder = SpannerSchema.builder(dialect); - DatabaseClient databaseClient = spannerAccessor.getDatabaseClient(); - try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) { - ResultSet resultSet = readTableInfo(tx, dialect); - - while (resultSet.next()) { - String tableName = resultSet.getString(0); - String columnName = resultSet.getString(1); - String type = resultSet.getString(2); - long cellsMutated = resultSet.getLong(3); - - builder.addColumn(tableName, columnName, type, cellsMutated); - } - - resultSet = readPrimaryKeyInfo(tx, dialect); - while (resultSet.next()) { - String tableName = resultSet.getString(0); - String columnName = resultSet.getString(1); - String ordering = resultSet.getString(2); - - builder.addKeyPart(tableName, columnName, "DESC".equalsIgnoreCase(ordering)); - } - } - c.output(builder.build()); - } - - private ResultSet readTableInfo(ReadOnlyTransaction tx, Dialect dialect) { - // retrieve schema information for all tables, as well as aggregating the - // number of indexes that cover each column. this will be used to estimate - // the number of cells (table column plus indexes) mutated in an upsert operation - // in order to stay below the 20k threshold - String statement = ""; - switch (dialect) { - case GOOGLE_STANDARD_SQL: - statement = - "SELECT" - + " c.table_name" - + " , c.column_name" - + " , c.spanner_type" - + " , (1 + COALESCE(t.indices, 0)) AS cells_mutated" - + " FROM (" - + " SELECT c.table_name, c.column_name, c.spanner_type, c.ordinal_position" - + " FROM information_schema.columns as c" - + " WHERE c.table_catalog = '' AND c.table_schema = '') AS c" - + " LEFT OUTER JOIN (" - + " SELECT t.table_name, t.column_name, COUNT(*) AS indices" - + " FROM information_schema.index_columns AS t " - + " WHERE t.index_name != 'PRIMARY_KEY' AND t.table_catalog = ''" - + " AND t.table_schema = ''" - + " GROUP BY t.table_name, t.column_name) AS t" - + " USING (table_name, column_name)" - + " ORDER BY c.table_name, c.ordinal_position"; - break; - case POSTGRESQL: - statement = - "SELECT" - + " c.table_name" - + " , c.column_name" - + " , c.spanner_type" - + " , (1 + COALESCE(t.indices, 0)) AS cells_mutated" - + " FROM (" - + " SELECT c.table_name, c.column_name, c.spanner_type, c.ordinal_position" - + " FROM information_schema.columns as c" - + " WHERE c.table_schema NOT IN" - + " ('information_schema', 'spanner_sys', 'pg_catalog')) AS c" - + " LEFT OUTER JOIN (" - + " SELECT t.table_name, t.column_name, COUNT(*) AS indices" - + " FROM information_schema.index_columns AS t " - + " WHERE t.index_name != 'PRIMARY_KEY'" - + " AND t.table_schema NOT IN" - + " ('information_schema', 'spanner_sys', 'pg_catalog')" - + " GROUP BY t.table_name, t.column_name) AS t" - + " USING (table_name, column_name)" - + " ORDER BY c.table_name, c.ordinal_position"; - break; - default: - throw new IllegalArgumentException("Unrecognized dialect: " + dialect.name()); - } - return tx.executeQuery(Statement.of(statement)); - } - - private ResultSet readPrimaryKeyInfo(ReadOnlyTransaction tx, Dialect dialect) { - String statement = ""; - switch (dialect) { - case GOOGLE_STANDARD_SQL: - statement = - "SELECT t.table_name, t.column_name, t.column_ordering" - + " FROM information_schema.index_columns AS t " - + " WHERE t.index_name = 'PRIMARY_KEY' AND t.table_catalog = ''" - + " AND t.table_schema = ''" - + " ORDER BY t.table_name, t.ordinal_position"; - break; - case POSTGRESQL: - statement = - "SELECT t.table_name, t.column_name, t.column_ordering" - + " FROM information_schema.index_columns AS t " - + " WHERE t.index_name = 'PRIMARY_KEY'" - + " AND t.table_schema NOT IN ('information_schema', 'spanner_sys', 'pg_catalog')" - + " ORDER BY t.table_name, t.ordinal_position"; - break; - default: - throw new IllegalArgumentException("Unrecognized dialect: " + dialect.name()); - } - return tx.executeQuery(Statement.of(statement)); - } -} diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerAccessor.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerAccessor.java deleted file mode 100644 index b0f93e64c7..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerAccessor.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.api.gax.retrying.RetrySettings; -import com.google.cloud.spanner.BatchClient; -import com.google.cloud.spanner.DatabaseAdminClient; -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.Spanner; -import org.threeten.bp.Duration; - -/** Manages lifecycle of {@link DatabaseClient} and {@link Spanner} instances. */ -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -public class LocalSpannerAccessor implements AutoCloseable { - /* A common user agent token that indicates that this request was originated from - * Apache Beam. Setting the user-agent allows Cloud Spanner to detect that the - * workload is coming from Dataflow and to potentially apply performance optimizations - */ - private final SpannerAccessor originalAccessor; - - private LocalSpannerAccessor(SpannerAccessor originalAccessor) { - this.originalAccessor = originalAccessor; - } - - public static LocalSpannerAccessor getOrCreate(SpannerConfig spannerConfig) { - RetrySettings retrySettings = - RetrySettings.newBuilder() - .setInitialRpcTimeout(Duration.ofHours(2)) - .setMaxRpcTimeout(Duration.ofHours(2)) - .setTotalTimeout(Duration.ofHours(2)) - .setRpcTimeoutMultiplier(1.0) - .setInitialRetryDelay(Duration.ofSeconds(2)) - .setMaxRetryDelay(Duration.ofSeconds(60)) - .setRetryDelayMultiplier(1.5) - .setMaxAttempts(100) - .build(); - spannerConfig = spannerConfig.withExecuteStreamingSqlRetrySettings(retrySettings); - // This property sets the default timeout between 2 response packets in the client library. - System.setProperty("com.google.cloud.spanner.watchdogTimeoutSeconds", "7200"); - return new LocalSpannerAccessor(SpannerAccessor.getOrCreate(spannerConfig)); - } - - public DatabaseClient getDatabaseClient() { - return originalAccessor.getDatabaseClient(); - } - - public BatchClient getBatchClient() { - return originalAccessor.getBatchClient(); - } - - public DatabaseAdminClient getDatabaseAdminClient() { - return originalAccessor.getDatabaseAdminClient(); - } - - @Override - public void close() { - originalAccessor.close(); - } -} diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java deleted file mode 100644 index c6130ba289..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java +++ /dev/null @@ -1,2175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import static java.util.stream.Collectors.toList; -import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_END_AT; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; - -import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.StatusCode.Code; -import com.google.auto.value.AutoValue; -import com.google.cloud.ServiceFactory; -import com.google.cloud.Timestamp; -import com.google.cloud.spanner.AbortedException; -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.DatabaseId; -import com.google.cloud.spanner.Dialect; -import com.google.cloud.spanner.ErrorCode; -import com.google.cloud.spanner.KeySet; -import com.google.cloud.spanner.Mutation; -import com.google.cloud.spanner.Mutation.Op; -import com.google.cloud.spanner.Options; -import com.google.cloud.spanner.Options.RpcPriority; -import com.google.cloud.spanner.PartitionOptions; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerException; -import com.google.cloud.spanner.SpannerOptions; -import com.google.cloud.spanner.Statement; -import com.google.cloud.spanner.Struct; -import com.google.cloud.spanner.TimestampBound; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.OptionalInt; -import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; -import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; -import org.apache.beam.runners.core.metrics.ServiceCallMetric; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.PostProcessingMetricsDoFn; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.SizeEstimator; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Distribution; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.Impulse; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Reshuffle; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.Wait; -import org.apache.beam.sdk.transforms.WithTimestamps; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.BackOff; -import org.apache.beam.sdk.util.FluentBackoff; -import org.apache.beam.sdk.util.Sleeper; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedBytes; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Experimental {@link PTransform Transforms} for reading from and writing to Google Cloud Spanner. - * - *

Reading from Cloud Spanner

- * - *

To read from Cloud Spanner, apply {@link Read} transformation. It will return a {@link - * PCollection} of {@link Struct Structs}, where each element represents an individual row returned - * from the read operation. Both Query and Read APIs are supported. See more information about reading from Cloud Spanner - * - *

To execute a query, specify a {@link Read#withQuery(Statement)} or {@link - * Read#withQuery(String)} during the construction of the transform. - * - *

{@code
- * PCollection rows = p.apply(
- *     LocalSpannerIO.read()
- *         .withInstanceId(instanceId)
- *         .withDatabaseId(dbId)
- *         .withQuery("SELECT id, name, email FROM users"));
- * }
- * - *

To use the Read API, specify a {@link Read#withTable(String) table name} and a {@link - * Read#withColumns(List) list of columns}. - * - *

{@code
- * PCollection rows = p.apply(
- *    LocalSpannerIO.read()
- *        .withInstanceId(instanceId)
- *        .withDatabaseId(dbId)
- *        .withTable("users")
- *        .withColumns("id", "name", "email"));
- * }
- * - *

To optimally read using index, specify the index name using {@link Read#withIndex}. - * - *

The transform is guaranteed to be executed on a consistent snapshot of data, utilizing the - * power of read only transactions. Staleness of data can be controlled using {@link - * Read#withTimestampBound} or {@link Read#withTimestamp(Timestamp)} methods. Read more about transactions in - * Cloud Spanner. - * - *

It is possible to read several {@link PCollection PCollections} within a single transaction. - * Apply {@link LocalSpannerIO#createTransaction()} transform, that lazily creates a transaction. The - * result of this transformation can be passed to read operation using {@link - * Read#withTransaction(PCollectionView)}. - * - *

{@code
- * SpannerConfig spannerConfig = ...
- *
- * PCollectionView tx = p.apply(
- *    LocalSpannerIO.createTransaction()
- *        .withSpannerConfig(spannerConfig)
- *        .withTimestampBound(TimestampBound.strong()));
- *
- * PCollection users = p.apply(
- *    LocalSpannerIO.read()
- *        .withSpannerConfig(spannerConfig)
- *        .withQuery("SELECT name, email FROM users")
- *        .withTransaction(tx));
- *
- * PCollection tweets = p.apply(
- *    LocalSpannerIO.read()
- *        .withSpannerConfig(spannerConfig)
- *        .withQuery("SELECT user, tweet, date FROM tweets")
- *        .withTransaction(tx));
- * }
- * - *

Writing to Cloud Spanner

- * - *

The Cloud Spanner {@link Write} transform writes to Cloud Spanner by executing a collection of - * input row {@link Mutation Mutations}. The mutations are grouped into batches for efficiency. - * - *

To configure the write transform, create an instance using {@link #write()} and then specify - * the destination Cloud Spanner instance ({@link Write#withInstanceId(String)} and destination - * database ({@link Write#withDatabaseId(String)}). For example: - * - *

{@code
- * // Earlier in the pipeline, create a PCollection of Mutations to be written to Cloud Spanner.
- * PCollection mutations = ...;
- * // Write mutations.
- * SpannerWriteResult result = mutations.apply(
- *     "Write", LocalSpannerIO.write().withInstanceId("instance").withDatabaseId("database"));
- * }
- * - *

SpannerWriteResult

- * - *

The {@link SpannerWriteResult SpannerWriteResult} object contains the results of the - * transform, including a {@link PCollection} of MutationGroups that failed to write, and a {@link - * PCollection} that can be used in batch pipelines as a completion signal to {@link Wait - * Wait.OnSignal} to indicate when all input has been written. Note that in streaming pipelines, - * this signal will never be triggered as the input is unbounded and this {@link PCollection} is - * using the {@link GlobalWindow}. - * - *

Batching and Grouping

- * - *

To reduce the number of transactions sent to Spanner, the {@link Mutation Mutations} are - * grouped into batches. The default maximum size of the batch is set to 1MB or 5000 mutated cells, - * or 500 rows (whichever is reached first). To override this use {@link - * Write#withBatchSizeBytes(long) withBatchSizeBytes()}, {@link Write#withMaxNumMutations(long) - * withMaxNumMutations()} or {@link Write#withMaxNumMutations(long) withMaxNumRows()}. Setting - * either to a small value or zero disables batching. - * - *

Note that the maximum - * size of a single transaction is 20,000 mutated cells - including cells in indexes. If you - * have a large number of indexes and are getting exceptions with message: INVALID_ARGUMENT: The - * transaction contains too many mutations you will need to specify a smaller number of {@code - * MaxNumMutations}. - * - *

The batches written are obtained from by grouping enough {@link Mutation Mutations} from the - * Bundle provided by Beam to form several batches. This group of {@link Mutation Mutations} is then - * sorted by table and primary key, and the batches are created from the sorted group. Each batch - * will then have rows for the same table, with keys that are 'close' to each other, thus optimising - * write efficiency by each batch affecting as few table splits as possible performance. - * - *

This grouping factor (number of batches) is controlled by the parameter {@link - * Write#withGroupingFactor(int) withGroupingFactor()}. - * - *

Note that each worker will need enough memory to hold {@code GroupingFactor x - * MaxBatchSizeBytes} Mutations, so if you have a large {@code MaxBatchSize} you may need to reduce - * {@code GroupingFactor} - * - *

While Grouping and Batching increases write efficiency, it dramatically increases the latency - * between when a Mutation is received by the transform, and when it is actually written to the - * database. This is because enough Mutations need to be received to fill the grouped batches. In - * Batch pipelines (bounded sources), this is not normally an issue, but in Streaming (unbounded) - * pipelines, this latency is often seen as unacceptable. - * - *

There are therefore 3 different ways that this transform can be configured: - * - *

    - *
  • With Grouping and Batching.
    - * This is the default for Batch pipelines, where sorted batches of Mutations are created and - * written. This is the most efficient way to ingest large amounts of data, but the highest - * latency before writing - *
  • With Batching but no Grouping
    - * If {@link Write#withGroupingFactor(int) .withGroupingFactor(1)}, is set, grouping is - * disabled. This is the default for Streaming pipelines. Unsorted batches are created and - * written as soon as enough mutations to fill a batch are received. This reflects a - * compromise where a small amount of additional latency enables more efficient writes - *
  • Without any Batching
    - * If {@link Write#withBatchSizeBytes(long) .withBatchSizeBytes(0)} is set, no batching is - * performed and the Mutations are written to the database as soon as they are received. - * ensuring the lowest latency before Mutations are written. - *
- * - *

Monitoring

- * - *

Several counters are provided for monitoring purposes: - * - *

    - *
  • batchable_mutation_groups
    - * Counts the mutations that are batched for writing to Spanner. - *
  • unbatchable_mutation_groups
    - * Counts the mutations that can not be batched and are applied individually - either because - * they are too large to fit into a batch, or they are ranged deletes. - *
  • mutation_group_batches_received, mutation_group_batches_write_success, - * mutation_group_batches_write_failed
    - * Count the number of batches that are processed. If Failure Mode is set to {@link - * FailureMode#REPORT_FAILURES REPORT_FAILURES}, then failed batches will be split up and the - * individual mutation groups retried separately. - *
  • mutation_groups_received, mutation_groups_write_success, - * mutation_groups_write_fail
    - * Count the number of individual MutationGroups that are processed. - *
  • spanner_write_success, spanner_write_fail
    - * The number of writes to Spanner that have occurred. - *
  • spanner_write_retries
    - * The number of times a write is retried after a failure - either due to a timeout, or when - * batches fail and {@link FailureMode#REPORT_FAILURES REPORT_FAILURES} is set so that - * individual Mutation Groups are retried. - *
  • spanner_write_timeouts
    - * The number of timeouts that occur when writing to Spanner. Writes that timed out are - * retried after a backoff. Large numbers of timeouts suggest an overloaded Spanner instance. - *
  • spanner_write_total_latency_ms
    - * The total amount of time spent writing to Spanner, in milliseconds. - *
- * - *

Database Schema Preparation

- * - *

The Write transform reads the database schema on pipeline start to know which columns are used - * as primary keys of the tables and indexes. This is so that the transform knows how to sort the - * grouped Mutations by table name and primary key as described above. - * - *

If the database schema, any additional tables or indexes are created in the same pipeline then - * there will be a race condition, leading to a situation where the schema is read before the table - * is created its primary key will not be known. This will mean that the sorting/batching will not - * be optimal and performance will be reduced (warnings will be logged for rows using unknown - * tables) - * - *

To prevent this race condition, use {@link Write#withSchemaReadySignal(PCollection)} to pass a - * signal {@link PCollection} (for example the output of the transform that creates the table(s)) - * which will be used with {@link Wait.OnSignal} to prevent the schema from being read until it is - * ready. The Write transform will be paused until this signal {@link PCollection} is closed. - * - *

Transactions

- * - *

The transform does not provide same transactional guarantees as Cloud Spanner. In particular, - * - *

    - *
  • Individual Mutations are submitted atomically, but all Mutations are not submitted in the - * same transaction. - *
  • A Mutation is applied at least once; - *
  • If the pipeline was unexpectedly stopped, mutations that were already applied will not get - * rolled back. - *
- * - *

Use {@link MutationGroup MutationGroups} with the {@link WriteGrouped} transform to ensure - * that a small set mutations is bundled together. It is guaranteed that mutations in a {@link - * MutationGroup} are submitted in the same transaction. Note that a MutationGroup must not exceed - * the Spanner transaction limits. - * - *

{@code
- * // Earlier in the pipeline, create a PCollection of MutationGroups to be written to Cloud Spanner.
- * PCollection mutationGroups = ...;
- * // Write mutation groups.
- * SpannerWriteResult result = mutationGroups.apply(
- *     "Write",
- *     LocalSpannerIO.write().withInstanceId("instance").withDatabaseId("database").grouped());
- * }
- * - *

Streaming Support

- * - *

{@link Write} can be used as a streaming sink, however as with batch mode note that the write - * order of individual {@link Mutation}/{@link MutationGroup} objects is not guaranteed. - */ -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -public class LocalSpannerIO { - - private static final Logger LOG = LoggerFactory.getLogger(LocalSpannerIO.class); - - private static final long DEFAULT_BATCH_SIZE_BYTES = 1024L * 1024L; // 1 MB - // Max number of mutations to batch together. - private static final int DEFAULT_MAX_NUM_MUTATIONS = 5000; - // Max number of mutations to batch together. - private static final int DEFAULT_MAX_NUM_ROWS = 500; - // Multiple of mutation size to use to gather and sort mutations - private static final int DEFAULT_GROUPING_FACTOR = 1000; - - /** - * Creates an uninitialized instance of {@link Read}. Before use, the {@link Read} must be - * configured with a {@link Read#withInstanceId} and {@link Read#withDatabaseId} that identify the - * Cloud Spanner database. - */ - public static Read read() { - return new AutoValue_LocalSpannerIO_Read.Builder() - .setSpannerConfig(SpannerConfig.create()) - .setTimestampBound(TimestampBound.strong()) - .setReadOperation(ReadOperation.create()) - .setBatching(true) - .build(); - } - - /** - * A {@link PTransform} that works like {@link #read}, but executes read operations coming from a - * {@link PCollection}. - */ - public static ReadAll readAll() { - return new AutoValue_LocalSpannerIO_ReadAll.Builder() - .setSpannerConfig(SpannerConfig.create()) - .setTimestampBound(TimestampBound.strong()) - .setBatching(true) - .build(); - } - - /** - * Returns a transform that creates a batch transaction. By default, {@link - * TimestampBound#strong()} transaction is created, to override this use {@link - * CreateTransaction#withTimestampBound(TimestampBound)}. - */ - public static CreateTransaction createTransaction() { - return new AutoValue_LocalSpannerIO_CreateTransaction.Builder() - .setSpannerConfig(SpannerConfig.create()) - .setTimestampBound(TimestampBound.strong()) - .build(); - } - - /** - * Creates an uninitialized instance of {@link Write}. Before use, the {@link Write} must be - * configured with a {@link Write#withInstanceId} and {@link Write#withDatabaseId} that identify - * the Cloud Spanner database being written. - */ - public static Write write() { - return new AutoValue_LocalSpannerIO_Write.Builder() - .setSpannerConfig(SpannerConfig.create()) - .setBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES) - .setMaxNumMutations(DEFAULT_MAX_NUM_MUTATIONS) - .setMaxNumRows(DEFAULT_MAX_NUM_ROWS) - .setFailureMode(FailureMode.FAIL_FAST) - .build(); - } - - /** - * Creates an uninitialized instance of {@link ReadChangeStream}. Before use, the {@link - * ReadChangeStream} must be configured with a {@link ReadChangeStream#withProjectId}, {@link - * ReadChangeStream#withInstanceId}, and {@link ReadChangeStream#withDatabaseId} that identify the - * Cloud Spanner database being written. It must also be configured with the start time and the - * change stream name. - */ - public static ReadChangeStream readChangeStream() { - return new AutoValue_LocalSpannerIO_ReadChangeStream.Builder() - .setSpannerConfig(SpannerConfig.create()) - .setChangeStreamName(DEFAULT_CHANGE_STREAM_NAME) - .setRpcPriority(DEFAULT_RPC_PRIORITY) - .setInclusiveStartAt(DEFAULT_INCLUSIVE_START_AT) - .setInclusiveEndAt(DEFAULT_INCLUSIVE_END_AT) - .build(); - } - - /** Implementation of {@link #readAll}. */ - @AutoValue - public abstract static class ReadAll - extends PTransform, PCollection> { - - abstract SpannerConfig getSpannerConfig(); - - abstract @Nullable PCollectionView getTransaction(); - - abstract @Nullable TimestampBound getTimestampBound(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - - abstract Builder setTransaction(PCollectionView transaction); - - abstract Builder setTimestampBound(TimestampBound timestampBound); - - abstract Builder setBatching(Boolean batching); - - abstract ReadAll build(); - } - - /** Specifies the Cloud Spanner configuration. */ - public ReadAll withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - - /** Specifies the Cloud Spanner project. */ - public ReadAll withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); - } - - /** Specifies the Cloud Spanner project. */ - public ReadAll withProjectId(ValueProvider projectId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withProjectId(projectId)); - } - - /** Specifies the Cloud Spanner instance. */ - public ReadAll withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); - } - - /** Specifies the Cloud Spanner instance. */ - public ReadAll withInstanceId(ValueProvider instanceId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withInstanceId(instanceId)); - } - - /** Specifies the Cloud Spanner database. */ - public ReadAll withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); - } - - /** Specifies the Cloud Spanner host. */ - public ReadAll withHost(ValueProvider host) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withHost(host)); - } - - public ReadAll withHost(String host) { - return withHost(ValueProvider.StaticValueProvider.of(host)); - } - - /** Specifies the Cloud Spanner emulator host. */ - public ReadAll withEmulatorHost(ValueProvider emulatorHost) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withEmulatorHost(emulatorHost)); - } - - public ReadAll withEmulatorHost(String emulatorHost) { - return withEmulatorHost(ValueProvider.StaticValueProvider.of(emulatorHost)); - } - - /** Specifies the Cloud Spanner database. */ - public ReadAll withDatabaseId(ValueProvider databaseId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withDatabaseId(databaseId)); - } - - @VisibleForTesting - ReadAll withServiceFactory(ServiceFactory serviceFactory) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withServiceFactory(serviceFactory)); - } - - public ReadAll withTransaction(PCollectionView transaction) { - return toBuilder().setTransaction(transaction).build(); - } - - public ReadAll withTimestamp(Timestamp timestamp) { - return withTimestampBound(TimestampBound.ofReadTimestamp(timestamp)); - } - - public ReadAll withTimestampBound(TimestampBound timestampBound) { - return toBuilder().setTimestampBound(timestampBound).build(); - } - - /** - * By default Batch API is used to read data from Cloud Spanner. It is useful to disable - * batching when the underlying query is not root-partitionable. - */ - public ReadAll withBatching(boolean batching) { - return toBuilder().setBatching(batching).build(); - } - - public ReadAll withLowPriority() { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withRpcPriority(RpcPriority.LOW)); - } - - public ReadAll withHighPriority() { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withRpcPriority(RpcPriority.HIGH)); - } - - abstract Boolean getBatching(); - - @Override - public PCollection expand(PCollection input) { - PTransform, PCollection> readTransform; - if (getBatching()) { - readTransform = - LocalBatchSpannerRead.create(getSpannerConfig(), getTransaction(), getTimestampBound()); - } else { - readTransform = - NaiveSpannerRead.create(getSpannerConfig(), getTransaction(), getTimestampBound()); - } - return input - .apply("Reshuffle", Reshuffle.viaRandomKey()) - .apply("Read from Cloud Spanner", readTransform); - } - } - - /** Implementation of {@link #read}. */ - @AutoValue - public abstract static class Read extends PTransform> { - - abstract SpannerConfig getSpannerConfig(); - - abstract ReadOperation getReadOperation(); - - abstract @Nullable TimestampBound getTimestampBound(); - - abstract @Nullable PCollectionView getTransaction(); - - abstract @Nullable PartitionOptions getPartitionOptions(); - - abstract Boolean getBatching(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - - abstract Builder setReadOperation(ReadOperation readOperation); - - abstract Builder setTimestampBound(TimestampBound timestampBound); - - abstract Builder setTransaction(PCollectionView transaction); - - abstract Builder setPartitionOptions(PartitionOptions partitionOptions); - - abstract Builder setBatching(Boolean batching); - - abstract Read build(); - } - - /** Specifies the Cloud Spanner configuration. */ - public Read withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - - /** Specifies the Cloud Spanner project. */ - public Read withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); - } - - /** Specifies the Cloud Spanner project. */ - public Read withProjectId(ValueProvider projectId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withProjectId(projectId)); - } - - /** Specifies the Cloud Spanner instance. */ - public Read withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); - } - - /** Specifies the Cloud Spanner instance. */ - public Read withInstanceId(ValueProvider instanceId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withInstanceId(instanceId)); - } - - /** Specifies the Cloud Spanner database. */ - public Read withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); - } - - /** Specifies the Cloud Spanner database. */ - public Read withDatabaseId(ValueProvider databaseId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withDatabaseId(databaseId)); - } - - /** Specifies the Cloud Spanner host. */ - public Read withHost(ValueProvider host) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withHost(host)); - } - - public Read withHost(String host) { - return withHost(ValueProvider.StaticValueProvider.of(host)); - } - - /** Specifies the Cloud Spanner emulator host. */ - public Read withEmulatorHost(ValueProvider emulatorHost) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withEmulatorHost(emulatorHost)); - } - - public Read withEmulatorHost(String emulatorHost) { - return withEmulatorHost(ValueProvider.StaticValueProvider.of(emulatorHost)); - } - - /** If true the uses Cloud Spanner batch API. */ - public Read withBatching(boolean batching) { - return toBuilder().setBatching(batching).build(); - } - - @VisibleForTesting - Read withServiceFactory(ServiceFactory serviceFactory) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withServiceFactory(serviceFactory)); - } - - public Read withTransaction(PCollectionView transaction) { - return toBuilder().setTransaction(transaction).build(); - } - - public Read withTimestamp(Timestamp timestamp) { - return withTimestampBound(TimestampBound.ofReadTimestamp(timestamp)); - } - - public Read withTimestampBound(TimestampBound timestampBound) { - return toBuilder().setTimestampBound(timestampBound).build(); - } - - public Read withTable(String table) { - return withReadOperation(getReadOperation().withTable(table)); - } - - public Read withReadOperation(ReadOperation operation) { - return toBuilder().setReadOperation(operation).build(); - } - - public Read withColumns(String... columns) { - return withColumns(Arrays.asList(columns)); - } - - public Read withColumns(List columns) { - return withReadOperation(getReadOperation().withColumns(columns)); - } - - public Read withQuery(Statement statement) { - return withReadOperation(getReadOperation().withQuery(statement)); - } - - public Read withQuery(String sql) { - return withQuery(Statement.of(sql)); - } - - public Read withQueryName(String queryName) { - return withReadOperation(getReadOperation().withQueryName(queryName)); - } - - public Read withKeySet(KeySet keySet) { - return withReadOperation(getReadOperation().withKeySet(keySet)); - } - - public Read withIndex(String index) { - return withReadOperation(getReadOperation().withIndex(index)); - } - - public Read withPartitionOptions(PartitionOptions partitionOptions) { - return withReadOperation(getReadOperation().withPartitionOptions(partitionOptions)); - } - - public Read withLowPriority() { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withRpcPriority(RpcPriority.LOW)); - } - - public Read withHighPriority() { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withRpcPriority(RpcPriority.HIGH)); - } - - @Override - public PCollection expand(PBegin input) { - getSpannerConfig().validate(); - checkArgument( - getTimestampBound() != null, - "LocalSpannerIO.read() runs in a read only transaction and requires timestamp to be set " - + "with withTimestampBound or withTimestamp method"); - - if (getReadOperation().getQuery() != null) { - // TODO: validate query? - } else if (getReadOperation().getTable() != null) { - // Assume read - checkNotNull( - getReadOperation().getColumns(), - "For a read operation LocalSpannerIO.read() requires a list of " - + "columns to set with withColumns method"); - checkArgument( - !getReadOperation().getColumns().isEmpty(), - "For a read operation LocalSpannerIO.read() requires a non-empty" - + " list of columns to set with withColumns method"); - } else { - throw new IllegalArgumentException( - "LocalSpannerIO.read() requires configuring query or read operation."); - } - - ReadAll readAll = - readAll() - .withSpannerConfig(getSpannerConfig()) - .withTimestampBound(getTimestampBound()) - .withBatching(getBatching()) - .withTransaction(getTransaction()); - return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll); - } - - SerializableFunction getFormatFn() { - return input -> - Row.withSchema(Schema.builder().addInt64Field("Key").build()) - .withFieldValue("Key", 3L) - .build(); - } - } - - /** Implementation of Read Rows Transform. */ - static class ReadRows extends PTransform> { - - Read read; - Schema schema; - - public ReadRows(Read read, Schema schema) { - super("Read rows"); - this.read = read; - this.schema = schema; - } - - @Override - public PCollection expand(PBegin input) { - return input - .apply(read) - .apply( - MapElements.into(TypeDescriptor.of(Row.class)) - .via( - (SerializableFunction) - struct -> StructUtils.structToBeamRow(struct, schema))) - .setRowSchema(schema); - } - } - - /** - * A {@link PTransform} that create a transaction. If applied to a {@link PCollection}, it will - * create a transaction after the {@link PCollection} is closed. - * - * @see LocalSpannerIO - * @see Wait - */ - @AutoValue - public abstract static class CreateTransaction - extends PTransform> { - - abstract SpannerConfig getSpannerConfig(); - - abstract @Nullable TimestampBound getTimestampBound(); - - abstract Builder toBuilder(); - - @Override - public PCollectionView expand(PInput input) { - getSpannerConfig().validate(); - - PCollection collection = input.getPipeline().apply(Create.of(1)); - - if (input instanceof PCollection) { - collection = collection.apply(Wait.on((PCollection) input)); - } else if (!(input instanceof PBegin)) { - throw new RuntimeException("input must be PBegin or PCollection"); - } - - return collection - .apply( - "Create transaction", - ParDo.of( - new LocalCreateTransactionFn(this.getSpannerConfig(), this.getTimestampBound()))) - .apply("As PCollectionView", View.asSingleton()); - } - - /** Specifies the Cloud Spanner configuration. */ - public CreateTransaction withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - - /** Specifies the Cloud Spanner project. */ - public CreateTransaction withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); - } - - /** Specifies the Cloud Spanner project. */ - public CreateTransaction withProjectId(ValueProvider projectId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withProjectId(projectId)); - } - - /** Specifies the Cloud Spanner instance. */ - public CreateTransaction withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); - } - - /** Specifies the Cloud Spanner instance. */ - public CreateTransaction withInstanceId(ValueProvider instanceId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withInstanceId(instanceId)); - } - - /** Specifies the Cloud Spanner database. */ - public CreateTransaction withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); - } - - /** Specifies the Cloud Spanner database. */ - public CreateTransaction withDatabaseId(ValueProvider databaseId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withDatabaseId(databaseId)); - } - - /** Specifies the Cloud Spanner host. */ - public CreateTransaction withHost(ValueProvider host) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withHost(host)); - } - - public CreateTransaction withHost(String host) { - return withHost(ValueProvider.StaticValueProvider.of(host)); - } - - /** Specifies the Cloud Spanner emulator host. */ - public CreateTransaction withEmulatorHost(ValueProvider emulatorHost) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withEmulatorHost(emulatorHost)); - } - - public CreateTransaction withEmulatorHost(String emulatorHost) { - return withEmulatorHost(ValueProvider.StaticValueProvider.of(emulatorHost)); - } - - @VisibleForTesting - CreateTransaction withServiceFactory(ServiceFactory serviceFactory) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withServiceFactory(serviceFactory)); - } - - public CreateTransaction withTimestampBound(TimestampBound timestampBound) { - return toBuilder().setTimestampBound(timestampBound).build(); - } - - /** A builder for {@link CreateTransaction}. */ - @AutoValue.Builder - public abstract static class Builder { - - public abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - - public abstract Builder setTimestampBound(TimestampBound newTimestampBound); - - public abstract CreateTransaction build(); - } - } - - /** A failure handling strategy. */ - public enum FailureMode { - /** Invalid write to Spanner will cause the pipeline to fail. A default strategy. */ - FAIL_FAST, - /** Invalid mutations will be returned as part of the result of the write transform. */ - REPORT_FAILURES - } - - /** - * A {@link PTransform} that writes {@link Mutation} objects to Google Cloud Spanner. - * - * @see LocalSpannerIO - */ - @AutoValue - public abstract static class Write extends PTransform, SpannerWriteResult> { - - abstract SpannerConfig getSpannerConfig(); - - abstract long getBatchSizeBytes(); - - abstract long getMaxNumMutations(); - - abstract long getMaxNumRows(); - - abstract FailureMode getFailureMode(); - - abstract @Nullable PCollection getSchemaReadySignal(); - - abstract OptionalInt getGroupingFactor(); - - abstract @Nullable PCollectionView getDialectView(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - - abstract Builder setBatchSizeBytes(long batchSizeBytes); - - abstract Builder setMaxNumMutations(long maxNumMutations); - - abstract Builder setMaxNumRows(long maxNumRows); - - abstract Builder setFailureMode(FailureMode failureMode); - - abstract Builder setSchemaReadySignal(PCollection schemaReadySignal); - - abstract Builder setGroupingFactor(int groupingFactor); - - abstract Builder setDialectView(PCollectionView dialect); - - abstract Write build(); - } - - /** Specifies the Cloud Spanner configuration. */ - public Write withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - - /** Specifies the Cloud Spanner project. */ - public Write withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); - } - - /** Specifies the Cloud Spanner project. */ - public Write withProjectId(ValueProvider projectId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withProjectId(projectId)); - } - - /** Specifies the Cloud Spanner instance. */ - public Write withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); - } - - /** Specifies the Cloud Spanner instance. */ - public Write withInstanceId(ValueProvider instanceId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withInstanceId(instanceId)); - } - - /** Specifies the Cloud Spanner database. */ - public Write withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); - } - - /** Specifies the Cloud Spanner database. */ - public Write withDatabaseId(ValueProvider databaseId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withDatabaseId(databaseId)); - } - - /** Specifies the Cloud Spanner host. */ - public Write withHost(ValueProvider host) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withHost(host)); - } - - /** Specifies the Cloud Spanner host. */ - public Write withHost(String host) { - return withHost(ValueProvider.StaticValueProvider.of(host)); - } - - /** Specifies the Cloud Spanner emulator host. */ - public Write withEmulatorHost(ValueProvider emulatorHost) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withEmulatorHost(emulatorHost)); - } - - public Write withEmulatorHost(String emulatorHost) { - return withEmulatorHost(ValueProvider.StaticValueProvider.of(emulatorHost)); - } - - public Write withDialectView(PCollectionView dialect) { - return toBuilder().setDialectView(dialect).build(); - } - - /** - * Specifies the deadline for the Commit API call. Default is 15 secs. DEADLINE_EXCEEDED errors - * will prompt a backoff/retry until the value of {@link #withMaxCumulativeBackoff(Duration)} is - * reached. DEADLINE_EXCEEDED errors are reported with logging and counters. - */ - public Write withCommitDeadline(Duration commitDeadline) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withCommitDeadline(commitDeadline)); - } - - /** - * Specifies the maximum cumulative backoff time when retrying after DEADLINE_EXCEEDED errors. - * Default is 15 mins. - * - *

If the mutations still have not been written after this time, they are treated as a - * failure, and handled according to the setting of {@link #withFailureMode(FailureMode)}. - */ - public Write withMaxCumulativeBackoff(Duration maxCumulativeBackoff) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withMaxCumulativeBackoff(maxCumulativeBackoff)); - } - - @VisibleForTesting - Write withServiceFactory(ServiceFactory serviceFactory) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withServiceFactory(serviceFactory)); - } - - /** Same transform but can be applied to {@link PCollection} of {@link MutationGroup}. */ - public WriteGrouped grouped() { - return new WriteGrouped(this); - } - - /** - * Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1MB - */ - public Write withBatchSizeBytes(long batchSizeBytes) { - return toBuilder().setBatchSizeBytes(batchSizeBytes).build(); - } - - /** Specifies failure mode. {@link FailureMode#FAIL_FAST} mode is selected by default. */ - public Write withFailureMode(FailureMode failureMode) { - return toBuilder().setFailureMode(failureMode).build(); - } - - /** - * Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value - * is 5000 - */ - public Write withMaxNumMutations(long maxNumMutations) { - return toBuilder().setMaxNumMutations(maxNumMutations).build(); - } - - /** - * Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is - * 1000 - */ - public Write withMaxNumRows(long maxNumRows) { - return toBuilder().setMaxNumRows(maxNumRows).build(); - } - - /** - * Specifies an optional input PCollection that can be used as the signal for {@link - * Wait.OnSignal} to indicate when the database schema is ready to be read. - * - *

To be used when the database schema is created by another section of the pipeline, this - * causes this transform to wait until the {@code signal PCollection} has been closed before - * reading the schema from the database. - * - * @see Wait.OnSignal - */ - public Write withSchemaReadySignal(PCollection signal) { - return toBuilder().setSchemaReadySignal(signal).build(); - } - - /** - * Specifies the multiple of max mutation (in terms of both bytes per batch and cells per batch) - * that is used to select a set of mutations to sort by key for batching. This sort uses local - * memory on the workers, so using large values can cause out of memory errors. Default value is - * 1000. - */ - public Write withGroupingFactor(int groupingFactor) { - return toBuilder().setGroupingFactor(groupingFactor).build(); - } - - public Write withLowPriority() { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withRpcPriority(RpcPriority.LOW)); - } - - public Write withHighPriority() { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withRpcPriority(RpcPriority.HIGH)); - } - - @Override - public SpannerWriteResult expand(PCollection input) { - getSpannerConfig().validate(); - - return input - .apply("To mutation group", ParDo.of(new ToMutationGroupFn())) - .apply("Write mutations to Cloud Spanner", new WriteGrouped(this)); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - populateDisplayDataWithParameters(builder); - } - - private void populateDisplayDataWithParameters(DisplayData.Builder builder) { - getSpannerConfig().populateDisplayData(builder); - builder.add( - DisplayData.item("batchSizeBytes", getBatchSizeBytes()) - .withLabel("Max batch size in bytes")); - builder.add( - DisplayData.item("maxNumMutations", getMaxNumMutations()) - .withLabel("Max number of mutated cells in each batch")); - builder.add( - DisplayData.item("maxNumRows", getMaxNumRows()) - .withLabel("Max number of rows in each batch")); - // Grouping factor default value depends on whether it is a batch or streaming pipeline. - // This function is not aware of that state, so use 'DEFAULT' if unset. - builder.add( - DisplayData.item( - "groupingFactor", - (getGroupingFactor().isPresent() - ? Integer.toString(getGroupingFactor().getAsInt()) - : "DEFAULT")) - .withLabel("Number of batches to sort over")); - } - } - - /** Implementation of Write Rows Transform. */ - static class WriteRows extends PTransform, PDone> { - - private final Write write; - private final Op operation; - private final String table; - - private WriteRows(Write write, Op operation, String table) { - this.write = write; - this.operation = operation; - this.table = table; - } - - public static WriteRows of(Write write, Op operation, String table) { - return new WriteRows(write, operation, table); - } - - @Override - public PDone expand(PCollection input) { - input - .apply( - MapElements.into(TypeDescriptor.of(Mutation.class)) - .via(MutationUtils.beamRowToMutationFn(operation, table))) - .apply(write); - return PDone.in(input.getPipeline()); - } - } - - /** Same as {@link Write} but supports grouped mutations. */ - public static class WriteGrouped - extends PTransform, SpannerWriteResult> { - - private final Write spec; - private static final TupleTag BATCHABLE_MUTATIONS_TAG = - new TupleTag("batchableMutations") {}; - private static final TupleTag> UNBATCHABLE_MUTATIONS_TAG = - new TupleTag>("unbatchableMutations") {}; - - private static final TupleTag MAIN_OUT_TAG = new TupleTag("mainOut") {}; - private static final TupleTag FAILED_MUTATIONS_TAG = - new TupleTag("failedMutations") {}; - private static final SerializableCoder CODER = - SerializableCoder.of(MutationGroup.class); - - public WriteGrouped(Write spec) { - this.spec = spec; - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - spec.populateDisplayDataWithParameters(builder); - } - - @Override - public SpannerWriteResult expand(PCollection input) { - PCollection> batches; - PCollectionView dialectView = spec.getDialectView(); - - if (dialectView == null) { - dialectView = - input - .getPipeline() - .apply("CreateSingleton", Create.of(Dialect.GOOGLE_STANDARD_SQL)) - .apply("As PCollectionView", View.asSingleton()); - } - - if (spec.getBatchSizeBytes() <= 1 - || spec.getMaxNumMutations() <= 1 - || spec.getMaxNumRows() <= 1) { - LOG.info("Batching of mutationGroups is disabled"); - TypeDescriptor> descriptor = - new TypeDescriptor>() {}; - batches = input.apply(MapElements.into(descriptor).via(ImmutableList::of)); - } else { - - // First, read the Cloud Spanner schema. - PCollection schemaSeed = - input.getPipeline().apply("Create Seed", Create.of((Void) null)); - if (spec.getSchemaReadySignal() != null) { - // Wait for external signal before reading schema. - schemaSeed = schemaSeed.apply("Wait for schema", Wait.on(spec.getSchemaReadySignal())); - } - final PCollectionView schemaView = - schemaSeed - .apply( - "Read information schema", - ParDo.of(new LocalReadSpannerSchema(spec.getSpannerConfig(), dialectView)) - .withSideInputs(dialectView)) - .apply("Schema View", View.asSingleton()); - - // Split the mutations into batchable and unbatchable mutations. - // Filter out mutation groups too big to be batched. - PCollectionTuple filteredMutations = - input - .apply( - "RewindowIntoGlobal", - Window.into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()) - .apply( - "Filter Unbatchable Mutations", - ParDo.of( - new BatchableMutationFilterFn( - schemaView, - UNBATCHABLE_MUTATIONS_TAG, - spec.getBatchSizeBytes(), - spec.getMaxNumMutations(), - spec.getMaxNumRows())) - .withSideInputs(schemaView) - .withOutputTags( - BATCHABLE_MUTATIONS_TAG, TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG))); - - // Build a set of Mutation groups from the current bundle, - // sort them by table/key then split into batches. - PCollection> batchedMutations = - filteredMutations - .get(BATCHABLE_MUTATIONS_TAG) - .apply( - "Gather Sort And Create Batches", - ParDo.of( - new GatherSortCreateBatchesFn( - spec.getBatchSizeBytes(), - spec.getMaxNumMutations(), - spec.getMaxNumRows(), - // Do not group on streaming unless explicitly set. - spec.getGroupingFactor() - .orElse( - input.isBounded() == IsBounded.BOUNDED - ? DEFAULT_GROUPING_FACTOR - : 1), - schemaView)) - .withSideInputs(schemaView)); - - // Merge the batched and unbatchable mutation PCollections and write to Spanner. - batches = - PCollectionList.of(filteredMutations.get(UNBATCHABLE_MUTATIONS_TAG)) - .and(batchedMutations) - .apply("Merge", Flatten.pCollections()); - } - - PCollectionTuple result = - batches.apply( - "Write batches to Spanner", - ParDo.of( - new WriteToSpannerFn( - spec.getSpannerConfig(), spec.getFailureMode(), FAILED_MUTATIONS_TAG)) - .withOutputTags(MAIN_OUT_TAG, TupleTagList.of(FAILED_MUTATIONS_TAG))); - - return new SpannerWriteResult( - input.getPipeline(), - result.get(MAIN_OUT_TAG), - result.get(FAILED_MUTATIONS_TAG), - FAILED_MUTATIONS_TAG); - } - - @VisibleForTesting - static MutationGroup decode(byte[] bytes) { - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); - try { - return CODER.decode(bis); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @VisibleForTesting - static byte[] encode(MutationGroup g) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - CODER.encode(g, bos); - } catch (IOException e) { - throw new RuntimeException(e); - } - return bos.toByteArray(); - } - } - - /** Implementation of Read Change Stream Transform. */ - @AutoValue - public abstract static class ReadChangeStream - extends PTransform> { - - abstract SpannerConfig getSpannerConfig(); - - abstract String getChangeStreamName(); - - abstract @Nullable String getMetadataInstance(); - - abstract @Nullable String getMetadataDatabase(); - - abstract @Nullable String getMetadataTable(); - - abstract Timestamp getInclusiveStartAt(); - - abstract @Nullable Timestamp getInclusiveEndAt(); - - abstract @Nullable RpcPriority getRpcPriority(); - - /** @deprecated This configuration has no effect, as tracing is not available */ - @Deprecated - abstract @Nullable Double getTraceSampleProbability(); - - abstract Builder toBuilder(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setSpannerConfig(SpannerConfig spannerConfig); - - abstract Builder setChangeStreamName(String changeStreamName); - - abstract Builder setMetadataInstance(String metadataInstance); - - abstract Builder setMetadataDatabase(String metadataDatabase); - - abstract Builder setMetadataTable(String metadataTable); - - abstract Builder setInclusiveStartAt(Timestamp inclusiveStartAt); - - abstract Builder setInclusiveEndAt(Timestamp inclusiveEndAt); - - abstract Builder setRpcPriority(RpcPriority rpcPriority); - - abstract Builder setTraceSampleProbability(Double probability); - - abstract ReadChangeStream build(); - } - - /** Specifies the Cloud Spanner configuration. */ - public ReadChangeStream withSpannerConfig(SpannerConfig spannerConfig) { - return toBuilder().setSpannerConfig(spannerConfig).build(); - } - - /** Specifies the Cloud Spanner project. */ - public ReadChangeStream withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); - } - - /** Specifies the Cloud Spanner project. */ - public ReadChangeStream withProjectId(ValueProvider projectId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withProjectId(projectId)); - } - - /** Specifies the Cloud Spanner instance. */ - public ReadChangeStream withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); - } - - /** Specifies the Cloud Spanner instance. */ - public ReadChangeStream withInstanceId(ValueProvider instanceId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withInstanceId(instanceId)); - } - - /** Specifies the Cloud Spanner database. */ - public ReadChangeStream withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); - } - - /** Specifies the Cloud Spanner database. */ - public ReadChangeStream withDatabaseId(ValueProvider databaseId) { - SpannerConfig config = getSpannerConfig(); - return withSpannerConfig(config.withDatabaseId(databaseId)); - } - - /** Specifies the change stream name. */ - public ReadChangeStream withChangeStreamName(String changeStreamName) { - return toBuilder().setChangeStreamName(changeStreamName).build(); - } - - /** Specifies the metadata database. */ - public ReadChangeStream withMetadataInstance(String metadataInstance) { - return toBuilder().setMetadataInstance(metadataInstance).build(); - } - - /** Specifies the metadata database. */ - public ReadChangeStream withMetadataDatabase(String metadataDatabase) { - return toBuilder().setMetadataDatabase(metadataDatabase).build(); - } - - /** Specifies the metadata table name. */ - public ReadChangeStream withMetadataTable(String metadataTable) { - return toBuilder().setMetadataTable(metadataTable).build(); - } - - /** Specifies the time that the change stream should be read from. */ - public ReadChangeStream withInclusiveStartAt(Timestamp timestamp) { - return toBuilder().setInclusiveStartAt(timestamp).build(); - } - - /** Specifies the end time of the change stream. */ - public ReadChangeStream withInclusiveEndAt(Timestamp timestamp) { - return toBuilder().setInclusiveEndAt(timestamp).build(); - } - - /** Specifies the priority of the change stream queries. */ - public ReadChangeStream withRpcPriority(RpcPriority rpcPriority) { - return toBuilder().setRpcPriority(rpcPriority).build(); - } - - /** - * Specifies the sample probability of tracing requests. - * - * @deprecated This configuration has no effect, as tracing is not available. - */ - @Deprecated - public ReadChangeStream withTraceSampleProbability(Double probability) { - return toBuilder().setTraceSampleProbability(probability).build(); - } - - @Override - public PCollection expand(PBegin input) { - checkArgument( - getSpannerConfig() != null, - "LocalSpannerIO.readChangeStream() requires the spanner config to be set."); - checkArgument( - getSpannerConfig().getProjectId() != null, - "LocalSpannerIO.readChangeStream() requires the project ID to be set."); - checkArgument( - getSpannerConfig().getInstanceId() != null, - "LocalSpannerIO.readChangeStream() requires the instance ID to be set."); - checkArgument( - getSpannerConfig().getDatabaseId() != null, - "LocalSpannerIO.readChangeStream() requires the database ID to be set."); - checkArgument( - getChangeStreamName() != null, - "LocalSpannerIO.readChangeStream() requires the name of the change stream to be set."); - checkArgument( - getInclusiveStartAt() != null, - "LocalSpannerIO.readChangeStream() requires the start time to be set."); - // Inclusive end at is defaulted to ChangeStreamsConstants.MAX_INCLUSIVE_END_AT - checkArgument( - getInclusiveEndAt() != null, - "LocalSpannerIO.readChangeStream() requires the end time to be set. If you'd like to" - + " process the stream without an end time, you can omit this parameter."); - if (getMetadataInstance() != null) { - checkArgument( - getMetadataDatabase() != null, - "LocalSpannerIO.readChangeStream() requires the metadata database to be set if metadata" - + " instance is set."); - } - - // Start time must be before end time - if (getInclusiveEndAt() != null - && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimestamp())) { - throw new IllegalArgumentException("Start time cannot be after end time."); - } - - final DatabaseId changeStreamDatabaseId = - DatabaseId.of( - getSpannerConfig().getProjectId().get(), - getSpannerConfig().getInstanceId().get(), - getSpannerConfig().getDatabaseId().get()); - final String partitionMetadataInstanceId = - MoreObjects.firstNonNull( - getMetadataInstance(), changeStreamDatabaseId.getInstanceId().getInstance()); - final String partitionMetadataDatabaseId = - MoreObjects.firstNonNull(getMetadataDatabase(), changeStreamDatabaseId.getDatabase()); - final DatabaseId fullPartitionMetadataDatabaseId = - DatabaseId.of( - getSpannerConfig().getProjectId().get(), - partitionMetadataInstanceId, - partitionMetadataDatabaseId); - final String partitionMetadataTableName = - MoreObjects.firstNonNull( - getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId)); - - SpannerConfig changeStreamSpannerConfig = getSpannerConfig(); - // Set default retryable errors for ReadChangeStream - if (changeStreamSpannerConfig.getRetryableCodes() == null) { - ImmutableSet defaultRetryableCodes = ImmutableSet.of(Code.UNAVAILABLE, Code.ABORTED); - changeStreamSpannerConfig = - changeStreamSpannerConfig.toBuilder().setRetryableCodes(defaultRetryableCodes).build(); - } - // Set default retry timeouts for ReadChangeStream - if (changeStreamSpannerConfig.getExecuteStreamingSqlRetrySettings() == null) { - changeStreamSpannerConfig = - changeStreamSpannerConfig - .toBuilder() - .setExecuteStreamingSqlRetrySettings( - RetrySettings.newBuilder() - .setTotalTimeout(org.threeten.bp.Duration.ofMinutes(5)) - .setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(1)) - .setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(1)) - .build()) - .build(); - } - final SpannerConfig partitionMetadataSpannerConfig = - changeStreamSpannerConfig - .toBuilder() - .setInstanceId(StaticValueProvider.of(partitionMetadataInstanceId)) - .setDatabaseId(StaticValueProvider.of(partitionMetadataDatabaseId)) - .build(); - Dialect changeStreamDatabaseDialect = getDialect(changeStreamSpannerConfig); - Dialect metadataDatabaseDialect = getDialect(partitionMetadataSpannerConfig); - LOG.info( - "The Spanner database " - + changeStreamDatabaseId - + " has dialect " - + changeStreamDatabaseDialect); - LOG.info( - "The Spanner database " - + fullPartitionMetadataDatabaseId - + " has dialect " - + metadataDatabaseDialect); - final String changeStreamName = getChangeStreamName(); - final Timestamp startTimestamp = getInclusiveStartAt(); - // Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the - // interval into a closed-open in the read change stream restriction (prevents overflow) - final Timestamp endTimestamp = - getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0 - ? MAX_INCLUSIVE_END_AT - : getInclusiveEndAt(); - final MapperFactory mapperFactory = new MapperFactory(changeStreamDatabaseDialect); - final ChangeStreamMetrics metrics = new ChangeStreamMetrics(); - final RpcPriority rpcPriority = MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH); - final DaoFactory daoFactory = - new DaoFactory( - changeStreamSpannerConfig, - changeStreamName, - partitionMetadataSpannerConfig, - partitionMetadataTableName, - rpcPriority, - input.getPipeline().getOptions().getJobName(), - changeStreamDatabaseDialect, - metadataDatabaseDialect); - final ActionFactory actionFactory = new ActionFactory(); - - final InitializeDoFn initializeDoFn = - new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp); - final DetectNewPartitionsDoFn detectNewPartitionsDoFn = - new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics); - final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn = - new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics); - final PostProcessingMetricsDoFn postProcessingMetricsDoFn = - new PostProcessingMetricsDoFn(metrics); - - LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName); - input - .getPipeline() - .getOptions() - .as(SpannerChangeStreamOptions.class) - .setMetadataTable(partitionMetadataTableName); - - PCollection impulseOut = input.apply(Impulse.create()); - PCollection results = - impulseOut - .apply("Initialize the connector", ParDo.of(initializeDoFn)) - .apply("Detect new partitions", ParDo.of(detectNewPartitionsDoFn)) - .apply("Read change stream partition", ParDo.of(readChangeStreamPartitionDoFn)) - .apply("Gather metrics", ParDo.of(postProcessingMetricsDoFn)); - final Coder dataChangeRecordCoder = results.getCoder(); - final SizeEstimator dataChangeRecordSizeEstimator = - new SizeEstimator<>(dataChangeRecordCoder); - final BytesThroughputEstimator throughputEstimator = - new BytesThroughputEstimator<>(THROUGHPUT_WINDOW_SECONDS, dataChangeRecordSizeEstimator); - readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator); - - impulseOut - .apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp())) - .apply(Wait.on(results)) - .apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory))); - return results; - } - - private static Dialect getDialect(SpannerConfig spannerConfig) { - DatabaseClient databaseClient = - SpannerAccessor.getOrCreate(spannerConfig).getDatabaseClient(); - return databaseClient.getDialect(); - } - } - - /** - * Interface to display the name of the metadata table on Dataflow UI. This is only used for - * internal purpose. This should not be used to pass the name of the metadata table. - */ - public interface SpannerChangeStreamOptions extends StreamingOptions { - - /** Returns the name of the metadata table. */ - String getMetadataTable(); - - /** Specifies the name of the metadata table. */ - void setMetadataTable(String table); - } - - private static class ToMutationGroupFn extends DoFn { - - @ProcessElement - public void processElement(ProcessContext c) { - Mutation value = c.element(); - c.output(MutationGroup.create(value)); - } - } - - /** - * Gathers a set of mutations together, gets the keys, encodes them to byte[], sorts them and then - * outputs the encoded sorted list. - * - *

Testing notes: With very small amounts of data, each mutation group is in a separate bundle, - * and as batching and sorting is over the bundle, this effectively means that no batching will - * occur, Therefore this DoFn has to be tested in isolation. - */ - @VisibleForTesting - static class GatherSortCreateBatchesFn extends DoFn> { - - private final long maxBatchSizeBytes; - private final long maxBatchNumMutations; - private final long maxBatchNumRows; - private final long maxSortableSizeBytes; - private final long maxSortableNumMutations; - private final long maxSortableNumRows; - private final PCollectionView schemaView; - private final ArrayList mutationsToSort = new ArrayList<>(); - - // total size of MutationGroups in mutationsToSort. - private long sortableSizeBytes = 0; - // total number of mutated cells in mutationsToSort - private long sortableNumCells = 0; - // total number of rows mutated in mutationsToSort - private long sortableNumRows = 0; - - GatherSortCreateBatchesFn( - long maxBatchSizeBytes, - long maxNumMutations, - long maxNumRows, - long groupingFactor, - PCollectionView schemaView) { - this.maxBatchSizeBytes = maxBatchSizeBytes; - this.maxBatchNumMutations = maxNumMutations; - this.maxBatchNumRows = maxNumRows; - - if (groupingFactor <= 0) { - groupingFactor = 1; - } - - this.maxSortableSizeBytes = maxBatchSizeBytes * groupingFactor; - this.maxSortableNumMutations = maxNumMutations * groupingFactor; - this.maxSortableNumRows = maxNumRows * groupingFactor; - this.schemaView = schemaView; - - initSorter(); - } - - private synchronized void initSorter() { - mutationsToSort.clear(); - sortableSizeBytes = 0; - sortableNumCells = 0; - sortableNumRows = 0; - } - - @FinishBundle - public synchronized void finishBundle(FinishBundleContext c) throws Exception { - sortAndOutputBatches(new OutputReceiverForFinishBundle(c)); - } - - private synchronized void sortAndOutputBatches(OutputReceiver> out) - throws IOException { - try { - if (mutationsToSort.isEmpty()) { - // nothing to output. - return; - } - - if (maxSortableNumMutations == maxBatchNumMutations) { - // no grouping is occurring, no need to sort and make batches, just output what we have. - outputBatch(out, 0, mutationsToSort.size()); - return; - } - - // Sort then split the sorted mutations into batches. - mutationsToSort.sort(Comparator.naturalOrder()); - int batchStart = 0; - int batchEnd = 0; - - // total size of the current batch. - long batchSizeBytes = 0; - // total number of mutated cells. - long batchCells = 0; - // total number of rows mutated. - long batchRows = 0; - - // collect and output batches. - while (batchEnd < mutationsToSort.size()) { - MutationGroupContainer mg = mutationsToSort.get(batchEnd); - - if (((batchCells + mg.numCells) > maxBatchNumMutations) - || ((batchSizeBytes + mg.sizeBytes) > maxBatchSizeBytes - || (batchRows + mg.numRows > maxBatchNumRows))) { - // Cannot add new element, current batch is full; output. - outputBatch(out, batchStart, batchEnd); - batchStart = batchEnd; - batchSizeBytes = 0; - batchCells = 0; - batchRows = 0; - } - - batchEnd++; - batchSizeBytes += mg.sizeBytes; - batchCells += mg.numCells; - batchRows += mg.numRows; - } - - if (batchStart < batchEnd) { - // output remaining elements - outputBatch(out, batchStart, mutationsToSort.size()); - } - } finally { - initSorter(); - } - } - - private void outputBatch( - OutputReceiver> out, int batchStart, int batchEnd) { - out.output( - mutationsToSort.subList(batchStart, batchEnd).stream() - .map(o -> o.mutationGroup) - .collect(toList())); - } - - @ProcessElement - public synchronized void processElement( - ProcessContext c, OutputReceiver> out) throws Exception { - SpannerSchema spannerSchema = c.sideInput(schemaView); - MutationKeyEncoder encoder = new MutationKeyEncoder(spannerSchema); - MutationGroup mg = c.element(); - long groupSize = MutationSizeEstimator.sizeOf(mg); - long groupCells = MutationCellCounter.countOf(spannerSchema, mg); - long groupRows = mg.size(); - - synchronized (this) { - if (((sortableNumCells + groupCells) > maxSortableNumMutations) - || (sortableSizeBytes + groupSize) > maxSortableSizeBytes - || (sortableNumRows + groupRows) > maxSortableNumRows) { - sortAndOutputBatches(out); - } - - mutationsToSort.add( - new MutationGroupContainer( - mg, groupSize, groupCells, groupRows, encoder.encodeTableNameAndKey(mg.primary()))); - sortableSizeBytes += groupSize; - sortableNumCells += groupCells; - sortableNumRows += groupRows; - } - } - - // Container class to store a MutationGroup, its sortable encoded key and its statistics. - private static final class MutationGroupContainer - implements Comparable { - - final MutationGroup mutationGroup; - final long sizeBytes; - final long numCells; - final long numRows; - final byte[] encodedKey; - - MutationGroupContainer( - MutationGroup mutationGroup, - long sizeBytes, - long numCells, - long numRows, - byte[] encodedKey) { - this.mutationGroup = mutationGroup; - this.sizeBytes = sizeBytes; - this.numCells = numCells; - this.numRows = numRows; - this.encodedKey = encodedKey; - } - - @Override - public int compareTo(MutationGroupContainer o) { - return UnsignedBytes.lexicographicalComparator().compare(this.encodedKey, o.encodedKey); - } - } - - // TODO(BEAM-1287): Remove this when FinishBundle has added support for an {@link - // OutputReceiver} - private static class OutputReceiverForFinishBundle - implements OutputReceiver> { - - private final FinishBundleContext c; - - OutputReceiverForFinishBundle(FinishBundleContext c) { - this.c = c; - } - - @Override - public void output(Iterable output) { - outputWithTimestamp(output, Instant.now()); - } - - @Override - public void outputWithTimestamp(Iterable output, Instant timestamp) { - c.output(output, timestamp, GlobalWindow.INSTANCE); - } - } - } - - /** - * Filters MutationGroups larger than the batch size to the output tagged with {@code - * UNBATCHABLE_MUTATIONS_TAG}. - * - *

Testing notes: As batching does not occur during full pipline testing, this DoFn must be - * tested in isolation. - */ - @VisibleForTesting - static class BatchableMutationFilterFn extends DoFn { - - private final PCollectionView schemaView; - private final TupleTag> unbatchableMutationsTag; - private final long batchSizeBytes; - private final long maxNumMutations; - private final long maxNumRows; - private final Counter batchableMutationGroupsCounter = - Metrics.counter(WriteGrouped.class, "batchable_mutation_groups"); - private final Counter unBatchableMutationGroupsCounter = - Metrics.counter(WriteGrouped.class, "unbatchable_mutation_groups"); - - BatchableMutationFilterFn( - PCollectionView schemaView, - TupleTag> unbatchableMutationsTag, - long batchSizeBytes, - long maxNumMutations, - long maxNumRows) { - this.schemaView = schemaView; - this.unbatchableMutationsTag = unbatchableMutationsTag; - this.batchSizeBytes = batchSizeBytes; - this.maxNumMutations = maxNumMutations; - this.maxNumRows = maxNumRows; - } - - @ProcessElement - public void processElement(ProcessContext c) { - MutationGroup mg = c.element(); - if (mg.primary().getOperation() == Op.DELETE && !isPointDelete(mg.primary())) { - // Ranged deletes are not batchable. - c.output(unbatchableMutationsTag, Arrays.asList(mg)); - unBatchableMutationGroupsCounter.inc(); - return; - } - - SpannerSchema spannerSchema = c.sideInput(schemaView); - long groupSize = MutationSizeEstimator.sizeOf(mg); - long groupCells = MutationCellCounter.countOf(spannerSchema, mg); - long groupRows = Iterables.size(mg); - - if (groupSize >= batchSizeBytes || groupCells >= maxNumMutations || groupRows >= maxNumRows) { - c.output(unbatchableMutationsTag, Arrays.asList(mg)); - unBatchableMutationGroupsCounter.inc(); - } else { - c.output(mg); - batchableMutationGroupsCounter.inc(); - } - } - } - - @VisibleForTesting - static class WriteToSpannerFn extends DoFn, Void> { - - private final SpannerConfig spannerConfig; - private final FailureMode failureMode; - - // LocalSpannerAccessor can not be serialized so must be initialized at runtime in setup(). - private transient LocalSpannerAccessor spannerAccessor; - - /* Number of times an aborted write to spanner could be retried */ - private static final int ABORTED_RETRY_ATTEMPTS = 5; - /* Error string in Aborted exception during schema change */ - private final String errString = - "Transaction aborted. " - + "Database schema probably changed during transaction, retry may succeed."; - - @VisibleForTesting static Sleeper sleeper = Sleeper.DEFAULT; - - private final Counter mutationGroupBatchesReceived = - Metrics.counter(WriteGrouped.class, "mutation_group_batches_received"); - private final Counter mutationGroupBatchesWriteSuccess = - Metrics.counter(WriteGrouped.class, "mutation_group_batches_write_success"); - private final Counter mutationGroupBatchesWriteFail = - Metrics.counter(WriteGrouped.class, "mutation_group_batches_write_fail"); - - private final Counter mutationGroupsReceived = - Metrics.counter(WriteGrouped.class, "mutation_groups_received"); - private final Counter mutationGroupsWriteSuccess = - Metrics.counter(WriteGrouped.class, "mutation_groups_write_success"); - private final Counter mutationGroupsWriteFail = - Metrics.counter(WriteGrouped.class, "mutation_groups_write_fail"); - - private final Counter spannerWriteSuccess = - Metrics.counter(WriteGrouped.class, "spanner_write_success"); - private final Counter spannerWriteFail = - Metrics.counter(WriteGrouped.class, "spanner_write_fail"); - private final Distribution spannerWriteLatency = - Metrics.distribution(WriteGrouped.class, "spanner_write_latency_ms"); - private final Counter spannerWriteTimeouts = - Metrics.counter(WriteGrouped.class, "spanner_write_timeouts"); - private final Counter spannerWriteRetries = - Metrics.counter(WriteGrouped.class, "spanner_write_retries"); - - private final TupleTag failedTag; - - // Fluent Backoff is not serializable so create at runtime in setup(). - private transient FluentBackoff bundleWriteBackoff; - private transient String projectId; - private transient ServiceCallMetric serviceCallMetric; - - WriteToSpannerFn( - SpannerConfig spannerConfig, FailureMode failureMode, TupleTag failedTag) { - this.spannerConfig = spannerConfig; - this.failureMode = failureMode; - this.failedTag = failedTag; - } - - @Setup - public void setup() { - spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig); - bundleWriteBackoff = - FluentBackoff.DEFAULT - .withMaxCumulativeBackoff(spannerConfig.getMaxCumulativeBackoff().get()) - .withInitialBackoff(spannerConfig.getMaxCumulativeBackoff().get().dividedBy(60)); - - projectId = - this.spannerConfig.getProjectId() == null - || this.spannerConfig.getProjectId().get() == null - || this.spannerConfig.getProjectId().get().isEmpty() - ? SpannerOptions.getDefaultProjectId() - : this.spannerConfig.getProjectId().get(); - } - - @Teardown - public void teardown() { - spannerAccessor.close(); - } - - @StartBundle - public void startBundle() { - serviceCallMetric = - createServiceCallMetric( - projectId, - this.spannerConfig.getInstanceId().get(), - this.spannerConfig.getDatabaseId().get(), - this.spannerConfig.getInstanceId().get(), - "Write"); - } - - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Iterable mutations = c.element(); - - // Batch upsert rows. - try { - mutationGroupBatchesReceived.inc(); - mutationGroupsReceived.inc(Iterables.size(mutations)); - Iterable batch = Iterables.concat(mutations); - writeMutations(batch); - mutationGroupBatchesWriteSuccess.inc(); - mutationGroupsWriteSuccess.inc(Iterables.size(mutations)); - return; - } catch (SpannerException e) { - mutationGroupBatchesWriteFail.inc(); - if (failureMode == FailureMode.REPORT_FAILURES) { - // fall through and retry individual mutationGroups. - } else if (failureMode == FailureMode.FAIL_FAST) { - mutationGroupsWriteFail.inc(Iterables.size(mutations)); - throw e; - } else { - throw new IllegalArgumentException("Unknown failure mode " + failureMode); - } - } - - // If we are here, writing a batch has failed, retry individual mutations. - for (MutationGroup mg : mutations) { - try { - spannerWriteRetries.inc(); - writeMutations(mg); - mutationGroupsWriteSuccess.inc(); - } catch (SpannerException e) { - mutationGroupsWriteFail.inc(); - LOG.warn("Failed to write the mutation group: " + mg, e); - c.output(failedTag, mg); - } - } - } - - /* - Spanner aborts all inflight transactions during a schema change. Client is expected - to retry silently. These must not be counted against retry backoff. - */ - private void spannerWriteWithRetryIfSchemaChange(Iterable batch) - throws SpannerException { - for (int retry = 1; ; retry++) { - try { - if (spannerConfig.getRpcPriority() != null - && spannerConfig.getRpcPriority().get() != null) { - spannerAccessor - .getDatabaseClient() - .writeAtLeastOnceWithOptions( - batch, Options.priority(spannerConfig.getRpcPriority().get())); - } else { - spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch); - } - serviceCallMetric.call("ok"); - return; - } catch (AbortedException e) { - serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString()); - if (retry >= ABORTED_RETRY_ATTEMPTS) { - throw e; - } - if (e.isRetryable() || e.getMessage().contains(errString)) { - continue; - } - throw e; - } catch (SpannerException e) { - serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString()); - throw e; - } - } - } - - private ServiceCallMetric createServiceCallMetric( - String projectId, String instanceId, String databaseId, String tableId, String method) { - HashMap baseLabels = new HashMap<>(); - baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); - baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "Spanner"); - baseLabels.put(MonitoringInfoConstants.Labels.METHOD, method); - baseLabels.put( - MonitoringInfoConstants.Labels.RESOURCE, - GcpResourceIdentifiers.spannerTable(projectId, instanceId, databaseId, tableId)); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId); - baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId); - ServiceCallMetric serviceCallMetric = - new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); - return serviceCallMetric; - } - - /** Write the Mutations to Spanner, handling DEADLINE_EXCEEDED with backoff/retries. */ - private void writeMutations(Iterable mutations) throws SpannerException, IOException { - BackOff backoff = bundleWriteBackoff.backoff(); - long mutationsSize = Iterables.size(mutations); - - while (true) { - Stopwatch timer = Stopwatch.createStarted(); - // loop is broken on success, timeout backoff/retry attempts exceeded, or other failure. - try { - spannerWriteWithRetryIfSchemaChange(mutations); - spannerWriteSuccess.inc(); - return; - } catch (SpannerException exception) { - if (exception.getErrorCode() == ErrorCode.DEADLINE_EXCEEDED) { - spannerWriteTimeouts.inc(); - - // Potentially backoff/retry after DEADLINE_EXCEEDED. - long sleepTimeMsecs = backoff.nextBackOffMillis(); - if (sleepTimeMsecs == BackOff.STOP) { - LOG.error( - "DEADLINE_EXCEEDED writing batch of {} mutations to Cloud Spanner. " - + "Aborting after too many retries.", - mutationsSize); - spannerWriteFail.inc(); - throw exception; - } - LOG.info( - "DEADLINE_EXCEEDED writing batch of {} mutations to Cloud Spanner, " - + "retrying after backoff of {}ms\n" - + "({})", - mutationsSize, - sleepTimeMsecs, - exception.getMessage()); - spannerWriteRetries.inc(); - try { - sleeper.sleep(sleepTimeMsecs); - } catch (InterruptedException e) { - // ignore. - } - } else { - // Some other failure: pass up the stack. - spannerWriteFail.inc(); - throw exception; - } - } finally { - spannerWriteLatency.update(timer.elapsed(TimeUnit.MILLISECONDS)); - } - } - } - } - - private LocalSpannerIO() {} // Prevent construction. -} diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java deleted file mode 100644 index 7ccd4b0ae7..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationSizeEstimator.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.cloud.ByteArray; -import com.google.cloud.Date; -import com.google.cloud.Timestamp; -import com.google.cloud.spanner.Key; -import com.google.cloud.spanner.KeyRange; -import com.google.cloud.spanner.KeySet; -import com.google.cloud.spanner.Mutation; -import com.google.cloud.spanner.Value; -import java.math.BigDecimal; - -/** Estimates the logical size of {@link Mutation}. */ -class MutationSizeEstimator { - - // Prevent construction. - private MutationSizeEstimator() {} - - /** Estimates a size of mutation in bytes. */ - static long sizeOf(Mutation m) { - if (m.getOperation() == Mutation.Op.DELETE) { - return sizeOf(m.getKeySet()); - } - long result = 0; - for (Value v : m.getValues()) { - switch (v.getType().getCode()) { - case ARRAY: - result += estimateArrayValue(v); - break; - case STRUCT: - throw new IllegalArgumentException("Structs are not supported in mutation."); - default: - result += estimatePrimitiveValue(v); - } - } - return result; - } - - private static long sizeOf(KeySet keySet) { - long result = 0; - for (Key k : keySet.getKeys()) { - result += sizeOf(k); - } - for (KeyRange kr : keySet.getRanges()) { - result += sizeOf(kr); - } - return result; - } - - private static long sizeOf(KeyRange kr) { - return sizeOf(kr.getStart()) + sizeOf(kr.getEnd()); - } - - private static long sizeOf(Key k) { - long result = 0; - for (Object part : k.getParts()) { - if (part == null) { - continue; - } - if (part instanceof Boolean) { - result += 1; - } else if (part instanceof Long) { - result += 8; - } else if (part instanceof Double) { - result += 8; - } else if (part instanceof String) { - result += ((String) part).length(); - } else if (part instanceof ByteArray) { - result += ((ByteArray) part).length(); - } else if (part instanceof Timestamp) { - result += 12; - } else if (part instanceof Date) { - result += 12; - } - } - return result; - } - - /** Estimates a size of the mutation group in bytes. */ - public static long sizeOf(MutationGroup group) { - long result = 0; - for (Mutation m : group) { - result += sizeOf(m); - } - return result; - } - - private static long estimatePrimitiveValue(Value v) { - switch (v.getType().getCode()) { - case BOOL: - return 1; - case FLOAT32: - return 4; - case INT64: - case FLOAT64: - return 8; - case DATE: - case TIMESTAMP: - return 12; - case STRING: - case PG_NUMERIC: - return v.isNull() ? 0 : v.getString().length(); - case BYTES: - return v.isNull() ? 0 : v.getBytes().length(); - case NUMERIC: - // see - // https://cloud.google.com/spanner/docs/working-with-numerics#handling_numeric_when_creating_a_client_library_or_driver - // Numeric/BigDecimal are stored in protos as String. It is likely that they - // are also stored in the Spanner database as String, so this gives an approximation for - // mutation value size. - return v.isNull() ? 0 : v.getNumeric().toString().length(); - case JSON: - return v.isNull() ? 0 : v.getJson().length(); - case PG_JSONB: - return v.isNull() ? 0 : v.getPgJsonb().length(); - default: - throw new IllegalArgumentException("Unsupported type " + v.getType()); - } - } - - private static long estimateArrayValue(Value v) { - if (v.isNull()) { - return 0; - } - switch (v.getType().getArrayElementType().getCode()) { - case BOOL: - return v.getBoolArray().size(); - case FLOAT32: - return 4L * v.getFloat32Array().size(); - case INT64: - return 8L * v.getInt64Array().size(); - case FLOAT64: - return 8L * v.getFloat64Array().size(); - case STRING: - case PG_NUMERIC: - long totalLength = 0; - for (String s : v.getStringArray()) { - if (s == null) { - continue; - } - totalLength += s.length(); - } - return totalLength; - case BYTES: - totalLength = 0; - for (ByteArray bytes : v.getBytesArray()) { - if (bytes == null) { - continue; - } - totalLength += bytes.length(); - } - return totalLength; - case DATE: - return 12L * v.getDateArray().size(); - case TIMESTAMP: - return 12L * v.getTimestampArray().size(); - case NUMERIC: - totalLength = 0; - for (BigDecimal n : v.getNumericArray()) { - if (n == null) { - continue; - } - // see - // https://cloud.google.com/spanner/docs/working-with-numerics#handling_numeric_when_creating_a_client_library_or_driver - // Numeric/BigDecimal are stored in protos as String. It is likely that they - // are also stored in the Spanner database as String, so this gives an approximation for - // mutation value size. - totalLength += n.toString().length(); - } - return totalLength; - case JSON: - totalLength = 0; - for (String s : v.getJsonArray()) { - if (s == null) { - continue; - } - totalLength += s.length(); - } - return totalLength; - case PG_JSONB: - totalLength = 0; - for (String s : v.getPgJsonbArray()) { - if (s == null) { - continue; - } - totalLength += s.length(); - } - return totalLength; - default: - throw new IllegalArgumentException("Unsupported type " + v.getType()); - } - } -} diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java deleted file mode 100644 index 0c9c42d8c2..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.auto.value.AutoValue; -import com.google.cloud.spanner.KeySet; -import com.google.cloud.spanner.PartitionOptions; -import com.google.cloud.spanner.Statement; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; -import org.checkerframework.checker.nullness.qual.Nullable; - -/** Encapsulates a spanner read operation. */ -@AutoValue -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -public abstract class ReadOperation implements Serializable { - - public static ReadOperation create() { - return new AutoValue_ReadOperation.Builder() - .setPartitionOptions(PartitionOptions.getDefaultInstance()) - .setKeySet(KeySet.all()) - .build(); - } - - public abstract @Nullable Statement getQuery(); - - public abstract @Nullable String getQueryName(); - - public abstract @Nullable String getTable(); - - public abstract @Nullable String getIndex(); - - public abstract @Nullable List getColumns(); - - public abstract @Nullable KeySet getKeySet(); - - abstract @Nullable PartitionOptions getPartitionOptions(); - - @AutoValue.Builder - abstract static class Builder { - - abstract Builder setQuery(Statement statement); - - abstract Builder setQueryName(String queryName); - - abstract Builder setTable(String table); - - abstract Builder setIndex(String index); - - abstract Builder setColumns(List columns); - - abstract Builder setKeySet(KeySet keySet); - - abstract Builder setPartitionOptions(PartitionOptions partitionOptions); - - abstract ReadOperation build(); - } - - abstract Builder toBuilder(); - - public ReadOperation withTable(String table) { - return toBuilder().setTable(table).build(); - } - - public ReadOperation withColumns(String... columns) { - return withColumns(Arrays.asList(columns)); - } - - public ReadOperation withColumns(List columns) { - return toBuilder().setColumns(columns).build(); - } - - public ReadOperation withQuery(Statement statement) { - return toBuilder().setQuery(statement).build(); - } - - public ReadOperation withQuery(String sql) { - return withQuery(Statement.of(sql)); - } - - public ReadOperation withQueryName(String queryName) { - return toBuilder().setQueryName(queryName).build(); - } - - public ReadOperation withKeySet(KeySet keySet) { - return toBuilder().setKeySet(keySet).build(); - } - - public ReadOperation withIndex(String index) { - return toBuilder().setIndex(index).build(); - } - - public ReadOperation withPartitionOptions(PartitionOptions partitionOptions) { - return toBuilder().setPartitionOptions(partitionOptions).build(); - } -} diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java deleted file mode 100644 index 5b20bb97f8..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ /dev/null @@ -1,323 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; - -import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.StatusCode.Code; -import com.google.auth.Credentials; -import com.google.auto.value.AutoValue; -import com.google.cloud.ServiceFactory; -import com.google.cloud.spanner.Options.RpcPriority; -import com.google.cloud.spanner.Spanner; -import com.google.cloud.spanner.SpannerOptions; -import java.io.Serializable; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; - -/** Configuration for a Cloud Spanner client. */ -@AutoValue -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -public abstract class SpannerConfig implements Serializable { - // Default Project ID. - private static final String DEFAULT_PROJECT_ID = "DATAFLOW_PROJECT"; - // A default host name for batch traffic. - private static final String DEFAULT_HOST = "https://batch-spanner.googleapis.com/"; - // Deadline for Commit API call. - private static final Duration DEFAULT_COMMIT_DEADLINE = Duration.standardSeconds(15); - // Total allowable backoff time. - private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardMinutes(15); - // A default priority for batch traffic. - static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.MEDIUM; - - public abstract @Nullable ValueProvider getProjectId(); - - public abstract @Nullable ValueProvider getInstanceId(); - - public abstract @Nullable ValueProvider getDatabaseId(); - - public abstract @Nullable ValueProvider getHost(); - - public abstract @Nullable ValueProvider getEmulatorHost(); - - public abstract @Nullable ValueProvider getIsLocalChannelProvider(); - - public abstract @Nullable ValueProvider getCommitDeadline(); - - public abstract @Nullable ValueProvider getMaxCumulativeBackoff(); - - public abstract @Nullable RetrySettings getExecuteStreamingSqlRetrySettings(); - - public abstract @Nullable RetrySettings getCommitRetrySettings(); - - public abstract @Nullable ImmutableSet getRetryableCodes(); - - public abstract @Nullable ValueProvider getRpcPriority(); - - public abstract @Nullable ValueProvider getDatabaseRole(); - - public abstract @Nullable ValueProvider getPartitionQueryTimeout(); - - public abstract @Nullable ValueProvider getPartitionReadTimeout(); - - @VisibleForTesting - abstract @Nullable ServiceFactory getServiceFactory(); - - public abstract @Nullable ValueProvider getDataBoostEnabled(); - - public abstract @Nullable ValueProvider getCredentials(); - - abstract Builder toBuilder(); - - public static SpannerConfig create() { - return builder() - .setProjectId(ValueProvider.StaticValueProvider.of(DEFAULT_PROJECT_ID)) - .setHost(ValueProvider.StaticValueProvider.of(DEFAULT_HOST)) - .setCommitDeadline(ValueProvider.StaticValueProvider.of(DEFAULT_COMMIT_DEADLINE)) - .setMaxCumulativeBackoff( - ValueProvider.StaticValueProvider.of(DEFAULT_MAX_CUMULATIVE_BACKOFF)) - .setRpcPriority(ValueProvider.StaticValueProvider.of(DEFAULT_RPC_PRIORITY)) - .build(); - } - - static Builder builder() { - return new AutoValue_SpannerConfig.Builder(); - } - - public void validate() { - checkNotNull( - getInstanceId(), - "SpannerIO.read() requires instance id to be set with withInstanceId method"); - checkNotNull( - getDatabaseId(), - "SpannerIO.read() requires database id to be set with withDatabaseId method"); - } - - public void populateDisplayData(DisplayData.Builder builder) { - builder - .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("Output Project")) - .addIfNotNull(DisplayData.item("instanceId", getInstanceId()).withLabel("Output Instance")) - .addIfNotNull(DisplayData.item("databaseId", getDatabaseId()).withLabel("Output Database")); - - if (getServiceFactory() != null) { - builder.addIfNotNull( - DisplayData.item("serviceFactory", getServiceFactory().getClass().getName()) - .withLabel("Service Factory")); - } - } - - /** Builder for {@link SpannerConfig}. */ - @AutoValue.Builder - public abstract static class Builder { - - abstract Builder setProjectId(ValueProvider projectId); - - abstract Builder setInstanceId(ValueProvider instanceId); - - abstract Builder setDatabaseId(ValueProvider databaseId); - - abstract Builder setHost(ValueProvider host); - - abstract Builder setEmulatorHost(ValueProvider emulatorHost); - - abstract Builder setIsLocalChannelProvider(ValueProvider isLocalChannelProvider); - - abstract Builder setCommitDeadline(ValueProvider commitDeadline); - - abstract Builder setMaxCumulativeBackoff(ValueProvider maxCumulativeBackoff); - - abstract Builder setExecuteStreamingSqlRetrySettings( - RetrySettings executeStreamingSqlRetrySettings); - - abstract Builder setCommitRetrySettings(RetrySettings commitRetrySettings); - - abstract Builder setRetryableCodes(ImmutableSet retryableCodes); - - abstract Builder setServiceFactory(ServiceFactory serviceFactory); - - abstract Builder setRpcPriority(ValueProvider rpcPriority); - - abstract Builder setDatabaseRole(ValueProvider databaseRole); - - abstract Builder setDataBoostEnabled(ValueProvider dataBoostEnabled); - - abstract Builder setPartitionQueryTimeout(ValueProvider partitionQueryTimeout); - - abstract Builder setPartitionReadTimeout(ValueProvider partitionReadTimeout); - - abstract Builder setCredentials(ValueProvider credentials); - - public abstract SpannerConfig build(); - } - - /** Specifies the Cloud Spanner project ID. */ - public SpannerConfig withProjectId(ValueProvider projectId) { - return toBuilder().setProjectId(projectId).build(); - } - - /** Specifies the Cloud Spanner project ID. */ - public SpannerConfig withProjectId(String projectId) { - return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); - } - - /** Specifies the Cloud Spanner instance ID. */ - public SpannerConfig withInstanceId(ValueProvider instanceId) { - checkNotNull(instanceId, "withInstanceId(instanceId) called with null input."); - return toBuilder().setInstanceId(instanceId).build(); - } - - /** Specifies the Cloud Spanner instance ID. */ - public SpannerConfig withInstanceId(String instanceId) { - return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); - } - - /** Specifies the Cloud Spanner database ID. */ - public SpannerConfig withDatabaseId(ValueProvider databaseId) { - checkNotNull(databaseId, "withDatabaseId(databaseId) called with null input."); - return toBuilder().setDatabaseId(databaseId).build(); - } - - /** Specifies the Cloud Spanner database ID. */ - public SpannerConfig withDatabaseId(String databaseId) { - return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId)); - } - - /** Specifies the Cloud Spanner host. */ - public SpannerConfig withHost(ValueProvider host) { - checkNotNull(host, "withHost(host) called with null input."); - return toBuilder().setHost(host).build(); - } - - /** Specifies the Cloud Spanner host, when an emulator is used. */ - public SpannerConfig withEmulatorHost(ValueProvider emulatorHost) { - return toBuilder().setEmulatorHost(emulatorHost).build(); - } - - /** - * Specifies whether a local channel provider should be used. This should be set to True when an - * emulator is used. - */ - public SpannerConfig withIsLocalChannelProvider(ValueProvider isLocalChannelProvider) { - return toBuilder().setIsLocalChannelProvider(isLocalChannelProvider).build(); - } - - /** Specifies the commit deadline. This is overridden if the CommitRetrySettings is specified. */ - public SpannerConfig withCommitDeadline(Duration commitDeadline) { - return withCommitDeadline(ValueProvider.StaticValueProvider.of(commitDeadline)); - } - - /** Specifies the commit deadline. This is overridden if the CommitRetrySettings is specified. */ - public SpannerConfig withCommitDeadline(ValueProvider commitDeadline) { - return toBuilder().setCommitDeadline(commitDeadline).build(); - } - - /** Specifies the maximum cumulative backoff. */ - public SpannerConfig withMaxCumulativeBackoff(Duration maxCumulativeBackoff) { - return withMaxCumulativeBackoff(ValueProvider.StaticValueProvider.of(maxCumulativeBackoff)); - } - - /** Specifies the maximum cumulative backoff. */ - public SpannerConfig withMaxCumulativeBackoff(ValueProvider maxCumulativeBackoff) { - return toBuilder().setMaxCumulativeBackoff(maxCumulativeBackoff).build(); - } - - /** - * Specifies the ExecuteStreamingSql retry settings. If not set, the default timeout is set to 2 - * hours. - */ - public SpannerConfig withExecuteStreamingSqlRetrySettings( - RetrySettings executeStreamingSqlRetrySettings) { - return toBuilder() - .setExecuteStreamingSqlRetrySettings(executeStreamingSqlRetrySettings) - .build(); - } - - /** Specifies the commit retry settings. Setting this overrides the commit deadline. */ - public SpannerConfig withCommitRetrySettings(RetrySettings commitRetrySettings) { - return toBuilder().setCommitRetrySettings(commitRetrySettings).build(); - } - - /** Specifies the errors that will be retried by the client library for all operations. */ - public SpannerConfig withRetryableCodes(ImmutableSet retryableCodes) { - return toBuilder().setRetryableCodes(retryableCodes).build(); - } - - /** Specifies the service factory to create instance of Spanner. */ - @VisibleForTesting - SpannerConfig withServiceFactory(ServiceFactory serviceFactory) { - return toBuilder().setServiceFactory(serviceFactory).build(); - } - - /** Specifies the RPC priority. */ - public SpannerConfig withRpcPriority(RpcPriority rpcPriority) { - return withRpcPriority(ValueProvider.StaticValueProvider.of(rpcPriority)); - } - - /** Specifies the RPC priority. */ - public SpannerConfig withRpcPriority(ValueProvider rpcPriority) { - checkNotNull(rpcPriority, "withRpcPriority(rpcPriority) called with null input."); - return toBuilder().setRpcPriority(rpcPriority).build(); - } - - /** Specifies the Cloud Spanner database role. */ - public SpannerConfig withDatabaseRole(ValueProvider databaseRole) { - return toBuilder().setDatabaseRole(databaseRole).build(); - } - - /** Specifies if the pipeline has to be run on the independent compute resource. */ - public SpannerConfig withDataBoostEnabled(ValueProvider dataBoostEnabled) { - return toBuilder().setDataBoostEnabled(dataBoostEnabled).build(); - } - - /** Specifies the PartitionQuery timeout. */ - public SpannerConfig withPartitionQueryTimeout(Duration partitionQueryTimeout) { - return withPartitionQueryTimeout(ValueProvider.StaticValueProvider.of(partitionQueryTimeout)); - } - - /** Specifies the PartitionQuery timeout. */ - public SpannerConfig withPartitionQueryTimeout(ValueProvider partitionQueryTimeout) { - return toBuilder().setPartitionQueryTimeout(partitionQueryTimeout).build(); - } - - /** Specifies the PartitionRead timeout. */ - public SpannerConfig withPartitionReadTimeout(Duration partitionReadTimeout) { - return withPartitionReadTimeout(ValueProvider.StaticValueProvider.of(partitionReadTimeout)); - } - - /** Specifies the PartitionRead timeout. */ - public SpannerConfig withPartitionReadTimeout(ValueProvider partitionReadTimeout) { - return toBuilder().setPartitionReadTimeout(partitionReadTimeout).build(); - } - - /** Specifies the credentials. */ - public SpannerConfig withCredentials(Credentials credentials) { - return withCredentials(ValueProvider.StaticValueProvider.of(credentials)); - } - - /** Specifies the credentials. */ - public SpannerConfig withCredentials(ValueProvider credentials) { - return toBuilder().setCredentials(credentials).build(); - } -} diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java deleted file mode 100644 index 346605a3fe..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import com.google.auto.value.AutoValue; -import com.google.cloud.spanner.Dialect; -import com.google.cloud.spanner.Type; -import java.io.Serializable; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableListMultimap; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableTable; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; - -/** Encapsulates Cloud Spanner Schema. */ -@AutoValue -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -abstract class SpannerSchema implements Serializable { - abstract ImmutableList tables(); - - abstract Dialect dialect(); - - abstract ImmutableListMultimap columns(); - - abstract ImmutableListMultimap keyParts(); - - abstract ImmutableTable cellsMutatedPerColumn(); - - abstract ImmutableMap cellsMutatedPerRow(); - - public static Builder builder() { - return builder(Dialect.GOOGLE_STANDARD_SQL); - } - - public static Builder builder(Dialect dialect) { - return new AutoValue_SpannerSchema.Builder().setDialect(dialect); - } - - /** Builder for {@link SpannerSchema}. */ - @AutoValue.Builder - abstract static class Builder { - abstract Builder setTables(ImmutableList tablesBuilder); - - abstract Builder setDialect(Dialect dialect); - - abstract Dialect dialect(); - - abstract ImmutableListMultimap.Builder columnsBuilder(); - - abstract ImmutableListMultimap.Builder keyPartsBuilder(); - - abstract ImmutableTable.Builder cellsMutatedPerColumnBuilder(); - - abstract ImmutableMap.Builder cellsMutatedPerRowBuilder(); - - abstract ImmutableListMultimap columns(); - - abstract ImmutableTable cellsMutatedPerColumn(); - - @VisibleForTesting - public Builder addColumn(String table, String name, String type) { - return addColumn(table, name, type, 1L); - } - - public Builder addColumn(String table, String name, String type, long cellsMutated) { - String tableLower = table.toLowerCase(); - String nameLower = name.toLowerCase(); - - columnsBuilder().put(tableLower, Column.create(nameLower, type, dialect())); - cellsMutatedPerColumnBuilder().put(tableLower, nameLower, cellsMutated); - return this; - } - - public Builder addKeyPart(String table, String column, boolean desc) { - keyPartsBuilder().put(table.toLowerCase(), KeyPart.create(column.toLowerCase(), desc)); - return this; - } - - abstract SpannerSchema autoBuild(); - - public final SpannerSchema build() { - // precompute the number of cells that are mutated for operations affecting - // an entire row such as a single key delete. - cellsMutatedPerRowBuilder() - .putAll( - Maps.transformValues( - cellsMutatedPerColumn().rowMap(), - entry -> entry.values().stream().mapToLong(Long::longValue).sum())); - - setTables(ImmutableList.copyOf(columns().keySet())); - - return autoBuild(); - } - } - - public List getTables() { - return tables(); - } - - public List getColumns(String table) { - return columns().get(table.toLowerCase()); - } - - public List getKeyParts(String table) { - return keyParts().get(table.toLowerCase()); - } - - /** Return the total number of cells affected when the specified column is mutated. */ - public long getCellsMutatedPerColumn(String table, String column) { - return cellsMutatedPerColumn().row(table.toLowerCase()).getOrDefault(column.toLowerCase(), 1L); - } - - /** Return the total number of cells affected with the given row is deleted. */ - public long getCellsMutatedPerRow(String table) { - return cellsMutatedPerRow().getOrDefault(table.toLowerCase(), 1L); - } - - @AutoValue - abstract static class KeyPart implements Serializable { - static KeyPart create(String field, boolean desc) { - return new AutoValue_SpannerSchema_KeyPart(field, desc); - } - - abstract String getField(); - - abstract boolean isDesc(); - } - - @AutoValue - abstract static class Column implements Serializable { - - private static final Pattern EMBEDDING_VECTOR_PATTERN = - Pattern.compile( - "^ARRAY<([a-zA-Z0-9]+)>\\(vector_length=>\\d+\\)$", Pattern.CASE_INSENSITIVE); - - private static final Pattern PG_EMBEDDING_VECTOR_PATTERN = - Pattern.compile("^(\\D+)\\[\\]\\svector\\slength\\s\\d+$", Pattern.CASE_INSENSITIVE); - - - static Column create(String name, Type type) { - return new AutoValue_SpannerSchema_Column(name, type); - } - - static Column create(String name, String spannerType, Dialect dialect) { - return create(name, parseSpannerType(spannerType, dialect)); - } - - public abstract String getName(); - - public abstract Type getType(); - - private static Type parseSpannerType(String spannerType, Dialect dialect) { - spannerType = spannerType.toUpperCase(); - switch (dialect) { - case GOOGLE_STANDARD_SQL: - if ("BOOL".equals(spannerType)) { - return Type.bool(); - } - if ("INT64".equals(spannerType)) { - return Type.int64(); - } - if ("FLOAT32".equals(spannerType)) { - return Type.float32(); - } - if ("FLOAT64".equals(spannerType)) { - return Type.float64(); - } - if (spannerType.startsWith("STRING")) { - return Type.string(); - } - if (spannerType.startsWith("BYTES")) { - return Type.bytes(); - } - if ("TIMESTAMP".equals(spannerType)) { - return Type.timestamp(); - } - if ("DATE".equals(spannerType)) { - return Type.date(); - } - if ("NUMERIC".equals(spannerType)) { - return Type.numeric(); - } - if ("JSON".equals(spannerType)) { - return Type.json(); - } - if (spannerType.startsWith("ARRAY")) { - // Substring "ARRAY" or substring "ARRAY(vector_length=>yyy)" - - String arrayElementType; - // Handle vector_length annotation - Matcher m = EMBEDDING_VECTOR_PATTERN.matcher(spannerType); - if (m.find()) { - arrayElementType = m.group(1); - } - else { - arrayElementType = spannerType.substring(6, spannerType.length() - 1); - } - Type itemType = parseSpannerType(arrayElementType, dialect); - return Type.array(itemType); - } - throw new IllegalArgumentException("Unknown spanner type " + spannerType); - case POSTGRESQL: - // Handle vector_length annotation - Matcher m = PG_EMBEDDING_VECTOR_PATTERN.matcher(spannerType); - if (m.find()) { - // Substring "xxx[] vector length yyy" - String arrayElementType = m.group(1); - Type itemType = parseSpannerType(arrayElementType, dialect); - return Type.array(itemType); - } - if (spannerType.endsWith("[]")) { - // Substring "xxx[]" - // Must check array type first - String spannerArrayType = spannerType.substring(0, spannerType.length() - 2); - Type itemType = parseSpannerType(spannerArrayType, dialect); - return Type.array(itemType); - } - if ("BOOLEAN".equals(spannerType)) { - return Type.bool(); - } - if ("BIGINT".equals(spannerType)) { - return Type.int64(); - } - if ("REAL".equals(spannerType)) { - return Type.float32(); - } - if ("DOUBLE PRECISION".equals(spannerType)) { - return Type.float64(); - } - if (spannerType.startsWith("CHARACTER VARYING") || "TEXT".equals(spannerType)) { - return Type.string(); - } - if ("BYTEA".equals(spannerType)) { - return Type.bytes(); - } - if ("TIMESTAMP WITH TIME ZONE".equals(spannerType)) { - return Type.timestamp(); - } - if ("DATE".equals(spannerType)) { - return Type.date(); - } - if (spannerType.startsWith("NUMERIC")) { - return Type.pgNumeric(); - } - if (spannerType.startsWith("JSONB")) { - return Type.pgJsonb(); - } - if ("SPANNER.COMMIT_TIMESTAMP".equals(spannerType)) { - return Type.timestamp(); - } - throw new IllegalArgumentException("Unknown spanner type " + spannerType); - default: - throw new IllegalArgumentException("Unrecognized dialect: " + dialect.name()); - } - } - } -} diff --git a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java b/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java deleted file mode 100644 index 576d32fa3a..0000000000 --- a/v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package info for the copy of SpannerIO. - */ -package org.apache.beam.sdk.io.gcp.spanner; diff --git a/v1/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java b/v1/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java deleted file mode 100644 index c3d6fa2fa6..0000000000 --- a/v1/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchemaTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner; - -import static org.junit.Assert.assertEquals; - -import com.google.cloud.spanner.Dialect; -import com.google.cloud.spanner.Type; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** A test of {@link SpannerSchema}. */ -@RunWith(JUnit4.class) -public class SpannerSchemaTest { - - @Test - public void testSingleTable() throws Exception { - SpannerSchema schema = - SpannerSchema.builder() - .addColumn("test", "pk", "STRING(48)") - .addKeyPart("test", "pk", false) - .addColumn("test", "maxKey", "STRING(MAX)") - .addColumn("test", "numericVal", "NUMERIC") - .addColumn("test", "jsonVal", "JSON") - .addColumn("test", "arrayVal", "ARRAY") - .addColumn("test", "embeddingVectorVal", "ARRAY(VECTOR_LENGTH=>128)") - .build(); - - assertEquals(1, schema.getTables().size()); - assertEquals(6, schema.getColumns("test").size()); - assertEquals(1, schema.getKeyParts("test").size()); - assertEquals(Type.json(), schema.getColumns("test").get(3).getType()); - assertEquals(Type.array(Type.float64()), schema.getColumns("test").get(4).getType()); - assertEquals(Type.array(Type.float64()), schema.getColumns("test").get(5).getType()); - } - - @Test - public void testTwoTables() throws Exception { - SpannerSchema schema = - SpannerSchema.builder() - .addColumn("test", "pk", "STRING(48)") - .addKeyPart("test", "pk", false) - .addColumn("test", "maxKey", "STRING(MAX)") - .addColumn("other", "pk", "INT64") - .addKeyPart("other", "pk", true) - .addColumn("other", "maxKey", "STRING(MAX)") - .build(); - - assertEquals(2, schema.getTables().size()); - assertEquals(2, schema.getColumns("test").size()); - assertEquals(1, schema.getKeyParts("test").size()); - - assertEquals(2, schema.getColumns("other").size()); - assertEquals(1, schema.getKeyParts("other").size()); - } - - @Test - public void testSinglePgTable() throws Exception { - SpannerSchema schema = - SpannerSchema.builder(Dialect.POSTGRESQL) - .addColumn("test", "pk", "character varying(48)") - .addKeyPart("test", "pk", false) - .addColumn("test", "maxKey", "character varying") - .addColumn("test", "numericVal", "numeric") - .addColumn("test", "commitTime", "spanner.commit_timestamp") - .addColumn("test", "jsonbCol", "jsonb") - .addColumn("test", "arrayCol", "DOUBLE PRECISION[]") - .addColumn("test", "embeddingVectorCol", "DOUBLE PRECISION[] VECTOR LENGTH 16") - .build(); - - assertEquals(1, schema.getTables().size()); - assertEquals(7, schema.getColumns("test").size()); - assertEquals(1, schema.getKeyParts("test").size()); - assertEquals(Type.timestamp(), schema.getColumns("test").get(3).getType()); - assertEquals(Type.array(Type.float64()), schema.getColumns("test").get(5).getType()); - assertEquals(Type.array(Type.float64()), schema.getColumns("test").get(6).getType()); - } - - @Test - public void testTwoPgTables() throws Exception { - SpannerSchema schema = - SpannerSchema.builder(Dialect.POSTGRESQL) - .addColumn("test", "pk", "character varying(48)") - .addKeyPart("test", "pk", false) - .addColumn("test", "maxKey", "character varying") - .addColumn("test", "jsonbCol", "jsonb") - .addColumn("other", "pk", "bigint") - .addKeyPart("other", "pk", true) - .addColumn("other", "maxKey", "character varying") - .addColumn("other", "commitTime", "spanner.commit_timestamp") - .build(); - - assertEquals(2, schema.getTables().size()); - assertEquals(3, schema.getColumns("test").size()); - assertEquals(1, schema.getKeyParts("test").size()); - - assertEquals(3, schema.getColumns("other").size()); - assertEquals(1, schema.getKeyParts("other").size()); - assertEquals(Type.timestamp(), schema.getColumns("other").get(2).getType()); - } -} \ No newline at end of file