Skip to content

Commit

Permalink
implement generation id
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jul 10, 2024
1 parent d308ff1 commit db617da
Show file tree
Hide file tree
Showing 26 changed files with 130 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package io.airbyte.integrations.destination.databricks.jdbc

import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType.STRING
Expand Down Expand Up @@ -35,9 +38,6 @@ class DatabricksDestinationHandler(
) : DestinationHandler<MinimumDestinationState.Impl> {

private val log = KotlinLogging.logger {}
private val abRawId = DatabricksSqlGenerator.AB_RAW_ID
private val abExtractedAt = DatabricksSqlGenerator.AB_EXTRACTED_AT
private val abMeta = DatabricksSqlGenerator.AB_META

override fun execute(sql: Sql) {
val transactions: List<List<String>> = sql.transactions
Expand Down Expand Up @@ -169,17 +169,17 @@ class DatabricksDestinationHandler(
tableDefinition: TableDefinition
): Boolean {
val isAbRawIdMatch =
tableDefinition.columns.contains(abRawId) &&
tableDefinition.columns.contains(COLUMN_NAME_AB_RAW_ID) &&
DatabricksSqlGenerator.toDialectType(STRING) ==
tableDefinition.columns[abRawId]?.type
tableDefinition.columns[COLUMN_NAME_AB_RAW_ID]?.type
val isAbExtractedAtMatch =
tableDefinition.columns.contains(abExtractedAt) &&
tableDefinition.columns.contains(COLUMN_NAME_AB_EXTRACTED_AT) &&
DatabricksSqlGenerator.toDialectType(TIMESTAMP_WITH_TIMEZONE) ==
tableDefinition.columns[abExtractedAt]?.type
tableDefinition.columns[COLUMN_NAME_AB_EXTRACTED_AT]?.type
val isAbMetaMatch =
tableDefinition.columns.contains(abMeta) &&
tableDefinition.columns.contains(COLUMN_NAME_AB_META) &&
DatabricksSqlGenerator.toDialectType(STRING) ==
tableDefinition.columns[abMeta]?.type
tableDefinition.columns[COLUMN_NAME_AB_META]?.type
if (!isAbRawIdMatch || !isAbExtractedAtMatch || !isAbMetaMatch) return false

val expectedColumns =
Expand All @@ -188,7 +188,11 @@ class DatabricksDestinationHandler(
}
val actualColumns =
tableDefinition.columns.entries
.filter { (it.key != abRawId && it.key != abExtractedAt && it.key != abMeta) }
.filter {
(it.key != COLUMN_NAME_AB_RAW_ID &&
it.key != COLUMN_NAME_AB_EXTRACTED_AT &&
it.key != COLUMN_NAME_AB_META)
}
.associate {
it.key to if (it.value.type != "DECIMAL") it.value.type else "DECIMAL(38, 10)"
}
Expand Down Expand Up @@ -249,13 +253,13 @@ class DatabricksDestinationHandler(

val minExtractedAtLoadedNotNullQuery =
"""
|SELECT min(`${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}`) as last_loaded_at
|SELECT min(`$COLUMN_NAME_AB_EXTRACTED_AT`) as last_loaded_at
|FROM $databaseName.${id.rawTableId(DatabricksSqlGenerator.QUOTE)}
|WHERE ${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT} IS NULL
|""".trimMargin()
val maxExtractedAtQuery =
"""
|SELECT max(`${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}`) as last_loaded_at
|SELECT max(`$COLUMN_NAME_AB_EXTRACTED_AT`) as last_loaded_at
|FROM $databaseName.${id.rawTableId(DatabricksSqlGenerator.QUOTE)}
""".trimMargin()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

package io.airbyte.integrations.destination.databricks.jdbc

import io.airbyte.cdk.integrations.base.JavaBaseConstants as constants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT as AB_EXTRACTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID as AB_GENERATION
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT as AB_LOADED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META as AB_META
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID as AB_RAW_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA as AB_DATA
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
Expand All @@ -17,34 +22,29 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.Struct
import io.airbyte.integrations.base.destination.typing_deduping.Union
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.*
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.Reason
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.*
import java.util.Optional

class DatabricksSqlGenerator(
private val namingTransformer: NamingConventionTransformer,
private val unityCatalogName: String,
) : SqlGenerator {

private val log = KotlinLogging.logger {}
private val cdcDeletedColumn = buildColumnId(CDC_DELETED_COLUMN_NAME)
private val metaColumnTypeMap =
mapOf(
buildColumnId(AB_RAW_ID) to AirbyteProtocolType.STRING,
buildColumnId(AB_EXTRACTED_AT) to AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE,
buildColumnId(AB_META) to AirbyteProtocolType.STRING
buildColumnId(AB_META) to AirbyteProtocolType.STRING,
buildColumnId(AB_GENERATION) to AirbyteProtocolType.INTEGER,
)

companion object {
const val QUOTE = "`"
const val CDC_DELETED_COLUMN_NAME = "_ab_cdc_deleted_at"
const val AB_RAW_ID = constants.COLUMN_NAME_AB_RAW_ID
const val AB_EXTRACTED_AT = constants.COLUMN_NAME_AB_EXTRACTED_AT
const val AB_LOADED_AT = constants.COLUMN_NAME_AB_LOADED_AT
const val AB_DATA = constants.COLUMN_NAME_DATA
const val AB_META = constants.COLUMN_NAME_AB_META

fun toDialectType(type: AirbyteType): String {
return when (type) {
Expand Down Expand Up @@ -110,7 +110,8 @@ class DatabricksSqlGenerator(
$AB_EXTRACTED_AT TIMESTAMP,
$AB_LOADED_AT TIMESTAMP,
$AB_DATA STRING,
$AB_META STRING
$AB_META STRING,
$AB_GENERATION BIGINT
)
""".trimIndent(),
)
Expand All @@ -129,8 +130,8 @@ class DatabricksSqlGenerator(
return Sql.of(
"""
| UPDATE $unityCatalogName.${streamId.rawTableId(QUOTE)}
| SET ${constants.COLUMN_NAME_AB_LOADED_AT} = CURRENT_TIMESTAMP
| WHERE ${constants.COLUMN_NAME_AB_LOADED_AT} IS NULL
| SET $AB_LOADED_AT = CURRENT_TIMESTAMP
| WHERE $AB_LOADED_AT IS NULL
| $extractedAtCondition
| """.trimMargin()
)
Expand Down Expand Up @@ -360,6 +361,8 @@ class DatabricksSqlGenerator(
"""
|to_json(
| named_struct(
| "sync_id",
| _airbyte_meta.sync_id,
| "changes",
| array_union(
| _airbyte_type_errors,
Expand All @@ -374,7 +377,7 @@ class DatabricksSqlGenerator(
val selectFromRawTable =
"""SELECT
|${projectionColumns.replaceIndent(" ")},
| from_json($AB_META, 'STRUCT<`changes` : ARRAY<STRUCT<`field`: STRING, `change`: STRING, `reason`: STRING>>>') as `_airbyte_meta`,
| from_json($AB_META, 'STRUCT<`sync_id` : BIGINT, `changes` : ARRAY<STRUCT<`field`: STRING, `change`: STRING, `reason`: STRING>>>') as `_airbyte_meta`,
|${typeCastErrorsArray.replaceIndent(" ")} as `_airbyte_type_errors`
|FROM
| $unityCatalogName.${stream.id.rawTableId(QUOTE)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ class DatabricksStorageOperation(
workspaceClient.files().upload(stagedFile, data.inputStream)
destinationHandler.execute(
Sql.of(
// schema inference sees _airbyte_generation_id as an int (int32),
// which can't be loaded into a bigint (int64) column.
// So we have to explicitly cast it to a bigint.
"""
COPY INTO `$database`.`${streamId.rawNamespace}`.`${streamId.rawName}`
FROM '$stagedFile'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header'='true', 'inferSchema'='true', 'escape'='"');
""".trimIndent(),
COPY INTO `$database`.`${streamId.rawNamespace}`.`${streamId.rawName}`
FROM (
SELECT _airbyte_generation_id :: bigint, * except (_airbyte_generation_id)
FROM '$stagedFile'
)
FILEFORMAT = CSV
FORMAT_OPTIONS ('header'='true', 'inferSchema'='true', 'escape'='"');
""".trimIndent(),
),
)
// Databricks recommends that partners delete files in the staging directory once the data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.integrations.destination.databricks.staging.DatabricksFileBufferFactory
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.FileInputStream
import java.nio.charset.Charset
import java.util.stream.Stream
import java.util.zip.GZIPInputStream
import org.apache.commons.io.FileUtils
import org.apache.commons.io.IOUtils

/**
* This is almost identical to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ object DatabricksFileBufferFactory {
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_META
JavaBaseConstants.COLUMN_NAME_AB_META,
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
)
}

Expand All @@ -56,14 +57,14 @@ object DatabricksFileBufferFactory {
formattedString: String,
emittedAt: Long,
formattedAirbyteMetaString: String,
// TODO use this value
generationId: Long,
): List<Any> {
return listOf(
id,
Instant.ofEpochMilli(emittedAt),
formattedString,
formattedAirbyteMetaString
formattedAirbyteMetaString,
generationId,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class DatabricksSqlGeneratorIntegrationTest :
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_META
JavaBaseConstants.COLUMN_NAME_AB_META,
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
)
val tableIdentifier = streamId.rawTableId(DatabricksSqlGenerator.QUOTE)
insertRecords(columnNames, tableIdentifier, records)
Expand Down Expand Up @@ -137,14 +138,19 @@ class DatabricksSqlGeneratorIntegrationTest :
streamId: StreamId,
suffix: String?,
records: List<JsonNode>,
// TODO
generationId: Long,
) {
val columnNames =
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
val tableIdentifier =
streamId.finalTableId(DatabricksSqlGenerator.QUOTE, suffix?.lowercase() ?: "")
insertRecords(columnNames, tableIdentifier, records)
insertRecords(
columnNames,
tableIdentifier,
records.map {
(it as ObjectNode).put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId)
},
)
}

private fun insertRecords(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": "{\"city\":\"Los Angeles\",\"state\":\"CA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": "{\"city\":\"Boston\",\"state\":\"MA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": "{\"city\":\"Los Angeles\",\"state\":\"CA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": "{\"city\":\"Boston\",\"state\":\"MA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": "null"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": "null"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": "null"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": "null"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "_airbyte_generation_id": 43}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Keep the Alice record with more recent updated_at
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000", "name": "Alice", "address":"{\"city\":\"Los Angeles\",\"state\":\"CA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000", "name": "Bob", "address":"{\"city\":\"Boston\",\"state\":\"MA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[{\"field\":\"address\",\"change\":\"NULLED\",\"reason\":\"SOURCE_RETRIEVAL_ERROR\"}]}", "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000", "name": "Alice", "address":"{\"city\":\"Los Angeles\",\"state\":\"CA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000", "name": "Bob", "address":"{\"city\":\"Boston\",\"state\":\"MA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[{\"field\":\"address\",\"change\":\"NULLED\",\"reason\":\"SOURCE_RETRIEVAL_ERROR\"}]}", "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00.000000", "name": "Someone completely different"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 1, "id2": 200, "updated_at": "2001-01-01T00:00:00.000000", "name": "Someone completely different"}
Loading

0 comments on commit db617da

Please sign in to comment.