From 50c1e4bf78f6b4790ab94b6a282e4e21f248eb72 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Wed, 8 May 2024 11:23:27 -0700 Subject: [PATCH] extract generation ID + sync ID from catalog --- .../airbyte-cdk/dependencies/build.gradle | 2 +- .../typing_deduping/CatalogParser.kt | 38 ++++++++---- .../typing_deduping/StreamConfig.kt | 5 +- .../typing_deduping/CatalogParserTest.kt | 3 + .../DefaultTyperDeduperTest.kt | 18 ++++-- .../DestinationV1V2MigratorTest.kt | 14 +++-- .../BaseSqlGeneratorIntegrationTest.kt | 60 ++++++++++++------- 7 files changed, 95 insertions(+), 45 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle b/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle index 18664af67acf12..3b977ba2d268da 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle @@ -17,7 +17,7 @@ dependencies { api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' api 'com.google.guava:guava:33.0.0-jre' api 'commons-io:commons-io:2.15.1' - api ('io.airbyte.airbyte-protocol:protocol-models:0.7.0') { exclude group: 'com.google.api-client', module: 'google-api-client' } + api ('io.airbyte.airbyte-protocol:protocol-models:0.9.0') { exclude group: 'com.google.api-client', module: 'google-api-client' } api 'javax.annotation:javax.annotation-api:1.3.2' api 'org.apache.commons:commons-compress:1.25.0' api 'org.apache.commons:commons-lang3:3.14.0' diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index 4ac3e5e4d6f0b3..f8016603303986 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -6,8 +6,10 @@ package io.airbyte.integrations.base.destination.typing_deduping import com.google.common.annotations.VisibleForTesting import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStringForDeinterpolation import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.commons.exceptions.ConfigErrorException import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import io.airbyte.protocol.models.v0.DestinationSyncMode import java.util.Optional import java.util.function.Consumer import org.apache.commons.codec.digest.DigestUtils @@ -54,15 +56,13 @@ constructor( ) .substring(0, 3) val newName = "${originalName}_$hash" - actualStreamConfig = - StreamConfig( - sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace), - originalStreamConfig.syncMode, - originalStreamConfig.destinationSyncMode, - originalStreamConfig.primaryKey, - originalStreamConfig.cursor, - originalStreamConfig.columns, - ) + actualStreamConfig = originalStreamConfig.copy( + id = sqlGenerator.buildStreamId( + originalNamespace, + newName, + rawNamespace, + ), + ) } else { actualStreamConfig = originalStreamConfig } @@ -112,6 +112,20 @@ constructor( @VisibleForTesting fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig { + if (stream.generationId == null) { + // TODO set platform version + throw ConfigErrorException("You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to X.Y.Z") + } + if (stream.minimumGenerationId != 0.toLong() && stream.minimumGenerationId != stream.generationId) { + TODO("Emit system_error trace message and crash") + } + + // The refreshes project is the beginning of the end for OVERWRITE syncs. + // The sync mode still exists, but we are fully dependent on min_generation to trigger overwrite logic. + if (stream.destinationSyncMode == DestinationSyncMode.OVERWRITE) { + stream.destinationSyncMode = DestinationSyncMode.APPEND + } + val airbyteColumns = when ( val schema: AirbyteType = @@ -143,11 +157,13 @@ constructor( return StreamConfig( sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace), - stream.syncMode, stream.destinationSyncMode, primaryKey, cursor, - columns + columns, + stream.generationId, + stream.minimumGenerationId, + stream.syncId, ) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt index c0fc5f7ce4a77d..80fa468436469a 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt @@ -4,15 +4,16 @@ package io.airbyte.integrations.base.destination.typing_deduping import io.airbyte.protocol.models.v0.DestinationSyncMode -import io.airbyte.protocol.models.v0.SyncMode import java.util.* import kotlin.collections.LinkedHashMap data class StreamConfig( val id: StreamId, - val syncMode: SyncMode, val destinationSyncMode: DestinationSyncMode, val primaryKey: List, val cursor: Optional, val columns: LinkedHashMap, + val generationId: Long, + val minimumGenerationId: Long, + val syncId: Long, ) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt index 537044ac53cc41..f86478d922cc0f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt @@ -200,6 +200,9 @@ internal class CatalogParserTest { ) .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withGenerationId(0) + .withMinimumGenerationId(0) + .withSyncId(0) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt index 5209367698ecbd..f681d9be3be3ad 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt @@ -913,11 +913,13 @@ class DefaultTyperDeduperTest { "overwrite_ns", "overwrite_stream" ), - mock(), DestinationSyncMode.OVERWRITE, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) private val APPEND_STREAM_CONFIG = StreamConfig( @@ -929,11 +931,13 @@ class DefaultTyperDeduperTest { "append_ns", "append_stream" ), - mock(), DestinationSyncMode.APPEND, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) private val DEDUPE_STREAM_CONFIG = StreamConfig( @@ -945,11 +949,13 @@ class DefaultTyperDeduperTest { "dedup_ns", "dedup_stream" ), - mock(), DestinationSyncMode.APPEND_DEDUP, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt index 646e32363baa5f..c1739fea4ad036 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt @@ -77,7 +77,7 @@ class DestinationV1V2MigratorTest { migrator: BaseDestinationV1V2Migrator<*>, expected: Boolean ) { - val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock()) + val config = StreamConfig(STREAM_ID, destinationSyncMode, mock(), mock(), mock(), 0, 0, 0) val actual = migrator.shouldMigrate(config) Assertions.assertEquals(expected, actual) } @@ -88,11 +88,13 @@ class DestinationV1V2MigratorTest { val config = StreamConfig( STREAM_ID, - mock(), DestinationSyncMode.APPEND_DEDUP, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) val migrator = makeMockMigrator(true, true, false, false, false) val exception = @@ -112,11 +114,13 @@ class DestinationV1V2MigratorTest { val stream = StreamConfig( STREAM_ID, - mock(), DestinationSyncMode.APPEND_DEDUP, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) val handler = Mockito.mock(DestinationHandler::class.java) val sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table") diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 6f839fd6c9d74a..281a260475957d 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -222,39 +222,47 @@ abstract class BaseSqlGeneratorIntegrationTest(), Optional.empty(), - LinkedHashMap() + LinkedHashMap(), + 0, + 0, + 0, ) val createTable = generator.createTable(stream, "", false)