Skip to content

Commit

Permalink
implement generation id
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jul 11, 2024
1 parent d308ff1 commit f7c724a
Show file tree
Hide file tree
Showing 30 changed files with 159 additions and 143 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
testExecutionConcurrency=-1
# minimum 3 minutes timeout required during parallel workload on our small warehouse
JunitMethodExecutionTimeout=3 m
# increased timeout required during parallel workload on our small warehouse
JunitMethodExecutionTimeout=10 m
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ class DatabricksDestination : BaseConnector(), Destination {

val sqlGenerator =
DatabricksSqlGenerator(DatabricksNamingTransformer(), connectorConfig.database)
val catalogParser = CatalogParser(sqlGenerator, connectorConfig.rawSchemaOverride)
val defaultNamespace = connectorConfig.schema
val catalogParser =
CatalogParser(sqlGenerator, defaultNamespace, connectorConfig.rawSchemaOverride)
val parsedCatalog = catalogParser.parseCatalog(catalog)
val workspaceClient =
DatabricksConnectorClientsFactory.createWorkspaceClient(
Expand Down Expand Up @@ -177,7 +179,7 @@ class DatabricksDestination : BaseConnector(), Destination {
DefaultSyncOperation(
parsedCatalog,
destinationHandler,
connectorConfig.schema,
defaultNamespace,
DatabricksStreamOperationFactory(storageOperations),
listOf()
)
Expand All @@ -192,7 +194,7 @@ class DatabricksDestination : BaseConnector(), Destination {
catalog = catalog,
bufferManager =
BufferManager(
defaultNamespace = connectorConfig.schema,
defaultNamespace = defaultNamespace,
(Runtime.getRuntime().maxMemory() * BufferManager.MEMORY_LIMIT_RATIO).toLong(),
),
airbyteMessageDeserializer = AirbyteMessageDeserializer(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ package io.airbyte.integrations.destination.databricks.jdbc

import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType.STRING
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
Expand Down Expand Up @@ -35,9 +40,6 @@ class DatabricksDestinationHandler(
) : DestinationHandler<MinimumDestinationState.Impl> {

private val log = KotlinLogging.logger {}
private val abRawId = DatabricksSqlGenerator.AB_RAW_ID
private val abExtractedAt = DatabricksSqlGenerator.AB_EXTRACTED_AT
private val abMeta = DatabricksSqlGenerator.AB_META

override fun execute(sql: Sql) {
val transactions: List<List<String>> = sql.transactions
Expand Down Expand Up @@ -169,26 +171,36 @@ class DatabricksDestinationHandler(
tableDefinition: TableDefinition
): Boolean {
val isAbRawIdMatch =
tableDefinition.columns.contains(abRawId) &&
tableDefinition.columns.contains(COLUMN_NAME_AB_RAW_ID) &&
DatabricksSqlGenerator.toDialectType(STRING) ==
tableDefinition.columns[abRawId]?.type
tableDefinition.columns[COLUMN_NAME_AB_RAW_ID]?.type
val isAbExtractedAtMatch =
tableDefinition.columns.contains(abExtractedAt) &&
tableDefinition.columns.contains(COLUMN_NAME_AB_EXTRACTED_AT) &&
DatabricksSqlGenerator.toDialectType(TIMESTAMP_WITH_TIMEZONE) ==
tableDefinition.columns[abExtractedAt]?.type
tableDefinition.columns[COLUMN_NAME_AB_EXTRACTED_AT]?.type
val isAbMetaMatch =
tableDefinition.columns.contains(abMeta) &&
tableDefinition.columns.contains(COLUMN_NAME_AB_META) &&
DatabricksSqlGenerator.toDialectType(STRING) ==
tableDefinition.columns[abMeta]?.type
if (!isAbRawIdMatch || !isAbExtractedAtMatch || !isAbMetaMatch) return false
tableDefinition.columns[COLUMN_NAME_AB_META]?.type
val isAbGenerationMatch =
tableDefinition.columns.contains(COLUMN_NAME_AB_GENERATION_ID) &&
DatabricksSqlGenerator.toDialectType(AirbyteProtocolType.INTEGER) ==
tableDefinition.columns[COLUMN_NAME_AB_GENERATION_ID]?.type
if (!isAbRawIdMatch || !isAbExtractedAtMatch || !isAbMetaMatch || !isAbGenerationMatch)
return false

val expectedColumns =
streamConfig.columns.entries.associate {
it.key.name to DatabricksSqlGenerator.toDialectType(it.value)
}
val actualColumns =
tableDefinition.columns.entries
.filter { (it.key != abRawId && it.key != abExtractedAt && it.key != abMeta) }
.filter {
(it.key != COLUMN_NAME_AB_RAW_ID &&
it.key != COLUMN_NAME_AB_EXTRACTED_AT &&
it.key != COLUMN_NAME_AB_META &&
it.key != COLUMN_NAME_AB_GENERATION_ID)
}
.associate {
it.key to if (it.value.type != "DECIMAL") it.value.type else "DECIMAL(38, 10)"
}
Expand Down Expand Up @@ -249,13 +261,13 @@ class DatabricksDestinationHandler(

val minExtractedAtLoadedNotNullQuery =
"""
|SELECT min(`${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}`) as last_loaded_at
|SELECT min(`$COLUMN_NAME_AB_EXTRACTED_AT`) as last_loaded_at
|FROM $databaseName.${id.rawTableId(DatabricksSqlGenerator.QUOTE)}
|WHERE ${JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT} IS NULL
|""".trimMargin()
val maxExtractedAtQuery =
"""
|SELECT max(`${JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT}`) as last_loaded_at
|SELECT max(`$COLUMN_NAME_AB_EXTRACTED_AT`) as last_loaded_at
|FROM $databaseName.${id.rawTableId(DatabricksSqlGenerator.QUOTE)}
""".trimMargin()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

package io.airbyte.integrations.destination.databricks.jdbc

import io.airbyte.cdk.integrations.base.JavaBaseConstants as constants
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT as AB_EXTRACTED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID as AB_GENERATION
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT as AB_LOADED_AT
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META as AB_META
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID as AB_RAW_ID
import io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA as AB_DATA
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
Expand All @@ -17,34 +22,29 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.Struct
import io.airbyte.integrations.base.destination.typing_deduping.Union
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.*
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.Change
import io.airbyte.protocol.models.AirbyteRecordMessageMetaChange.Reason
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.*
import java.util.Optional

class DatabricksSqlGenerator(
private val namingTransformer: NamingConventionTransformer,
private val unityCatalogName: String,
) : SqlGenerator {

private val log = KotlinLogging.logger {}
private val cdcDeletedColumn = buildColumnId(CDC_DELETED_COLUMN_NAME)
private val metaColumnTypeMap =
mapOf(
buildColumnId(AB_RAW_ID) to AirbyteProtocolType.STRING,
buildColumnId(AB_EXTRACTED_AT) to AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE,
buildColumnId(AB_META) to AirbyteProtocolType.STRING
buildColumnId(AB_META) to AirbyteProtocolType.STRING,
buildColumnId(AB_GENERATION) to AirbyteProtocolType.INTEGER,
)

companion object {
const val QUOTE = "`"
const val CDC_DELETED_COLUMN_NAME = "_ab_cdc_deleted_at"
const val AB_RAW_ID = constants.COLUMN_NAME_AB_RAW_ID
const val AB_EXTRACTED_AT = constants.COLUMN_NAME_AB_EXTRACTED_AT
const val AB_LOADED_AT = constants.COLUMN_NAME_AB_LOADED_AT
const val AB_DATA = constants.COLUMN_NAME_DATA
const val AB_META = constants.COLUMN_NAME_AB_META

fun toDialectType(type: AirbyteType): String {
return when (type) {
Expand Down Expand Up @@ -110,7 +110,8 @@ class DatabricksSqlGenerator(
$AB_EXTRACTED_AT TIMESTAMP,
$AB_LOADED_AT TIMESTAMP,
$AB_DATA STRING,
$AB_META STRING
$AB_META STRING,
$AB_GENERATION BIGINT
)
""".trimIndent(),
)
Expand All @@ -129,8 +130,8 @@ class DatabricksSqlGenerator(
return Sql.of(
"""
| UPDATE $unityCatalogName.${streamId.rawTableId(QUOTE)}
| SET ${constants.COLUMN_NAME_AB_LOADED_AT} = CURRENT_TIMESTAMP
| WHERE ${constants.COLUMN_NAME_AB_LOADED_AT} IS NULL
| SET $AB_LOADED_AT = CURRENT_TIMESTAMP
| WHERE $AB_LOADED_AT IS NULL
| $extractedAtCondition
| """.trimMargin()
)
Expand Down Expand Up @@ -360,6 +361,8 @@ class DatabricksSqlGenerator(
"""
|to_json(
| named_struct(
| "sync_id",
| _airbyte_meta.sync_id,
| "changes",
| array_union(
| _airbyte_type_errors,
Expand All @@ -374,7 +377,7 @@ class DatabricksSqlGenerator(
val selectFromRawTable =
"""SELECT
|${projectionColumns.replaceIndent(" ")},
| from_json($AB_META, 'STRUCT<`changes` : ARRAY<STRUCT<`field`: STRING, `change`: STRING, `reason`: STRING>>>') as `_airbyte_meta`,
| from_json($AB_META, 'STRUCT<`sync_id` : BIGINT, `changes` : ARRAY<STRUCT<`field`: STRING, `change`: STRING, `reason`: STRING>>>') as `_airbyte_meta`,
|${typeCastErrorsArray.replaceIndent(" ")} as `_airbyte_type_errors`
|FROM
| $unityCatalogName.${stream.id.rawTableId(QUOTE)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,18 @@ class DatabricksStorageOperation(
workspaceClient.files().upload(stagedFile, data.inputStream)
destinationHandler.execute(
Sql.of(
// schema inference sees _airbyte_generation_id as an int (int32),
// which can't be loaded into a bigint (int64) column.
// So we have to explicitly cast it to a bigint.
"""
COPY INTO `$database`.`${streamId.rawNamespace}`.`${streamId.rawName}`
FROM '$stagedFile'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header'='true', 'inferSchema'='true', 'escape'='"');
""".trimIndent(),
COPY INTO `$database`.`${streamId.rawNamespace}`.`${streamId.rawName}`
FROM (
SELECT _airbyte_generation_id :: bigint, * except (_airbyte_generation_id)
FROM '$stagedFile'
)
FILEFORMAT = CSV
FORMAT_OPTIONS ('header'='true', 'inferSchema'='true', 'escape'='"');
""".trimIndent(),
),
)
// Databricks recommends that partners delete files in the staging directory once the data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ object DatabricksFileBufferFactory {
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_META
JavaBaseConstants.COLUMN_NAME_AB_META,
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
)
}

Expand All @@ -56,14 +57,14 @@ object DatabricksFileBufferFactory {
formattedString: String,
emittedAt: Long,
formattedAirbyteMetaString: String,
// TODO use this value
generationId: Long,
): List<Any> {
return listOf(
id,
Instant.ofEpochMilli(emittedAt),
formattedString,
formattedAirbyteMetaString
formattedAirbyteMetaString,
generationId,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class DatabricksSqlGeneratorIntegrationTest :
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_META
JavaBaseConstants.COLUMN_NAME_AB_META,
JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID,
)
val tableIdentifier = streamId.rawTableId(DatabricksSqlGenerator.QUOTE)
insertRecords(columnNames, tableIdentifier, records)
Expand Down Expand Up @@ -137,14 +138,19 @@ class DatabricksSqlGeneratorIntegrationTest :
streamId: StreamId,
suffix: String?,
records: List<JsonNode>,
// TODO
generationId: Long,
) {
val columnNames =
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
val tableIdentifier =
streamId.finalTableId(DatabricksSqlGenerator.QUOTE, suffix?.lowercase() ?: "")
insertRecords(columnNames, tableIdentifier, records)
insertRecords(
columnNames,
tableIdentifier,
records.map {
(it as ObjectNode).put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId)
},
)
}

private fun insertRecords(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": "{\"city\":\"Los Angeles\",\"state\":\"CA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": "{\"city\":\"Boston\",\"state\":\"MA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_meta": "{\"changes\":[]}", "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": "{\"city\":\"Los Angeles\",\"state\":\"CA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": "{\"city\":\"Boston\",\"state\":\"MA\"}"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_generation_id": 43, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": "null"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": "null"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": "null"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": "null"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "_airbyte_generation_id": 43}
{"_airbyte_extracted_at": "1970-01-01T00:00:01", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": "{\"sync_id\":42,\"changes\":[]}", "_airbyte_generation_id": 43}
Loading

0 comments on commit f7c724a

Please sign in to comment.