Skip to content

Commit

Permalink
extract generation ID + sync ID from catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed May 8, 2024
1 parent 16ed6bf commit 9f8ae69
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 27 deletions.
2 changes: 1 addition & 1 deletion airbyte-cdk/java/airbyte-cdk/dependencies/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies {
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
api 'com.google.guava:guava:33.0.0-jre'
api 'commons-io:commons-io:2.15.1'
api ('io.airbyte.airbyte-protocol:protocol-models:0.7.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
api ('io.airbyte.airbyte-protocol:protocol-models:0.9.0') { exclude group: 'com.google.api-client', module: 'google-api-client' }
api 'javax.annotation:javax.annotation-api:1.3.2'
api 'org.apache.commons:commons-compress:1.25.0'
api 'org.apache.commons:commons-lang3:3.14.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package io.airbyte.integrations.base.destination.typing_deduping
import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStringForDeinterpolation
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.commons.exceptions.ConfigErrorException
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.util.Optional
import java.util.function.Consumer
import org.apache.commons.codec.digest.DigestUtils
Expand Down Expand Up @@ -54,15 +56,13 @@ constructor(
)
.substring(0, 3)
val newName = "${originalName}_$hash"
actualStreamConfig =
StreamConfig(
sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace),
originalStreamConfig.syncMode,
originalStreamConfig.destinationSyncMode,
originalStreamConfig.primaryKey,
originalStreamConfig.cursor,
originalStreamConfig.columns,
)
actualStreamConfig = originalStreamConfig.copy(
id = sqlGenerator.buildStreamId(
originalNamespace,
newName,
rawNamespace,
),
)
} else {
actualStreamConfig = originalStreamConfig
}
Expand Down Expand Up @@ -112,6 +112,20 @@ constructor(

@VisibleForTesting
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
if (stream.generationId == null) {
// TODO set platform version
throw ConfigErrorException("You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to X.Y.Z")
}
if (stream.minimumGenerationId != 0.toLong() && stream.minimumGenerationId != stream.generationId) {
TODO("Emit system_error trace message and crash")
}

// The refreshes project is the beginning of the end for OVERWRITE syncs.
// The sync mode still exists, but we are fully dependent on min_generation to trigger overwrite logic.
if (stream.destinationSyncMode == DestinationSyncMode.OVERWRITE) {
stream.destinationSyncMode = DestinationSyncMode.APPEND
}

val airbyteColumns =
when (
val schema: AirbyteType =
Expand Down Expand Up @@ -147,7 +161,10 @@ constructor(
stream.destinationSyncMode,
primaryKey,
cursor,
columns
columns,
stream.generationId,
stream.minimumGenerationId,
stream.syncId,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ data class StreamConfig(
val primaryKey: List<ColumnId>,
val cursor: Optional<ColumnId>,
val columns: LinkedHashMap<ColumnId, AirbyteType>,
val generationId: Long,
val minimumGenerationId: Long,
val syncId: Long,
)
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ internal class CatalogParserTest {
)
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withGenerationId(0)
.withMinimumGenerationId(0)
.withSyncId(0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,10 @@ class DefaultTyperDeduperTest {
DestinationSyncMode.OVERWRITE,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
private val APPEND_STREAM_CONFIG =
StreamConfig(
Expand All @@ -933,7 +936,10 @@ class DefaultTyperDeduperTest {
DestinationSyncMode.APPEND,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
private val DEDUPE_STREAM_CONFIG =
StreamConfig(
Expand All @@ -949,7 +955,10 @@ class DefaultTyperDeduperTest {
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DestinationV1V2MigratorTest {
migrator: BaseDestinationV1V2Migrator<*>,
expected: Boolean
) {
val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock())
val config = StreamConfig(STREAM_ID, mock(), destinationSyncMode, mock(), mock(), mock(), 0, 0, 0)
val actual = migrator.shouldMigrate(config)
Assertions.assertEquals(expected, actual)
}
Expand All @@ -92,7 +92,10 @@ class DestinationV1V2MigratorTest {
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
val migrator = makeMockMigrator(true, true, false, false, false)
val exception =
Expand All @@ -116,7 +119,10 @@ class DestinationV1V2MigratorTest {
DestinationSyncMode.APPEND_DEDUP,
mock(),
mock(),
mock()
mock(),
0,
0,
0,
)
val handler = Mockito.mock(DestinationHandler::class.java)
val sql = sqlGenerator.migrateFromV1toV2(STREAM_ID, "v1_raw_namespace", "v1_raw_table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
COLUMNS
COLUMNS,
0,
0,
0,
)
incrementalAppendStream =
StreamConfig(
Expand All @@ -235,7 +238,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
DestinationSyncMode.APPEND,
primaryKey,
Optional.of(cursor),
COLUMNS
COLUMNS,
0,
0,
0,
)

cdcIncrementalDedupStream =
Expand All @@ -245,7 +251,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
cdcColumns
cdcColumns,
0,
0,
0,
)
cdcIncrementalAppendStream =
StreamConfig(
Expand All @@ -254,7 +263,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
DestinationSyncMode.APPEND,
primaryKey,
Optional.of(cursor),
cdcColumns
cdcColumns,
0,
0,
0,
)

LOGGER.info("Running with namespace {}", namespace)
Expand Down Expand Up @@ -357,7 +369,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
DestinationSyncMode.APPEND_DEDUP,
incrementalDedupStream.primaryKey,
incrementalDedupStream.cursor,
incrementalDedupStream.columns
incrementalDedupStream.columns,
0,
0,
0,
)

createRawTable(streamId)
Expand Down Expand Up @@ -966,7 +981,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.empty(),
COLUMNS
COLUMNS,
0,
0,
0,
)
createRawTable(streamId)
createFinalTable(streamConfig, "")
Expand Down Expand Up @@ -1387,7 +1405,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
generator.buildColumnId("includes$\$doubledollar") to
AirbyteProtocolType.STRING,
generator.buildColumnId("endswithbackslash\\") to AirbyteProtocolType.STRING
)
),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down Expand Up @@ -1446,7 +1467,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
DestinationSyncMode.APPEND_DEDUP,
java.util.List.of(columnId),
Optional.of(columnId),
linkedMapOf(columnId to AirbyteProtocolType.STRING)
linkedMapOf(columnId to AirbyteProtocolType.STRING),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down Expand Up @@ -1482,7 +1506,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
linkedMapOf(
generator.buildColumnId("current_date") to AirbyteProtocolType.STRING,
generator.buildColumnId("join") to AirbyteProtocolType.STRING
)
),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down Expand Up @@ -1527,7 +1554,10 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
DestinationSyncMode.APPEND,
emptyList<ColumnId>(),
Optional.empty(),
LinkedHashMap()
LinkedHashMap(),
0,
0,
0,
)

val createTable = generator.createTable(stream, "", false)
Expand Down

0 comments on commit 9f8ae69

Please sign in to comment.