From 9f030102c2e765b9c72f5c4279b23432a32b94bf Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 10 May 2024 12:57:49 -0700 Subject: [PATCH] set default namespace in CatalogParser --- .../destination/async/AsyncStreamConsumer.kt | 51 ++++--------------- .../async/StreamDescriptorUtils.kt | 7 +++ .../async/buffers/BufferEnqueue.kt | 23 +++++++-- .../async/buffers/BufferManager.kt | 7 ++- .../async/state/GlobalAsyncStateManager.kt | 31 +---------- .../util/ConfiguredCatalogUtil.kt | 24 --------- .../async/AsyncStreamConsumerTest.kt | 9 ++-- .../async/buffers/BufferDequeueTest.kt | 35 +++++-------- .../async/buffers/BufferEnqueueTest.kt | 9 ++-- .../state/GlobalAsyncStateManagerTest.kt | 40 +++++++-------- .../jdbc/AbstractJdbcDestination.kt | 20 +------- .../jdbc/JdbcBufferedConsumerFactory.kt | 5 +- .../staging/StagingConsumerFactory.kt | 9 ++-- .../typing_deduping/CatalogParser.kt | 16 +++++- .../typing_deduping/CatalogParserTest.kt | 17 ++++++- .../BaseSqlGeneratorIntegrationTest.kt | 4 +- 16 files changed, 122 insertions(+), 185 deletions(-) delete mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt index 58c195346043f..a8852e7b93e97 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.async import com.google.common.base.Preconditions -import com.google.common.base.Strings import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.destination.async.buffers.BufferEnqueue @@ -28,8 +27,6 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicLong import java.util.function.Consumer -import java.util.stream.Collectors -import kotlin.jvm.optionals.getOrNull import org.jetbrains.annotations.VisibleForTesting private val logger = KotlinLogging.logger {} @@ -52,7 +49,6 @@ constructor( onFlush: DestinationFlushFunction, private val catalog: ConfiguredAirbyteCatalog, private val bufferManager: BufferManager, - private val defaultNamespace: Optional, private val flushFailure: FlushFailure = FlushFailure(), workerPool: ExecutorService = Executors.newFixedThreadPool(5), private val airbyteMessageDeserializer: AirbyteMessageDeserializer = @@ -80,28 +76,6 @@ constructor( private var hasClosed = false private var hasFailed = false - internal constructor( - outputRecordCollector: Consumer, - onStart: OnStartFunction, - onClose: OnCloseFunction, - flusher: DestinationFlushFunction, - catalog: ConfiguredAirbyteCatalog, - bufferManager: BufferManager, - flushFailure: FlushFailure, - defaultNamespace: Optional, - ) : this( - outputRecordCollector, - onStart, - onClose, - flusher, - catalog, - bufferManager, - defaultNamespace, - flushFailure, - Executors.newFixedThreadPool(5), - AirbyteMessageDeserializer(), - ) - @Throws(Exception::class) override fun start() { Preconditions.checkState(!hasStarted, "Consumer has already been started.") @@ -130,9 +104,6 @@ constructor( message, ) if (AirbyteMessage.Type.RECORD == partialAirbyteMessage.type) { - if (Strings.isNullOrEmpty(partialAirbyteMessage.record?.namespace)) { - partialAirbyteMessage.record?.namespace = defaultNamespace.getOrNull() - } validateRecord(partialAirbyteMessage) partialAirbyteMessage.record?.streamDescriptor?.let { @@ -142,7 +113,6 @@ constructor( bufferEnqueue.addRecord( partialAirbyteMessage, sizeInBytes + PARTIAL_DESERIALIZE_REF_BYTES, - defaultNamespace, ) } @@ -160,18 +130,15 @@ constructor( bufferManager.close() val streamSyncSummaries = - streamNames - .stream() - .collect( - Collectors.toMap( - { streamDescriptor: StreamDescriptor -> streamDescriptor }, - { streamDescriptor: StreamDescriptor -> - StreamSyncSummary( - Optional.of(getRecordCounter(streamDescriptor).get()), - ) - }, - ), - ) + streamNames.associate { streamDescriptor -> + StreamDescriptorUtils.withDefaultNamespace( + streamDescriptor, + bufferManager.defaultNamespace, + ) to + StreamSyncSummary( + Optional.of(getRecordCounter(streamDescriptor).get()), + ) + } onClose.accept(hasFailed, streamSyncSummaries) // as this throws an exception, we need to be after all other close functions. diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt index c1861b5bc21ed..cd77fbcfb01a9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/StreamDescriptorUtils.kt @@ -34,4 +34,11 @@ object StreamDescriptorUtils { return pairs } + + fun withDefaultNamespace(sd: StreamDescriptor, defaultNamespace: String) = + if (sd.namespace.isNullOrEmpty()) { + StreamDescriptor().withName(sd.name).withNamespace(defaultNamespace) + } else { + sd + } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt index 79b70e9da3fe4..0e292df46331c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt @@ -7,9 +7,9 @@ package io.airbyte.cdk.integrations.destination.async.buffers import io.airbyte.cdk.integrations.destination.async.GlobalMemoryManager import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.cdk.integrations.destination.async.state.GlobalAsyncStateManager +import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.StreamDescriptor -import java.util.Optional import java.util.concurrent.ConcurrentMap /** @@ -20,6 +20,7 @@ class BufferEnqueue( private val memoryManager: GlobalMemoryManager, private val buffers: ConcurrentMap, private val stateManager: GlobalAsyncStateManager, + private val defaultNamespace: String, ) { /** * Buffer a record. Contains memory management logic to dynamically adjust queue size based via @@ -31,12 +32,11 @@ class BufferEnqueue( fun addRecord( message: PartialAirbyteMessage, sizeInBytes: Int, - defaultNamespace: Optional, ) { if (message.type == AirbyteMessage.Type.RECORD) { handleRecord(message, sizeInBytes) } else if (message.type == AirbyteMessage.Type.STATE) { - stateManager.trackState(message, sizeInBytes.toLong(), defaultNamespace.orElse("")) + stateManager.trackState(message, sizeInBytes.toLong()) } } @@ -53,7 +53,20 @@ class BufferEnqueue( } val stateId = stateManager.getStateIdAndIncrementCounter(streamDescriptor) - var addedToQueue = queue.offer(message, sizeInBytes.toLong(), stateId) + // We don't set the default namespace until after putting this message into the state + // manager/etc. + // All our internal handling is on the true (null) namespace, + // we just set the default namespace when handing off to destination-specific code. + val mangledMessage = + if (message.record!!.namespace.isNullOrEmpty()) { + val clone = Jsons.clone(message) + clone.record!!.namespace = defaultNamespace + clone + } else { + message + } + + var addedToQueue = queue.offer(mangledMessage, sizeInBytes.toLong(), stateId) var i = 0 while (!addedToQueue) { @@ -61,7 +74,7 @@ class BufferEnqueue( if (newlyAllocatedMemory > 0) { queue.addMaxMemory(newlyAllocatedMemory) } - addedToQueue = queue.offer(message, sizeInBytes.toLong(), stateId) + addedToQueue = queue.offer(mangledMessage, sizeInBytes.toLong(), stateId) i++ if (i > 5) { try { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt index f762d90b4a2a8..96cdedd0bf9d0 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferManager.kt @@ -22,6 +22,11 @@ private val logger = KotlinLogging.logger {} class BufferManager @JvmOverloads constructor( + /** + * This probably doesn't belong here, but it's the easiest place where both [BufferEnqueue] and + * [io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer] can both get to it. + */ + public val defaultNamespace: String, maxMemory: Long = (Runtime.getRuntime().maxMemory() * MEMORY_LIMIT_RATIO).toLong(), ) { @get:VisibleForTesting val buffers: ConcurrentMap @@ -46,7 +51,7 @@ constructor( memoryManager = GlobalMemoryManager(maxMemory) this.stateManager = GlobalAsyncStateManager(memoryManager) buffers = ConcurrentHashMap() - bufferEnqueue = BufferEnqueue(memoryManager, buffers, stateManager) + bufferEnqueue = BufferEnqueue(memoryManager, buffers, stateManager, defaultNamespace) bufferDequeue = BufferDequeue(memoryManager, buffers, stateManager) debugLoop = Executors.newSingleThreadScheduledExecutor() debugLoop.scheduleAtFixedRate( diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt index e494e7ac9a0f4..b100d9a007d93 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.async.state import com.google.common.base.Preconditions -import com.google.common.base.Strings import io.airbyte.cdk.integrations.destination.async.GlobalMemoryManager import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage import io.airbyte.commons.json.Jsons @@ -104,7 +103,6 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { fun trackState( message: PartialAirbyteMessage, sizeInBytes: Long, - defaultNamespace: String, ) { if (preState) { convertToGlobalIfNeeded(message) @@ -113,7 +111,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { // stateType should not change after a conversion. Preconditions.checkArgument(stateType == extractStateType(message)) - closeState(message, sizeInBytes, defaultNamespace) + closeState(message, sizeInBytes) } /** @@ -333,10 +331,9 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { private fun closeState( message: PartialAirbyteMessage, sizeInBytes: Long, - defaultNamespace: String, ) { val resolvedDescriptor: StreamDescriptor = - extractStream(message, defaultNamespace) + extractStream(message) .orElse( SENTINEL_GLOBAL_DESC, ) @@ -434,38 +431,14 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { UUID.randomUUID().toString(), ) - /** - * If the user has selected the Destination Namespace as the Destination default while - * setting up the connector, the platform sets the namespace as null in the StreamDescriptor - * in the AirbyteMessages (both record and state messages). The destination checks that if - * the namespace is empty or null, if yes then re-populates it with the defaultNamespace. - * See [io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer.accept] But - * destination only does this for the record messages. So when state messages arrive without - * a namespace and since the destination doesn't repopulate it with the default namespace, - * there is a mismatch between the StreamDescriptor from record messages and state messages. - * That breaks the logic of the state management class as [descToStateIdQ] needs to have - * consistent StreamDescriptor. This is why while trying to extract the StreamDescriptor - * from state messages, we check if the namespace is null, if yes then replace it with - * defaultNamespace to keep it consistent with the record messages. - */ private fun extractStream( message: PartialAirbyteMessage, - defaultNamespace: String, ): Optional { if ( message.state?.type != null && message.state?.type == AirbyteStateMessage.AirbyteStateType.STREAM ) { val streamDescriptor: StreamDescriptor? = message.state?.stream?.streamDescriptor - if (Strings.isNullOrEmpty(streamDescriptor?.namespace)) { - return Optional.of( - StreamDescriptor() - .withName( - streamDescriptor?.name, - ) - .withNamespace(defaultNamespace), - ) - } return streamDescriptor?.let { Optional.of(it) } ?: Optional.empty() } return Optional.empty() diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt deleted file mode 100644 index d33bbabb62809..0000000000000 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright (c) 2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.cdk.integrations.util - -import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog - -/** - * For streams in [catalog] which do not have a namespace specified, explicitly set their namespace - * to the [defaultNamespace] - */ -fun addDefaultNamespaceToStreams(catalog: ConfiguredAirbyteCatalog, defaultNamespace: String?) { - if (defaultNamespace == null) { - return - } - // TODO: This logic exists in all V2 destinations. - // This is sad that if we forget to add this, there will be a null pointer during parseCatalog - for (catalogStream in catalog.streams) { - if (catalogStream.stream.namespace.isNullOrEmpty()) { - catalogStream.stream.namespace = defaultNamespace - } - } -} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt index a2128488bd897..fe2bd49bb9982 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt @@ -31,7 +31,6 @@ import io.airbyte.protocol.models.v0.StreamDescriptor import java.io.IOException import java.math.BigDecimal import java.time.Instant -import java.util.Optional import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -61,7 +60,7 @@ class AsyncStreamConsumerTest { private val CATALOG: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog() .withStreams( - java.util.List.of( + listOf( CatalogHelpers.createConfiguredAirbyteStream( STREAM_NAME, SCHEMA_NAME, @@ -146,9 +145,8 @@ class AsyncStreamConsumerTest { onClose = onClose, onFlush = flushFunction, catalog = CATALOG, - bufferManager = BufferManager(), + bufferManager = BufferManager("default_ns"), flushFailure = flushFailure, - defaultNamespace = Optional.of("default_ns"), airbyteMessageDeserializer = airbyteMessageDeserializer, workerPool = Executors.newFixedThreadPool(5), ) @@ -265,9 +263,8 @@ class AsyncStreamConsumerTest { Mockito.mock(OnCloseFunction::class.java), flushFunction, CATALOG, - BufferManager((1024 * 10).toLong()), + BufferManager("default_ns", (1024 * 10).toLong()), flushFailure, - Optional.of("default_ns"), ) Mockito.`when`(flushFunction.optimalBatchSizeBytes).thenReturn(0L) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeueTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeueTest.kt index 209676f5c102b..2e5bdc85ebec6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeueTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeueTest.kt @@ -12,7 +12,6 @@ import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.StreamDescriptor import java.time.Instant import java.time.temporal.ChronoUnit -import java.util.Optional import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Nested import org.junit.jupiter.api.Test @@ -33,29 +32,25 @@ class BufferDequeueTest { internal inner class Take { @Test internal fun testTakeShouldBestEffortRead() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) // total size of records is 80, so we expect 50 to get us 2 records (prefer to @@ -77,24 +72,21 @@ class BufferDequeueTest { @Test internal fun testTakeShouldReturnAllIfPossible() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) try { @@ -108,19 +100,17 @@ class BufferDequeueTest { @Test internal fun testTakeFewerRecordsThanSizeLimitShouldNotError() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) enqueue.addRecord( RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) try { @@ -135,12 +125,12 @@ class BufferDequeueTest { @Test internal fun testMetadataOperationsCorrect() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) val secondStream = StreamDescriptor().withName("stream_2") val recordFromSecondStream = Jsons.clone(RECORD_MSG_20_BYTES) @@ -148,7 +138,6 @@ class BufferDequeueTest { enqueue.addRecord( recordFromSecondStream, RECORD_SIZE_20_BYTES, - Optional.of(DEFAULT_NAMESPACE) ) Assertions.assertEquals(60, dequeue.totalGlobalQueueSizeBytes) @@ -169,7 +158,7 @@ class BufferDequeueTest { @Test internal fun testMetadataOperationsError() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val dequeue = bufferManager.bufferDequeue val ghostStream = StreamDescriptor().withName("ghost stream") @@ -186,7 +175,7 @@ class BufferDequeueTest { @Test @Throws(Exception::class) internal fun cleansUpMemoryForEmptyQueues() { - val bufferManager = BufferManager() + val bufferManager = BufferManager(DEFAULT_NAMESPACE) val enqueue = bufferManager.bufferEnqueue val dequeue = bufferManager.bufferDequeue val memoryManager = bufferManager.memoryManager @@ -198,15 +187,15 @@ class BufferDequeueTest { ) // allocate a block for new stream - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) Assertions.assertEquals( 2 * GlobalMemoryManager.BLOCK_SIZE_BYTES, memoryManager.getCurrentMemoryBytes(), ) - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) - enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) + enqueue.addRecord(RECORD_MSG_20_BYTES, RECORD_SIZE_20_BYTES) // no re-allocates as we haven't breached block size Assertions.assertEquals( diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueueTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueueTest.kt index 01c859d31b3f9..caf1f9fa64454 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueueTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueueTest.kt @@ -10,7 +10,6 @@ import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordM import io.airbyte.cdk.integrations.destination.async.state.GlobalAsyncStateManager import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.StreamDescriptor -import java.util.Optional import java.util.concurrent.ConcurrentHashMap import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @@ -31,6 +30,7 @@ class BufferEnqueueTest { Mockito.mock( GlobalAsyncStateManager::class.java, ), + DEFAULT_NAMESPACE, ) val streamName = "stream" @@ -42,7 +42,7 @@ class BufferEnqueueTest { PartialAirbyteRecordMessage().withStream(streamName), ) - enqueue.addRecord(record, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(record, RECORD_SIZE_20_BYTES) Assertions.assertEquals(1, streamToBuffer[stream]!!.size()) Assertions.assertEquals(20L, streamToBuffer[stream]!!.currentMemoryUsage) } @@ -58,6 +58,7 @@ class BufferEnqueueTest { Mockito.mock( GlobalAsyncStateManager::class.java, ), + DEFAULT_NAMESPACE, ) val streamName = "stream" @@ -69,8 +70,8 @@ class BufferEnqueueTest { PartialAirbyteRecordMessage().withStream(streamName), ) - enqueue.addRecord(record, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) - enqueue.addRecord(record, RECORD_SIZE_20_BYTES, Optional.of(DEFAULT_NAMESPACE)) + enqueue.addRecord(record, RECORD_SIZE_20_BYTES) + enqueue.addRecord(record, RECORD_SIZE_20_BYTES) Assertions.assertEquals(2, streamToBuffer[stream]!!.size()) Assertions.assertEquals(40, streamToBuffer[stream]!!.currentMemoryUsage) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt index a46535bf82ea0..a78f713119372 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt @@ -222,7 +222,7 @@ class GlobalAsyncStateManagerTest { ) assertEquals(0, stateWithStats.size) - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( e, @@ -271,7 +271,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) // GLOBAL - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( e, @@ -308,7 +308,6 @@ class GlobalAsyncStateManagerTest { stateManager.trackState( STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, - DEFAULT_NAMESPACE, ) } } @@ -324,7 +323,7 @@ class GlobalAsyncStateManagerTest { val preConvertId2: Long = simulateIncomingRecords(STREAM3_DESC, 10, stateManager) assertEquals(3, setOf(preConvertId0, preConvertId1, preConvertId2).size) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) // Since this is actually a global state, we can only flush after all streams are done. stateManager.decrement(preConvertId0, 10) @@ -379,7 +378,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) val preConvertId0: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(preConvertId0, 10) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -413,7 +412,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.clear() val afterConvertId1: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.decrement(afterConvertId1, 10) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -451,7 +450,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) val preConvertId0: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(preConvertId0, 10) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -483,7 +482,7 @@ class GlobalAsyncStateManagerTest { assertEquals(listOf(expectedDestinationStats), stateWithStats.values.stream().toList()) emittedStatesFromDestination.clear() - stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( e, @@ -518,7 +517,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.clear() val afterConvertId2: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(GLOBAL_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE3, STATE_MSG_SIZE) stateManager.decrement(afterConvertId2, 10) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -558,7 +557,7 @@ class GlobalAsyncStateManagerTest { val preConvertId0: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) val preConvertId1: Long = simulateIncomingRecords(STREAM2_DESC, 10, stateManager) assertNotEquals(preConvertId0, preConvertId1) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(preConvertId0, 10) stateManager.decrement(preConvertId1, 10) stateManager.flushStates { e: AirbyteMessage? -> @@ -594,7 +593,7 @@ class GlobalAsyncStateManagerTest { val afterConvertId0: Long = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) val afterConvertId1: Long = simulateIncomingRecords(STREAM2_DESC, 10, stateManager) assertEquals(afterConvertId0, afterConvertId1) - stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.decrement(afterConvertId0, 20) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -635,7 +634,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) // GLOBAL - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( e, @@ -671,7 +670,6 @@ class GlobalAsyncStateManagerTest { stateManager.trackState( GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, - DEFAULT_NAMESPACE, ) } } @@ -683,7 +681,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) var stateId: Long = simulateIncomingRecords(STREAM1_DESC, 3, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(stateId, 3) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -717,7 +715,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.clear() stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.decrement(stateId, 10) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -759,7 +757,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) var stateId: Long = simulateIncomingRecords(STREAM1_DESC, 3, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(stateId, 3) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -791,7 +789,7 @@ class GlobalAsyncStateManagerTest { assertEquals(listOf(expectedDestinationStats), stateWithStats.values.stream().toList()) emittedStatesFromDestination.clear() - stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( e, @@ -826,7 +824,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.clear() stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE3, STATE_MSG_SIZE) stateManager.decrement(stateId, 10) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -870,7 +868,7 @@ class GlobalAsyncStateManagerTest { val stream1StateId: Long = simulateIncomingRecords(STREAM1_DESC, 3, stateManager) val stream2StateId: Long = simulateIncomingRecords(STREAM2_DESC, 7, stateManager) - stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE) stateManager.decrement(stream1StateId, 3) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( @@ -909,7 +907,7 @@ class GlobalAsyncStateManagerTest { ) } assertEquals(listOf(), emittedStatesFromDestination) - stateManager.trackState(STREAM2_STATE_MESSAGE, STATE_MSG_SIZE, DEFAULT_NAMESPACE) + stateManager.trackState(STREAM2_STATE_MESSAGE, STATE_MSG_SIZE) stateManager.decrement(stream2StateId, 3) // only flush state if counter is 0. stateManager.flushStates { e: AirbyteMessage? -> @@ -965,7 +963,7 @@ class GlobalAsyncStateManagerTest { GlobalAsyncStateManager(GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES)) val stateId = simulateIncomingRecords(STREAM1_DESC, 6, stateManager) stateManager.decrement(stateId, 4) - stateManager.trackState(GLOBAL_STATE_MESSAGE1, 1, STREAM1_DESC.namespace) + stateManager.trackState(GLOBAL_STATE_MESSAGE1, 1) stateManager.flushStates { e: AirbyteMessage? -> emittedStatesFromDestination.add( e, diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt index 501c7a9b42a6d..235ae31c0749c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt @@ -28,7 +28,6 @@ import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordM import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator -import io.airbyte.cdk.integrations.util.addDefaultNamespaceToStreams import io.airbyte.commons.exceptions.ConnectionErrorException import io.airbyte.commons.json.Jsons import io.airbyte.commons.map.MoreMaps @@ -245,22 +244,7 @@ abstract class AbstractJdbcDestination ): SerializedAirbyteMessageConsumer? { val database = getDatabase(getDataSource(config)) - // Short circuit for non-v2 destinations. - if (!isDestinationV2) { - return JdbcBufferedConsumerFactory.createAsync( - outputRecordCollector, - database, - sqlOperations, - namingResolver, - config, - catalog, - null, - NoopTyperDeduper(), - ) - } - val defaultNamespace = config[configSchemaKey].asText() - addDefaultNamespaceToStreams(catalog, defaultNamespace) return getV2MessageConsumer( config, catalog, @@ -289,8 +273,8 @@ abstract class AbstractJdbcDestination CatalogParser(sqlGenerator, override) } - .orElse(CatalogParser(sqlGenerator)) + .map { override: String -> CatalogParser(sqlGenerator, defaultNamespace, override) } + .orElse(CatalogParser(sqlGenerator, defaultNamespace)) .parseCatalog(catalog!!) val typerDeduper: TyperDeduper = buildTyperDeduper( diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt index 489f017c04358..c8457d7801851 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/JdbcBufferedConsumerFactory.kt @@ -64,7 +64,7 @@ object JdbcBufferedConsumerFactory { namingResolver: NamingConventionTransformer, config: JsonNode, catalog: ConfiguredAirbyteCatalog, - defaultNamespace: String?, + defaultNamespace: String, typerDeduper: TyperDeduper, dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(), optimalBatchSizeBytes: Long = DEFAULT_OPTIMAL_BATCH_SIZE_FOR_FLUSH, @@ -87,8 +87,7 @@ object JdbcBufferedConsumerFactory { optimalBatchSizeBytes ), catalog, - BufferManager((Runtime.getRuntime().maxMemory() * 0.2).toLong()), - Optional.ofNullable(defaultNamespace), + BufferManager(defaultNamespace, (Runtime.getRuntime().maxMemory() * 0.2).toLong()), FlushFailure(), Executors.newFixedThreadPool(2), AirbyteMessageDeserializer(dataTransformer) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt index 773c11d8bbd28..be14bb6fcd781 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt @@ -47,7 +47,7 @@ private constructor( private val purgeStagingData: Boolean, private val typerDeduper: TyperDeduper?, private val parsedCatalog: ParsedCatalog?, - private val defaultNamespace: String?, + private val defaultNamespace: String, private val destinationColumns: JavaBaseConstants.DestinationColumns, // Optional fields private val bufferMemoryLimit: Optional, @@ -104,7 +104,9 @@ private constructor( purgeStagingData, typerDeduper, parsedCatalog, - defaultNamespace, + // If we don't set a default namespace, throw. This is required for staging + // destinations. + defaultNamespace!!, destinationColumns, bufferMemoryLimit, optimalBatchSizeBytes, @@ -147,8 +149,7 @@ private constructor( ), flusher, catalog!!, - BufferManager(getMemoryLimit(bufferMemoryLimit)), - Optional.ofNullable(defaultNamespace), + BufferManager(defaultNamespace, getMemoryLimit(bufferMemoryLimit)), FlushFailure(), Executors.newFixedThreadPool(5), AirbyteMessageDeserializer(dataTransformer), diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index ab3ed847f5ae9..d412de7dc0ac6 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -6,6 +6,7 @@ package io.airbyte.integrations.base.destination.typing_deduping import com.google.common.annotations.VisibleForTesting import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStringForDeinterpolation import io.airbyte.cdk.integrations.base.JavaBaseConstants +import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream import java.util.Optional @@ -18,9 +19,20 @@ class CatalogParser @JvmOverloads constructor( private val sqlGenerator: SqlGenerator, - private val rawNamespace: String = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE + private val defaultNamespace: String, + private val rawNamespace: String = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE, ) { - fun parseCatalog(catalog: ConfiguredAirbyteCatalog): ParsedCatalog { + fun parseCatalog(orginalCatalog: ConfiguredAirbyteCatalog): ParsedCatalog { + // Don't mutate the original catalog, just operate on a copy of it + // This is... probably the easiest way we have to deep clone a protocol model object? + val catalog = Jsons.clone(orginalCatalog) + catalog.streams.onEach { + // Overwrite null namespaces + if (it.stream.namespace.isNullOrEmpty()) { + it.stream.namespace = defaultNamespace + } + } + // this code is bad and I feel bad // it's mostly a port of the old normalization logic to prevent tablename collisions. // tbh I have no idea if it works correctly. diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt index 9032fc5e42fb4..f6ad003dac099 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParserTest.kt @@ -45,7 +45,7 @@ internal class CatalogParserTest { StreamId(namespace, name, rawNamespace, namespace + "_abab_" + name, namespace, name) } - parser = CatalogParser(sqlGenerator) + parser = CatalogParser(sqlGenerator, "default_namespace") } /** @@ -176,9 +176,22 @@ internal class CatalogParserTest { ) } + @Test + fun testDefaultNamespace() { + val catalog = + parser.parseCatalog( + ConfiguredAirbyteCatalog() + .withStreams( + listOf(stream(null, "a", Jsons.deserialize("""{"type": "object"}"""))) + ) + ) + + Assertions.assertEquals("default_namespace", catalog.streams[0].id.originalNamespace) + } + companion object { private fun stream( - namespace: String, + namespace: String?, name: String, schema: JsonNode = Jsons.deserialize( diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 9535f91ca3db8..579373ebb5af8 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -1786,7 +1786,9 @@ abstract class BaseSqlGeneratorIntegrationTest