Skip to content

Commit

Permalink
convert #36396 to kotlin
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 26, 2024
1 parent 861863f commit 325cbef
Showing 1 changed file with 38 additions and 24 deletions.
Expand Up @@ -4,6 +4,7 @@
package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition
Expand All @@ -23,16 +24,16 @@ import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.function.Function
import java.util.function.Predicate
import java.util.stream.Collectors
import kotlin.collections.LinkedHashMap
import java.util.stream.Collectors.toMap
import lombok.extern.slf4j.Slf4j
import org.jooq.Condition
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.conf.ParamType
import org.jooq.impl.DSL
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.quotedName
import org.jooq.impl.SQLDataType
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -207,60 +208,73 @@ abstract class JdbcDestinationHandler<DestinationState>(
@get:Throws(SQLException::class)
protected val allDestinationStates: Map<AirbyteStreamNameNamespacePair, DestinationState>
get() {

// Guarantee the table exists.
jdbcDatabase.execute(
dslContext
.createTableIfNotExists(
DSL.quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)
)
.column(
DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME),
SQLDataType.VARCHAR
quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)
)
.column(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME), SQLDataType.VARCHAR)
.column(
DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE),
quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE),
SQLDataType.VARCHAR
) // Just use a string type, even if the destination has a json type.
// We're never going to query this column in a fancy way - all our processing
// can happen
// client-side.
.column(
DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE),
quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE),
SQLDataType.VARCHAR
) // Add an updated_at field. We don't actually need it yet, but it can't hurt!
.column(
DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT),
quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT),
SQLDataType.TIMESTAMPWITHTIMEZONE
)
.getSQL(ParamType.INLINED)
)

// Fetch all records from it. We _could_ filter down to just our streams... but meh.
// This is small
// data.
return jdbcDatabase
.queryJsons(
dslContext
.select(
DSL.field(DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)),
DSL.field(DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)),
DSL.field(DSL.quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE))
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)),
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)),
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_STATE))
)
.from(DSL.quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))
.sql
.from(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME))
.getSQL()
)
.stream()
.peek { recordJson: JsonNode ->
// Forcibly downcase all key names.
// This is to handle any destinations that upcase the column names.
// For example - Snowflake with QUOTED_IDENTIFIERS_IGNORE_CASE=TRUE.
val record = recordJson as ObjectNode
record.fieldNames().forEachRemaining { fieldName: String ->
record.set<JsonNode>(
fieldName.lowercase(Locale.getDefault()),
record[fieldName]
)
}
}
.collect(
Collectors.toMap(
Function { record: JsonNode ->
val nameNode = record[DESTINATION_STATE_TABLE_COLUMN_NAME]
val namespaceNode = record[DESTINATION_STATE_TABLE_COLUMN_NAMESPACE]
toMap(
{ record ->
val nameNode: JsonNode = record.get(DESTINATION_STATE_TABLE_COLUMN_NAME)
val namespaceNode: JsonNode =
record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)
AirbyteStreamNameNamespacePair(
nameNode?.asText(),
namespaceNode?.asText()
if (nameNode != null) nameNode.asText() else null,
if (namespaceNode != null) namespaceNode.asText() else null
)
},
Function { record: JsonNode ->
val stateNode = record[DESTINATION_STATE_TABLE_COLUMN_STATE]
{ record ->
val stateNode: JsonNode =
record.get(DESTINATION_STATE_TABLE_COLUMN_STATE)
val state =
if (stateNode != null) Jsons.deserialize(stateNode.asText())
else Jsons.emptyObject()
Expand Down

0 comments on commit 325cbef

Please sign in to comment.