Skip to content

Commit

Permalink
fix compiler warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 27, 2024
1 parent 295d35b commit 6d91dbc
Show file tree
Hide file tree
Showing 98 changed files with 7,524 additions and 3,716 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object DSLContextFactory {
driverClassName: String,
jdbcConnectionString: String?,
dialect: SQLDialect?,
connectionProperties: Map<String?, String?>?,
connectionProperties: Map<String, String>?,
connectionTimeout: Duration?
): DSLContext {
return DSL.using(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object DataSourceFactory {
password: String?,
driverClassName: String,
jdbcConnectionString: String?,
connectionProperties: Map<String?, String?>?,
connectionProperties: Map<String, String>?,
connectionTimeout: Duration?
): DataSource {
return DataSourceBuilder(username, password, driverClassName, jdbcConnectionString)
Expand Down Expand Up @@ -100,7 +100,7 @@ object DataSourceFactory {
port: Int,
database: String?,
driverClassName: String,
connectionProperties: Map<String?, String?>?
connectionProperties: Map<String, String>?
): DataSource {
return DataSourceBuilder(username, password, driverClassName, host, port, database)
.withConnectionProperties(connectionProperties)
Expand Down Expand Up @@ -152,7 +152,7 @@ object DataSourceFactory {
private var password: String?,
private var driverClassName: String
) {
private var connectionProperties: Map<String?, String?> = java.util.Map.of()
private var connectionProperties: Map<String, String> = java.util.Map.of()
private var database: String? = null
private var host: String? = null
private var jdbcUrl: String? = null
Expand Down Expand Up @@ -185,7 +185,7 @@ object DataSourceFactory {
}

fun withConnectionProperties(
connectionProperties: Map<String?, String?>?
connectionProperties: Map<String, String>?
): DataSourceBuilder {
if (connectionProperties != null) {
this.connectionProperties = connectionProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory

/** Implementation of source operations with standard JDBC types. */
class JdbcSourceOperations :
AbstractJdbcCompatibleSourceOperations<JDBCType?>(), SourceOperations<ResultSet, JDBCType?> {
AbstractJdbcCompatibleSourceOperations<JDBCType>(), SourceOperations<ResultSet, JDBCType> {
protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType {
return try {
JDBCType.valueOf(columnTypeInt)
Expand Down Expand Up @@ -65,7 +65,7 @@ class JdbcSourceOperations :
preparedStatement: PreparedStatement,
parameterIndex: Int,
cursorFieldType: JDBCType?,
value: String
value: String?
) {
when (cursorFieldType) {
JDBCType.TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value)
Expand All @@ -80,12 +80,12 @@ class JdbcSourceOperations :
JDBCType.TINYINT,
JDBCType.SMALLINT -> setShortInt(preparedStatement, parameterIndex, value!!)
JDBCType.INTEGER -> setInteger(preparedStatement, parameterIndex, value!!)
JDBCType.BIGINT -> setBigInteger(preparedStatement, parameterIndex, value)
JDBCType.BIGINT -> setBigInteger(preparedStatement, parameterIndex, value!!)
JDBCType.FLOAT,
JDBCType.DOUBLE -> setDouble(preparedStatement, parameterIndex, value!!)
JDBCType.REAL -> setReal(preparedStatement, parameterIndex, value!!)
JDBCType.NUMERIC,
JDBCType.DECIMAL -> setDecimal(preparedStatement, parameterIndex, value)
JDBCType.DECIMAL -> setDecimal(preparedStatement, parameterIndex, value!!)
JDBCType.CHAR,
JDBCType.NCHAR,
JDBCType.NVARCHAR,
Expand Down Expand Up @@ -147,7 +147,7 @@ class JdbcSourceOperations :
return JdbcUtils.ALLOWED_CURSOR_TYPES.contains(type)
}

override fun getAirbyteType(jdbcType: JDBCType?): JsonSchemaType {
override fun getAirbyteType(jdbcType: JDBCType): JsonSchemaType {
return when (jdbcType) {
JDBCType.BIT,
JDBCType.BOOLEAN -> JsonSchemaType.BOOLEAN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.*

abstract class JdbcConnector
protected constructor(@JvmField protected val driverClassName: String) : BaseConnector() {
protected fun getConnectionTimeout(connectionProperties: Map<String?, String?>): Duration {
protected fun getConnectionTimeout(connectionProperties: Map<String, String>): Duration {
return getConnectionTimeout(connectionProperties, driverClassName)
}

Expand All @@ -37,7 +37,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon
* @return DataSourceBuilder class used to create dynamic fields for DataSource
*/
fun getConnectionTimeout(
connectionProperties: Map<String?, String?>,
connectionProperties: Map<String, String>,
driverClassName: String?
): Duration {
val parsedConnectionTimeout =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ interface Source : Integration {
@Throws(Exception::class)
fun read(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): AutoCloseableIterator<AirbyteMessage>

Expand All @@ -65,7 +65,7 @@ interface Source : Integration {
@Throws(Exception::class)
fun readStreams(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
return List.of(read(config, catalog, state))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
@Throws(Exception::class)
override fun read(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): AutoCloseableIterator<AirbyteMessage> {
return source.read(config, catalog, state)
Expand All @@ -44,7 +44,7 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
@Throws(Exception::class)
override fun readStreams(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
return source.readStreams(config, catalog, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SshWrappedSource : Source {
@Throws(Exception::class)
override fun read(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): AutoCloseableIterator<AirbyteMessage> {
val tunnel: SshTunnel = SshTunnel.Companion.getInstance(config, hostKey, portKey)
Expand All @@ -97,7 +97,7 @@ class SshWrappedSource : Source {
@Throws(Exception::class)
override fun readStreams(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
val tunnel: SshTunnel = SshTunnel.Companion.getInstance(config, hostKey, portKey)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.10
version=0.28.11
6 changes: 6 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ java {
}
}

compileKotlin.compilerOptions.allWarningsAsErrors = false
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
compileTestKotlin.compilerOptions.allWarningsAsErrors = false


// Convert yaml to java: relationaldb.models
jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
Expand Down Expand Up @@ -53,4 +58,5 @@ dependencies {
testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:datastore-postgres'))

testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,43 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.SyncMode
import io.debezium.engine.ChangeEvent
import io.debezium.engine.DebeziumEngine
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import org.slf4j.Logger
import org.slf4j.LoggerFactory

/**
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
* to use debezium for CDC, it should use this class
*/
class AirbyteDebeziumHandler<T>(private val config: JsonNode,
private val targetPosition: CdcTargetPosition<T>,
private val trackSchemaHistory: Boolean,
private val firstRecordWaitTime: Duration,
private val subsequentRecordWaitTime: Duration,
private val queueSize: Int,
private val addDbNameToOffsetState: Boolean) {
internal inner class CapacityReportingBlockingQueue<E>(capacity: Int) : LinkedBlockingQueue<E>(capacity) {
class AirbyteDebeziumHandler<T>(
private val config: JsonNode,
private val targetPosition: CdcTargetPosition<T>,
private val trackSchemaHistory: Boolean,
private val firstRecordWaitTime: Duration,
private val subsequentRecordWaitTime: Duration,
private val queueSize: Int,
private val addDbNameToOffsetState: Boolean
) {
internal inner class CapacityReportingBlockingQueue<E>(capacity: Int) :
LinkedBlockingQueue<E>(capacity) {
private var lastReport: Instant? = null

private fun reportQueueUtilization() {
if (lastReport == null || Duration.between(lastReport, Instant.now()).compareTo(Companion.REPORT_DURATION) > 0) {
LOGGER.info("CDC events queue size: {}. remaining {}", this.size, this.remainingCapacity())
synchronized(this) {
lastReport = Instant.now()
}
if (
lastReport == null ||
Duration.between(lastReport, Instant.now())
.compareTo(Companion.REPORT_DURATION) > 0
) {
LOGGER.info(
"CDC events queue size: {}. remaining {}",
this.size,
this.remainingCapacity()
)
synchronized(this) { lastReport = Instant.now() }
}
}

Expand All @@ -55,44 +64,62 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
reportQueueUtilization()
return super.poll()
}

companion object {
private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS)
}
}

fun getIncrementalIterators(debeziumPropertiesManager: DebeziumPropertiesManager,
eventConverter: DebeziumEventConverter,
cdcSavedInfoFetcher: CdcSavedInfoFetcher,
cdcStateHandler: CdcStateHandler): AutoCloseableIterator<AirbyteMessage> {
fun getIncrementalIterators(
debeziumPropertiesManager: DebeziumPropertiesManager,
eventConverter: DebeziumEventConverter,
cdcSavedInfoFetcher: CdcSavedInfoFetcher,
cdcStateHandler: CdcStateHandler
): AutoCloseableIterator<AirbyteMessage> {
LOGGER.info("Using CDC: {}", true)
LOGGER.info("Using DBZ version: {}", DebeziumEngine::class.java.getPackage().implementationVersion)
val offsetManager: AirbyteFileOffsetBackingStore = AirbyteFileOffsetBackingStore.Companion.initializeState(
LOGGER.info(
"Using DBZ version: {}",
DebeziumEngine::class.java.getPackage().implementationVersion
)
val offsetManager: AirbyteFileOffsetBackingStore =
AirbyteFileOffsetBackingStore.Companion.initializeState(
cdcSavedInfoFetcher.savedOffset,
if (addDbNameToOffsetState) Optional.ofNullable<String>(config[JdbcUtils.DATABASE_KEY].asText()) else Optional.empty<String>())
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage?> = if (trackSchemaHistory
) Optional.of<AirbyteSchemaHistoryStorage?>(AirbyteSchemaHistoryStorage.Companion.initializeDBHistory(
cdcSavedInfoFetcher.savedSchemaHistory, cdcStateHandler.compressSchemaHistoryForState()))
else Optional.empty<AirbyteSchemaHistoryStorage>()
if (addDbNameToOffsetState)
Optional.ofNullable<String>(config[JdbcUtils.DATABASE_KEY].asText())
else Optional.empty<String>()
)
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage> =
if (trackSchemaHistory)
Optional.of<AirbyteSchemaHistoryStorage?>(
AirbyteSchemaHistoryStorage.Companion.initializeDBHistory(
cdcSavedInfoFetcher.savedSchemaHistory,
cdcStateHandler.compressSchemaHistoryForState()
)
)
else Optional.empty<AirbyteSchemaHistoryStorage>()
val publisher = DebeziumRecordPublisher(debeziumPropertiesManager)
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> = CapacityReportingBlockingQueue<ChangeEvent<String, String>>(queueSize)
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> =
CapacityReportingBlockingQueue(queueSize)
publisher.start(queue, offsetManager, schemaHistoryManager)
// handle state machine around pub/sub logic.
val eventIterator: AutoCloseableIterator<ChangeEventWithMetadata> = DebeziumRecordIterator(
val eventIterator: AutoCloseableIterator<ChangeEventWithMetadata> =
DebeziumRecordIterator(
queue,
targetPosition,
{ publisher.hasClosed() },
DebeziumShutdownProcedure(queue, { publisher.close() }, { publisher.hasClosed() }),
firstRecordWaitTime,
subsequentRecordWaitTime)
subsequentRecordWaitTime
)

val syncCheckpointDuration = if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY)
) Duration.ofSeconds(config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong())
else DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION
val syncCheckpointRecords = if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY)
) config[DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY].asLong()
else DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS.toLong()
return AutoCloseableIterators.fromIterator(DebeziumStateDecoratingIterator(
val syncCheckpointDuration =
if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY))
Duration.ofSeconds(
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong()
)
else DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION
val syncCheckpointRecords =
if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY))
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS_PROPERTY].asLong()
else DebeziumIteratorConstants.SYNC_CHECKPOINT_RECORDS.toLong()
return AutoCloseableIterators.fromIterator(
DebeziumStateDecoratingIterator(
eventIterator,
cdcStateHandler,
targetPosition,
Expand All @@ -101,11 +128,14 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
trackSchemaHistory,
schemaHistoryManager.orElse(null),
syncCheckpointDuration,
syncCheckpointRecords))
syncCheckpointRecords
)
)
}

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(AirbyteDebeziumHandler::class.java)
private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS)

/**
* We use 10000 as capacity cause the default queue size and batch size of debezium is :
Expand All @@ -115,8 +145,10 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
const val QUEUE_CAPACITY: Int = 10000

fun isAnyStreamIncrementalSyncMode(catalog: ConfiguredAirbyteCatalog): Boolean {
return catalog.streams.stream().map { obj: ConfiguredAirbyteStream -> obj.syncMode }
.anyMatch { syncMode: SyncMode -> syncMode == SyncMode.INCREMENTAL }
return catalog.streams
.stream()
.map { obj: ConfiguredAirbyteStream -> obj.syncMode }
.anyMatch { syncMode: SyncMode -> syncMode == SyncMode.INCREMENTAL }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ interface CdcMetadataInjector<T> {
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
*
* @param event is the actual record which contains data and would be written to the destination
* @param source contains the metadata about the record and we need to extract that metadata and add
* it to the event before writing it to destination
* @param source contains the metadata about the record and we need to extract that metadata and
* add it to the event before writing it to destination
*/
fun addMetaData(event: ObjectNode?, source: JsonNode?)

fun addMetaDataToRowsFetchedOutsideDebezium(record: ObjectNode?, transactionTimestamp: String?, metadataToAdd: T) {
fun addMetaDataToRowsFetchedOutsideDebezium(
record: ObjectNode?,
transactionTimestamp: String?,
metadataToAdd: T
) {
throw RuntimeException("Not Supported")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ import java.util.*
interface CdcSavedInfoFetcher {
val savedOffset: JsonNode?

val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory<Optional<JsonNode?>?>?
val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory<Optional<JsonNode>>?
}

0 comments on commit 6d91dbc

Please sign in to comment.