Skip to content

Commit

Permalink
convert destination-bigquery to kotlin CDK (#36899)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Apr 25, 2024
1 parent 993aece commit c4ad3d9
Show file tree
Hide file tree
Showing 44 changed files with 251 additions and 209 deletions.
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Expand Up @@ -144,6 +144,8 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.30.11 | 2024-04-25 | [\#36899](https://github.com/airbytehq/airbyte/pull/36899) | changes for bigQuery destination. |
| 0.30.10 | 2024-04-24 | [\#37541](https://github.com/airbytehq/airbyte/pull/37541) | remove excessive logging |
| 0.30.9 | 2024-04-24 | [\#37477](https://github.com/airbytehq/airbyte/pull/37477) | remove unnecessary logs
| 0.30.7 | 2024-04-23 | [\#37477](https://github.com/airbytehq/airbyte/pull/37477) | fix kotlin warnings in core CDK submodule
| 0.30.7 | 2024-04-23 | [\#37484](https://github.com/airbytehq/airbyte/pull/37484) | fix kotlin warnings in dependencies CDK submodule |
Expand Down
Expand Up @@ -152,6 +152,7 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
}
}

@JvmStatic
fun addAllStringsInConfigForDeinterpolation(node: JsonNode) {
if (node.isTextual) {
addStringForDeinterpolation(node.asText())
Expand Down
Expand Up @@ -35,8 +35,8 @@ object JavaBaseConstants {
// Meta was introduced later, so to avoid triggering raw table soft-reset in v1->v2
// use this column list.
@JvmField
val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: Set<String> =
java.util.Set.of(
val V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META: List<String> =
java.util.List.of(
COLUMN_NAME_AB_RAW_ID,
COLUMN_NAME_AB_EXTRACTED_AT,
COLUMN_NAME_AB_LOADED_AT,
Expand All @@ -56,4 +56,9 @@ object JavaBaseConstants {
java.util.List.of(COLUMN_NAME_AB_RAW_ID, COLUMN_NAME_AB_EXTRACTED_AT, COLUMN_NAME_AB_META)

const val DEFAULT_AIRBYTE_INTERNAL_NAMESPACE: String = "airbyte_internal"
enum class DestinationColumns(val rawColumns: List<String>) {
V2_WITH_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES),
V2_WITHOUT_META(JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES_WITHOUT_META),
LEGACY(JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS)
}
}
Expand Up @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.function.Consumer
import java.util.stream.Collectors
import kotlin.jvm.optionals.getOrNull
import org.jetbrains.annotations.VisibleForTesting

private val logger = KotlinLogging.logger {}

Expand All @@ -41,7 +42,10 @@ private val logger = KotlinLogging.logger {}
* memory limit governed by [GlobalMemoryManager]. Record writing is decoupled via [FlushWorkers].
* See the other linked class for more detail.
*/
class AsyncStreamConsumer(
open class AsyncStreamConsumer
@VisibleForTesting
@JvmOverloads
constructor(
outputRecordCollector: Consumer<AirbyteMessage>,
private val onStart: OnStartFunction,
private val onClose: OnCloseFunction,
Expand All @@ -51,7 +55,8 @@ class AsyncStreamConsumer(
private val defaultNamespace: Optional<String>,
private val flushFailure: FlushFailure = FlushFailure(),
workerPool: ExecutorService = Executors.newFixedThreadPool(5),
private val airbyteMessageDeserializer: AirbyteMessageDeserializer,
private val airbyteMessageDeserializer: AirbyteMessageDeserializer =
AirbyteMessageDeserializer(),
) : SerializedAirbyteMessageConsumer {
private val bufferEnqueue: BufferEnqueue = bufferManager.bufferEnqueue
private val flushWorkers: FlushWorkers =
Expand Down
Expand Up @@ -14,6 +14,11 @@ import io.airbyte.protocol.models.v0.StreamDescriptor
*
* The map of StreamSyncSummaries MUST be non-null, but MAY be empty. Streams not present in the map
* will be treated as equivalent to [StreamSyncSummary.DEFAULT].
*
* The @JvmSuppressWildcards is here so that the 2nd parameter of accept stays a java
* Map<StreamDescriptor, StreamSyncSummary> rather than becoming a Map<StreamDescriptor, ? extends
* StreamSyncSummary>
*/
fun interface OnCloseFunction :
CheckedBiConsumer<Boolean, Map<StreamDescriptor, StreamSyncSummary>, Exception>
CheckedBiConsumer<
Boolean, @JvmSuppressWildcards Map<StreamDescriptor, StreamSyncSummary>, Exception>
@@ -1 +1 @@
version=0.30.10
version=0.30.11
Expand Up @@ -1469,7 +1469,7 @@ abstract class DestinationAcceptanceTest {
}

/** Whether the destination should be tested against different namespaces. */
protected fun supportNamespaceTest(): Boolean {
protected open fun supportNamespaceTest(): Boolean {
return false
}

Expand All @@ -1485,7 +1485,7 @@ abstract class DestinationAcceptanceTest {
* normalized namespace when testCaseId = "S3A-1". Find the testCaseId in
* "namespace_test_cases.json".
*/
protected fun assertNamespaceNormalization(
protected open fun assertNamespaceNormalization(
testCaseId: String?,
expectedNormalizedNamespace: String?,
actualNormalizedNamespace: String?
Expand Down
Expand Up @@ -61,9 +61,7 @@ class LocalAirbyteDestination(private val dest: Destination) : AirbyteDestinatio
return isClosed
}

override fun getExitValue(): Int {
return 0
}
override val exitValue = 0

override fun attemptRead(): Optional<io.airbyte.protocol.models.AirbyteMessage> {
return Optional.empty()
Expand Down
Expand Up @@ -28,7 +28,7 @@ open class AdvancedTestDataComparator : TestDataComparator {
return java.util.List.of(identifier)
}

protected fun compareObjects(expectedObject: JsonNode, actualObject: JsonNode) {
protected open fun compareObjects(expectedObject: JsonNode, actualObject: JsonNode) {
if (!areBothEmpty(expectedObject, actualObject)) {
LOGGER.info("Expected Object : {}", expectedObject)
LOGGER.info("Actual Object : {}", actualObject)
Expand Down
Expand Up @@ -72,7 +72,7 @@ interface AirbyteDestination : CheckedConsumer<AirbyteMessage, Exception>, AutoC
* @return exit code of the destination process
* @throws IllegalStateException if the destination process has not exited
*/
fun getExitValue(): Int
abstract val exitValue: Int

/**
* Attempts to read an AirbyteMessage from the Destination.
Expand Down
Expand Up @@ -51,25 +51,19 @@ constructor(
private var messageIterator: Iterator<AirbyteMessage?>? = null

private var exitValueIsSet = false
private var exitValue: Int = 0
override fun getExitValue(): Int {
Preconditions.checkState(
destinationProcess != null,
"Destination process is null, cannot retrieve exit value."
)
Preconditions.checkState(
!destinationProcess!!.isAlive,
"Destination process is still alive, cannot retrieve exit value."
)

if (!exitValueIsSet) {
exitValueIsSet = true
exitValue = destinationProcess!!.exitValue()
override val exitValue: Int
get() {
Preconditions.checkState(
destinationProcess != null,
"Destination process is null, cannot retrieve exit value."
)
Preconditions.checkState(
!destinationProcess!!.isAlive,
"Destination process is still alive, cannot retrieve exit value."
)
return destinationProcess!!.exitValue()
}

return exitValue
}

@Throws(IOException::class, TestHarnessException::class)
override fun start(
destinationConfig: WorkerDestinationConfig,
Expand Down
Expand Up @@ -70,6 +70,7 @@ class GcsDestinationConfig(
companion object {
private const val GCS_ENDPOINT = "https://storage.googleapis.com"

@JvmStatic
fun getGcsDestinationConfig(config: JsonNode): GcsDestinationConfig {
return GcsDestinationConfig(
config["gcs_bucket_name"].asText(),
Expand Down
Expand Up @@ -7,9 +7,12 @@ import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Instant
import java.util.*

private val logger = KotlinLogging.logger {}

/**
* A CsvSheetGenerator that produces data in the format expected by JdbcSqlOperations. See
* JdbcSqlOperations#createTableQuery.
Expand All @@ -24,14 +27,12 @@ import java.util.*
*/
class StagingDatabaseCsvSheetGenerator
@JvmOverloads
constructor(private val useDestinationsV2Columns: Boolean = false) : CsvSheetGenerator {
// TODO is this even used anywhere?
private var header: List<String> =
if (this.useDestinationsV2Columns) JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES
else JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS

constructor(
private val destinationColumns: JavaBaseConstants.DestinationColumns =
JavaBaseConstants.DestinationColumns.LEGACY
) : CsvSheetGenerator {
override fun getHeaderRow(): List<String> {
return header
return destinationColumns.rawColumns
}

override fun getDataRow(id: UUID, recordMessage: AirbyteRecordMessage): List<Any> {
Expand All @@ -53,16 +54,19 @@ constructor(private val useDestinationsV2Columns: Boolean = false) : CsvSheetGen
emittedAt: Long,
formattedAirbyteMetaString: String
): List<Any> {
return if (useDestinationsV2Columns) {
java.util.List.of<Any>(
id,
Instant.ofEpochMilli(emittedAt),
"",
formattedString,
formattedAirbyteMetaString
)
} else {
java.util.List.of<Any>(id, formattedString, Instant.ofEpochMilli(emittedAt))
return when (destinationColumns) {
JavaBaseConstants.DestinationColumns.LEGACY ->
listOf(id, formattedString, Instant.ofEpochMilli(emittedAt))
JavaBaseConstants.DestinationColumns.V2_WITH_META ->
listOf(
id,
Instant.ofEpochMilli(emittedAt),
"",
formattedString,
formattedAirbyteMetaString
)
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META ->
listOf(id, Instant.ofEpochMilli(emittedAt), "", formattedString)
}
}
}
Expand Up @@ -4,6 +4,7 @@
package io.airbyte.cdk.integrations.destination.staging

import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.async.function.DestinationFlushFunction
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig
Expand Down Expand Up @@ -39,7 +40,7 @@ internal class AsyncFlush(
// the batch size, the AsyncFlusher will flush in smaller batches which allows for memory to be
// freed earlier similar to a sliding window effect
override val optimalBatchSizeBytes: Long,
private val useDestinationsV2Columns: Boolean
private val destinationColumns: JavaBaseConstants.DestinationColumns
) : DestinationFlushFunction {

@Throws(Exception::class)
Expand All @@ -49,7 +50,7 @@ internal class AsyncFlush(
writer =
CsvSerializedBuffer(
FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
StagingDatabaseCsvSheetGenerator(useDestinationsV2Columns),
StagingDatabaseCsvSheetGenerator(destinationColumns),
true
)

Expand Down
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.destination.staging
import com.fasterxml.jackson.databind.JsonNode
import com.google.common.base.Preconditions
import io.airbyte.cdk.db.jdbc.JdbcDatabase
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer
Expand Down Expand Up @@ -46,7 +47,7 @@ private constructor(
private val typerDeduper: TyperDeduper?,
private val parsedCatalog: ParsedCatalog?,
private val defaultNamespace: String?,
private val useDestinationsV2Columns: Boolean,
private val destinationColumns: JavaBaseConstants.DestinationColumns,
// Optional fields
private val bufferMemoryLimit: Optional<Long>,
private val optimalBatchSizeBytes: Long,
Expand All @@ -68,7 +69,8 @@ private constructor(
var typerDeduper: TyperDeduper? = null
var parsedCatalog: ParsedCatalog? = null
var defaultNamespace: String? = null
var useDestinationsV2Columns: Boolean = false
var destinationColumns: JavaBaseConstants.DestinationColumns =
JavaBaseConstants.DestinationColumns.LEGACY

// Optional fields
private var bufferMemoryLimit = Optional.empty<Long>()
Expand Down Expand Up @@ -104,7 +106,7 @@ private constructor(
typerDeduper,
parsedCatalog,
defaultNamespace,
useDestinationsV2Columns,
destinationColumns,
bufferMemoryLimit,
optimalBatchSizeBytes,
(if (dataTransformer != null) dataTransformer else IdentityDataTransformer())!!
Expand All @@ -118,13 +120,7 @@ private constructor(
val stagingOperations = this.stagingOperations!!

val writeConfigs: List<WriteConfig> =
createWriteConfigs(
namingResolver,
config,
catalog,
parsedCatalog,
useDestinationsV2Columns
)
createWriteConfigs(namingResolver, config, catalog, parsedCatalog, destinationColumns)
val streamDescToWriteConfig: Map<StreamDescriptor, WriteConfig> =
streamDescToWriteConfig(writeConfigs)
val flusher =
Expand All @@ -136,7 +132,7 @@ private constructor(
typerDeduperValve,
typerDeduper,
optimalBatchSizeBytes,
useDestinationsV2Columns
destinationColumns
)
return AsyncStreamConsumer(
outputRecordCollector!!,
Expand Down Expand Up @@ -181,7 +177,7 @@ private constructor(
typerDeduper: TyperDeduper,
parsedCatalog: ParsedCatalog?,
defaultNamespace: String?,
useDestinationsV2Columns: Boolean
destinationColumns: JavaBaseConstants.DestinationColumns
): Builder {
val builder = Builder()
builder.outputRecordCollector = outputRecordCollector
Expand All @@ -195,7 +191,7 @@ private constructor(
builder.typerDeduper = typerDeduper
builder.parsedCatalog = parsedCatalog
builder.defaultNamespace = defaultNamespace
builder.useDestinationsV2Columns = useDestinationsV2Columns
builder.destinationColumns = destinationColumns
return builder
}

Expand Down Expand Up @@ -263,20 +259,20 @@ private constructor(
config: JsonNode?,
catalog: ConfiguredAirbyteCatalog?,
parsedCatalog: ParsedCatalog?,
useDestinationsV2Columns: Boolean
destinationColumns: JavaBaseConstants.DestinationColumns
): List<WriteConfig> {
return catalog!!
.streams
.stream()
.map(toWriteConfig(namingResolver, config, parsedCatalog, useDestinationsV2Columns))
.map(toWriteConfig(namingResolver, config, parsedCatalog, destinationColumns))
.toList()
}

private fun toWriteConfig(
namingResolver: NamingConventionTransformer?,
config: JsonNode?,
parsedCatalog: ParsedCatalog?,
useDestinationsV2Columns: Boolean
destinationColumns: JavaBaseConstants.DestinationColumns
): Function<ConfiguredAirbyteStream, WriteConfig> {
return Function<ConfiguredAirbyteStream, WriteConfig> { stream: ConfiguredAirbyteStream
->
Expand All @@ -289,15 +285,19 @@ private constructor(

val outputSchema: String
val tableName: String
if (useDestinationsV2Columns) {
val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id
outputSchema = streamId.rawNamespace!!
tableName = streamId.rawName!!
} else {
outputSchema =
getOutputSchema(abStream, config!!["schema"].asText(), namingResolver)
tableName =
@Suppress("deprecation") namingResolver!!.getRawTableName(streamName)
when (destinationColumns) {
JavaBaseConstants.DestinationColumns.V2_WITH_META,
JavaBaseConstants.DestinationColumns.V2_WITHOUT_META -> {
val streamId = parsedCatalog!!.getStream(abStream.namespace, streamName).id
outputSchema = streamId.rawNamespace!!
tableName = streamId.rawName!!
}
JavaBaseConstants.DestinationColumns.LEGACY -> {
outputSchema =
getOutputSchema(abStream, config!!["schema"].asText(), namingResolver)
tableName =
@Suppress("deprecation") namingResolver!!.getRawTableName(streamName)
}
}
val tmpTableName =
@Suppress("deprecation") namingResolver!!.getTmpTableName(streamName)
Expand Down

0 comments on commit c4ad3d9

Please sign in to comment.