Skip to content

Commit

Permalink
Remove LocalSpannerIO
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn committed Apr 11, 2024
1 parent 1244027 commit 253a4e9
Show file tree
Hide file tree
Showing 20 changed files with 38 additions and 3,813 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -66,11 +66,11 @@ public PCollection<Ddl> expand(PCollection<Ddl> input) {
ParDo.of(
new DoFn<Ddl, Ddl>() {

private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;

@Setup
public void setup() {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 73 in v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java#L73

Added line #L73 was not covered by tests
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
Expand Down Expand Up @@ -182,7 +182,7 @@ public WriteFilesResult<String> expand(PBegin begin) {

/*
* Allow users to specify read timestamp.
* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
* CreateTransaction and CreateTransactionFn classes in SpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
Expand Down Expand Up @@ -403,7 +403,7 @@ public void processElement(ProcessContext c) {
PCollection<Struct> rows =
tables.apply(
"Read all rows from Spanner",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));

Check warning on line 406 in v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java#L406

Added line #L406 was not covered by tests

ValueProvider<ResourceId> resource =
ValueProvider.NestedValueProvider.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -166,7 +166,7 @@ public void processElement(ProcessContext c) {
schemas.apply("Build avro DDL", Combine.globally(AsList.fn()));

PCollectionView<Transaction> tx =
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));

Check warning on line 169 in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java#L169

Added line #L169 was not covered by tests

PCollection<Ddl> informationSchemaDdl =
begin.apply(
Expand Down Expand Up @@ -266,7 +266,7 @@ public void processElement(ProcessContext c) {
SpannerWriteResult result =
mutations.apply(
"Write mutations " + depth,
LocalSpannerIO.write()
SpannerIO.write()

Check warning on line 269 in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java#L269

Added line #L269 was not covered by tests
.withSchemaReadySignal(ddl)
.withSpannerConfig(spannerConfig)
.withCommitDeadline(Duration.standardMinutes(1))
Expand Down Expand Up @@ -401,7 +401,7 @@ private static class CreateTables extends PTransform<PBegin, PCollectionTuple> {
private final ValueProvider<Boolean> earlyIndexCreateFlag;
private final ValueProvider<Integer> ddlCreationTimeoutInMinutes;

private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;
private static final Logger LOG = LoggerFactory.getLogger(CreateTables.class);

/* If the schema has a lot of DDL changes after data load, it's preferable to create
Expand Down Expand Up @@ -457,7 +457,7 @@ public PCollectionTuple expand(PBegin begin) {

@Setup
public void setup() {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 460 in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java#L460

Added line #L460 was not covered by tests
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -43,15 +43,15 @@ public PCollection<Dialect> expand(PBegin p) {

private static class ReadDialectFn extends DoFn<Void, Dialect> {
private final SpannerConfig spannerConfig;
private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;

public ReadDialectFn(SpannerConfig spannerConfig) {
this.spannerConfig = spannerConfig;
}

@Setup
public void setup() throws Exception {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 54 in v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java#L54

Added line #L54 was not covered by tests
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.cloud.spanner.Dialect;
import com.google.cloud.teleport.spanner.ddl.Ddl;
import com.google.cloud.teleport.spanner.ddl.InformationSchemaScanner;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -59,7 +59,7 @@ public PCollection<Ddl> expand(PBegin p) {

private static class ReadInformationSchemaFn extends DoFn<Void, Ddl> {
private final SpannerConfig spannerConfig;
private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;
private final PCollectionView<Transaction> tx;
private final PCollectionView<Dialect> dialectView;

Expand All @@ -74,7 +74,7 @@ public ReadInformationSchemaFn(

@Setup
public void setup() throws Exception {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 77 in v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java#L77

Added line #L77 was not covered by tests
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -105,7 +105,7 @@ public TextImportTransform(
@Override
public PDone expand(PBegin begin) {
PCollectionView<Transaction> tx =
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));

Check warning on line 108 in v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java#L108

Added line #L108 was not covered by tests

PCollectionView<Dialect> dialectView =
begin
Expand Down Expand Up @@ -203,7 +203,7 @@ public void processElement(ProcessContext c) {
.apply("Wait for previous depth " + depth, Wait.on(previousComputation))
.apply(
"Write mutations " + depth,
LocalSpannerIO.write()
SpannerIO.write()

Check warning on line 206 in v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java#L206

Added line #L206 was not covered by tests
.withSpannerConfig(spannerConfig)
.withCommitDeadline(Duration.standardMinutes(1))
.withMaxCumulativeBackoff(Duration.standardHours(2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -142,7 +142,7 @@ public static void main(String[] args) {
options.getTextWritePrefix(),
options.getSpannerSnapshotTime());

/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
/* CreateTransaction and CreateTransactionFn classes in SpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
Expand All @@ -162,14 +162,14 @@ public static void main(String[] args) {
PCollection<String> csv =
pipeline
.apply("Create export", spannerExport)
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
// We need to use SpannerIO.readAll() instead of SpannerIO.read()
// because ValueProvider parameters such as table name required for
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
// SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
// these parameters at the pipeline execution time.
.apply(
"Read all records",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))

Check warning on line 172 in v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java#L172

Added line #L172 was not covered by tests
.apply(
"Struct To Csv",
MapElements.into(TypeDescriptors.strings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -269,7 +269,7 @@ public static void main(String[] args) {
options.getSpannerColumnsToExport(),
ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false));

/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
/* CreateTransaction and CreateTransactionFn classes in SpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
Expand All @@ -289,14 +289,14 @@ public static void main(String[] args) {
PCollection<String> json =
pipeline
.apply("Create export", spannerExport)
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
// We need to use SpannerIO.readAll() instead of SpannerIO.read()
// because ValueProvider parameters such as table name required for
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
// SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
// these parameters at the pipeline execution time.
.apply(
"Read all records",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))

Check warning on line 299 in v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java#L299

Added line #L299 was not covered by tests
.apply(
"Struct To JSON",
MapElements.into(TypeDescriptors.strings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.Default;
Expand Down Expand Up @@ -249,16 +249,16 @@ public static Builder builder() {
return new AutoValue_SpannerConverters_ExportTransform.Builder();
}

private LocalSpannerAccessor spannerAccessor;
private SpannerAccessor spannerAccessor;
private DatabaseClient databaseClient;

// LocalSpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass
// SpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass
// mocked database client directly instead. We can't generate stub of ExportTransform because
// AutoValue generates a final class.
// TODO make LocalSpannerAccessor serializable
// TODO make SpannerAccessor serializable
DatabaseClient getDatabaseClient(SpannerConfig spannerConfig) {
if (databaseClient == null) {
this.spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
this.spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 261 in v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java#L261

Added line #L261 was not covered by tests
return this.spannerAccessor.getDatabaseClient();
} else {
return this.databaseClient;
Expand Down Expand Up @@ -766,11 +766,11 @@ public CreateTransactionFnWithTimestamp(
this.spannerSnapshotTime = spannerSnapshotTime;
}

private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;

@DoFn.Setup
public void setup() throws Exception {
spannerAccessor = LocalSpannerAccessor.getOrCreate(config);
spannerAccessor = SpannerAccessor.getOrCreate(config);

Check warning on line 773 in v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java#L773

Added line #L773 was not covered by tests
}

@Teardown
Expand Down
Loading

0 comments on commit 253a4e9

Please sign in to comment.