Skip to content

Commit

Permalink
[BEAM-13665] Make SpannerIO projectID optional again
Browse files Browse the repository at this point in the history
Fixes regression introduced by PR #15493 which inadvertently caused
an NPE when the projectID was not specified for a SpannerIO read or
write.

Adds unit test for reading/writing both with and without projectID
  • Loading branch information
nielm committed Jan 18, 2022
1 parent b3e9c7d commit a502151
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import java.util.HashMap;
Expand Down Expand Up @@ -151,6 +152,7 @@ private static class ReadFromPartitionFn extends DoFn<Partition, Struct> {
private final PCollectionView<? extends Transaction> txView;

private transient SpannerAccessor spannerAccessor;
private transient ServiceCallMetric serviceCallMetric;

public ReadFromPartitionFn(
SpannerConfig config, PCollectionView<? extends Transaction> txView) {
Expand All @@ -161,6 +163,11 @@ public ReadFromPartitionFn(
@Setup
public void setup() throws Exception {
spannerAccessor = SpannerAccessor.getOrCreate(config);
serviceCallMetric =
createServiceCallMetric(
this.config.getProjectId() == null ? SpannerOptions.getDefaultProjectId() : this.config.getProjectId().get(),
this.config.getDatabaseId().get(),
this.config.getInstanceId().get());
}

@Teardown
Expand All @@ -170,11 +177,6 @@ public void teardown() throws Exception {

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
ServiceCallMetric serviceCallMetric =
createServiceCallMetric(
this.config.getProjectId().toString(),
this.config.getDatabaseId().toString(),
this.config.getInstanceId().toString());
Transaction tx = c.sideInput(txView);

BatchReadOnlyTransaction batchTx =
Expand All @@ -201,7 +203,8 @@ private ServiceCallMetric createServiceCallMetric(
baseLabels.put(
MonitoringInfoConstants.Labels.RESOURCE,
GcpResourceIdentifiers.spannerTable(projectId, databaseId, tableId));
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId);
baseLabels.put(
MonitoringInfoConstants.Labels.SPANNER_PROJECT_ID, projectId);
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_DATABASE_ID, databaseId);
baseLabels.put(MonitoringInfoConstants.Labels.SPANNER_INSTANCE_ID, tableId);
ServiceCallMetric serviceCallMetric =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1610,6 +1610,7 @@ static class WriteToSpannerFn extends DoFn<Iterable<MutationGroup>, Void> {

// Fluent Backoff is not serializable so create at runtime in setup().
private transient FluentBackoff bundleWriteBackoff;
private transient ServiceCallMetric serviceCallMetric;

WriteToSpannerFn(
SpannerConfig spannerConfig, FailureMode failureMode, TupleTag<MutationGroup> failedTag) {
Expand All @@ -1625,6 +1626,15 @@ public void setup() {
FluentBackoff.DEFAULT
.withMaxCumulativeBackoff(spannerConfig.getMaxCumulativeBackoff().get())
.withInitialBackoff(spannerConfig.getMaxCumulativeBackoff().get().dividedBy(60));
serviceCallMetric =
createServiceCallMetric(
this.spannerConfig.getProjectId() == null
? SpannerOptions.getDefaultProjectId()
: this.spannerConfig.getProjectId().get(),
this.spannerConfig.getDatabaseId().get(),
this.spannerConfig.getInstanceId().get(),
"Write");

}

@Teardown
Expand Down Expand Up @@ -1678,12 +1688,6 @@ public void processElement(ProcessContext c) throws Exception {
private void spannerWriteWithRetryIfSchemaChange(Iterable<Mutation> batch)
throws SpannerException {
for (int retry = 1; ; retry++) {
ServiceCallMetric serviceCallMetric =
createServiceCallMetric(
this.spannerConfig.getProjectId().toString(),
this.spannerConfig.getDatabaseId().toString(),
this.spannerConfig.getInstanceId().toString(),
"Write");
try {
spannerAccessor
.getDatabaseClient()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,23 @@ private SpannerConfig getSpannerConfig() {
}

@Test
public void runRead() throws Exception {
public void runReadTestWithProjectId() throws Exception {
runReadTest(getSpannerConfig());
}

@Test
public void runReadTestWithDefaultProject() throws Exception {
runReadTest(
SpannerConfig.create()
.withInstanceId("123")
.withDatabaseId("aaa")
.withServiceFactory(serviceFactory));
}

private void runReadTest(SpannerConfig spannerConfig) throws Exception {
Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
TimestampBound timestampBound = TimestampBound.ofReadTimestamp(timestamp);

SpannerConfig spannerConfig = getSpannerConfig();

PCollection<Struct> one =
pipeline.apply(
"read q",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,21 @@ public void singleMutationPipeline() throws Exception {
verifyBatches(batch(m(2L)));
}

@Test
public void singleMutationPipelineNoProjectId() throws Exception {
Mutation mutation = m(2L);
PCollection<Mutation> mutations = pipeline.apply(Create.of(mutation));

mutations.apply(
SpannerIO.write()
.withInstanceId("test-instance")
.withDatabaseId("test-database")
.withServiceFactory(serviceFactory));
pipeline.run();

verifyBatches(batch(m(2L)));
}

@Test
public void singleMutationGroupPipeline() throws Exception {
PCollection<MutationGroup> mutations =
Expand Down

0 comments on commit a502151

Please sign in to comment.