Skip to content

Commit

Permalink
Merge pull request #31376 from tonytanger/validate_cbt_change_stream
Browse files Browse the repository at this point in the history
Add option to disable validation of cloud bigtable change stream IO
  • Loading branch information
svetakvsundhar authored May 28, 2024
2 parents 37b8c8a + aa831b4 commit 944273f
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableChangeStreamAccessor;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.BigtableClientOverride;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao;
Expand Down Expand Up @@ -2057,6 +2056,7 @@ static ReadChangeStream create() {
return new AutoValue_BigtableIO_ReadChangeStream.Builder()
.setBigtableConfig(config)
.setMetadataTableBigtableConfig(metadataTableconfig)
.setValidateConfig(true)
.build();
}

Expand All @@ -2080,6 +2080,8 @@ static ReadChangeStream create() {

abstract @Nullable Duration getBacklogReplicationAdjustment();

abstract @Nullable Boolean getValidateConfig();

abstract ReadChangeStream.Builder toBuilder();

/**
Expand Down Expand Up @@ -2284,25 +2286,80 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) {
return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
}

/**
* Disables validation that the table being read and the metadata table exists, and that the app
* profile used is single cluster and single row transaction enabled. Set this option if the
* caller does not have additional Bigtable permissions to validate the configurations.
* <b>NOTE</b> this also disabled creating or updating the metadata table because that also
* requires additional permissions, essentially setting {@link #withCreateOrUpdateMetadataTable}
* to false.
*/
public ReadChangeStream withoutValidation() {
BigtableConfig config = getBigtableConfig();
BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
return toBuilder()
.setBigtableConfig(config.withValidate(false))
.setMetadataTableBigtableConfig(metadataTableConfig.withValidate(false))
.setValidateConfig(false)
.build();
}

@Override
public void validate(PipelineOptions options) {
BigtableServiceFactory factory = new BigtableServiceFactory();
if (getBigtableConfig().getValidate()) {
try {
checkArgument(
factory.checkTableExists(getBigtableConfig(), options, getTableId()),
"Change Stream table %s does not exist",
getTableId());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

// Validate the app profile is single cluster and allows single row transactions.
private void validateAppProfile(
MetadataTableAdminDao metadataTableAdminDao, String appProfileId) {
checkArgument(metadataTableAdminDao != null);
checkArgument(
metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(appProfileId),
"App profile id '"
+ appProfileId
+ "' provided to access metadata table needs to use single-cluster routing policy"
+ " and allow single-row transactions.");
}

// Update metadata table schema if allowed and required.
private void createOrUpdateMetadataTable(
MetadataTableAdminDao metadataTableAdminDao, String metadataTableId) {
boolean shouldCreateOrUpdateMetadataTable = true;
if (getCreateOrUpdateMetadataTable() != null) {
shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
}
// Only try to create or update metadata table if option is set to true. Otherwise, just
// check if the table exists.
if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) {
LOG.info("Created metadata table: " + metadataTableId);
}
}

@Override
public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
BigtableConfig bigtableConfig = getBigtableConfig();
checkArgument(
getBigtableConfig() != null,
bigtableConfig != null,
"BigtableIO ReadChangeStream is missing required configurations fields.");
checkArgument(
getBigtableConfig().getProjectId() != null, "Missing required projectId field.");
checkArgument(
getBigtableConfig().getInstanceId() != null, "Missing required instanceId field.");
bigtableConfig.validate();
checkArgument(getTableId() != null, "Missing required tableId field.");

BigtableConfig bigtableConfig = getBigtableConfig();
if (getBigtableConfig().getAppProfileId() == null
|| getBigtableConfig().getAppProfileId().get().isEmpty()) {
if (bigtableConfig.getAppProfileId() == null
|| bigtableConfig.getAppProfileId().get().isEmpty()) {
bigtableConfig = bigtableConfig.withAppProfileId(StaticValueProvider.of("default"));
}

BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig();
String metadataTableId = getMetadataTableId();
if (metadataTableConfig.getProjectId() == null
|| metadataTableConfig.getProjectId().get().isEmpty()) {
metadataTableConfig = metadataTableConfig.withProjectId(bigtableConfig.getProjectId());
Expand All @@ -2311,6 +2368,7 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
|| metadataTableConfig.getInstanceId().get().isEmpty()) {
metadataTableConfig = metadataTableConfig.withInstanceId(bigtableConfig.getInstanceId());
}
String metadataTableId = getMetadataTableId();
if (metadataTableId == null || metadataTableId.isEmpty()) {
metadataTableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME;
}
Expand All @@ -2333,10 +2391,6 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
existingPipelineOptions = ExistingPipelineOptions.FAIL_IF_EXISTS;
}

boolean shouldCreateOrUpdateMetadataTable = true;
if (getCreateOrUpdateMetadataTable() != null) {
shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable();
}
Duration backlogReplicationAdjustment = getBacklogReplicationAdjustment();
if (backlogReplicationAdjustment == null) {
backlogReplicationAdjustment = DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT;
Expand All @@ -2348,31 +2402,25 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
new DaoFactory(
bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName);

// Validate the configuration is correct before creating the pipeline, if required.
try {
MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao();
checkArgument(metadataTableAdminDao != null);
checkArgument(
metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(
metadataTableConfig.getAppProfileId().get()),
"App profile id '"
+ metadataTableConfig.getAppProfileId().get()
+ "' provided to access metadata table needs to use single-cluster routing policy"
+ " and allow single-row transactions.");

// Only try to create or update metadata table if option is set to true. Otherwise, just
// check if the table exists.
if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) {
LOG.info("Created metadata table: " + metadataTableAdminDao.getTableId());
boolean validateConfig = true;
if (getValidateConfig() != null) {
validateConfig = getValidateConfig();
}
checkArgument(
metadataTableAdminDao.doesMetadataTableExist(),
"Metadata table does not exist: " + metadataTableAdminDao.getTableId());

try (BigtableChangeStreamAccessor bigtableChangeStreamAccessor =
BigtableChangeStreamAccessor.getOrCreate(bigtableConfig)) {
// Validate app profile and create metadata table if validate is required.
if (validateConfig) {
createOrUpdateMetadataTable(metadataTableAdminDao, metadataTableId);
validateAppProfile(metadataTableAdminDao, metadataTableConfig.getAppProfileId().get());
}
// Validate metadata table if validate is required. We validate metadata table after
// createOrUpdateMetadataTable because if the metadata doesn't exist, we have to run
// createOrUpdateMetadataTable to create the metadata table.
if (metadataTableConfig.getValidate()) {
checkArgument(
bigtableChangeStreamAccessor.getTableAdminClient().exists(getTableId()),
"Change Stream table does not exist");
metadataTableAdminDao.doesMetadataTableExist(),
"Metadata table does not exist: " + metadataTableAdminDao.getTableId());
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -2429,6 +2477,8 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions(

abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment);

abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig);

abstract ReadChangeStream build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2031,4 +2031,97 @@ synchronized ConfigId newId() {
return ConfigId.create();
}
}

/////////////////////////// ReadChangeStream ///////////////////////////

@Test
public void testReadChangeStreamBuildsCorrectly() {
Instant startTime = Instant.now();
BigtableIO.ReadChangeStream readChangeStream =
BigtableIO.readChangeStream()
.withProjectId("project")
.withInstanceId("instance")
.withTableId("table")
.withAppProfileId("app-profile")
.withChangeStreamName("change-stream-name")
.withMetadataTableProjectId("metadata-project")
.withMetadataTableInstanceId("metadata-instance")
.withMetadataTableTableId("metadata-table")
.withMetadataTableAppProfileId("metadata-app-profile")
.withStartTime(startTime)
.withBacklogReplicationAdjustment(Duration.standardMinutes(1))
.withCreateOrUpdateMetadataTable(false)
.withExistingPipelineOptions(BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
assertEquals("project", readChangeStream.getBigtableConfig().getProjectId().get());
assertEquals("instance", readChangeStream.getBigtableConfig().getInstanceId().get());
assertEquals("app-profile", readChangeStream.getBigtableConfig().getAppProfileId().get());
assertEquals("table", readChangeStream.getTableId());
assertEquals(
"metadata-project", readChangeStream.getMetadataTableBigtableConfig().getProjectId().get());
assertEquals(
"metadata-instance",
readChangeStream.getMetadataTableBigtableConfig().getInstanceId().get());
assertEquals(
"metadata-app-profile",
readChangeStream.getMetadataTableBigtableConfig().getAppProfileId().get());
assertEquals("metadata-table", readChangeStream.getMetadataTableId());
assertEquals("change-stream-name", readChangeStream.getChangeStreamName());
assertEquals(startTime, readChangeStream.getStartTime());
assertEquals(Duration.standardMinutes(1), readChangeStream.getBacklogReplicationAdjustment());
assertEquals(false, readChangeStream.getCreateOrUpdateMetadataTable());
assertEquals(
BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS,
readChangeStream.getExistingPipelineOptions());
}

@Test
public void testReadChangeStreamFailsValidation() {
BigtableIO.ReadChangeStream readChangeStream =
BigtableIO.readChangeStream()
.withProjectId("project")
.withInstanceId("instance")
.withTableId("table");
// Validating table fails because table does not exist.
thrown.expect(IllegalArgumentException.class);
readChangeStream.validate(TestPipeline.testingPipelineOptions());
}

@Test
public void testReadChangeStreamPassWithoutValidation() {
BigtableIO.ReadChangeStream readChangeStream =
BigtableIO.readChangeStream()
.withProjectId("project")
.withInstanceId("instance")
.withTableId("table")
.withoutValidation();
// No error is thrown because we skip validation
readChangeStream.validate(TestPipeline.testingPipelineOptions());
}

@Test
public void testReadChangeStreamValidationFailsDuringApply() {
BigtableIO.ReadChangeStream readChangeStream =
BigtableIO.readChangeStream()
.withProjectId("project")
.withInstanceId("instance")
.withTableId("table");
// Validating table fails because resources cannot be found
thrown.expect(RuntimeException.class);

p.apply(readChangeStream);
}

@Test
public void testReadChangeStreamPassWithoutValidationDuringApply() {
BigtableIO.ReadChangeStream readChangeStream =
BigtableIO.readChangeStream()
.withProjectId("project")
.withInstanceId("instance")
.withTableId("table")
.withoutValidation();
// No RunTime exception as seen in previous test with validation. Only error that the pipeline
// is not ran.
thrown.expect(PipelineRunMissingException.class);
p.apply(readChangeStream);
}
}

0 comments on commit 944273f

Please sign in to comment.