diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt new file mode 100644 index 000000000000..bbe695a3cb80 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/operation/SyncOperation.kt @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.integrations.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.protocol.models.v0.StreamDescriptor +import java.util.stream.Stream + +/** Connector sync operations */ +interface SyncOperation { + + /** DestinationFlush function sends per stream with descriptor. */ + fun flushStream(descriptor: StreamDescriptor, stream: Stream) + fun finalizeStreams(streamSyncSummaries: Map) +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt new file mode 100644 index 000000000000..ac68dec36df2 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/AbstractStreamOperation.kt @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.DestinationSyncMode +import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.stream.Stream + +abstract class AbstractStreamOperation( + private val storageOperations: StorageOperations, + destinationInitialStatus: DestinationInitialStatus, +) : StreamOperation { + private val log = KotlinLogging.logger {} + + // State maintained to make decision between async calls + private val finalTmpTableSuffix: String + private val initialRawTableStatus: InitialRawTableStatus = + destinationInitialStatus.initialRawTableStatus + init { + val stream = destinationInitialStatus.streamConfig + storageOperations.prepareStage(stream.id, stream.destinationSyncMode) + storageOperations.createFinalSchema(stream.id) + // Prepare final tables based on sync mode. + finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus) + } + + companion object { + private const val NO_SUFFIX = "" + private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp" + } + + private fun prepareFinalTable( + initialStatus: DestinationInitialStatus + ): String { + val stream = initialStatus.streamConfig + // No special handling if final table doesn't exist, just create and return + if (!initialStatus.isFinalTablePresent) { + log.info { + "Final table does not exist for stream ${initialStatus.streamConfig.id.finalName}, creating." + } + storageOperations.createFinalTable(stream, NO_SUFFIX, false) + return NO_SUFFIX + } + + log.info { "Final Table exists for stream ${stream.id.finalName}" } + // The table already exists. Decide whether we're writing to it directly, or + // using a tmp table. + when (stream.destinationSyncMode) { + DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus) + DestinationSyncMode.APPEND, + DestinationSyncMode.APPEND_DEDUP -> { + if ( + initialStatus.isSchemaMismatch || + initialStatus.destinationState.needsSoftReset() + ) { + // We're loading data directly into the existing table. + // Make sure it has the right schema. + // Also, if a raw table migration wants us to do a soft reset, do that + // here. + log.info { "Executing soft-reset on final table of stream $stream" } + storageOperations.softResetFinalTable(stream) + } + return NO_SUFFIX + } + } + } + + private fun prepareFinalTableForOverwrite( + initialStatus: DestinationInitialStatus + ): String { + val stream = initialStatus.streamConfig + if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) { + // overwrite an existing tmp table if needed. + storageOperations.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true) + log.info { + "Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync" + } + // We want to overwrite an existing table. Write into a tmp table. + // We'll overwrite the table at the + // end of the sync. + return TMP_OVERWRITE_TABLE_SUFFIX + } + + log.info { + "Final Table for stream ${stream.id.finalName} is empty and matches the expected v2 format, writing to table directly" + } + return NO_SUFFIX + } + + /** Write records will be destination type specific, Insert vs staging based on format */ + abstract override fun writeRecords( + streamConfig: StreamConfig, + stream: Stream + ) + + override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) { + // Legacy logic that if recordsWritten or not tracked then it could be non-zero + val shouldRunFinalizer = + syncSummary.recordsWritten.map { it > 0 }.orElse(true) || + initialRawTableStatus.hasUnprocessedRecords + if (!shouldRunFinalizer) { + log.info { + "Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " + + "because it had no records during this sync and no unprocessed records from a previous sync." + } + return + } + + storageOperations.typeAndDedupe( + streamConfig, + initialRawTableStatus.maxProcessedTimestamp, + finalTmpTableSuffix + ) + + // Delete staging directory, implementation will handle if it has to do it or not or a No-OP + storageOperations.cleanupStage(streamConfig.id) + + // For overwrite, its wasteful to do T+D so we don't do soft-reset in prepare. Instead we do + // type-dedupe + // on a suffixed table and do a swap here when we have to for schema mismatches + if (streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE) { + storageOperations.overwriteFinalTable(streamConfig, finalTmpTableSuffix) + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultConnectorOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultConnectorOperation.kt new file mode 100644 index 000000000000..7d04a2d3ba57 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/DefaultConnectorOperation.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.operation.SyncOperation +import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions +import io.airbyte.commons.concurrency.CompletableFutures.allOf +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil as tdutils +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.protocol.models.v0.StreamDescriptor +import io.github.oshai.kotlinlogging.KotlinLogging +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.stream.Stream +import org.apache.commons.lang3.concurrent.BasicThreadFactory + +class DefaultConnectorOperation( + private val parsedCatalog: ParsedCatalog, + private val destinationHandler: DestinationHandler, + private val defaultNamespace: String, + private val streamOperationsFactory: StreamOperationsFactory, + private val migrations: List>, + private val executorService: ExecutorService = + Executors.newFixedThreadPool( + 10, + BasicThreadFactory.Builder().namingPattern("sync-operations-%d").build(), + ) +) : SyncOperation { + companion object { + // Use companion to be accessible during instantiation with init + private val log = KotlinLogging.logger {} + } + + private val streamOpsMap: Map> + init { + streamOpsMap = createPerStreamOpClients() + } + + private fun createPerStreamOpClients(): Map> { + log.info { "Preparing required schemas and tables for all streams" } + val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams) + + // we will commit destinationStates and run Migrations here. + val postMigrationInitialStates = + tdutils.executeRawTableMigrations( + executorService, + destinationHandler, + migrations, + streamsInitialStates + ) + + val initializationFutures = + postMigrationInitialStates + .map { + CompletableFuture.supplyAsync( + { Pair(it.streamConfig.id, streamOperationsFactory.createInstance(it)) }, + executorService, + ) + } + .toList() + val futuresResult = allOf(initializationFutures).toCompletableFuture().get() + val result = + exceptions.getResultsOrLogAndThrowFirst( + "Following exceptions occurred during sync initialization", + futuresResult, + ) + return result.toMap() + } + + override fun flushStream(descriptor: StreamDescriptor, stream: Stream) { + val streamConfig = + parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name) + streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream) + } + + override fun finalizeStreams(streamSyncSummaries: Map) { + // Only call finalizeTable operations which has summary. rest will be skipped + val finalizeFutures = + streamSyncSummaries.entries + .map { + CompletableFuture.supplyAsync( + { + val streamConfig = + parsedCatalog.getStream( + it.key.namespace ?: defaultNamespace, + it.key.name, + ) + streamOpsMap[streamConfig.id]?.finalizeTable(streamConfig, it.value) + }, + executorService, + ) + } + .toList() + val futuresResult = allOf(finalizeFutures).toCompletableFuture().join() + exceptions.getResultsOrLogAndThrowFirst( + "Following exceptions occurred while finalizing the sync", + futuresResult, + ) + log.info { "Cleaning up sync operation thread pools" } + executorService.shutdown() + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperations.kt new file mode 100644 index 000000000000..898feba1ca34 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StorageOperations.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import io.airbyte.integrations.base.destination.typing_deduping.StreamId +import io.airbyte.protocol.models.v0.DestinationSyncMode +import java.time.Instant +import java.util.Optional + +interface StorageOperations { + /* + * ==================== Staging Operations ================================ + */ + + /** + * Prepare staging area which cloud be creating any object storage, temp tables or file storage + */ + fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode) + + /** Delete previously staged data, using deterministic information from streamId. */ + fun cleanupStage(streamId: StreamId) + + /** Copy data from provided buffer into stage. */ + fun writeToStage(streamId: StreamId, buffer: SerializableBuffer) + + /* + * ==================== Final Table Operations ================================ + */ + + /** Create final schema extracted from [StreamId] */ + fun createFinalSchema(streamId: StreamId) + + /** Create final table extracted from [StreamId] */ + fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean) + + /** Reset the final table using a temp table or ALTER existing table's columns. */ + fun softResetFinalTable(streamConfig: StreamConfig) + + /** + * Attempt to atomically swap the final table (name and namespace extracted from [StreamId]). + * This could be destination specific, INSERT INTO..SELECT * and DROP TABLE OR CREATE OR REPLACE + * ... SELECT *, DROP TABLE + */ + fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String) + + /** + */ + fun typeAndDedupe( + streamConfig: StreamConfig, + maxProcessedTimestamp: Optional, + finalTableSuffix: String + ) +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt new file mode 100644 index 000000000000..f1edc144ac97 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperation.kt @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.cdk.integrations.destination.StreamSyncSummary +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import java.util.stream.Stream + +/** Operations on individual streams. */ +interface StreamOperation { + + fun writeRecords(streamConfig: StreamConfig, stream: Stream) + + fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationsFactory.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationsFactory.kt new file mode 100644 index 000000000000..1d24e80d3731 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/operation/StreamOperationsFactory.kt @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.operation + +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus + +interface StreamOperationsFactory { + + /** + * Create an instance with required dependencies injected using a concrete factory + * implementation. + */ + fun createInstance( + destinationInitialStatus: DestinationInitialStatus + ): StreamOperation +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt index 903ab16df20e..79585dca47c5 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt @@ -7,9 +7,9 @@ import io.airbyte.cdk.integrations.base.IntegrationRunner import io.airbyte.cdk.integrations.destination.StreamSyncSummary import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAndThrowFirst import io.airbyte.commons.concurrency.CompletableFutures -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeRawTableMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.executeWeirdMigrations -import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion.prepareSchemas +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeRawTableMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeWeirdMigrations +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.prepareSchemas import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.DestinationSyncMode @@ -186,7 +186,7 @@ class DefaultTyperDeduper( // Make sure it has the right schema. // Also, if a raw table migration wants us to do a soft reset, do that // here. - TypeAndDedupeTransaction.executeSoftReset( + TyperDeduperUtil.executeSoftReset( sqlGenerator, destinationHandler, stream @@ -267,7 +267,7 @@ class DefaultTyperDeduper( val initialRawTableStatus = initialRawTableStateByStream.getValue(streamConfig.id) - TypeAndDedupeTransaction.executeTypeAndDedupe( + TyperDeduperUtil.executeTypeAndDedupe( sqlGenerator, destinationHandler, streamConfig, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt index c37b25926467..50d318b5933c 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/SqlGenerator.kt @@ -95,7 +95,7 @@ interface SqlGenerator { * @return */ fun prepareTablesForSoftReset(stream: StreamConfig): Sql { - val createTempTable = createTable(stream, TypeAndDedupeTransaction.SOFT_RESET_SUFFIX, true) + val createTempTable = createTable(stream, TyperDeduperUtil.SOFT_RESET_SUFFIX, true) val clearLoadedAt = clearLoadedAt(stream.id) return Sql.Companion.concat(createTempTable, clearLoadedAt) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt deleted file mode 100644 index ca36a51f9233..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TypeAndDedupeTransaction.kt +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ -package io.airbyte.integrations.base.destination.typing_deduping - -import java.time.Instant -import java.util.* -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -object TypeAndDedupeTransaction { - const val SOFT_RESET_SUFFIX: String = "_ab_soft_reset" - private val LOGGER: Logger = LoggerFactory.getLogger(TypeAndDedupeTransaction::class.java) - - /** - * It can be expensive to build the errors array in the airbyte_meta column, so we first attempt - * an 'unsafe' transaction which assumes everything is typed correctly. If that fails, we will - * run a more expensive query which handles casting errors - * - * @param sqlGenerator for generating sql for the destination - * @param destinationHandler for executing sql created - * @param streamConfig which stream to operate on - * @param minExtractedAt to reduce the amount of data in the query - * @param suffix table suffix for temporary tables - * @throws Exception if the safe query fails - */ - @JvmStatic - @Throws(Exception::class) - fun executeTypeAndDedupe( - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler<*>, - streamConfig: StreamConfig?, - minExtractedAt: Optional, - suffix: String - ) { - try { - LOGGER.info( - "Attempting typing and deduping for {}.{} with suffix {}", - streamConfig!!.id.originalNamespace, - streamConfig.id.originalName, - suffix - ) - val unsafeSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, false) - destinationHandler.execute(unsafeSql) - } catch (e: Exception) { - if (sqlGenerator.shouldRetry(e)) { - // TODO Destination specific non-retryable exceptions should be added. - LOGGER.error( - "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, attempting with error handling", - streamConfig!!.id.originalNamespace, - streamConfig.id.originalName, - suffix, - e - ) - val saferSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, true) - destinationHandler.execute(saferSql) - } else { - LOGGER.error( - "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, Retry is skipped", - streamConfig!!.id.originalNamespace, - streamConfig.id.originalName, - suffix, - e - ) - throw e - } - } - } - - /** - * Everything in [TypeAndDedupeTransaction.executeTypeAndDedupe] but with a little extra prep - * work for the soft reset temp tables - * - * @param sqlGenerator for generating sql for the destination - * @param destinationHandler for executing sql created - * @param streamConfig which stream to operate on - * @throws Exception if the safe query fails - */ - @JvmStatic - @Throws(Exception::class) - fun executeSoftReset( - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler<*>, - streamConfig: StreamConfig - ) { - LOGGER.info( - "Attempting soft reset for stream {} {}", - streamConfig.id.originalNamespace, - streamConfig.id.originalName - ) - destinationHandler.execute(sqlGenerator.prepareTablesForSoftReset(streamConfig)) - executeTypeAndDedupe( - sqlGenerator, - destinationHandler, - streamConfig, - Optional.empty(), - SOFT_RESET_SUFFIX - ) - destinationHandler.execute( - sqlGenerator.overwriteFinalTable(streamConfig.id, SOFT_RESET_SUFFIX) - ) - } -} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt index 961e1ff5b9e6..cfcb1f92106f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt @@ -8,6 +8,7 @@ import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil.getResultsOrLogAn import io.airbyte.commons.concurrency.CompletableFutures import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import java.time.Instant import java.util.* import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletionStage @@ -16,190 +17,274 @@ import java.util.stream.Collectors.toMap import org.slf4j.Logger import org.slf4j.LoggerFactory -class TyperDeduperUtil { - companion object { - private val LOGGER: Logger = LoggerFactory.getLogger(TyperDeduperUtil::class.java) +object TyperDeduperUtil { + const val SOFT_RESET_SUFFIX: String = "_ab_soft_reset" + private val LOGGER: Logger = LoggerFactory.getLogger(TyperDeduperUtil::class.java) - @JvmStatic - fun executeRawTableMigrations( - executorService: ExecutorService, - destinationHandler: DestinationHandler, - migrations: List>, - initialStates: List> - ): List> { - // TODO: Either the migrations run the soft reset and create v2 tables or the actual - // prepare tables. - // unify the logic - // with current state of raw tables & final tables. This is done first before gather - // initial state - // to avoid recreating - // final tables later again. + @JvmStatic + fun executeRawTableMigrations( + executorService: ExecutorService, + destinationHandler: DestinationHandler, + migrations: List>, + initialStates: List> + ): List> { + // TODO: Either the migrations run the soft reset and create v2 tables or the actual + // prepare tables. + // unify the logic + // with current state of raw tables & final tables. This is done first before gather + // initial state + // to avoid recreating + // final tables later again. - // Run migrations in lockstep. Some migrations may require us to refetch the initial - // state. - // We want to be able to batch those calls together across streams. - // If a migration runs on one stream, it's likely to also run on other streams. - // So we can bundle the gatherInitialState calls together. - var currentStates = initialStates - for (migration in migrations) { - // Execute the migration on all streams in parallel - val futures: - Map>> = - currentStates - .stream() - .collect( - toMap( - { it.streamConfig.id }, - { initialState -> - runMigrationsAsync( - executorService, - destinationHandler, - migration, - initialState - ) - } - ) + // Run migrations in lockstep. Some migrations may require us to refetch the initial + // state. + // We want to be able to batch those calls together across streams. + // If a migration runs on one stream, it's likely to also run on other streams. + // So we can bundle the gatherInitialState calls together. + var currentStates = initialStates + for (migration in migrations) { + // Execute the migration on all streams in parallel + val futures: + Map>> = + currentStates + .stream() + .collect( + toMap( + { it.streamConfig.id }, + { initialState -> + runMigrationsAsync( + executorService, + destinationHandler, + migration, + initialState + ) + } ) - val migrationResultFutures = - CompletableFutures.allOf(futures.values.toList()).toCompletableFuture().join() - getResultsOrLogAndThrowFirst( - "The following exceptions were thrown attempting to run migrations:\n", - migrationResultFutures - ) - val migrationResults: Map> = - futures.mapValues { it.value.toCompletableFuture().join() } - - // Check if we need to refetch DestinationInitialState - val invalidatedStreams: Set = - migrationResults.filter { it.value.invalidateInitialState }.keys - val updatedStates: List> - if (invalidatedStreams.isNotEmpty()) { - LOGGER.info("Refetching initial state for streams: $invalidatedStreams") - updatedStates = - destinationHandler.gatherInitialState( - currentStates - .filter { invalidatedStreams.contains(it.streamConfig.id) } - .map { it.streamConfig } - ) - LOGGER.info("Updated states: $updatedStates") - } else { - updatedStates = emptyList() - } - - // Update the DestinationInitialStates with the new DestinationStates, - // and also update initialStates with the refetched states. - currentStates = - currentStates.map { initialState -> - // migrationResults will always contain an entry for each stream, so we can - // safely use !! - val updatedDestinationState = - migrationResults[initialState.streamConfig.id]!!.updatedDestinationState - if (invalidatedStreams.contains(initialState.streamConfig.id)) { - // We invalidated this stream's DestinationInitialState. - // Find the updated DestinationInitialState, and update it with our new - // DestinationState - return@map updatedStates - .filter { updatedState -> - updatedState.streamConfig.id.equals( - initialState.streamConfig.id - ) - } - .first() - .copy(destinationState = updatedDestinationState) - } else { - // Just update the original DestinationInitialState with the new - // DestinationState. - return@map initialState.copy(destinationState = updatedDestinationState) - } - } - } - return currentStates - } - - /** - * The legacy-style migrations (V1V2Migrator, V2TableMigrator) need to run before we gather - * initial state, because they're dumb and weird. (specifically: SnowflakeV2TableMigrator - * inspects the final tables and triggers a soft reset directly within the migration). TODO: - * Migrate these migrations to the new migration system. This will also reduce the number of - * times we need to query DB metadata, since (a) we can rely on the gatherInitialState - * values, and (b) we can add a DestinationState field for these migrations. It also enables - * us to not trigger multiple soft resets in a single sync. - */ - @JvmStatic - fun executeWeirdMigrations( - executorService: ExecutorService, - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler, - v1V2Migrator: DestinationV1V2Migrator, - v2TableMigrator: V2TableMigrator, - parsedCatalog: ParsedCatalog - ) { - val futures = - parsedCatalog.streams.map { - CompletableFuture.supplyAsync( - { - v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, it) - v2TableMigrator.migrateIfNecessary(it) - }, - executorService ) - } + val migrationResultFutures = + CompletableFutures.allOf(futures.values.toList()).toCompletableFuture().join() getResultsOrLogAndThrowFirst( "The following exceptions were thrown attempting to run migrations:\n", - CompletableFutures.allOf(futures.toList()).toCompletableFuture().join() + migrationResultFutures ) - } + val migrationResults: Map> = + futures.mapValues { it.value.toCompletableFuture().join() } + + // Check if we need to refetch DestinationInitialState + val invalidatedStreams: Set = + migrationResults.filter { it.value.invalidateInitialState }.keys + val updatedStates: List> + if (invalidatedStreams.isNotEmpty()) { + LOGGER.info("Refetching initial state for streams: $invalidatedStreams") + updatedStates = + destinationHandler.gatherInitialState( + currentStates + .filter { invalidatedStreams.contains(it.streamConfig.id) } + .map { it.streamConfig } + ) + LOGGER.info("Updated states: $updatedStates") + } else { + updatedStates = emptyList() + } - /** - * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures - * they exist in the Destination Database. - */ - @JvmStatic - fun prepareSchemas( - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler, - parsedCatalog: ParsedCatalog - ) { - val rawSchema = parsedCatalog.streams.map { it.id.rawNamespace } - val finalSchema = parsedCatalog.streams.map { it.id.finalNamespace } - val createAllSchemasSql = - (rawSchema + finalSchema).distinct().map { sqlGenerator.createSchema(it) } - destinationHandler.execute(Sql.concat(createAllSchemasSql)) + // Update the DestinationInitialStates with the new DestinationStates, + // and also update initialStates with the refetched states. + currentStates = + currentStates.map { initialState -> + // migrationResults will always contain an entry for each stream, so we can + // safely use !! + val updatedDestinationState = + migrationResults[initialState.streamConfig.id]!!.updatedDestinationState + if (invalidatedStreams.contains(initialState.streamConfig.id)) { + // We invalidated this stream's DestinationInitialState. + // Find the updated DestinationInitialState, and update it with our new + // DestinationState + return@map updatedStates + .filter { updatedState -> + updatedState.streamConfig.id.equals(initialState.streamConfig.id) + } + .first() + .copy(destinationState = updatedDestinationState) + } else { + // Just update the original DestinationInitialState with the new + // DestinationState. + return@map initialState.copy(destinationState = updatedDestinationState) + } + } } + return currentStates + } - private fun runMigrationsAsync( - executorService: ExecutorService, - destinationHandler: DestinationHandler, - migration: Migration, - initialStatus: DestinationInitialStatus - ): CompletionStage> { - return CompletableFuture.supplyAsync( - { - LOGGER.info( - "Maybe executing ${migration.javaClass.simpleName} migration for stream ${initialStatus.streamConfig.id.originalNamespace}.${initialStatus.streamConfig.id.originalName}." - ) + /** + * The legacy-style migrations (V1V2Migrator, V2TableMigrator) need to run before we gather + * initial state, because they're dumb and weird. (specifically: SnowflakeV2TableMigrator + * inspects the final tables and triggers a soft reset directly within the migration). TODO: + * Migrate these migrations to the new migration system. This will also reduce the number of + * times we need to query DB metadata, since (a) we can rely on the gatherInitialState values, + * and (b) we can add a DestinationState field for these migrations. It also enables us to not + * trigger multiple soft resets in a single sync. + */ + @JvmStatic + fun executeWeirdMigrations( + executorService: ExecutorService, + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler, + v1V2Migrator: DestinationV1V2Migrator, + v2TableMigrator: V2TableMigrator, + parsedCatalog: ParsedCatalog + ) { + val futures = + parsedCatalog.streams.map { + CompletableFuture.supplyAsync( + { + v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, it) + v2TableMigrator.migrateIfNecessary(it) + }, + executorService + ) + } + getResultsOrLogAndThrowFirst( + "The following exceptions were thrown attempting to run migrations:\n", + CompletableFutures.allOf(futures.toList()).toCompletableFuture().join() + ) + } - // We technically don't need to track this, but might as well hedge against - // migrations - // accidentally setting softReset=false - val softReset = initialStatus.destinationState.needsSoftReset() - val migrationResult = - migration.migrateIfNecessary( - destinationHandler, - initialStatus.streamConfig, - initialStatus - ) - val updatedNeedsSoftReset = - softReset || migrationResult.updatedDestinationState.needsSoftReset() - return@supplyAsync migrationResult.copy( - updatedDestinationState = - migrationResult.updatedDestinationState.withSoftReset( - updatedNeedsSoftReset - ) + /** + * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they + * exist in the Destination Database. + */ + @JvmStatic + fun prepareSchemas( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler, + parsedCatalog: ParsedCatalog + ) { + val rawSchema = parsedCatalog.streams.map { it.id.rawNamespace } + val finalSchema = parsedCatalog.streams.map { it.id.finalNamespace } + val createAllSchemasSql = + (rawSchema + finalSchema).distinct().map { sqlGenerator.createSchema(it) } + destinationHandler.execute(Sql.concat(createAllSchemasSql)) + } + + private fun runMigrationsAsync( + executorService: ExecutorService, + destinationHandler: DestinationHandler, + migration: Migration, + initialStatus: DestinationInitialStatus + ): CompletionStage> { + return CompletableFuture.supplyAsync( + { + LOGGER.info( + "Maybe executing ${migration.javaClass.simpleName} migration for stream ${initialStatus.streamConfig.id.originalNamespace}.${initialStatus.streamConfig.id.originalName}." + ) + + // We technically don't need to track this, but might as well hedge against + // migrations + // accidentally setting softReset=false + val softReset = initialStatus.destinationState.needsSoftReset() + val migrationResult = + migration.migrateIfNecessary( + destinationHandler, + initialStatus.streamConfig, + initialStatus ) - }, - executorService + val updatedNeedsSoftReset = + softReset || migrationResult.updatedDestinationState.needsSoftReset() + return@supplyAsync migrationResult.copy( + updatedDestinationState = + migrationResult.updatedDestinationState.withSoftReset(updatedNeedsSoftReset) + ) + }, + executorService + ) + } + + /** + * It can be expensive to build the errors array in the airbyte_meta column, so we first attempt + * an 'unsafe' transaction which assumes everything is typed correctly. If that fails, we will + * run a more expensive query which handles casting errors + * + * @param sqlGenerator for generating sql for the destination + * @param destinationHandler for executing sql created + * @param streamConfig which stream to operate on + * @param minExtractedAt to reduce the amount of data in the query + * @param suffix table suffix for temporary tables + * @throws Exception if the safe query fails + */ + @JvmStatic + @Throws(Exception::class) + fun executeTypeAndDedupe( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler<*>, + streamConfig: StreamConfig?, + minExtractedAt: Optional, + suffix: String + ) { + try { + LOGGER.info( + "Attempting typing and deduping for {}.{} with suffix {}", + streamConfig!!.id.originalNamespace, + streamConfig.id.originalName, + suffix ) + val unsafeSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, false) + destinationHandler.execute(unsafeSql) + } catch (e: Exception) { + if (sqlGenerator.shouldRetry(e)) { + // TODO Destination specific non-retryable exceptions should be added. + LOGGER.error( + "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, attempting with error handling", + streamConfig!!.id.originalNamespace, + streamConfig.id.originalName, + suffix, + e + ) + val saferSql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAt, true) + destinationHandler.execute(saferSql) + } else { + LOGGER.error( + "Encountered Exception on unsafe SQL for stream {} {} with suffix {}, Retry is skipped", + streamConfig!!.id.originalNamespace, + streamConfig.id.originalName, + suffix, + e + ) + throw e + } } } + + /** + * Everything in [TyperDeduperUtil.executeTypeAndDedupe] but with a little extra prep work for + * the soft reset temp tables + * + * @param sqlGenerator for generating sql for the destination + * @param destinationHandler for executing sql created + * @param streamConfig which stream to operate on + * @throws Exception if the safe query fails + */ + @JvmStatic + @Throws(Exception::class) + fun executeSoftReset( + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler<*>, + streamConfig: StreamConfig + ) { + LOGGER.info( + "Attempting soft reset for stream {} {}", + streamConfig.id.originalNamespace, + streamConfig.id.originalName + ) + destinationHandler.execute(sqlGenerator.prepareTablesForSoftReset(streamConfig)) + executeTypeAndDedupe( + sqlGenerator, + destinationHandler, + streamConfig, + Optional.empty(), + SOFT_RESET_SUFFIX + ) + destinationHandler.execute( + sqlGenerator.overwriteFinalTable(streamConfig.id, SOFT_RESET_SUFFIX) + ) + } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index ac9e81c8d9be..08c2ec8ef980 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -7,8 +7,8 @@ import com.fasterxml.jackson.databind.JsonNode import com.google.common.collect.Streams import io.airbyte.commons.json.Jsons import io.airbyte.commons.string.Strings -import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.executeSoftReset -import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction.executeTypeAndDedupe +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeSoftReset +import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.executeTypeAndDedupe import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog @@ -1628,11 +1628,7 @@ abstract class BaseSqlGeneratorIntegrationTest