Skip to content

Commit

Permalink
Destination Postgres: improve handling for column name truncation (#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Apr 10, 2024
1 parent ed8cebe commit 7ecb1d3
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 63 deletions.
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.29.10 | 2024-04-10 | [\#36805](https://github.com/airbytehq/airbyte/pull/36805) | Destinations: Enhance CatalogParser name collision handling; add DV2 tests for long identifiers |
| 0.29.9 | 2024-04-09 | [\#36047](https://github.com/airbytehq/airbyte/pull/36047) | Destinations: CDK updates for raw-only destinations |
| 0.29.8 | 2024-04-08 | [\#36868](https://github.com/airbytehq/airbyte/pull/36868) | Destinations: s3-destinations Compilation fixes for connector |
| 0.29.7 | 2024-04-08 | [\#36768](https://github.com/airbytehq/airbyte/pull/36768) | Destinations: Make destination state fetch/commit logic more resilient to errors |
| 0.29.6 | 2024-04-05 | [\#36577](https://github.com/airbytehq/airbyte/pull/36577) | Do not send system_error trace message for config exceptions. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.29.9
version=0.29.10
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStr
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import java.util.*
import java.util.Optional
import java.util.function.Consumer
import org.apache.commons.codec.digest.DigestUtils
import org.slf4j.Logger
Expand Down Expand Up @@ -50,31 +50,31 @@ constructor(
// We're taking a hash of the quoted namespace and the unquoted stream name
val hash =
DigestUtils.sha1Hex(
originalStreamConfig.id!!.finalNamespace + "&airbyte&" + originalName
"${originalStreamConfig.id.finalNamespace}&airbyte&$originalName"
)
.substring(0, 3)
val newName = originalName + "_" + hash
val newName = "${originalName}_$hash"
actualStreamConfig =
StreamConfig(
sqlGenerator.buildStreamId(originalNamespace, newName, rawNamespace),
originalStreamConfig.syncMode,
originalStreamConfig.destinationSyncMode,
originalStreamConfig.primaryKey,
originalStreamConfig.cursor,
originalStreamConfig.columns
originalStreamConfig.columns,
)
} else {
actualStreamConfig = originalStreamConfig
}
streamConfigs.add(actualStreamConfig)

// Populate some interesting strings into the exception handler string deinterpolator
addStringForDeinterpolation(actualStreamConfig.id!!.rawNamespace)
addStringForDeinterpolation(actualStreamConfig.id!!.rawName)
addStringForDeinterpolation(actualStreamConfig.id!!.finalNamespace)
addStringForDeinterpolation(actualStreamConfig.id!!.finalName)
addStringForDeinterpolation(actualStreamConfig.id!!.originalNamespace)
addStringForDeinterpolation(actualStreamConfig.id!!.originalName)
addStringForDeinterpolation(actualStreamConfig.id.rawNamespace)
addStringForDeinterpolation(actualStreamConfig.id.rawName)
addStringForDeinterpolation(actualStreamConfig.id.finalNamespace)
addStringForDeinterpolation(actualStreamConfig.id.finalName)
addStringForDeinterpolation(actualStreamConfig.id.originalNamespace)
addStringForDeinterpolation(actualStreamConfig.id.originalName)
actualStreamConfig.columns!!
.keys
.forEach(
Expand All @@ -101,19 +101,14 @@ constructor(
return ParsedCatalog(streamConfigs)
}

// TODO maybe we should extract the column collision stuff to a separate method, since that's
// the
// interesting bit
@VisibleForTesting
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
val schema: AirbyteType = AirbyteType.Companion.fromJsonSchema(stream.stream.jsonSchema)
val airbyteColumns =
if (schema is Struct) {
schema.properties
} else if (schema is Union) {
schema.asColumns()
} else {
throw IllegalArgumentException("Top-level schema must be an object")
when (schema) {
is Struct -> schema.properties
is Union -> schema.asColumns()
else -> throw IllegalArgumentException("Top-level schema must be an object")
}

require(!stream.primaryKey.stream().anyMatch { key: List<String?> -> key.size > 1 }) {
Expand All @@ -126,21 +121,38 @@ constructor(
.toList()

require(stream.cursorField.size <= 1) { "Only top-level cursors are supported" }
val cursor: Optional<ColumnId>
if (stream.cursorField.size > 0) {
cursor = Optional.of(sqlGenerator.buildColumnId(stream.cursorField[0])!!)
} else {
cursor = Optional.empty()
}
val cursor: Optional<ColumnId> =
if (stream.cursorField.isNotEmpty()) {
Optional.of(sqlGenerator.buildColumnId(stream.cursorField[0]))
} else {
Optional.empty()
}

val columns = resolveColumnCollisions(airbyteColumns, stream)

return StreamConfig(
sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace),
stream.syncMode,
stream.destinationSyncMode,
primaryKey,
cursor,
columns
)
}

// this code is really bad and I'm not convinced we need to preserve this behavior.
// as with the tablename collisions thing above - we're trying to preserve legacy
// normalization's
// naming conventions here.
/**
* This code is really bad and I'm not convinced we need to preserve this behavior. As with the
* tablename collisions thing above - we're trying to preserve legacy normalization's naming
* conventions here.
*/
private fun resolveColumnCollisions(
airbyteColumns: LinkedHashMap<String, AirbyteType>,
stream: ConfiguredAirbyteStream
): LinkedHashMap<ColumnId, AirbyteType> {
val columns = LinkedHashMap<ColumnId, AirbyteType>()
for ((key, value) in airbyteColumns) {
val originalColumnId = sqlGenerator.buildColumnId(key)
var columnId: ColumnId?
var columnId: ColumnId
if (
columns.keys.stream().noneMatch { c: ColumnId ->
c.canonicalName == originalColumnId.canonicalName
Expand All @@ -154,14 +166,31 @@ constructor(
"Detected column name collision for {}.{}.{}",
stream.stream.namespace,
stream.stream.name,
key
key,
)
// One of the existing columns has the same name. We need to handle this collision.
// Append _1, _2, _3, ... to the column name until we find one that doesn't collide.
var i = 1
while (true) {
columnId = sqlGenerator.buildColumnId(key, "_$i")
val canonicalName = columnId!!.canonicalName

// Verify that we're making progress, e.g. we haven't immediately truncated away
// the suffix.
if (columnId.canonicalName == originalColumnId.canonicalName) {
// If we're not making progress, do a more powerful mutation instead of
// appending numbers.
// Assume that we're being truncated, and that the column ID's name is the
// maximum length.
columnId =
superResolveColumnCollisions(
originalColumnId,
columns,
originalColumnId.name.length
)
break
}

val canonicalName = columnId.canonicalName
if (
columns.keys.stream().noneMatch { c: ColumnId ->
c.canonicalName == canonicalName
Expand All @@ -176,23 +205,64 @@ constructor(
// JSON records.
columnId =
ColumnId(
columnId!!.name,
originalColumnId!!.originalName,
columnId.canonicalName
columnId.name,
originalColumnId.originalName,
columnId.canonicalName,
)
}

columns[columnId] = value
}
return columns
}

return StreamConfig(
sqlGenerator.buildStreamId(stream.stream.namespace, stream.stream.name, rawNamespace),
stream.syncMode,
stream.destinationSyncMode,
primaryKey,
cursor,
columns
)
/**
* Generate a name of the format `<prefix><length><suffix>`. E.g. for affixLength=3:
* "veryLongName" -> "ver6ame" This is based on the "i18n"-ish naming convention.
*
* @param columnId The column that we're trying to add
* @param columns The columns that we've already added
*/
private fun superResolveColumnCollisions(
columnId: ColumnId,
columns: LinkedHashMap<ColumnId, AirbyteType>,
maximumColumnNameLength: Int
): ColumnId {
val originalColumnName = columnId.originalName

var newColumnId = columnId
// Assume that the <length> portion can be expressed in at most 5 characters.
// If someone is giving us a column name that's longer than 99999 characters,
// that's just being silly.
val affixLength = (maximumColumnNameLength - 5) / 2
// If, after reserving 5 characters for the length, we can't fit the affixes,
// just give up. That means the destination is trying to restrict us to a
// 6-character column name, which is just silly.
if (affixLength <= 0) {
throw IllegalArgumentException(
"Cannot solve column name collision: ${newColumnId.originalName}. We recommend removing this column to continue syncing."
)
}
val prefix = originalColumnName.substring(0, affixLength)
val suffix =
originalColumnName.substring(
originalColumnName.length - affixLength,
originalColumnName.length
)
val length = originalColumnName.length - 2 * affixLength
newColumnId = sqlGenerator.buildColumnId("$prefix$length$suffix")
// if there's _still_ a collision after this, just give up.
// we could try to be more clever, but this is already a pretty rare case.
if (
columns.keys.stream().anyMatch { c: ColumnId ->
c.canonicalName == newColumnId.canonicalName
}
) {
throw IllegalArgumentException(
"Cannot solve column name collision: ${newColumnId.originalName}. We recommend removing this column to continue syncing."
)
}
return newColumnId
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,31 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import java.util.List
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.Mockito
import org.mockito.invocation.InvocationOnMock
import org.mockito.kotlin.any
import org.mockito.kotlin.whenever

internal class CatalogParserTest {
private lateinit var sqlGenerator: SqlGenerator
private var parser: CatalogParser? = null
private lateinit var parser: CatalogParser

@BeforeEach
fun setup() {
sqlGenerator = Mockito.mock(SqlGenerator::class.java)
// noop quoting logic
Mockito.`when`(sqlGenerator.buildColumnId(any(), any())).thenAnswer {
invocation: InvocationOnMock ->
val fieldName = invocation.getArgument<String>(0)
val suffix = invocation.getArgument<String>(1)
ColumnId(fieldName + suffix, fieldName + suffix, fieldName + suffix)
}
Mockito.`when`(sqlGenerator.buildColumnId(any())).thenAnswer { invocation: InvocationOnMock
->
val fieldName = invocation.getArgument<String>(0)
ColumnId(fieldName, fieldName, fieldName)
sqlGenerator.buildColumnId(invocation.getArgument<String>(0), "")
}
Mockito.`when`(sqlGenerator.buildStreamId(any(), any(), any())).thenAnswer {
invocation: InvocationOnMock ->
Expand All @@ -46,7 +53,7 @@ internal class CatalogParserTest {
*/
@Test
fun finalNameCollision() {
Mockito.`when`(sqlGenerator!!.buildStreamId(any(), any(), any())).thenAnswer {
Mockito.`when`(sqlGenerator.buildStreamId(any(), any(), any())).thenAnswer {
invocation: InvocationOnMock ->
val originalNamespace = invocation.getArgument<String>(0)
val originalName = (invocation.getArgument<String>(1))
Expand All @@ -67,11 +74,18 @@ internal class CatalogParserTest {
ConfiguredAirbyteCatalog()
.withStreams(List.of(stream("a", "foobarfoo"), stream("a", "foofoo")))

val parsedCatalog = parser!!.parseCatalog(catalog)
val parsedCatalog = parser.parseCatalog(catalog)

Assertions.assertNotEquals(
parsedCatalog.streams.get(0).id.finalName,
parsedCatalog.streams.get(1).id.finalName
assertAll(
{ Assertions.assertEquals("a_abab_foofoo", parsedCatalog.streams.get(0).id.rawName) },
{ Assertions.assertEquals("foofoo", parsedCatalog.streams.get(0).id.finalName) },
{
Assertions.assertEquals(
"a_abab_foofoo_3fd",
parsedCatalog.streams.get(1).id.rawName
)
},
{ Assertions.assertEquals("foofoo_3fd", parsedCatalog.streams.get(1).id.finalName) }
)
}

Expand All @@ -81,9 +95,9 @@ internal class CatalogParserTest {
*/
@Test
fun columnNameCollision() {
Mockito.`when`(sqlGenerator!!.buildColumnId(any(), any())).thenAnswer {
Mockito.`when`(sqlGenerator.buildColumnId(any(), any())).thenAnswer {
invocation: InvocationOnMock ->
val originalName = invocation.getArgument<String>(0)
val originalName = invocation.getArgument<String>(0) + invocation.getArgument<String>(1)
// emulate quoting logic that causes a name collision
val quotedName = originalName.replace("bar".toRegex(), "")
ColumnId(quotedName, originalName, quotedName)
Expand All @@ -103,9 +117,52 @@ internal class CatalogParserTest {
)
val catalog = ConfiguredAirbyteCatalog().withStreams(List.of(stream("a", "a", schema)))

val parsedCatalog = parser!!.parseCatalog(catalog)
val parsedCatalog = parser.parseCatalog(catalog)
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()

assertAll(
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
{ Assertions.assertEquals("foofoo", columnsList[0].name) },
{ Assertions.assertEquals("foofoo_1", columnsList[1].name) }
)
}

/**
* Test behavior when the sqlgenerator truncates column names. We should end generate new names
* that still avoid collision.
*/
@Test
fun truncatingColumnNameCollision() {
whenever(sqlGenerator.buildColumnId(any(), any())).thenAnswer { invocation: InvocationOnMock
->
val originalName = invocation.getArgument<String>(0) + invocation.getArgument<String>(1)
// truncate to 10 characters
val truncatedName = originalName.substring(0, 10.coerceAtMost(originalName.length))
ColumnId(truncatedName, originalName, truncatedName)
}
val schema =
Jsons.deserialize(
"""
{
"type": "object",
"properties": {
"aVeryLongColumnName": {"type": "string"},
"aVeryLongColumnNameWithMoreTextAfterward": {"type": "string"}
}
}
""".trimIndent()
)
val catalog = ConfiguredAirbyteCatalog().withStreams(listOf(stream("a", "a", schema)))

val parsedCatalog = parser.parseCatalog(catalog)
val columnsList = parsedCatalog.streams[0].columns!!.keys.toList()

Assertions.assertEquals(2, parsedCatalog.streams.get(0).columns!!.size)
assertAll(
{ Assertions.assertEquals(2, parsedCatalog.streams[0].columns!!.size) },
{ Assertions.assertEquals("aVeryLongC", columnsList[0].name) },
{ Assertions.assertEquals("aV36rd", columnsList[1].name) }
)
}

companion object {
Expand Down
Loading

0 comments on commit 7ecb1d3

Please sign in to comment.