Skip to content

Commit

Permalink
馃悰 Destination Bigquery - fix migration logic (#29461)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbfbell committed Aug 17, 2023
1 parent 066a3f4 commit 085b121
Show file tree
Hide file tree
Showing 14 changed files with 38 additions and 24 deletions.
Expand Up @@ -41,6 +41,6 @@ private JavaBaseConstants() {}
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_META);

public static final String AIRBYTE_NAMESPACE_SCHEMA = "airbyte";
public static final String DEFAULT_AIRBYTE_INTERNAL_NAMESPACE = "airbyte_internal";

}
Expand Up @@ -15,18 +15,22 @@

public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator {

Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);

@Override
public void migrateIfNecessary(
final SqlGenerator sqlGenerator,
final DestinationHandler destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException {
LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName());
if (shouldMigrate(streamConfig)) {
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
migrate(sqlGenerator, destinationHandler, streamConfig);
} else {
LOGGER.info("No Migration Required for stream: {}", streamConfig.id().finalName());
}

}

/**
Expand All @@ -37,9 +41,13 @@ public void migrateIfNecessary(
*/
protected boolean shouldMigrate(final StreamConfig streamConfig) {
final var v1RawTable = convertToV1RawName(streamConfig);
return isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode())
&& !doesValidV2RawTableAlreadyExist(streamConfig)
&& doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
LOGGER.info("Checking whether v1 raw table {} in dataset {} exists", v1RawTable.tableName(), v1RawTable.namespace());
final var syncModeNeedsMigration = isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode());
final var noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig);
final var aValidV1RawTableExists = doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
LOGGER.info("Migration Info: Required for Sync mode: {}, No existing v2 raw tables: {}, A v1 raw table exists: {}",
syncModeNeedsMigration, noValidV2RawTableExists, aValidV1RawTableExists);
return syncModeNeedsMigration && noValidV2RawTableExists && aValidV1RawTableExists;
}

/**
Expand All @@ -55,7 +63,7 @@ public void migrate(final SqlGenerator<DialectTableDefinition> sqlGenerator,
final StreamConfig streamConfig)
throws TableNotMigratedException {
final var namespacedTableName = convertToV1RawName(streamConfig);
final var migrateAndReset = String.join("\n",
final var migrateAndReset = String.join("\n\n",
sqlGenerator.migrateFromV1toV2(streamConfig.id(), namespacedTableName.namespace(),
namespacedTableName.tableName()),
sqlGenerator.softReset(streamConfig));
Expand Down
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;

import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.ArrayList;
Expand All @@ -15,12 +17,11 @@

public class CatalogParser {

public static final String DEFAULT_RAW_TABLE_NAMESPACE = "airbyte_internal";
private final SqlGenerator<?> sqlGenerator;
private final String rawNamespace;

public CatalogParser(final SqlGenerator<?> sqlGenerator) {
this(sqlGenerator, DEFAULT_RAW_TABLE_NAMESPACE);
this(sqlGenerator, DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
}

public CatalogParser(final SqlGenerator<?> sqlGenerator, final String rawNamespace) {
Expand Down
Expand Up @@ -73,7 +73,8 @@ public void testMigrate() {
final var sqlGenerator = new MockSqlGenerator();
final StreamConfig stream = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null);
final DestinationHandler<String> handler = Mockito.mock(DestinationHandler.class);
final var sql = String.join("\n", sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table"), sqlGenerator.softReset(stream));
final var sql = String.join("\n\n", sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table"),
sqlGenerator.softReset(stream));
// All is well
final var migrator = noIssuesMigrator();
migrator.migrate(sqlGenerator, handler, stream);
Expand Down
Expand Up @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.7.7
LABEL io.airbyte.version=1.7.8
LABEL io.airbyte.name=airbyte/destination-bigquery

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 1.7.7
dockerImageTag: 1.7.8
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.integrations.destination.bigquery;

import static io.airbyte.integrations.base.JavaBaseConstants.AIRBYTE_NAMESPACE_SCHEMA;
import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Functions;
Expand Down Expand Up @@ -154,11 +154,15 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
final TyperDeduper typerDeduper) {
return () -> {
LOGGER.info("Preparing airbyte_raw tables in destination started for {} streams", writeConfigs.size());
if (TypingAndDedupingFlag.isDestinationV2()) {
typerDeduper.prepareTables();
}
for (final BigQueryWriteConfig writeConfig : writeConfigs.values()) {
LOGGER.info("Preparing staging are in destination for schema: {}, stream: {}, target table: {}, stage: {}",
writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName());
writeConfig.tableSchema(), writeConfig.streamName(), writeConfig.targetTableId(), writeConfig.streamName()
);
// In Destinations V2, we will always use the 'airbyte' schema/namespace for raw tables
final String rawDatasetId = TypingAndDedupingFlag.isDestinationV2() ? AIRBYTE_NAMESPACE_SCHEMA : writeConfig.datasetId();
final String rawDatasetId = TypingAndDedupingFlag.isDestinationV2() ? DEFAULT_AIRBYTE_INTERNAL_NAMESPACE : writeConfig.datasetId();
// Regardless, ensure the schema the customer wants to write to exists
bigQueryGcsOperations.createSchemaIfNotExists(writeConfig.datasetId(), writeConfig.datasetLocation());
// Schema used for raw and airbyte internal tables
Expand All @@ -174,7 +178,6 @@ private OnStartFunction onStartFunction(final BigQueryStagingOperations bigQuery
bigQueryGcsOperations.truncateTableIfExists(rawDatasetId, writeConfig.targetTableId(), writeConfig.tableSchema());
}
}
typerDeduper.prepareTables();
LOGGER.info("Preparing tables in destination completed.");
};
}
Expand Down
Expand Up @@ -54,8 +54,8 @@ protected boolean schemaMatchesExpectation(TableDefinition existingTable, Collec
@Override
protected NamespacedTableName convertToV1RawName(StreamConfig streamConfig) {
return new NamespacedTableName(
this.nameTransformer.getRawTableName(streamConfig.id().originalName()),
this.nameTransformer.getNamespace(streamConfig.id().originalNamespace())
this.nameTransformer.getNamespace(streamConfig.id().originalNamespace()),
this.nameTransformer.getRawTableName(streamConfig.id().originalName())
);
}
}
Expand Up @@ -7,9 +7,8 @@
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils;
Expand Down Expand Up @@ -71,6 +70,6 @@ protected void teardownStreamAndNamespace(String streamNamespace, String streamN
* Subclasses using a config with a nonstandard raw table dataset should override this method.
*/
protected String getRawDataset() {
return CatalogParser.DEFAULT_RAW_TABLE_NAMESPACE;
return JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
}
}
Expand Up @@ -49,7 +49,7 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
ENV ENABLE_SENTRY true


LABEL io.airbyte.version=1.3.2
LABEL io.airbyte.version=1.3.3
LABEL io.airbyte.name=airbyte/destination-snowflake

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 1.3.2
dockerImageTag: 1.3.3
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
Expand Up @@ -7,8 +7,8 @@
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
import io.airbyte.integrations.base.destination.typing_deduping.CatalogParser;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts;
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabase;
Expand Down Expand Up @@ -86,7 +86,7 @@ protected void globalTeardown() throws Exception {
* Subclasses using a config with a nonstandard raw table schema should override this method.
*/
protected String getRawSchema() {
return CatalogParser.DEFAULT_RAW_TABLE_NAMESPACE;
return JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
}

private String getDefaultSchema() {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Expand Up @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.7.8 | 2023-08-15 | [\#29461](https://github.com/airbytehq/airbyte/pull/29461) | Migration BugFix - ensure migration happens before table creation for GCS staging. |
| 1.7.7 | 2023-08-11 | [\#29381](https://github.com/airbytehq/airbyte/pull/29381) | Destinations v2: Add support for streams with no columns |
| 1.7.6 | 2023-08-04 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Add v1 -> v2 migration Logic |
| 1.7.5 | 2023-08-04 | [\#29106](https://github.com/airbytehq/airbyte/pull/29106) | Destinations v2: handle unusual CDC deletion edge case |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Expand Up @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n

| Version | Date | Pull Request | Subject |
|:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.3.3 | 2023-08-15 | [\#29461](https://github.com/airbytehq/airbyte/pull/29461) | Changing a static constant reference |
| 1.3.2 | 2023-08-11 | [\#29381](https://github.com/airbytehq/airbyte/pull/29381) | Destinations v2: Add support for streams with no columns |
| 1.3.1 | 2023-08-04 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Update SqlGenerator |
| 1.3.0 | 2023-08-07 | [\#29174](https://github.com/airbytehq/airbyte/pull/29174) | Destinations v2: early access release |
Expand Down

0 comments on commit 085b121

Please sign in to comment.