Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove LocalSpannerIO #1429

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -66,11 +66,11 @@
ParDo.of(
new DoFn<Ddl, Ddl>() {

private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;

@Setup
public void setup() {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 73 in v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ApplyDDLTransform.java#L73

Added line #L73 was not covered by tests
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Combine;
Expand Down Expand Up @@ -182,7 +182,7 @@

/*
* Allow users to specify read timestamp.
* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
* CreateTransaction and CreateTransactionFn classes in SpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
Expand Down Expand Up @@ -403,7 +403,7 @@
PCollection<Struct> rows =
tables.apply(
"Read all rows from Spanner",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));

Check warning on line 406 in v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ExportTransform.java#L406

Added line #L406 was not covered by tests

ValueProvider<ResourceId> resource =
ValueProvider.NestedValueProvider.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -166,7 +166,7 @@
schemas.apply("Build avro DDL", Combine.globally(AsList.fn()));

PCollectionView<Transaction> tx =
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));

Check warning on line 169 in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java#L169

Added line #L169 was not covered by tests

PCollection<Ddl> informationSchemaDdl =
begin.apply(
Expand Down Expand Up @@ -266,7 +266,7 @@
SpannerWriteResult result =
mutations.apply(
"Write mutations " + depth,
LocalSpannerIO.write()
SpannerIO.write()

Check warning on line 269 in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java#L269

Added line #L269 was not covered by tests
.withSchemaReadySignal(ddl)
.withSpannerConfig(spannerConfig)
.withCommitDeadline(Duration.standardMinutes(1))
Expand Down Expand Up @@ -401,7 +401,7 @@
private final ValueProvider<Boolean> earlyIndexCreateFlag;
private final ValueProvider<Integer> ddlCreationTimeoutInMinutes;

private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;
private static final Logger LOG = LoggerFactory.getLogger(CreateTables.class);

/* If the schema has a lot of DDL changes after data load, it's preferable to create
Expand Down Expand Up @@ -457,7 +457,7 @@

@Setup
public void setup() {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 460 in v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ImportTransform.java#L460

Added line #L460 was not covered by tests
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Dialect;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -43,15 +43,15 @@

private static class ReadDialectFn extends DoFn<Void, Dialect> {
private final SpannerConfig spannerConfig;
private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;

public ReadDialectFn(SpannerConfig spannerConfig) {
this.spannerConfig = spannerConfig;
}

@Setup
public void setup() throws Exception {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 54 in v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ReadDialect.java#L54

Added line #L54 was not covered by tests
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.google.cloud.spanner.Dialect;
import com.google.cloud.teleport.spanner.ddl.Ddl;
import com.google.cloud.teleport.spanner.ddl.InformationSchemaScanner;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -59,7 +59,7 @@

private static class ReadInformationSchemaFn extends DoFn<Void, Ddl> {
private final SpannerConfig spannerConfig;
private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;
private final PCollectionView<Transaction> tx;
private final PCollectionView<Dialect> dialectView;

Expand All @@ -74,7 +74,7 @@

@Setup
public void setup() throws Exception {
spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 77 in v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/ReadInformationSchema.java#L77

Added line #L77 was not covered by tests
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.SpannerWriteResult;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.ValueProvider;
Expand Down Expand Up @@ -105,7 +105,7 @@
@Override
public PDone expand(PBegin begin) {
PCollectionView<Transaction> tx =
begin.apply(LocalSpannerIO.createTransaction().withSpannerConfig(spannerConfig));
begin.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));

Check warning on line 108 in v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java#L108

Added line #L108 was not covered by tests

PCollectionView<Dialect> dialectView =
begin
Expand Down Expand Up @@ -203,7 +203,7 @@
.apply("Wait for previous depth " + depth, Wait.on(previousComputation))
.apply(
"Write mutations " + depth,
LocalSpannerIO.write()
SpannerIO.write()

Check warning on line 206 in v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/spanner/TextImportTransform.java#L206

Added line #L206 was not covered by tests
.withSpannerConfig(spannerConfig)
.withCommitDeadline(Duration.standardMinutes(1))
.withMaxCumulativeBackoff(Duration.standardHours(2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -142,7 +142,7 @@
options.getTextWritePrefix(),
options.getSpannerSnapshotTime());

/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
/* CreateTransaction and CreateTransactionFn classes in SpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
Expand All @@ -162,14 +162,14 @@
PCollection<String> csv =
pipeline
.apply("Create export", spannerExport)
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
// We need to use SpannerIO.readAll() instead of SpannerIO.read()
// because ValueProvider parameters such as table name required for
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
// SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
// these parameters at the pipeline execution time.
.apply(
"Read all records",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))

Check warning on line 172 in v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/templates/SpannerToText.java#L172

Added line #L172 was not covered by tests
.apply(
"Struct To Csv",
MapElements.into(TypeDescriptors.strings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -253,7 +253,7 @@
options.getSpannerColumnsToExport(),
ValueProvider.StaticValueProvider.of(/* disable_schema_export= */ false));

/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
/* CreateTransaction and CreateTransactionFn classes in SpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
Expand All @@ -273,14 +273,14 @@
PCollection<String> json =
pipeline
.apply("Create export", spannerExport)
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
// We need to use SpannerIO.readAll() instead of SpannerIO.read()
// because ValueProvider parameters such as table name required for
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
// SpannerIO.read() can be read only inside DoFn but SpannerIO.read() is of
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
// these parameters at the pipeline execution time.
.apply(
"Read all records",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))

Check warning on line 283 in v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/templates/SpannerVectorEmbeddingExport.java#L283

Added line #L283 was not covered by tests
.apply(
"Struct To JSON",
MapElements.into(TypeDescriptors.strings())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.Default;
Expand Down Expand Up @@ -249,16 +249,16 @@
return new AutoValue_SpannerConverters_ExportTransform.Builder();
}

private LocalSpannerAccessor spannerAccessor;
private SpannerAccessor spannerAccessor;
private DatabaseClient databaseClient;

// LocalSpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass
// SpannerAccessor is not serializable, thus can't be passed as a mock so we need to pass
// mocked database client directly instead. We can't generate stub of ExportTransform because
// AutoValue generates a final class.
// TODO make LocalSpannerAccessor serializable
// TODO make SpannerAccessor serializable
DatabaseClient getDatabaseClient(SpannerConfig spannerConfig) {
if (databaseClient == null) {
this.spannerAccessor = LocalSpannerAccessor.getOrCreate(spannerConfig);
this.spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);

Check warning on line 261 in v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java#L261

Added line #L261 was not covered by tests
return this.spannerAccessor.getDatabaseClient();
} else {
return this.databaseClient;
Expand Down Expand Up @@ -766,11 +766,11 @@
this.spannerSnapshotTime = spannerSnapshotTime;
}

private transient LocalSpannerAccessor spannerAccessor;
private transient SpannerAccessor spannerAccessor;

@DoFn.Setup
public void setup() throws Exception {
spannerAccessor = LocalSpannerAccessor.getOrCreate(config);
spannerAccessor = SpannerAccessor.getOrCreate(config);

Check warning on line 773 in v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/templates/common/SpannerConverters.java#L773

Added line #L773 was not covered by tests
}

@Teardown
Expand Down
Loading
Loading