Skip to content

Commit

Permalink
Update CDK for Raw Only Dv2 destinations (#36047)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbfbell authored Apr 10, 2024
1 parent bc27a84 commit ba3bdb1
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ object JdbcUtils {
)
@JvmStatic val defaultSourceOperations: JdbcSourceOperations = JdbcSourceOperations()

@JvmStatic
val defaultJSONFormat: JSONFormat = JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT)

@JvmStatic
Expand All @@ -85,6 +86,7 @@ object JdbcUtils {
}
}

@JvmStatic
@JvmOverloads
fun parseJdbcParameters(
jdbcPropertiesString: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
*/
package io.airbyte.cdk.integrations.base

import java.util.*
import org.apache.commons.lang3.StringUtils

fun upperQuoted(column: String): String {
return StringUtils.wrap(column.uppercase(Locale.getDefault()), "\"")
}

object JavaBaseConstants {
const val ARGS_CONFIG_KEY: String = "config"
const val ARGS_CATALOG_KEY: String = "catalog"
Expand Down Expand Up @@ -33,7 +40,7 @@ object JavaBaseConstants {
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA
COLUMN_NAME_DATA,
)
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES: List<String> =
Expand All @@ -42,7 +49,7 @@ object JavaBaseConstants {
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
COLUMN_NAME_DATA,
COLUMN_NAME_AB_META
COLUMN_NAME_AB_META,
)
@JvmField
val V2_FINAL_TABLE_METADATA_COLUMNS: List<String> =
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.8
version=0.29.9
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
protected val configSchemaKey: String
get() = "schema"

/**
* If the destination should always disable type dedupe, override this method to return true. We
* only type and dedupe if we create final tables.
*
* @return whether the destination should always disable type dedupe
*/
protected open fun shouldAlwaysDisableTypeDedupe(): Boolean {
return false
}

override fun check(config: JsonNode): AirbyteConnectionStatus? {
val dataSource = getDataSource(config)

Expand All @@ -67,7 +77,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
val v2RawSchema =
namingResolver.getIdentifier(
getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
)
attemptTableOperations(v2RawSchema, database, namingResolver, sqlOperations, false)
destinationSpecificTableOperations(database)
Expand All @@ -87,7 +97,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
"""
Could not connect with provided configuration.
${e.message}
""".trimIndent()
""".trimIndent(),
)
} finally {
try {
Expand Down Expand Up @@ -123,7 +133,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
jdbcConfig[JdbcUtils.PASSWORD_KEY].asText()
else null,
driverClassName,
jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText()
jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(),
)
.withConnectionProperties(connectionProperties)
.withConnectionTimeout(getConnectionTimeout(connectionProperties))
Expand Down Expand Up @@ -155,8 +165,10 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
for (key in defaultParameters.keys) {
require(
!(customParameters.containsKey(key) &&
customParameters[key] != defaultParameters[key])
) { "Cannot overwrite default JDBC parameter $key" }
customParameters[key] != defaultParameters[key]),
) {
"Cannot overwrite default JDBC parameter $key"
}
}
}

Expand Down Expand Up @@ -191,7 +203,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
* @param config
* @return
*/
protected fun getDatabaseName(config: JsonNode): String {
protected open fun getDatabaseName(config: JsonNode): String {
return config[JdbcUtils.DATABASE_KEY].asText()
}

Expand Down Expand Up @@ -227,7 +239,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
config,
catalog,
null,
NoopTyperDeduper()
NoopTyperDeduper(),
)
}

Expand All @@ -238,10 +250,18 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
catalog,
outputRecordCollector,
database,
defaultNamespace
defaultNamespace,
)
}

private fun isTypeDedupeDisabled(config: JsonNode): Boolean {
return shouldAlwaysDisableTypeDedupe() ||
(config.has(DISABLE_TYPE_DEDUPE) &&
config[DISABLE_TYPE_DEDUPE].asBoolean(
false,
))
}

private fun getV2MessageConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
Expand All @@ -256,51 +276,71 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
.map { override: String -> CatalogParser(sqlGenerator, override) }
.orElse(CatalogParser(sqlGenerator))
.parseCatalog(catalog!!)
val typerDeduper: TyperDeduper =
buildTyperDeduper(
config,
database,
parsedCatalog,
)

return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
database,
sqlOperations,
namingResolver,
config,
catalog,
defaultNamespace,
typerDeduper,
getDataTransformer(parsedCatalog, defaultNamespace),
)
}

private fun buildTyperDeduper(
config: JsonNode,
database: JdbcDatabase,
parsedCatalog: ParsedCatalog,
): TyperDeduper {
val databaseName = getDatabaseName(config)
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
val v2TableMigrator = NoopV2TableMigrator()
val migrator = JdbcV1V2Migrator(namingResolver, database, databaseName)
val destinationHandler: DestinationHandler<DestinationState> =
getDestinationHandler(
databaseName,
database,
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)
getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
)
val disableTypeDedupe =
config.has(DISABLE_TYPE_DEDUPE) && config[DISABLE_TYPE_DEDUPE].asBoolean(false)
val typerDeduper: TyperDeduper
val disableTypeDedupe = isTypeDedupeDisabled(config)
val migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler)
typerDeduper =
if (disableTypeDedupe) {
NoOpTyperDeduperWithV1V2Migrations(
sqlGenerator,
destinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
migrations
)
} else {

val typerDeduper: TyperDeduper
if (disableTypeDedupe) {
typerDeduper =
if (migrations.isEmpty()) {
NoopTyperDeduper()
} else {
NoOpTyperDeduperWithV1V2Migrations(
sqlGenerator,
destinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
migrations,
)
}
} else {
typerDeduper =
DefaultTyperDeduper(
sqlGenerator,
destinationHandler,
parsedCatalog,
migrator,
v2TableMigrator,
migrations
migrations,
)
}

return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
database,
sqlOperations,
namingResolver,
config,
catalog,
defaultNamespace,
typerDeduper,
getDataTransformer(parsedCatalog, defaultNamespace)
)
}
return typerDeduper
}

companion object {
Expand Down Expand Up @@ -361,7 +401,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
{ conn: Connection -> conn.metaData.catalogs },
{ queryContext: ResultSet? ->
JdbcUtils.defaultSourceOperations.rowToJson(queryContext!!)
}
},
)

// verify we have write permissions on the target schema by creating a table with a
Expand All @@ -370,7 +410,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
val outputTableName =
namingResolver.getIdentifier(
"_airbyte_connection_test_" +
UUID.randomUUID().toString().replace("-".toRegex(), "")
UUID.randomUUID().toString().replace("-".toRegex(), ""),
)
sqlOps.createSchemaIfNotExists(database, outputSchema)
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName)
Expand All @@ -381,7 +421,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
database,
java.util.List.of(dummyRecord),
outputSchema,
outputTableName
outputTableName,
)
}
} finally {
Expand Down Expand Up @@ -412,7 +452,7 @@ abstract class AbstractJdbcDestination<DestinationState : MinimumDestinationStat
.withRecord(
PartialAirbyteRecordMessage()
.withStream("stream1")
.withEmittedAt(1602637589000L)
.withEmittedAt(1602637589000L),
)
.withSerialized(dummyDataToInsert.toString())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.Sql
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import org.jooq.SQLDialect

class NoOpJdbcDestinationHandler<DestinationState>(
databaseName: String,
jdbcDatabase: JdbcDatabase,
rawTableSchemaName: String,
sqlDialect: SQLDialect
) :
JdbcDestinationHandler<DestinationState>(
databaseName,
jdbcDatabase,
rawTableSchemaName,
sqlDialect
) {

override fun execute(sql: Sql) {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun gatherInitialState(
streamConfigs: List<StreamConfig>
): List<DestinationInitialStatus<DestinationState>> {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun commitDestinationStates(destinationStates: Map<StreamId, DestinationState>) {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun toDestinationState(json: JsonNode?): DestinationState {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}

override fun toJdbcTypeName(airbyteType: AirbyteType?): String {
throw NotImplementedError("This JDBC Destination Handler does not support typing deduping")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import io.airbyte.configoss.JobGetSpecConfig
import io.airbyte.configoss.OperatorDbt
import io.airbyte.configoss.StandardCheckConnectionInput
import io.airbyte.configoss.StandardCheckConnectionOutput
import io.airbyte.configoss.StandardCheckConnectionOutput.Status
import io.airbyte.configoss.WorkerDestinationConfig
import io.airbyte.protocol.models.Field
import io.airbyte.protocol.models.JsonSchemaType
Expand Down Expand Up @@ -64,9 +63,6 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Consumer
import java.util.stream.Collectors
import java.util.stream.Stream
import kotlin.Comparator
import kotlin.collections.ArrayList
import kotlin.collections.HashSet
import kotlin.test.assertNotNull
import org.junit.jupiter.api.*
import org.junit.jupiter.api.extension.ExtensionContext
Expand Down Expand Up @@ -345,7 +341,7 @@ abstract class DestinationAcceptanceTest {
"""This method is moved to the AdvancedTestDataComparator. Please move your destination
implementation of the method to your comparator implementation."""
)
protected fun resolveIdentifier(identifier: String?): List<String?> {
protected open fun resolveIdentifier(identifier: String?): List<String?> {
return java.util.List.of(identifier)
}

Expand Down

0 comments on commit ba3bdb1

Please sign in to comment.