Skip to content

Commit

Permalink
Revert "T&D: Use non-null columns for PKs rather then scanning raw ta… (
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Oct 4, 2023
1 parent 53735b6 commit ad5ce75
Show file tree
Hide file tree
Showing 23 changed files with 245 additions and 210 deletions.
Expand Up @@ -20,7 +20,6 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -579,9 +578,7 @@ public void overwriteFinalTable() throws Exception {
{
"_airbyte_raw_id": "4fa4efe2-3097-4464-bd22-11211cc3e15b",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_meta": {},
"id1": 1,
"id2": 2
"_airbyte_meta": {}
}
"""));
insertFinalTableRecords(
Expand Down Expand Up @@ -929,46 +926,6 @@ public void testReservedKeywords() throws Exception {
dumpFinalTableRecords(streamId, ""));
}

/**
* Verify that the final table inherently rejects null values for primary key columns. Subclasses
* MAY override this if they don't support `NOT NULL` constraints.
*/
@Test
public void ensurePKsAreIndexedUnique() throws Exception {
createRawTable(streamId);
insertRawTableRecords(
streamId,
List.of(Jsons.deserialize(
"""
{
"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {
"id1": 1,
"id2": 2
}
}
""")));

final String createTable = generator.createTable(incrementalDedupStream, "", false);

// should be OK with new tables
destinationHandler.execute(createTable);
final Optional<DialectTableDefinition> existingTableA = destinationHandler.findExistingTable(streamId);
assertTrue(generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTableA.get()));
destinationHandler.execute("DROP TABLE " + streamId.finalTableId(""));

// Hack the create query to remove NOT NULLs to emulate the old behavior
final String createTableModified = Arrays.stream(createTable.split("\r\n"))
.map(line -> line.contains("id1") || line.contains("id2") || line.contains("ID1") || line.contains("ID2")
? line.replace("NOT NULL", "")
: line)
.collect(Collectors.joining("\r\n"));
destinationHandler.execute(createTableModified);
final Optional<DialectTableDefinition> existingTableB = destinationHandler.findExistingTable(streamId);
assertFalse(generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTableB.get()));
}

/**
* A stream with no columns is weird, but we shouldn't treat it specially in any way. It should
* create a final table as usual, and populate it with the relevant metadata columns.
Expand Down
@@ -1,23 +1,23 @@
// types with timezones:
// Z offset
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "time_with_timezone": "12:34:56Z"}}
{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_with_timezone": "2023-01-23T12:34:56Z", "time_with_timezone": "12:34:56Z"}}
// -08:00 offset
{"_airbyte_raw_id": "05028c5f-7813-4e9c-bd4b-387d1f8ba435", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_with_timezone": "2023-01-23T12:34:56-08:00", "time_with_timezone": "12:34:56-08:00"}}
{"_airbyte_raw_id": "05028c5f-7813-4e9c-bd4b-387d1f8ba435", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_with_timezone": "2023-01-23T12:34:56-08:00", "time_with_timezone": "12:34:56-08:00"}}
// -0800 offset
{"_airbyte_raw_id": "95dfb0c6-6a67-4ba0-9935-643bebc90437", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_with_timezone": "2023-01-23T12:34:56-0800", "time_with_timezone": "12:34:56-0800"}}
{"_airbyte_raw_id": "95dfb0c6-6a67-4ba0-9935-643bebc90437", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_with_timezone": "2023-01-23T12:34:56-0800", "time_with_timezone": "12:34:56-0800"}}
// -08 offset
{"_airbyte_raw_id": "f3d8abe2-bb0f-4caf-8ddc-0641df02f3a9", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_with_timezone": "2023-01-23T12:34:56-08", "time_with_timezone": "12:34:56-08"}}
{"_airbyte_raw_id": "f3d8abe2-bb0f-4caf-8ddc-0641df02f3a9", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_with_timezone": "2023-01-23T12:34:56-08", "time_with_timezone": "12:34:56-08"}}
// +08:00 offset
{"_airbyte_raw_id": "a81ed40a-2a49-488d-9714-d53e8b052968", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_with_timezone": "2023-01-23T12:34:56+08:00", "time_with_timezone": "12:34:56+08:00"}}
{"_airbyte_raw_id": "a81ed40a-2a49-488d-9714-d53e8b052968", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_with_timezone": "2023-01-23T12:34:56+08:00", "time_with_timezone": "12:34:56+08:00"}}
// +0800 offset
{"_airbyte_raw_id": "c07763a0-89e6-4cb7-b7d0-7a34a7c9918a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_with_timezone": "2023-01-23T12:34:56+0800", "time_with_timezone": "12:34:56+0800"}}
{"_airbyte_raw_id": "c07763a0-89e6-4cb7-b7d0-7a34a7c9918a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_with_timezone": "2023-01-23T12:34:56+0800", "time_with_timezone": "12:34:56+0800"}}
// +08 offset
{"_airbyte_raw_id": "358d3b52-50ab-4e06-9094-039386f9bf0d", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_with_timezone": "2023-01-23T12:34:56+08", "time_with_timezone": "12:34:56+08"}}
{"_airbyte_raw_id": "358d3b52-50ab-4e06-9094-039386f9bf0d", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_with_timezone": "2023-01-23T12:34:56+08", "time_with_timezone": "12:34:56+08"}}
// decimal precision
{"_airbyte_raw_id": "db8200ac-b2b9-4b95-a053-8a0343042751", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_with_timezone": "2023-01-23T12:34:56.123Z", "time_with_timezone": "12:34:56.123Z"}}
{"_airbyte_raw_id": "db8200ac-b2b9-4b95-a053-8a0343042751", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_with_timezone": "2023-01-23T12:34:56.123Z", "time_with_timezone": "12:34:56.123Z"}}

// types without timezones:
// basic
{"_airbyte_raw_id": "10ce5d93-6923-4217-a46f-103833837038", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_without_timezone": "2023-01-23T12:34:56", "time_without_timezone": "12:34:56", "date": "2023-01-23"}}
{"_airbyte_raw_id": "10ce5d93-6923-4217-a46f-103833837038", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_without_timezone": "2023-01-23T12:34:56", "time_without_timezone": "12:34:56", "date": "2023-01-23"}}
// decimal precision
{"_airbyte_raw_id": "a7a6e176-7464-4a0b-b55c-b4f936e8d5a1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 2, "timestamp_without_timezone": "2023-01-23T12:34:56.123", "time_without_timezone": "12:34:56.123"}}
{"_airbyte_raw_id": "a7a6e176-7464-4a0b-b55c-b4f936e8d5a1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"timestamp_without_timezone": "2023-01-23T12:34:56.123", "time_without_timezone": "12:34:56.123"}}
Expand Up @@ -9,7 +9,7 @@

public record AlterTableReport(Set<String> columnsToAdd,
Set<String> columnsToRemove,
Set<String> columnsToChange,
Set<String> columnsToChangeType,
boolean isDestinationV2Format) {

/**
Expand All @@ -18,7 +18,7 @@ public record AlterTableReport(Set<String> columnsToAdd,
* @return whether the schema matches
*/
public boolean isNoOp() {
return isDestinationV2Format && Stream.of(this.columnsToAdd, this.columnsToRemove, this.columnsToChange)
return isDestinationV2Format && Stream.of(this.columnsToAdd, this.columnsToRemove, this.columnsToChangeType)
.allMatch(Set::isEmpty);
}

Expand Down
Expand Up @@ -22,7 +22,7 @@ public void migrateIfNecessary(
final SqlGenerator sqlGenerator,
final DestinationHandler destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException, Exception {
throws TableNotMigratedException, UnexpectedSchemaException {
LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName());
if (shouldMigrate(streamConfig)) {
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
Expand All @@ -40,7 +40,7 @@ public void migrateIfNecessary(
* @param streamConfig the stream in question
* @return whether to migrate the stream
*/
protected boolean shouldMigrate(final StreamConfig streamConfig) throws Exception {
protected boolean shouldMigrate(final StreamConfig streamConfig) {
final var v1RawTable = convertToV1RawName(streamConfig);
LOGGER.info("Checking whether v1 raw table {} in dataset {} exists", v1RawTable.tableName(), v1RawTable.namespace());
final var syncModeNeedsMigration = isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode());
Expand All @@ -66,7 +66,7 @@ public void migrate(final SqlGenerator<DialectTableDefinition> sqlGenerator,
final var namespacedTableName = convertToV1RawName(streamConfig);
try {
destinationHandler.execute(sqlGenerator.migrateFromV1toV2(streamConfig.id(), namespacedTableName.namespace(), namespacedTableName.tableName()));
} catch (final Exception e) {
} catch (Exception e) {
final var message = "Attempted and failed to migrate stream %s".formatted(streamConfig.id().finalName());
throw new TableNotMigratedException(message, e);
}
Expand All @@ -78,7 +78,7 @@ public void migrate(final SqlGenerator<DialectTableDefinition> sqlGenerator,
* @param existingV2AirbyteRawTable the v1 raw table
* @return whether the schema is as expected
*/
private boolean doesV1RawTableMatchExpectedSchema(final DialectTableDefinition existingV2AirbyteRawTable) {
private boolean doesV1RawTableMatchExpectedSchema(DialectTableDefinition existingV2AirbyteRawTable) {

return schemaMatchesExpectation(existingV2AirbyteRawTable, LEGACY_RAW_TABLE_COLUMNS);
}
Expand All @@ -88,7 +88,7 @@ private boolean doesV1RawTableMatchExpectedSchema(final DialectTableDefinition e
*
* @param existingV2AirbyteRawTable the v2 raw table
*/
private void validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema(final DialectTableDefinition existingV2AirbyteRawTable) {
private void validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema(DialectTableDefinition existingV2AirbyteRawTable) {
if (!schemaMatchesExpectation(existingV2AirbyteRawTable, V2_RAW_TABLE_COLUMN_NAMES)) {
throw new UnexpectedSchemaException("Destination V2 Raw Table does not match expected Schema");
}
Expand All @@ -110,7 +110,7 @@ private boolean isMigrationRequiredForSyncMode(final DestinationSyncMode destina
* @param streamConfig the raw table to check
* @return whether it exists and is in the correct format
*/
private boolean doesValidV2RawTableAlreadyExist(final StreamConfig streamConfig) throws Exception {
private boolean doesValidV2RawTableAlreadyExist(final StreamConfig streamConfig) {
if (doesAirbyteInternalNamespaceExist(streamConfig)) {
final var existingV2Table = getTableIfExists(streamConfig.id().rawNamespace(), streamConfig.id().rawName());
existingV2Table.ifPresent(this::validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema);
Expand All @@ -126,7 +126,7 @@ private boolean doesValidV2RawTableAlreadyExist(final StreamConfig streamConfig)
* @param tableName
* @return whether it exists and is in the correct format
*/
protected boolean doesValidV1RawTableExist(final String namespace, final String tableName) throws Exception {
protected boolean doesValidV1RawTableExist(final String namespace, final String tableName) {
final var existingV1RawTable = getTableIfExists(namespace, tableName);
return existingV1RawTable.isPresent() && doesV1RawTableMatchExpectedSchema(existingV1RawTable.get());
}
Expand All @@ -137,7 +137,7 @@ protected boolean doesValidV1RawTableExist(final String namespace, final String
* @param streamConfig the stream to check
* @return whether the schema exists
*/
abstract protected boolean doesAirbyteInternalNamespaceExist(StreamConfig streamConfig) throws Exception;
abstract protected boolean doesAirbyteInternalNamespaceExist(StreamConfig streamConfig);

/**
* Checks a Table's schema and compares it to an expected schema to make sure it matches
Expand All @@ -155,7 +155,7 @@ protected boolean doesValidV1RawTableExist(final String namespace, final String
* @param tableName
* @return an optional potentially containing a reference to the table
*/
abstract protected Optional<DialectTableDefinition> getTableIfExists(String namespace, String tableName) throws Exception;
abstract protected Optional<DialectTableDefinition> getTableIfExists(String namespace, String tableName);

/**
* We use different naming conventions for raw table names in destinations v2, we need a way to map
Expand Down
Expand Up @@ -20,6 +20,6 @@ void migrateIfNecessary(
final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException, Exception;
throws TableNotMigratedException, UnexpectedSchemaException;

}
Expand Up @@ -27,7 +27,7 @@ public class DestinationV1V2MigratorTest {
public static class ShouldMigrateTestArgumentProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) throws Exception {
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {

// Don't throw an exception
final boolean v2SchemaMatches = true;
Expand All @@ -52,25 +52,24 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte

@ParameterizedTest
@ArgumentsSource(ShouldMigrateTestArgumentProvider.class)
public void testShouldMigrate(final DestinationSyncMode destinationSyncMode, final BaseDestinationV1V2Migrator migrator, final boolean expected)
throws Exception {
public void testShouldMigrate(final DestinationSyncMode destinationSyncMode, final BaseDestinationV1V2Migrator migrator, boolean expected) {
final StreamConfig config = new StreamConfig(STREAM_ID, null, destinationSyncMode, null, null, null);
final var actual = migrator.shouldMigrate(config);
Assertions.assertEquals(expected, actual);
}

@Test
public void testMismatchedSchemaThrowsException() throws Exception {
public void testMismatchedSchemaThrowsException() {
final StreamConfig config = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null);
final var migrator = makeMockMigrator(true, true, false, false, false);
final UnexpectedSchemaException exception = Assertions.assertThrows(UnexpectedSchemaException.class,
UnexpectedSchemaException exception = Assertions.assertThrows(UnexpectedSchemaException.class,
() -> migrator.shouldMigrate(config));
Assertions.assertEquals("Destination V2 Raw Table does not match expected Schema", exception.getMessage());
}

@SneakyThrows
@Test
public void testMigrate() throws Exception {
public void testMigrate() {
final var sqlGenerator = new MockSqlGenerator();
final StreamConfig stream = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null);
final DestinationHandler<String> handler = Mockito.mock(DestinationHandler.class);
Expand All @@ -81,17 +80,16 @@ public void testMigrate() throws Exception {
Mockito.verify(handler).execute(sql);
// Exception thrown when executing sql, TableNotMigratedException thrown
Mockito.doThrow(Exception.class).when(handler).execute(Mockito.anyString());
final TableNotMigratedException exception = Assertions.assertThrows(TableNotMigratedException.class,
TableNotMigratedException exception = Assertions.assertThrows(TableNotMigratedException.class,
() -> migrator.migrate(sqlGenerator, handler, stream));
Assertions.assertEquals("Attempted and failed to migrate stream final_table", exception.getMessage());
}

public static BaseDestinationV1V2Migrator makeMockMigrator(final boolean v2NamespaceExists,
final boolean v2TableExists,
final boolean v2RawSchemaMatches,
final boolean v1RawTableExists,
final boolean v1RawTableSchemaMatches)
throws Exception {
boolean v1RawTableExists,
boolean v1RawTableSchemaMatches) {
final BaseDestinationV1V2Migrator migrator = Mockito.spy(BaseDestinationV1V2Migrator.class);
Mockito.when(migrator.doesAirbyteInternalNamespaceExist(Mockito.any())).thenReturn(v2NamespaceExists);
final var existingTable = v2TableExists ? Optional.of("v2_raw") : Optional.empty();
Expand All @@ -105,7 +103,7 @@ public static BaseDestinationV1V2Migrator makeMockMigrator(final boolean v2Names
return migrator;
}

public static BaseDestinationV1V2Migrator noIssuesMigrator() throws Exception {
public static BaseDestinationV1V2Migrator noIssuesMigrator() {
return makeMockMigrator(true, false, true, true, true);
}

Expand Down
Expand Up @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.21
LABEL io.airbyte.version=2.0.22
LABEL io.airbyte.name=airbyte/destination-bigquery
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.0.21
dockerImageTag: 2.0.22
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down

0 comments on commit ad5ce75

Please sign in to comment.