Skip to content

Commit

Permalink
Connector operations by responsibility
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed May 10, 2024
1 parent 3525225 commit 726b1fe
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.airbyte.cdk.integrations.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.stream.Stream

/** Connector sync operations */
interface SyncOperation {

/** DestinationFlush function sends per stream with descriptor. */
fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>)
fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.util.ConnectorExceptionUtil as exceptions
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtil.Companion as tdutils
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
import io.airbyte.commons.concurrency.CompletableFutures.allOf
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.StreamDescriptor
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.stream.Stream
import org.apache.commons.lang3.concurrent.BasicThreadFactory

class DefaultConnectorOperation<DestinationState : MinimumDestinationState> (
private val parsedCatalog: ParsedCatalog,
private val destinationHandler: DestinationHandler<DestinationState>,
private val defaultNamespace: String,
private val streamOperationsFactory: StreamOperationsFactory<DestinationState>,
private val migrations: List<Migration<DestinationState>>,
private val executorService: ExecutorService =
Executors.newFixedThreadPool(
10,
BasicThreadFactory.Builder().namingPattern("sync-operations-%d").build(),
)
) : SyncOperation {
companion object {
// Use companion to be accessible during instantiation with init
private val log = KotlinLogging.logger {}
}

private val streamOpsMap: Map<StreamId, StreamOperation<DestinationState>>
init {
streamOpsMap = createPerStreamOpClients()
}

private fun createPerStreamOpClients(): Map<StreamId, StreamOperation<DestinationState>> {
log.info { "Preparing required schemas and tables for all streams" }
val streamsInitialStates = destinationHandler.gatherInitialState(parsedCatalog.streams)

// we will commit destinationStates and run Migrations here.
val postMigrationInitialStates =
tdutils.executeRawTableMigrations(
executorService,
destinationHandler,
migrations,
streamsInitialStates
)

val initializationFutures =
postMigrationInitialStates
.map {
CompletableFuture.supplyAsync(
{
Pair(
it.streamConfig.id,
streamOperationsFactory.createInstance(it)
)
},
executorService,
)
}
.toList()
val futuresResult = allOf(initializationFutures).toCompletableFuture().get()
val result =
exceptions.getResultsOrLogAndThrowFirst(
"Following exceptions occurred during sync initialization",
futuresResult,
)
return result.toMap()
}

override fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
val streamConfig =
parsedCatalog.getStream(descriptor.namespace ?: defaultNamespace, descriptor.name)
streamOpsMap[streamConfig.id]?.writeRecords(streamConfig, stream)
}

override fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>) {
// Only call finalizeTable operations which has summary. rest will be skipped
val finalizeFutures =
streamSyncSummaries.entries
.map {
CompletableFuture.supplyAsync(
{
val streamConfig =
parsedCatalog.getStream(
it.key.namespace ?: defaultNamespace,
it.key.name,
)
streamOpsMap[streamConfig.id]?.finalizeTable(streamConfig, it.value)
},
executorService,
)
}
.toList()
val futuresResult = allOf(finalizeFutures).toCompletableFuture().join()
exceptions.getResultsOrLogAndThrowFirst(
"Following exceptions occurred while finalizing the sync",
futuresResult,
)
log.info { "Cleaning up sync operation thread pools" }
executorService.shutdown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
import io.airbyte.protocol.models.v0.DestinationSyncMode
import java.time.Instant
import java.util.Optional

interface StorageOperations {
/*
* ==================== Staging Operations ================================
*/

/**
* Prepare staging area which cloud be creating any object storage, temp tables or file storage
*/
fun prepareStage(streamId: StreamId, destinationSyncMode: DestinationSyncMode)

/** Delete previously staged data, using deterministic information from streamId. */
fun cleanupStage(streamId: StreamId)

/** Copy data from provided buffer into stage. */
fun writeToStage(streamId: StreamId, buffer: SerializableBuffer)

/*
* ==================== Final Table Operations ================================
*/

/** Create final schema extracted from [StreamId] */
fun createFinalSchema(streamId: StreamId)

/** Create final table extracted from [StreamId] */
fun createFinalTable(streamConfig: StreamConfig, suffix: String, replace: Boolean)

/**
* Reset the final table using a temp table
* or ALTER existing table's columns.
*/
fun softResetFinalTable(streamConfig: StreamConfig)

/**
* Attempt to atomically swap the final table (name and namespace extracted from [StreamId]).
* This could be destination specific, INSERT INTO..SELECT * and DROP TABLE OR CREATE OR REPLACE
* ... SELECT *, DROP TABLE
*/
fun overwriteFinalTable(streamConfig: StreamConfig, tmpTableSuffix: String)

/**
*/
fun typeAndDedupe(
streamConfig: StreamConfig,
maxProcessedTimestamp: Optional<Instant>,
finalTableSuffix: String
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import java.util.stream.Stream

/** Operations on individual streams. */
interface StreamOperation<T> {

fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>)

fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.airbyte.integrations.base.destination.operation

import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus

interface StreamOperationsFactory<DestinationState> {

/**
* Create an instance with required dependencies injected using a concrete
* factory implementation.
*/
fun createInstance(destinationInitialStatus: DestinationInitialStatus<DestinationState>): StreamOperation<DestinationState>

}

0 comments on commit 726b1fe

Please sign in to comment.