Skip to content

Commit

Permalink
fix kotlin warnings in db-sources CDK submodule (#37482)
Browse files Browse the repository at this point in the history
fixing kotlin warnings in CDK db-sources submodule
  • Loading branch information
stephane-airbyte committed Apr 23, 2024
1 parent 5d5b1e3 commit 9413578
Show file tree
Hide file tree
Showing 41 changed files with 142 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ internal constructor(
val config = parseConfig(parsed!!.getConfigPath())
validateConfig(integration.spec()!!.connectionSpecification, config, "READ")
val catalog =
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!!
val stateOptional =
parsed.getStatePath().map { path: Path? -> parseConfig(path) }
try {
Expand All @@ -201,7 +201,7 @@ internal constructor(
(integration as Destination).isV2Destination
)
val catalog =
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)
parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog::class.java)!!

try {
destination!!
Expand Down Expand Up @@ -517,7 +517,7 @@ internal constructor(
return Jsons.deserialize(IOs.readFile(path))
}

private fun <T> parseConfig(path: Path?, klass: Class<T>): T {
private fun <T> parseConfig(path: Path?, klass: Class<T>): T? {
val jsonNode = parseConfig(path)
return Jsons.`object`(jsonNode, klass)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2354,7 +2354,7 @@ abstract class DestinationAcceptanceTest {
}

private fun <V0, V1> convertProtocolObject(v1: V1, klass: Class<V0>): V0 {
return Jsons.`object`(Jsons.jsonNode(v1), klass)
return Jsons.`object`(Jsons.jsonNode(v1), klass)!!
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ class LocalAirbyteDestination(private val dest: Destination) : AirbyteDestinatio
Jsons.`object`(
Jsons.jsonNode(destinationConfig.catalog),
ConfiguredAirbyteCatalog::class.java
)
)!!
) { Destination::defaultOutputRecordCollector }
consumer!!.start()
}

@Throws(Exception::class)
override fun accept(message: io.airbyte.protocol.models.AirbyteMessage) {
consumer!!.accept(Jsons.`object`(Jsons.jsonNode(message), AirbyteMessage::class.java))
consumer!!.accept(Jsons.`object`(Jsons.jsonNode(message), AirbyteMessage::class.java)!!)
}

override fun notifyEndOfInput() {
Expand Down
8 changes: 0 additions & 8 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@ plugins {
id "com.github.eirnym.js2p" version "1.0"
}

java {
// TODO: rewrite code to avoid javac wornings in the first place
compileJava {
options.compilerArgs += "-Xlint:-try,-rawtypes,-unchecked,-removal,-this-escape"
}
}

compileKotlin.compilerOptions.allWarningsAsErrors = false
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
compileTestKotlin.compilerOptions.allWarningsAsErrors = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class AirbyteDebeziumHandler<T>(
private val targetPosition: CdcTargetPosition<T>,
private val trackSchemaHistory: Boolean,
private val firstRecordWaitTime: Duration,
private val subsequentRecordWaitTime: Duration,
private val queueSize: Int,
private val addDbNameToOffsetState: Boolean
) {
Expand Down Expand Up @@ -106,8 +105,7 @@ class AirbyteDebeziumHandler<T>(
targetPosition,
{ publisher.hasClosed() },
DebeziumShutdownProcedure(queue, { publisher.close() }, { publisher.hasClosed() }),
firstRecordWaitTime,
subsequentRecordWaitTime
firstRecordWaitTime
)

val syncCheckpointDuration =
Expand All @@ -134,13 +132,13 @@ class AirbyteDebeziumHandler<T>(
// not used
// at all thus we will pass in null.
val iterator: SourceStateIterator<ChangeEventWithMetadata> =
SourceStateIterator<ChangeEventWithMetadata>(
SourceStateIterator(
eventIterator,
null,
messageProducer!!,
messageProducer,
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)
)
return AutoCloseableIterators.fromIterator<AirbyteMessage>(iterator)
return AutoCloseableIterators.fromIterator(iterator)
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class AirbyteFileOffsetBackingStore(
}

fun persist(cdcState: JsonNode?) {
@Suppress("unchecked_cast")
val mapAsString: Map<String, String> =
if (cdcState != null)
Jsons.`object`(cdcState, MutableMap::class.java) as Map<String, String>
Expand Down Expand Up @@ -130,7 +131,7 @@ class AirbyteFileOffsetBackingStore(

if (obj !is HashMap<*, *>)
throw ConnectException("Expected HashMap but found " + obj.javaClass)
val raw = obj as Map<ByteArray?, ByteArray?>
@Suppress("unchecked_cast") val raw = obj as Map<ByteArray?, ByteArray?>
val data: MutableMap<ByteBuffer?, ByteBuffer?> = HashMap()
for ((key1, value1) in raw) {
val key = if ((key1 != null)) ByteBuffer.wrap(key1) else null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ class AirbyteSchemaHistoryStorage(
}

private fun persist(schemaHistory: SchemaHistory<Optional<JsonNode>>?) {
if (schemaHistory!!.schema!!.isEmpty) {
if (schemaHistory!!.schema.isEmpty) {
return
}
val fileAsString = Jsons.`object`(schemaHistory.schema!!.get(), String::class.java)
val fileAsString = Jsons.`object`(schemaHistory.schema.get(), String::class.java)

if (fileAsString == null || fileAsString.isEmpty()) {
if (fileAsString.isNullOrEmpty()) {
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ class DebeziumMessageProducer<T>(
}
}

if (checkpointOffsetToSend.size == 1 && !message!!.isSnapshotEvent) {
if (checkpointOffsetToSend.size == 1 && !message.isSnapshotEvent) {
if (targetPosition.isEventAheadOffset(checkpointOffsetToSend, message)) {
shouldEmitStateMessage = true
}
}

return eventConverter.toAirbyteMessage(message!!)
return eventConverter.toAirbyteMessage(message)
}

override fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class DebeziumRecordIterator<T>(
private val publisherStatusSupplier: Supplier<Boolean>,
private val debeziumShutdownProcedure: DebeziumShutdownProcedure<ChangeEvent<String?, String?>>,
private val firstRecordWaitTime: Duration,
subsequentRecordWaitTime: Duration?
) : AbstractIterator<ChangeEventWithMetadata>(), AutoCloseableIterator<ChangeEventWithMetadata> {
private val heartbeatEventSourceField: MutableMap<Class<out ChangeEvent<*, *>?>, Field?> =
HashMap(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DebeziumShutdownProcedure<T>(
Executors.newSingleThreadExecutor { r: Runnable? ->
val thread = Thread(r, "queue-data-transfer-thread")
thread.uncaughtExceptionHandler =
Thread.UncaughtExceptionHandler { t: Thread?, e: Throwable? -> exception = e }
Thread.UncaughtExceptionHandler { _: Thread, e: Throwable -> exception = e }
thread
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.debezium.internals

import io.debezium.config.Configuration
import io.debezium.embedded.KafkaConnectUtil
import java.lang.Boolean
import java.util.*
import kotlin.String
import org.apache.kafka.connect.json.JsonConverter
Expand Down Expand Up @@ -91,6 +90,6 @@ interface DebeziumStateUtil {

/** Configuration for offset state key/value converters. */
val INTERNAL_CONVERTER_CONFIG: Map<String, String?> =
java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, Boolean.FALSE.toString())
java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false.toString())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class RelationalDbDebeziumEventConverter(
) : DebeziumEventConverter {
override fun toAirbyteMessage(event: ChangeEventWithMetadata): AirbyteMessage {
val debeziumEvent = event.eventValueAsJson()
val before: JsonNode = debeziumEvent!!.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val before: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.BEFORE_EVENT)
val after: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.AFTER_EVENT)
val source: JsonNode = debeziumEvent.get(DebeziumEventConverter.Companion.SOURCE_EVENT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ abstract class AbstractJdbcSource<Datatype>(
}

@Throws(SQLException::class)
public override fun createDatabase(sourceConfig: JsonNode): JdbcDatabase {
return createDatabase(sourceConfig, JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER)
public override fun createDatabase(config: JsonNode): JdbcDatabase {
return createDatabase(config, JdbcDataSourceUtils.DEFAULT_JDBC_PARAMETERS_DELIMITER)
}

@Throws(SQLException::class)
Expand All @@ -634,7 +634,7 @@ abstract class AbstractJdbcSource<Datatype>(
// Create the data source
val dataSource =
create(
if (jdbcConfig!!.has(JdbcUtils.USERNAME_KEY))
if (jdbcConfig.has(JdbcUtils.USERNAME_KEY))
jdbcConfig[JdbcUtils.USERNAME_KEY].asText()
else null,
if (jdbcConfig.has(JdbcUtils.PASSWORD_KEY))
Expand All @@ -643,7 +643,7 @@ abstract class AbstractJdbcSource<Datatype>(
driverClassName,
jdbcConfig[JdbcUtils.JDBC_URL_KEY].asText(),
connectionProperties,
getConnectionTimeout(connectionProperties!!)
getConnectionTimeout(connectionProperties)
)
// Record the data source so that it can be closed.
dataSources.add(dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ class JdbcPrivilegeDto(
}
}

override fun equals(o: Any?): Boolean {
if (this === o) {
override fun equals(other: Any?): Boolean {
if (this === other) {
return true
}
if (o == null || javaClass != o.javaClass) {
if (other == null || javaClass != other.javaClass) {
return false
}
val that = o as JdbcPrivilegeDto
val that = other as JdbcPrivilegeDto
return (Objects.equal(grantee, that.grantee) &&
Objects.equal(tableName, that.tableName) &&
Objects.equal(schemaName, that.schemaName) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ protected constructor(driverClassName: String) :
continue
}
val cursorType =
table.fields!!
table.fields
.stream()
.filter { info: CommonField<DataType> -> info.name == cursorField.get() }
.map { obj: CommonField<DataType> -> obj.type }
Expand Down Expand Up @@ -300,7 +300,7 @@ protected constructor(driverClassName: String) :
val systemNameSpaces = excludedInternalNameSpaces
val systemViews = excludedViews
val discoveredTables = discoverInternal(database)
return (if (systemNameSpaces == null || systemNameSpaces.isEmpty()) discoveredTables
return (if (systemNameSpaces.isEmpty()) discoveredTables
else
discoveredTables
.stream()
Expand Down Expand Up @@ -425,7 +425,7 @@ protected constructor(driverClassName: String) :
val cursorInfo = stateManager!!.getCursorInfo(pair)

val airbyteMessageIterator: AutoCloseableIterator<AirbyteMessage>
if (cursorInfo!!.map { it.cursor }.isPresent) {
if (cursorInfo.map { it.cursor }.isPresent) {
airbyteMessageIterator =
getIncrementalStream(
database,
Expand All @@ -452,7 +452,7 @@ protected constructor(driverClassName: String) :
)
}

val cursorType = getCursorType(airbyteStream, cursorField)
getCursorType(airbyteStream, cursorField)

val messageProducer =
CursorStateMessageProducer(stateManager, cursorInfo.map { it.cursor })
Expand Down Expand Up @@ -662,13 +662,6 @@ protected constructor(driverClassName: String) :
protected abstract fun getAirbyteType(columnType: DataType): JsonSchemaType

protected abstract val excludedInternalNameSpaces: Set<String>
/**
* Get list of system namespaces(schemas) in order to exclude them from the `discover`
* result list.
*
* @return set of system namespaces(schemas) to be excluded
*/
get

protected open val excludedViews: Set<String>
/**
Expand Down Expand Up @@ -722,12 +715,6 @@ protected constructor(driverClassName: String) :
): Map<String, MutableList<String>>

protected abstract val quoteString: String?
/**
* Returns quote symbol of the database
*
* @return quote symbol
*/
get

/**
* Read all data from a table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ class CursorInfo(
return this
}

override fun equals(o: Any?): Boolean {
if (this === o) {
override fun equals(other: Any?): Boolean {
if (this === other) {
return true
}
if (o == null || javaClass != o.javaClass) {
if (other == null || javaClass != other.javaClass) {
return false
}
val that = o as CursorInfo
val that = other as CursorInfo
return originalCursorField == that.originalCursorField &&
originalCursor == that.originalCursor &&
originalCursorRecordCount == that.originalCursorRecordCount &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ object DbSourceDiscoverUtil {
tableInfo.fields
)
.withSupportedSyncModes(
if (tableInfo.cursorFields != null && tableInfo.cursorFields.isEmpty())
if (tableInfo.cursorFields.isEmpty())
Lists.newArrayList(SyncMode.FULL_REFRESH)
else Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ object RelationalDbQueryUtils {
fun prettyPrintConfiguredAirbyteStreamList(streamList: List<ConfiguredAirbyteStream>): String {
return streamList
.stream()
.map { s: ConfiguredAirbyteStream ->
"%s.%s".formatted(s.stream.namespace, s.stream.name)
}
.map { s: ConfiguredAirbyteStream -> "${s.stream.namespace}.${s.stream.name}" }
.collect(Collectors.joining(", "))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class StateDecoratingIterator(
}

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(StateDecoratingIterator::class.java)
private val LOGGER: Logger =
LoggerFactory.getLogger(@Suppress("deprecation") StateDecoratingIterator::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class GlobalStateManager(
if (airbyteStateMessage!!.type == AirbyteStateMessage.AirbyteStateType.GLOBAL) {
return Jsons.`object`(airbyteStateMessage.global.sharedState, CdcState::class.java)
} else {
val legacyState = Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
val legacyState: DbState? =
Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
return legacyState?.cdcState
}
}
Expand All @@ -114,7 +115,8 @@ class GlobalStateManager(
}
.collect(Collectors.toSet())
} else {
val legacyState = Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
val legacyState: DbState? =
Jsons.`object`(airbyteStateMessage.data, DbState::class.java)
return if (legacyState != null)
extractNamespacePairsFromDbStreamState(legacyState.streams)
else emptySet<AirbyteStreamNameNamespacePair>()
Expand Down Expand Up @@ -157,7 +159,7 @@ class GlobalStateManager(
return@Supplier Jsons.`object`<DbState>(
airbyteStateMessage.data,
DbState::class.java
)
)!!
.streams
.stream()
.map<AirbyteStreamState?> { s: DbStreamState ->
Expand Down
Loading

0 comments on commit 9413578

Please sign in to comment.