Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Cloud Bigtable Change Stream integration tests #29127

Merged
merged 8 commits into from
Oct 31, 2023
2 changes: 2 additions & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-end-to-end-tests'
def firestoreDb = project.findProperty('firestoreDb') ?: 'firestoredb'
def host = project.findProperty('host') ?: 'batch-firestore.googleapis.com:443'
def instanceId = project.findProperty('instanceId') ?: 'beam-test'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=DirectRunner",
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
"--firestoreDb=${firestoreDb}",
"--host=${host}",
"--instanceId=${instanceId}",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Abacn Want to bring your attention here. I want to be able to run something like this

./gradlew :sdks:java:io:google-cloud-platform:integrationTest --tests=org.apache.beam.sdk.io.gcp.bigtable.changestreams.it.BigtableChangeStreamIT -PgcpProject=my-test-project -PinstanceId=my-test-instance

BigtableTestOptions has a field instanceId but in order to pass a specific string that's not the default string, the only that I found to do it is by adding it here.

This works fine for ChangeStreamIT because it registers BigtableTestOptions which defines instanceId, so we can pass --instanceId to beamTestPipelineOptions. However, all other integration tests that do not register BigtableTestOptions will fail because we are passing --instanceId to beamTestPipelineOptions but they don't have an option that reigsters instanceId.

That's why I moved the BigtableTestOptions to the main directory and registered it by default.

Is there an alternative way to pass instanceId to my test that does not require explicitly passing instanceId here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works fine for ChangeStreamIT because it registers BigtableTestOptions which defines instanceId, so we can pass --instanceId to beamTestPipelineOptions. However, all other integration tests that do not register BigtableTestOptions will fail because we are passing --instanceId to beamTestPipelineOptions but they don't have an option that reigsters instanceId.

Thanks, understand. For irrelevant test this was due to --instanceId pipeline option not recognized, as you stated. This happens because currently all gcp integration tests are triggered by a single gradle command. Restructure test is a more substantial change. For now, we can keep it in main if this is the only approach to make it work, but consider to make the option specialized, like

  • use "--bigtableInstanceId" instanceId is too generic. The options in main scope will affect all users including customers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.

])

// Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,4 @@ public interface BigtableTestOptions extends TestPipelineOptions {
String getInstanceId();

void setInstanceId(String value);

@Description("Project for Bigtable")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting that this file has been moved back and forth: 4833b3f
what is the consideration that moving this back to main?

Also, what is the context that removing "setBigtableProject" option? In theory the project that Bigtable instance locates can be different from the project used to run Dataflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few reasons.

  1. I'm moving it so I can register it in GcpIoPipelineOptionsRegistrar.java for reasons explained in the comment below.
  2. I'm removing setBigtableProject because of the reason below, we can't actually set bigtableProject to pass to the tests anyways. So I don't think anything is depending on this so it should be safely removable.
  3. By registering bigtableProject, it conflicts with some of the tests we have because they also try to register bigtableProject. So it was either remove here or change it in the tests.
  4. I took a brief look at firestore and spanner, neither has an option to set a different project. So it doesn't seem like it's a common use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this goes to main it will affect all users. For example, the name "InstanceId" sounds generic. In the past we received similar reports concerning this: #27256 (comment)

in fact, other gcp component (e.g. Spanner) also has "instanceId" settings so could introduce confusion.

If it stay within Test scope one can still register it using another Registrar class. GcpIoPipelineOptionsRegistrar is just an auto service class. It loads the options at initialization. Besides it, one can register custom pipeline option classes within Bigtable test classes. This reduces the scope to necessary level.

ack for removal of setBigtableProject

@Default.String("")
String getBigtableProject();

void setBigtableProject(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableTestOptions;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -38,6 +39,7 @@ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
.add(PubsubOptions.class)
.add(FirestoreOptions.class)
.add(TestBigQueryOptions.class)
.add(BigtableTestOptions.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the only place that refers to BigtableTestOption class, it's not necessary to move the Option class. We can register the option in the test class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there's an alternate solution. I want to be able to pass in "instanceId" as a parameter to the IT. But gcp project defines integrationTest to explicitly take a few parameters and pass them to beam beamTestPipelineOptions. So I added instanceId. But, unless instanceId is always defined (by registering it for all ITs), other tests fail because they don't have instanceId defined but we're passing instanceId.

So this ended up being the solution. Happy to try something else.

Copy link
Contributor

@Abacn Abacn Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In each IT class, one can get a TestPipelineOptions instance like this

private static PipelineOptions testOptions = TestPipeline.testingPipelineOptions();

and then one can use testOptions.as(BigtableTestOptions.class) to get BigtableTestOptions, e.g.

options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);

No Registrar involved. Current Bigtable ITs is doing similar things and it should suffice.

https://github.com/apache/beam/blob/ba0259de7f9f2d9e06108de20ef57b16ef8e3cc4/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/it/BigtableChangeStreamIT.java#L80C5-L80C12

unless instanceId is always defined (by registering it for all ITs), other tests fail because they don't have instanceId defined but we're passing instanceId.

That is because "registering it for all ITs". If we do not register it for all ITs then irrelevant tests won't fail.

.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ public void testE2EBigtableRead() throws Exception {
BigtableTestOptions options =
TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);

String project = options.getBigtableProject();
if (project.equals("")) {
project = options.as(GcpOptions.class).getProject();
}
String project = options.as(GcpOptions.class).getProject();

BigtableOptions.Builder bigtableOptionsBuilder =
new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import com.google.bigtable.v2.Mutation;
import com.google.protobuf.ByteString;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.joda.time.Instant;

class BigtableTestUtils {
public class BigtableTestUtils {

static final String BOOL_COLUMN = "boolColumn";
static final String LONG_COLUMN = "longColumn";
Expand Down Expand Up @@ -144,4 +146,27 @@ private static Cell createCell(ByteString value, long timestamp, String... label
}
return builder.build();
}

// We have to build the pipeline at this package level and not changestreams package because
// endTime is package private and we can only create a pipeline with endTime here. Setting endTime
// allows the tests to predictably terminate.
public static BigtableIO.ReadChangeStream buildTestPipelineInput(
Copy link
Contributor

@Abacn Abacn Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is the only method that org.apache.beam.sdk.io.gcp.bigtable.changestreams.it refers to, it is not necessary to change the Util to public class. Just spin up the pipeline in place or create BigtableChangeStreamTestUtils in the changestream package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem, which we (bigtable team) dug ourselves into, is that endTime is a package private field. We don't want to expose endTime as an option because it's generally not well supported. But endTime is useful to terminate tests.

So we have to create the pipeline in the same package as BigtableIO so we actually can't create the pipeline with endTime in the changestreams package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. If there are other options needed then good to promote the scope of this Util class

String projectId,
String instanceId,
String tableId,
String appProfileId,
String metadataTableName,
Instant startTime,
Instant endTime,
BigtableClientOverride clientOverride) {
return BigtableIO.readChangeStream()
.withProjectId(projectId)
.withInstanceId(instanceId)
.withTableId(tableId)
.withAppProfileId(appProfileId)
.withMetadataTableTableId(metadataTableName)
.withStartTime(startTime)
.withEndTime(endTime)
.withBigtableClientOverride(clientOverride);
}
}
Loading
Loading