diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle b/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle index 18664af67acf12..3b977ba2d268da 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle @@ -17,7 +17,7 @@ dependencies { api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310' api 'com.google.guava:guava:33.0.0-jre' api 'commons-io:commons-io:2.15.1' - api ('io.airbyte.airbyte-protocol:protocol-models:0.7.0') { exclude group: 'com.google.api-client', module: 'google-api-client' } + api ('io.airbyte.airbyte-protocol:protocol-models:0.9.0') { exclude group: 'com.google.api-client', module: 'google-api-client' } api 'javax.annotation:javax.annotation-api:1.3.2' api 'org.apache.commons:commons-compress:1.25.0' api 'org.apache.commons:commons-lang3:3.14.0' diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index 4ac3e5e4d6f0b3..e1b6c78710e5fa 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -6,8 +6,10 @@ package io.airbyte.integrations.base.destination.typing_deduping import com.google.common.annotations.VisibleForTesting import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStringForDeinterpolation import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.commons.exceptions.ConfigErrorException import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream +import io.airbyte.protocol.models.v0.DestinationSyncMode import java.util.Optional import java.util.function.Consumer import org.apache.commons.codec.digest.DigestUtils @@ -54,15 +56,13 @@ constructor( ) .substring(0, 3) val newName = "${originalName}_$hash" - actualStreamConfig = - StreamConfig( - sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace), - originalStreamConfig.syncMode, - originalStreamConfig.destinationSyncMode, - originalStreamConfig.primaryKey, - originalStreamConfig.cursor, - originalStreamConfig.columns, - ) + actualStreamConfig = originalStreamConfig.copy( + id = sqlGenerator.buildStreamId( + originalNamespace, + newName, + rawNamespace, + ), + ) } else { actualStreamConfig = originalStreamConfig } @@ -112,6 +112,20 @@ constructor( @VisibleForTesting fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig { + if (stream.generationId == null) { + // TODO set platform version + throw ConfigErrorException("You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to X.Y.Z") + } + if (stream.minimumGenerationId != 0.toLong() && stream.minimumGenerationId != stream.generationId) { + TODO("Emit system_error trace message and crash") + } + + // The refreshes project is the beginning of the end for OVERWRITE syncs. + // The sync mode still exists, but we are fully dependent on min_generation to trigger overwrite logic. + if (stream.destinationSyncMode == DestinationSyncMode.OVERWRITE) { + stream.destinationSyncMode = DestinationSyncMode.APPEND + } + val airbyteColumns = when ( val schema: AirbyteType = @@ -147,7 +161,10 @@ constructor( stream.destinationSyncMode, primaryKey, cursor, - columns + columns, + stream.generationId, + stream.minimumGenerationId, + stream.syncId, ) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt index c0fc5f7ce4a77d..be4227cb0fdcbb 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/StreamConfig.kt @@ -15,4 +15,7 @@ data class StreamConfig( val primaryKey: List, val cursor: Optional, val columns: LinkedHashMap, + val generationId: Long, + val minimumGenerationId: Long, + val syncId: Long, ) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt index 537044ac53cc41..f86478d922cc0f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt @@ -200,6 +200,9 @@ internal class CatalogParserTest { ) .withSyncMode(SyncMode.INCREMENTAL) .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withGenerationId(0) + .withMinimumGenerationId(0) + .withSyncId(0) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt index 5209367698ecbd..7deb00f45c33cb 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.kt @@ -917,7 +917,10 @@ class DefaultTyperDeduperTest { DestinationSyncMode.OVERWRITE, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) private val APPEND_STREAM_CONFIG = StreamConfig( @@ -933,7 +936,10 @@ class DefaultTyperDeduperTest { DestinationSyncMode.APPEND, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) private val DEDUPE_STREAM_CONFIG = StreamConfig( @@ -949,7 +955,10 @@ class DefaultTyperDeduperTest { DestinationSyncMode.APPEND_DEDUP, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt index 646e32363baa5f..9f40bacddef559 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.kt @@ -77,7 +77,7 @@ class DestinationV1V2MigratorTest { migrator: BaseDestinationV1V2Migrator<*>, expected: Boolean ) { - val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock()) + val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock(), 0, 0, 0) val actual = migrator.shouldMigrate(config) Assertions.assertEquals(expected, actual) } @@ -92,7 +92,10 @@ class DestinationV1V2MigratorTest { DestinationSyncMode.APPEND_DEDUP, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) val migrator = makeMockMigrator(true, true, false, false, false) val exception = @@ -116,7 +119,10 @@ class DestinationV1V2MigratorTest { DestinationSyncMode.APPEND_DEDUP, mock(), mock(), - mock() + mock(), + 0, + 0, + 0, ) val handler = Mockito.mock(DestinationHandler::class.java) val sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table") diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 6f839fd6c9d74a..69b550e7ead44c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -226,7 +226,10 @@ abstract class BaseSqlGeneratorIntegrationTest(), Optional.empty(), - LinkedHashMap() + LinkedHashMap(), + 0, + 0, + 0, ) val createTable = generator.createTable(stream, "", false)