Skip to content

Commit

Permalink
Connector operations by responsibility
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed May 10, 2024
1 parent 7ac6df3 commit 3cd7744
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.stream.Stream

/** Connector sync operations */
interface SyncOperation {

/** DestinationFlush function sends per stream with descriptor. */
fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>)
fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.stream.Stream

abstract class AbstractStreamOperation<DestinationState : MinimumDestinationState>(
private val storageOperations: StorageOperations,
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
) : StreamOperation<DestinationState> {
private val log = KotlinLogging.logger {}

// State maintained to make decision between async calls
private val finalTmpTableSuffix: String
private val initialRawTableStatus: InitialRawTableStatus =
destinationInitialStatus.initialRawTableStatus
init {
val stream = destinationInitialStatus.streamConfig
storageOperations.prepareStage(stream.id, stream.destinationSyncMode)
storageOperations.createFinalSchema(stream.id)
// Prepare final tables based on sync mode.
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
}

companion object {
private const val NO_SUFFIX = ""
private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp"
}

private fun prepareFinalTable(
initialStatus: DestinationInitialStatus<DestinationState>
): String {
val stream = initialStatus.streamConfig
// No special handling if final table doesn't exist, just create and return
if (!initialStatus.isFinalTablePresent) {
log.info {
"Final table does not exist for stream ${initialStatus.streamConfig.id.finalName}, creating."
}
storageOperations.createFinalTable(stream, NO_SUFFIX, false)
return NO_SUFFIX
}

log.info { "Final Table exists for stream ${stream.id.finalName}" }
// The table already exists. Decide whether we're writing to it directly, or
// using a tmp table.
when (stream.destinationSyncMode) {
DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus)
DestinationSyncMode.APPEND,
DestinationSyncMode.APPEND_DEDUP -> {
if (
initialStatus.isSchemaMismatch ||
initialStatus.destinationState.needsSoftReset()
) {
// We're loading data directly into the existing table.
// Make sure it has the right schema.
// Also, if a raw table migration wants us to do a soft reset, do that
// here.
log.info { "Executing soft-reset on final table of stream $stream" }
storageOperations.softResetFinalTable(stream)
}
return NO_SUFFIX
}
}
}

private fun prepareFinalTableForOverwrite(
initialStatus: DestinationInitialStatus<DestinationState>
): String {
val stream = initialStatus.streamConfig
if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) {
// overwrite an existing tmp table if needed.
storageOperations.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true)
log.info {
"Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync"
}
// We want to overwrite an existing table. Write into a tmp table.
// We'll overwrite the table at the
// end of the sync.
return TMP_OVERWRITE_TABLE_SUFFIX
}

log.info {
"Final Table for stream ${stream.id.finalName} is empty and matches the expected v2 format, writing to table directly"
}
return NO_SUFFIX
}

/** Write records will be destination type specific, Insert vs staging based on format */
abstract override fun writeRecords(
streamConfig: StreamConfig,
stream: Stream<PartialAirbyteMessage>
)

override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {
// Legacy logic that if recordsWritten or not tracked then it could be non-zero
val shouldRunFinalizer =
syncSummary.recordsWritten.map { it > 0 }.orElse(true) ||
initialRawTableStatus.hasUnprocessedRecords
if (!shouldRunFinalizer) {
log.info {
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " +
"because it had no records during this sync and no unprocessed records from a previous sync."
}
return
}

storageOperations.typeAndDedupe(
streamConfig,
initialRawTableStatus.maxProcessedTimestamp,
finalTmpTableSuffix
)

// Delete staging directory, implementation will handle if it has to do it or not or a No-OP
storageOperations.cleanupStage(streamConfig.id)

// For overwrite, its wasteful to do T+D so we don't do soft-reset in prepare. Instead we do
// type-dedupe
// on a suffixed table and do a swap here when we have to for schema mismatches
if (streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE) {
storageOperations.overwriteFinalTable(streamConfig, finalTmpTableSuffix)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions
import io.airbyte.commons.concurrency.CompletableFutures.allOf
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion as tdutils
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.stream.Stream
import org.apache.commons.lang3.concurrent.BasicThreadFactory

class DefaultConnectorOperation<DestinationState : MinimumDestinationState>(
private val parsedCatalog: ParsedCatalog,
private val destinationHandler: DestinationHandler<DestinationState>,
private val defaultNamespace: String,
private val streamOperationsFactory: StreamOperationsFactory<DestinationState>,
private val migrations: List<Migration<DestinationState>>,
private val executorService: ExecutorService =
Executors.newFixedThreadPool(
10,
BasicThreadFactory.Builder().namingPattern("sync-operations-%d").build(),
)
) : SyncOperation {
companion object {
// Use companion to be accessible during instantiation with init
private val log = KotlinLogging.logger {}
}

private val streamOpsMap: Map<StreamId, StreamOperation<DestinationState>>
init {
streamOpsMap = createPerStreamOpClients()
}

private fun createPerStreamOpClients(): Map<StreamId, StreamOperation<DestinationState>> {
log.info { "Preparing required schemas and tables for all streams" }
val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams)

// we will commit destinationStates and run Migrations here.
val postMigrationInitialStates =
tdutils.executeRawTableMigrations(
executorService,
destinationHandler,
migrations,
streamsInitialStates
)

val initializationFutures =
postMigrationInitialStates
.map {
CompletableFuture.supplyAsync(
{ Pair(it.streamConfig.id, streamOperationsFactory.createInstance(it)) },
executorService,
)
}
.toList()
val futuresResult = allOf(initializationFutures).toCompletableFuture().get()
val result =
exceptions.getResultsOrLogAndThrowFirst(
"Following exceptions occurred during sync initialization",
futuresResult,
)
return result.toMap()
}

override fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
val streamConfig =
parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name)
streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream)
}

override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {
// Only call finalizeTable operations which has summary. rest will be skipped
val finalizeFutures =
streamSyncSummaries.entries
.map {
CompletableFuture.supplyAsync(
{
val streamConfig =
parsedCatalog.getStream(
it.key.namespace ?: defaultNamespace,
it.key.name,
)
streamOpsMap[streamConfig.id]?.finalizeTable(streamConfig, it.value)
},
executorService,
)
}
.toList()
val futuresResult = allOf(finalizeFutures).toCompletableFuture().join()
exceptions.getResultsOrLogAndThrowFirst(
"Following exceptions occurred while finalizing the sync",
futuresResult,
)
log.info { "Cleaning up sync operation thread pools" }
executorService.shutdown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.time.Instant
import java.util.Optional

interface StorageOperations {
/*
* ==================== Staging Operations ================================
*/

/**
* Prepare staging area which cloud be creating any object storage, temp tables or file storage
*/
fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode)

/** Delete previously staged data, using deterministic information from streamId. */
fun cleanupStage(streamId: StreamId)

/** Copy data from provided buffer into stage. */
fun writeToStage(streamId: StreamId, buffer: SerializableBuffer)

/*
* ==================== Final Table Operations ================================
*/

/** Create final schema extracted from [StreamId] */
fun createFinalSchema(streamId: StreamId)

/** Create final table extracted from [StreamId] */
fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean)

/** Reset the final table using a temp table or ALTER existing table's columns. */
fun softResetFinalTable(streamConfig: StreamConfig)

/**
* Attempt to atomically swap the final table (name and namespace extracted from [StreamId]).
* This could be destination specific, INSERT INTO..SELECT * and DROP TABLE OR CREATE OR REPLACE
* ... SELECT *, DROP TABLE
*/
fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String)

/**
*/
fun typeAndDedupe(
streamConfig: StreamConfig,
maxProcessedTimestamp: Optional<Instant>,
finalTableSuffix: String
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import java.util.stream.Stream

/** Operations on individual streams. */
interface StreamOperation<T> {

fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>)

fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus

interface StreamOperationsFactory<DestinationState> {

/**
* Create an instance with required dependencies injected using a concrete factory
* implementation.
*/
fun createInstance(
destinationInitialStatus: DestinationInitialStatus<DestinationState>
): StreamOperation<DestinationState>
}

0 comments on commit 3cd7744

Please sign in to comment.