Skip to content

Commit

Permalink
Connector operations by responsibility
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed May 17, 2024
1 parent d2e9c92 commit 4dd2733
Show file tree
Hide file tree
Showing 23 changed files with 1,737 additions and 299 deletions.
4 changes: 2 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.35.6 | 2024-05-17 | [\#38107](https://github.com/airbytehq/airbyte/pull/38107) | New interfaces for Destination connectors to plug into AsyncStreamConsumer |
| 0.35.5 | 2024-05-17 | [\#38204](https://github.com/airbytehq/airbyte/pull/38204) | add assume-role authentication to s3 |
| 0.35.2 | 2024-05-13 | [\#38104](https://github.com/airbytehq/airbyte/pull/38104) | Handle transient error messages |
| 0.35.0 | 2024-05-13 | [\#38127](https://github.com/airbytehq/airbyte/pull/38127) | Destinations: Populate generation/sync ID on StreamConfig |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.4 | 2024-05-10 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | make sure the exceptionHandler always terminates |
| 0.34.3 | 2024-05-10 | [\#38095](https://github.com/airbytehq/airbyte/pull/38095) | Minor changes for databricks connector |
| 0.34.1 | 2024-05-07 | [\#38030](https://github.com/airbytehq/airbyte/pull/38030) | Add support for transient errors |
| 0.34.0 | 2024-05-01 | [\#37712](https://github.com/airbytehq/airbyte/pull/37712) | Destinations: Remove incremental T+D |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ object JavaBaseConstants {

const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
enum class DestinationColumns(val rawColumns: List<String>) {
V2_WITH_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
LEGACY(JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS)
V2_WITH_META(V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
LEGACY(LEGACY_RAW_TABLE_COLUMNS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.*
* destinations framework; new implementations should always provide this information). If this
* value is empty, consumers should assume that the sync wrote nonzero records for this stream.
*/
class StreamSyncSummary(val recordsWritten: Optional<Long>) {
data class StreamSyncSummary(val recordsWritten: Optional<Long>) {

companion object {
@JvmField val DEFAULT: StreamSyncSummary = StreamSyncSummary(Optional.empty())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.stream.Stream

/**
* Destination Connector sync operations Any initialization required for the connector should be
* done as part of instantiation/init blocks
*/
interface SyncOperation {

/**
* This function is a shim for
* [io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction] After the
* method control is returned, it should be assumed that the data is committed to a durable
* storage and send back any State message acknowledgements.
*/
fun flushStream(descriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>)

/**
* Finalize streams which could involve typing deduping or any other housekeeping tasks
* required.
*/
fun finalizeStreams(streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>)
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.35.5
version=0.35.6
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.staging

import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.record_buffer.FileBuffer
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSerializedBuffer
import io.airbyte.cdk.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator

/**
* Factory which can create an instance of concrete SerializedBuffer for one-time use before buffer
* is closed. [io.airbyte.cdk.integrations.destination.s3.SerializedBufferFactory] is almost similar
* which needs to be unified. That doesn't work well with our DV2 staging destinations, which mostly
* support CSV only.
*/
object StagingSerializedBufferFactory {

fun initializeBuffer(
fileUploadFormat: FileUploadFormat,
destinationColumns: JavaBaseConstants.DestinationColumns
): SerializableBuffer {
when (fileUploadFormat) {
FileUploadFormat.CSV -> {
return CsvSerializedBuffer(
FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
StagingDatabaseCsvSheetGenerator(destinationColumns),
true,
)
}
else -> {
TODO("Only CSV is supported for Staging format")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.destination.staging.operation

import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.record_buffer.SerializableBuffer
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.staging.StagingSerializedBufferFactory
import io.airbyte.commons.json.Jsons
import io.airbyte.integrations.base.destination.operation.AbstractStreamOperation
import io.airbyte.integrations.base.destination.operation.StorageOperation
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.stream.Stream
import org.apache.commons.io.FileUtils

class StagingStreamOperations<DestinationState : MinimumDestinationState>(
private val storageOperation: StorageOperation<SerializableBuffer>,
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
private val fileUploadFormat: FileUploadFormat,
private val destinationColumns: JavaBaseConstants.DestinationColumns,
disableTypeDedupe: Boolean = false
) :
AbstractStreamOperation<DestinationState, SerializableBuffer>(
storageOperation,
destinationInitialStatus,
disableTypeDedupe
) {

private val log = KotlinLogging.logger {}
override fun writeRecords(streamConfig: StreamConfig, stream: Stream<PartialAirbyteMessage>) {
val writeBuffer =
StagingSerializedBufferFactory.initializeBuffer(fileUploadFormat, destinationColumns)

writeBuffer.use {
stream.forEach { record: PartialAirbyteMessage ->
it.accept(
record.serialized!!,
Jsons.serialize(record.record!!.meta),
record.record!!.emittedAt
)
}
it.flush()
log.info {
"Buffer flush complete for stream ${streamConfig.id.originalName} (${FileUtils.byteCountToDisplaySize(it.byteCount)}) to staging"
}
storageOperation.writeToStage(streamConfig.id, writeBuffer)
}
}
}
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/typing-deduping/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ dependencies {
testFixturesApi testFixtures(project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core'))
testFixturesImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
testImplementation "io.mockk:mockk:1.13.11"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.StreamSyncSummary
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState
import io.airbyte.protocol.models.v0.DestinationSyncMode
import io.github.oshai.kotlinlogging.KotlinLogging
import java.util.Optional
import java.util.stream.Stream

abstract class AbstractStreamOperation<DestinationState : MinimumDestinationState, Data>(
private val storageOperation: StorageOperation<Data>,
destinationInitialStatus: DestinationInitialStatus<DestinationState>,
private val disableTypeDedupe: Boolean = false
) : StreamOperation<DestinationState> {
private val log = KotlinLogging.logger {}

// State maintained to make decision between async calls
private val finalTmpTableSuffix: String
private val initialRawTableStatus: InitialRawTableStatus =
destinationInitialStatus.initialRawTableStatus

/**
* After running any sync setup code, we may update the destination state. This field holds that
* updated destination state.
*/
final override val updatedDestinationState: DestinationState

init {
val stream = destinationInitialStatus.streamConfig
storageOperation.prepareStage(stream.id, stream.destinationSyncMode)
if (!disableTypeDedupe) {
storageOperation.createFinalNamespace(stream.id)
// Prepare final tables based on sync mode.
finalTmpTableSuffix = prepareFinalTable(destinationInitialStatus)
} else {
log.info { "Typing and deduping disabled, skipping final table initialization" }
finalTmpTableSuffix = NO_SUFFIX
}
updatedDestinationState = destinationInitialStatus.destinationState.withSoftReset(false)
}

companion object {
private const val NO_SUFFIX = ""
private const val TMP_OVERWRITE_TABLE_SUFFIX = "_airbyte_tmp"
}

private fun prepareFinalTable(
initialStatus: DestinationInitialStatus<DestinationState>
): String {
val stream = initialStatus.streamConfig
// No special handling if final table doesn't exist, just create and return
if (!initialStatus.isFinalTablePresent) {
log.info {
"Final table does not exist for stream ${initialStatus.streamConfig.id.finalName}, creating."
}
storageOperation.createFinalTable(stream, NO_SUFFIX, false)
return NO_SUFFIX
}

log.info { "Final Table exists for stream ${stream.id.finalName}" }
// The table already exists. Decide whether we're writing to it directly, or
// using a tmp table.
when (stream.destinationSyncMode) {
DestinationSyncMode.OVERWRITE -> return prepareFinalTableForOverwrite(initialStatus)
DestinationSyncMode.APPEND,
DestinationSyncMode.APPEND_DEDUP -> {
if (
initialStatus.isSchemaMismatch ||
initialStatus.destinationState.needsSoftReset()
) {
// We're loading data directly into the existing table.
// Make sure it has the right schema.
// Also, if a raw table migration wants us to do a soft reset, do that
// here.
log.info { "Executing soft-reset on final table of stream $stream" }
storageOperation.softResetFinalTable(stream)
}
return NO_SUFFIX
}
}
}

private fun prepareFinalTableForOverwrite(
initialStatus: DestinationInitialStatus<DestinationState>
): String {
val stream = initialStatus.streamConfig
if (!initialStatus.isFinalTableEmpty || initialStatus.isSchemaMismatch) {
// overwrite an existing tmp table if needed.
storageOperation.createFinalTable(stream, TMP_OVERWRITE_TABLE_SUFFIX, true)
log.info {
"Using temp final table for table ${stream.id.finalName}, this will be overwritten at end of sync"
}
// We want to overwrite an existing table. Write into a tmp table.
// We'll overwrite the table at the
// end of the sync.
return TMP_OVERWRITE_TABLE_SUFFIX
}

log.info {
"Final Table for stream ${stream.id.finalName} is empty and matches the expected v2 format, writing to table directly"
}
return NO_SUFFIX
}

/** Write records will be destination type specific, Insert vs staging based on format */
abstract override fun writeRecords(
streamConfig: StreamConfig,
stream: Stream<PartialAirbyteMessage>
)

override fun finalizeTable(streamConfig: StreamConfig, syncSummary: StreamSyncSummary) {
// Delete staging directory, implementation will handle if it has to do it or not or a No-OP
storageOperation.cleanupStage(streamConfig.id)
if (disableTypeDedupe) {
log.info {
"Typing and deduping disabled, skipping final table finalization. " +
"Raw records can be found at ${streamConfig.id.rawNamespace}.${streamConfig.id.rawName}"
}
return
}

// Legacy logic that if recordsWritten or not tracked then it could be non-zero
val isNotOverwriteSync = streamConfig.destinationSyncMode != DestinationSyncMode.OVERWRITE
// Legacy logic that if recordsWritten or not tracked then it could be non-zero.
// But for OVERWRITE syncs, we don't need to look at old records.
val shouldRunTypingDeduping =
syncSummary.recordsWritten.map { it > 0 }.orElse(true) ||
(initialRawTableStatus.hasUnprocessedRecords && isNotOverwriteSync)
if (!shouldRunTypingDeduping) {
log.info {
"Skipping typing and deduping for stream ${streamConfig.id.originalNamespace}.${streamConfig.id.originalName} " +
"because it had no records during this sync and no unprocessed records from a previous sync."
}
} else {
// In overwrite mode, we want to read all the raw records. Typically, this is equivalent
// to filtering on timestamp, but might as well be explicit.
val timestampFilter =
if (isNotOverwriteSync) {
initialRawTableStatus.maxProcessedTimestamp
} else {
Optional.empty()
}
storageOperation.typeAndDedupe(streamConfig, timestampFilter, finalTmpTableSuffix)
}

// For overwrite, it's wasteful to do T+D, so we don't do soft-reset in prepare. Instead, we
// do
// type-dedupe
// on a suffixed table and do a swap here when we have to for schema mismatches
if (
streamConfig.destinationSyncMode == DestinationSyncMode.OVERWRITE &&
finalTmpTableSuffix.isNotBlank()
) {
storageOperation.overwriteFinalTable(streamConfig, finalTmpTableSuffix)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.operation

import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.operation.SyncOperation
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.stream.Stream

class DefaultFlush(
override val optimalBatchSizeBytes: Long,
private val syncOperation: SyncOperation
) : DestinationFlushFunction {
override fun flush(streamDescriptor: StreamDescriptor, stream: Stream<PartialAirbyteMessage>) {
syncOperation.flushStream(streamDescriptor, stream)
}
}

0 comments on commit 4dd2733

Please sign in to comment.