From 325cbefd2bfbd042bc42744a908fe67b3f52731d Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Mon, 25 Mar 2024 16:37:17 -0700 Subject: [PATCH] convert #36396 to kotlin --- .../typing_deduping/JdbcDestinationHandler.kt | 62 ++++++++++++------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index 3ed79a50e1c1c0..d26cd311d97001 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.integrations.base.JavaBaseConstants import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition @@ -23,16 +24,16 @@ import java.time.temporal.ChronoUnit import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage -import java.util.function.Function import java.util.function.Predicate -import java.util.stream.Collectors -import kotlin.collections.LinkedHashMap +import java.util.stream.Collectors.toMap import lombok.extern.slf4j.Slf4j import org.jooq.Condition import org.jooq.DSLContext import org.jooq.SQLDialect import org.jooq.conf.ParamType import org.jooq.impl.DSL +import org.jooq.impl.DSL.field +import org.jooq.impl.DSL.quotedName import org.jooq.impl.SQLDataType import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -207,33 +208,32 @@ abstract class JdbcDestinationHandler( @get:Throws(SQLException::class) protected val allDestinationStates: Map get() { + // Guarantee the table exists. jdbcDatabase.execute( dslContext .createTableIfNotExists( - DSL.quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME) - ) - .column( - DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME), - SQLDataType.VARCHAR + quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME) ) + .column(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME), SQLDataType.VARCHAR) .column( - DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE), + quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE), SQLDataType.VARCHAR ) // Just use a string type, even if the destination has a json type. // We're never going to query this column in a fancy way - all our processing // can happen // client-side. .column( - DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE), + quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE), SQLDataType.VARCHAR ) // Add an updated_at field. We don't actually need it yet, but it can't hurt! .column( - DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT), + quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT), SQLDataType.TIMESTAMPWITHTIMEZONE ) .getSQL(ParamType.INLINED) ) + // Fetch all records from it. We _could_ filter down to just our streams... but meh. // This is small // data. @@ -241,26 +241,40 @@ abstract class JdbcDestinationHandler( .queryJsons( dslContext .select( - DSL.field(DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)), - DSL.field(DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)), - DSL.field(DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE)) + field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)), + field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)), + field(quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE)) ) - .from(DSL.quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)) - .sql + .from(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)) + .getSQL() ) .stream() + .peek { recordJson: JsonNode -> + // Forcibly downcase all key names. + // This is to handle any destinations that upcase the column names. + // For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE. + val record = recordJson as ObjectNode + record.fieldNames().forEachRemaining { fieldName: String -> + record.set( + fieldName.lowercase(Locale.getDefault()), + record[fieldName] + ) + } + } .collect( - Collectors.toMap( - Function { record: JsonNode -> - val nameNode = record[DESTINATION_STATE_TABLE_COLUMN_NAME] - val namespaceNode = record[DESTINATION_STATE_TABLE_COLUMN_NAMESPACE] + toMap( + { record -> + val nameNode: JsonNode = record.get(DESTINATION_STATE_TABLE_COLUMN_NAME) + val namespaceNode: JsonNode = + record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE) AirbyteStreamNameNamespacePair( - nameNode?.asText(), - namespaceNode?.asText() + if (nameNode != null) nameNode.asText() else null, + if (namespaceNode != null) namespaceNode.asText() else null ) }, - Function { record: JsonNode -> - val stateNode = record[DESTINATION_STATE_TABLE_COLUMN_STATE] + { record -> + val stateNode: JsonNode = + record.get(DESTINATION_STATE_TABLE_COLUMN_STATE) val state = if (stateNode != null) Jsons.deserialize(stateNode.asText()) else Jsons.emptyObject()