Skip to content

Commit

Permalink
Destination Snowflake: Sync Id, generation_id and Meta (#39107)
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed Jun 10, 2024
1 parent 7e281dd commit 9f0ce4f
Show file tree
Hide file tree
Showing 46 changed files with 533 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.35.15'
cdkVersionRequired = '0.37.1'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.9.1
dockerImageTag: 3.10.0
dockerRepository: airbyte/destination-snowflake
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
githubIssueLabel: destination-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.Sql
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeAbMetaAndGenIdMigration
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeDV2Migration
import io.airbyte.integrations.destination.snowflake.migrations.SnowflakeState
import io.airbyte.integrations.destination.snowflake.operation.SnowflakeStagingClient
Expand All @@ -43,6 +44,7 @@ import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSq
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.util.*
Expand All @@ -53,7 +55,6 @@ import javax.sql.DataSource
import net.snowflake.client.core.SFSession
import net.snowflake.client.core.SFStatement
import net.snowflake.client.jdbc.SnowflakeSQLException
import org.apache.commons.lang3.StringUtils
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -63,7 +64,7 @@ constructor(
private val airbyteEnvironment: String,
private val nameTransformer: NamingConventionTransformer = SnowflakeSQLNameTransformer(),
) : BaseConnector(), Destination {
private val destinationColumns = JavaBaseConstants.DestinationColumns.V2_WITHOUT_META
private val destinationColumns = JavaBaseConstants.DestinationColumns.V2_WITH_GENERATION

override fun check(config: JsonNode): AirbyteConnectionStatus? {
val dataSource = getDataSource(config)
Expand Down Expand Up @@ -123,7 +124,8 @@ constructor(
),
isSchemaMismatch = true,
isFinalTableEmpty = true,
destinationState = SnowflakeState(false)
destinationState =
SnowflakeState(needsSoftReset = false, isAirbyteMetaPresentInRaw = false)
)
// We simulate a mini-sync to see the raw table code path is exercised. and disable T+D
snowflakeDestinationHandler.createNamespaces(setOf(rawTableSchemaName, outputSchema))
Expand Down Expand Up @@ -151,7 +153,10 @@ constructor(
),
)
streamOperation.writeRecords(streamConfig, listOf(message).stream())
streamOperation.finalizeTable(streamConfig, StreamSyncSummary.DEFAULT)
streamOperation.finalizeTable(
streamConfig,
StreamSyncSummary(1, AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE),
)
// clean up the raw table, this is intentionally not part of actual sync code
// because we avoid dropping original tables directly.
snowflakeDestinationHandler.execute(
Expand Down Expand Up @@ -190,41 +195,34 @@ constructor(
AirbyteExceptionHandler.addAllStringsInConfigForDeinterpolation(config)

val defaultNamespace = config["schema"].asText()
for (stream in catalog.streams) {
if (StringUtils.isEmpty(stream.stream.namespace)) {
stream.stream.namespace = defaultNamespace
}
}

val retentionPeriodDays =
getRetentionPeriodDays(
config[RETENTION_PERIOD_DAYS],
)
val sqlGenerator = SnowflakeSqlGenerator(retentionPeriodDays)
val database = getDatabase(getDataSource(config))
val databaseName = config[JdbcUtils.DATABASE_KEY].asText()
val rawTableSchemaName: String
val catalogParser: CatalogParser
if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) {
rawTableSchemaName = getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get()
catalogParser = CatalogParser(sqlGenerator, rawTableSchemaName)
} else {
rawTableSchemaName = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE
catalogParser = CatalogParser(sqlGenerator)
}
val rawTableSchemaName: String =
if (getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent) {
getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get()
} else {
JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE
}
val catalogParser = CatalogParser(sqlGenerator, defaultNamespace, rawTableSchemaName)
val snowflakeDestinationHandler =
SnowflakeDestinationHandler(databaseName, database, rawTableSchemaName)
val parsedCatalog: ParsedCatalog = catalogParser.parseCatalog(catalog)
val disableTypeDedupe =
config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false)
val migrations =
listOf<Migration<SnowflakeState>>(
val migrations: List<Migration<SnowflakeState>> =
listOf(
SnowflakeDV2Migration(
nameTransformer,
database,
databaseName,
sqlGenerator,
),
SnowflakeAbMetaAndGenIdMigration(database),
)

val snowflakeStagingClient = SnowflakeStagingClient(database)
Expand Down Expand Up @@ -264,8 +262,7 @@ constructor(
},
onFlush = DefaultFlush(optimalFlushBatchSize, syncOperation),
catalog = catalog,
bufferManager = BufferManager(snowflakeBufferMemoryLimit),
defaultNamespace = Optional.of(defaultNamespace),
bufferManager = BufferManager(defaultNamespace, snowflakeBufferMemoryLimit)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake.migrations

import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.commons.json.Jsons
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.LinkedHashMap

private val log = KotlinLogging.logger {}

class SnowflakeAbMetaAndGenIdMigration(private val database: JdbcDatabase) :
Migration<SnowflakeState> {
override fun migrateIfNecessary(
destinationHandler: DestinationHandler<SnowflakeState>,
stream: StreamConfig,
state: DestinationInitialStatus<SnowflakeState>
): Migration.MigrationResult<SnowflakeState> {
if (state.destinationState.isAirbyteMetaPresentInRaw) {
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
"because previous destination state has isAirbyteMetaPresent"
}
return Migration.MigrationResult(state.destinationState, false)
}

if (!state.initialRawTableStatus.rawTableExists) {
// The raw table doesn't exist. No migration necessary. Update the state.
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} because the raw table doesn't exist"
}
return Migration.MigrationResult(
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
false
)
}

// Snowflake will match the lowercase raw table even with QUOTED_IDENTIFIER_IGNORE_CASE =
// TRUE
val results =
database.queryJsons(
"SHOW COLUMNS IN TABLE \"${stream.id.rawNamespace}\".\"${stream.id.rawName}\""
)
val rawTableDefinition =
results
.groupBy { it.get("schema_name").asText()!! }
.mapValues { (_, v) ->
v.groupBy { it.get("table_name").asText()!! }
.mapValuesTo(LinkedHashMap()) { (_, v) ->
TableDefinition(
v.associateTo(LinkedHashMap()) {
// return value of data_type in show columns is a json string.
val dataType = Jsons.deserialize(it.get("data_type").asText())
it.get("column_name").asText()!! to
ColumnDefinition(
it.get("column_name").asText(),
dataType.get("type").asText(),
0,
dataType.get("nullable").asBoolean(),
)
},
)
}
}
// default is lower case raw tables, for accounts with QUOTED_IDENTIFIER_IGNORE_CASE = TRUE
// we have to match uppercase
val isUpperCaseIdentifer =
!rawTableDefinition.containsKey(stream.id.rawNamespace) &&
rawTableDefinition.containsKey(stream.id.rawNamespace.uppercase())
val rawNamespace: String
val rawName: String
val abMetaColumn: String
if (isUpperCaseIdentifer) {
rawNamespace = stream.id.rawNamespace.uppercase()
rawName = stream.id.rawName.uppercase()
abMetaColumn = JavaBaseConstants.COLUMN_NAME_AB_META.uppercase()
} else {
rawNamespace = stream.id.rawNamespace
rawName = stream.id.rawName
abMetaColumn = JavaBaseConstants.COLUMN_NAME_AB_META
}
rawTableDefinition[rawNamespace]?.get(rawName)?.let { tableDefinition ->
if (tableDefinition.columns.containsKey(abMetaColumn)) {
log.info {
"Skipping airbyte_meta/generation_id migration for ${stream.id.originalNamespace}.${stream.id.originalName} " +
"because the raw table already has the airbyte_meta column"
}
} else {
log.info {
"Migrating airbyte_meta/generation_id for table ${stream.id.rawNamespace}.${stream.id.rawName}"
}
// Quote for raw table columns
val alterRawTableSql =
"""
ALTER TABLE "${stream.id.rawNamespace}"."${stream.id.rawName}"
ADD COLUMN "${JavaBaseConstants.COLUMN_NAME_AB_META}" VARIANT,
COLUMN "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" INTEGER;
""".trimIndent()
database.execute(alterRawTableSql)
}
}

// To avoid another metadata query in Snowflake, we rely on the initial status gathering
// which already checks for the columns in the final table to indicate schema mismatch
// to safeguard if the schema mismatch is due to meta columns or customer's column
// executing an add column with if not exists check
if (state.isFinalTablePresent && state.isSchemaMismatch) {
log.info {
"Migrating generation_id for table ${stream.id.finalNamespace}.${stream.id.finalName}"
}
// explicitly uppercase and quote the final table column.
val alterFinalTableSql =
"""
ALTER TABLE "${stream.id.finalNamespace}"."${stream.id.finalName}"
ADD COLUMN IF NOT EXISTS "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase()}" INTEGER;
""".trimIndent()
database.execute(alterFinalTableSql)
// Final table schema changed, fetch the initial status again
return Migration.MigrationResult(
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
true
)
}

// Final table is untouched, so we don't need to fetch the initial status
return Migration.MigrationResult(
state.destinationState.copy(isAirbyteMetaPresentInRaw = true),
false
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ class SnowflakeDV2Migration(
): Migration.MigrationResult<SnowflakeState> {
log.info { "Initializing DV2 Migration check" }
legacyV1V2migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream)
return Migration.MigrationResult(SnowflakeState(false), true)
return Migration.MigrationResult(
SnowflakeState(needsSoftReset = false, isAirbyteMetaPresentInRaw = false),
true
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import io.airbyte.integrations.base.destination.typing_deduping.migrators.Minimu

// Note the nonnullable fields. Even though the underlying storage medium (a JSON blob) supports
// nullability, we don't want to deal with that in our codebase.
data class SnowflakeState(val needsSoftReset: Boolean) : MinimumDestinationState {
data class SnowflakeState(val needsSoftReset: Boolean, val isAirbyteMetaPresentInRaw: Boolean) :
MinimumDestinationState {
override fun needsSoftReset(): Boolean {
return needsSoftReset
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class SnowflakeStorageOperation(
| "${JavaBaseConstants.COLUMN_NAME_AB_RAW_ID}" VARCHAR PRIMARY KEY,
| "${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
| "${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT}" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
| "${JavaBaseConstants.COLUMN_NAME_DATA}" VARIANT
| "${JavaBaseConstants.COLUMN_NAME_DATA}" VARIANT,
| "${JavaBaseConstants.COLUMN_NAME_AB_META}" VARIANT DEFAULT NULL,
| "${JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID}" INTEGER DEFAULT NULL
|) data_retention_time_in_days = $retentionPeriodDays;
""".trimMargin()
}
Expand All @@ -60,11 +62,16 @@ class SnowflakeStorageOperation(
return "TRUNCATE TABLE \"${streamId.rawNamespace}\".\"${streamId.rawName}\";\n"
}

override fun writeToStage(streamId: StreamId, data: SerializableBuffer) {
val stageName = getStageName(streamId)
override fun writeToStage(streamConfig: StreamConfig, data: SerializableBuffer) {
val stageName = getStageName(streamConfig.id)
val stagingPath = getStagingPath()
val stagedFileName = staging.uploadRecordsToStage(data, stageName, stagingPath)
staging.copyIntoTableFromStage(stageName, stagingPath, listOf(stagedFileName), streamId)
staging.copyIntoTableFromStage(
stageName,
stagingPath,
listOf(stagedFileName),
streamConfig.id
)
}
override fun cleanupStage(streamId: StreamId) {
val stageName = getStageName(streamId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,19 @@ class SnowflakeDestinationHandler(
database.executeMetadataQuery { databaseMetaData: DatabaseMetaData ->
LOGGER.info("Retrieving table from Db metadata: {} {}", id.rawNamespace, id.rawName)
try {
databaseMetaData
.getTables(databaseName, id.rawNamespace, id.rawName, null)
.use { tables ->
return@executeMetadataQuery tables.next()
}
val rs =
databaseMetaData.getTables(databaseName, id.rawNamespace, id.rawName, null)
// When QUOTED_IDENTIFIERS_IGNORE_CASE is set to true, the raw table is
// interpreted as uppercase
// in db metadata calls. check for both
val rsUppercase =
databaseMetaData.getTables(
databaseName,
id.rawNamespace.uppercase(),
id.rawName.uppercase(),
null
)
rs.next() || rsUppercase.next()
} catch (e: SQLException) {
LOGGER.error("Failed to retrieve table metadata", e)
throw RuntimeException(e)
Expand Down Expand Up @@ -287,6 +295,14 @@ class SnowflakeDestinationHandler(
"VARIANT" == existingTable.columns[abMetaColumnName]!!.type
}

fun isAirbyteGenerationIdColumnMatch(existingTable: TableDefinition): Boolean {
val abGenerationIdColumnName: String =
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID.uppercase(Locale.getDefault())
return existingTable.columns.containsKey(abGenerationIdColumnName) &&
toJdbcTypeName(AirbyteProtocolType.INTEGER) ==
existingTable.columns[abGenerationIdColumnName]!!.type
}

@SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE")
override fun existingSchemaMatchesStreamConfig(
stream: StreamConfig?,
Expand All @@ -299,7 +315,8 @@ class SnowflakeDestinationHandler(
if (
!isAirbyteRawIdColumnMatch(existingTable) ||
!isAirbyteExtractedAtColumnMatch(existingTable) ||
!isAirbyteMetaColumnMatch(existingTable)
!isAirbyteMetaColumnMatch(existingTable) ||
!isAirbyteGenerationIdColumnMatch(existingTable)
) {
// Missing AB meta columns from final table, we need them to do proper T+D so trigger
// soft-reset
Expand Down Expand Up @@ -417,8 +434,13 @@ class SnowflakeDestinationHandler(
}

override fun toDestinationState(json: JsonNode): SnowflakeState {
// Note the field name is isAirbyteMetaPresentInRaw but jackson interprets it as
// airbyteMetaPresentInRaw when serializing so we map that to the correct field when
// deserializing
return SnowflakeState(
json.hasNonNull("needsSoftReset") && json["needsSoftReset"].asBoolean()
json.hasNonNull("needsSoftReset") && json["needsSoftReset"].asBoolean(),
json.hasNonNull("airbyteMetaPresentInRaw") &&
json["airbyteMetaPresentInRaw"].asBoolean()
)
}

Expand Down
Loading

0 comments on commit 9f0ce4f

Please sign in to comment.