Skip to content

Commit

Permalink
enable spotbugs for db-sources submodule (#36705)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Apr 3, 2024
1 parent 4d77401 commit fad0737
Show file tree
Hide file tree
Showing 14 changed files with 41 additions and 45 deletions.
3 changes: 0 additions & 3 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Expand Up @@ -15,9 +15,6 @@ compileKotlin.compilerOptions.allWarningsAsErrors = false
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
compileTestKotlin.compilerOptions.allWarningsAsErrors = false

spotbugsTestFixtures.enabled = false
spotbugsTest.enabled = false


// Convert yaml to java: relationaldb.models
jsonSchema2Pojo {
Expand Down
Expand Up @@ -47,7 +47,7 @@ interface CdcTargetPosition<T> {
* @param sourceOffset source offset params from heartbeat change event
* @return the heartbeat position in a heartbeat change event or null
*/
fun extractPositionFromHeartbeatOffset(sourceOffset: Map<String?, *>?): T
fun extractPositionFromHeartbeatOffset(sourceOffset: Map<String?, *>): T

/**
* This function checks if the event we are processing in the loop is already behind the offset
Expand Down
Expand Up @@ -19,11 +19,11 @@ import org.mockito.kotlin.mock
class DebeziumMessageProducerTest {
private var producer: DebeziumMessageProducer<*>? = null

lateinit var cdcStateHandler: CdcStateHandler
lateinit var targetPosition: CdcTargetPosition<Any>
lateinit var eventConverter: DebeziumEventConverter
lateinit var offsetManager: AirbyteFileOffsetBackingStore
lateinit var schemaHistoryManager: AirbyteSchemaHistoryStorage
var cdcStateHandler: CdcStateHandler = mock()
var targetPosition: CdcTargetPosition<Any> = mock()
var eventConverter: DebeziumEventConverter = mock()
var offsetManager: AirbyteFileOffsetBackingStore = mock()
var schemaHistoryManager: AirbyteSchemaHistoryStorage = mock()

@BeforeEach
fun setUp() {
Expand Down
Expand Up @@ -26,7 +26,7 @@ class DebeziumRecordIteratorTest {
}

override fun extractPositionFromHeartbeatOffset(
sourceOffset: Map<String?, *>?
sourceOffset: Map<String?, *>
): Long {
return sourceOffset!!["lsn"] as Long
}
Expand Down
Expand Up @@ -59,16 +59,16 @@ internal class DefaultJdbcSourceAcceptanceTest :

fun getConfigWithConnectionProperties(
psqlDb: PostgreSQLContainer<*>,
dbName: String?,
additionalParameters: String?
dbName: String,
additionalParameters: String
): JsonNode {
return Jsons.jsonNode(
ImmutableMap.builder<Any, Any?>()
.put(JdbcUtils.HOST_KEY, resolveHost(psqlDb))
.put(JdbcUtils.PORT_KEY, resolvePort(psqlDb))
.put(JdbcUtils.DATABASE_KEY, dbName)
.put(JdbcUtils.SCHEMAS_KEY, List.of(SCHEMA_NAME))
.put(JdbcUtils.USERNAME_KEY, psqlDb!!.username)
.put(JdbcUtils.USERNAME_KEY, psqlDb.username)
.put(JdbcUtils.PASSWORD_KEY, psqlDb.password)
.put(JdbcUtils.CONNECTION_PROPERTIES_KEY, additionalParameters)
.build()
Expand Down
Expand Up @@ -9,7 +9,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream

class SourceStateIteratorForTest<T>(
messageIterator: Iterator<T>,
stream: ConfiguredAirbyteStream,
stream: ConfiguredAirbyteStream?,
sourceStateMessageProducer: SourceStateMessageProducer<T>,
stateEmitFrequency: StateEmitFrequency
) :
Expand Down
Expand Up @@ -15,9 +15,9 @@ import org.mockito.kotlin.any
import org.mockito.kotlin.eq

class SourceStateIteratorTest {
lateinit var mockProducer: SourceStateMessageProducer<AirbyteMessage>
lateinit var messageIterator: Iterator<AirbyteMessage>
lateinit var stream: ConfiguredAirbyteStream
var mockProducer: SourceStateMessageProducer<AirbyteMessage> = mock()
var messageIterator: Iterator<AirbyteMessage> = mock()
var stream: ConfiguredAirbyteStream = mock()

var sourceStateIterator: SourceStateIteratorForTest<*>? = null

Expand Down
Expand Up @@ -25,7 +25,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory

abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
protected lateinit var testdb: T
protected var testdb: T = createTestDatabase()

protected fun createTableSqlFmt(): String {
return "CREATE TABLE %s.%s(%s);"
Expand Down
Expand Up @@ -39,7 +39,7 @@ import org.mockito.Mockito
"The static variables are updated in subclasses for convenience, and cannot be final."
)
abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
protected lateinit var testdb: T
protected var testdb: T = createTestDatabase()

protected fun streamName(): String {
return TABLE_NAME
Expand Down Expand Up @@ -314,8 +314,8 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
filteredCatalog.streams
.stream()
.filter { stream: AirbyteStream ->
TEST_SCHEMAS.stream().anyMatch { schemaName: String? ->
stream.namespace.startsWith(schemaName!!)
TEST_SCHEMAS.stream().anyMatch { schemaName: String ->
stream.namespace.startsWith(schemaName)
}
}
.collect(Collectors.toList())
Expand Down
Expand Up @@ -304,8 +304,8 @@ abstract class AbstractSourceConnectorTest {
protected fun runReadVerifyNumberOfReceivedMsgs(
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?,
mapOfExpectedRecordsCount: MutableMap<String?, Int>
): Map<String?, Int> {
mapOfExpectedRecordsCount: MutableMap<String, Int>
): Map<String, Int> {
val sourceConfig =
WorkerSourceConfig()
.withSourceConnectionConfiguration(config)
Expand Down
Expand Up @@ -148,7 +148,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() {
val recordMessages =
allMessages!!
.stream()
.filter { m: AirbyteMessage? -> m!!.type == AirbyteMessage.Type.RECORD }
.filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD }
.toList()
val expectedValues: MutableMap<String?, MutableList<String?>?> = HashMap()
val missedValuesByStream: MutableMap<String?, ArrayList<MissedRecords>> = HashMap()
Expand Down
Expand Up @@ -47,7 +47,7 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() {
get() = runExecutable(Command.GET_STATE)

@Throws(IOException::class)
override fun assertFullRefreshMessages(allMessages: List<AirbyteMessage?>?) {
override fun assertFullRefreshMessages(allMessages: List<AirbyteMessage>) {
val regexTests =
Streams.stream(
runExecutable(Command.GET_REGEX_TESTS).withArray<JsonNode>("tests").elements()
Expand All @@ -57,7 +57,7 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() {
val stringMessages =
allMessages!!
.stream()
.map { `object`: AirbyteMessage? -> Jsons.serialize(`object`) }
.map { `object`: AirbyteMessage -> Jsons.serialize(`object`) }
.toList()
LOGGER.info("Running " + regexTests.size + " regex tests...")
regexTests.forEach(
Expand Down Expand Up @@ -155,7 +155,7 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() {
private val LOGGER: Logger = LoggerFactory.getLogger(PythonSourceAcceptanceTest::class.java)
private const val OUTPUT_FILENAME = "output.json"

lateinit var IMAGE_NAME: String
var IMAGE_NAME: String = "dummy_image_name"
var PYTHON_CONTAINER_NAME: String? = null
}
}
Expand Up @@ -131,15 +131,15 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() {
@Test
@Throws(Exception::class)
fun testDiscover() {
val discoverOutput = runDiscover()
runDiscover()
val discoveredCatalog = lastPersistedCatalog
Assertions.assertNotNull(discoveredCatalog, "Expected discover to produce a catalog")
verifyCatalog(discoveredCatalog)
}

/** Override this method to check the actual catalog. */
@Throws(Exception::class)
protected fun verifyCatalog(catalog: AirbyteCatalog?) {
protected open fun verifyCatalog(catalog: AirbyteCatalog?) {
// do nothing by default
}

Expand Down Expand Up @@ -167,7 +167,7 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() {

/** Override this method to perform more specific assertion on the messages. */
@Throws(Exception::class)
protected open fun assertFullRefreshMessages(allMessages: List<AirbyteMessage?>?) {
protected open fun assertFullRefreshMessages(allMessages: List<AirbyteMessage>) {
// do nothing by default
}

Expand Down Expand Up @@ -248,8 +248,8 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() {
val stateMessages =
airbyteMessages
.stream()
.filter { m: AirbyteMessage? -> m!!.type == AirbyteMessage.Type.STATE }
.map { obj: AirbyteMessage? -> obj!!.state }
.filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.STATE }
.map { obj: AirbyteMessage -> obj.state }
.collect(Collectors.toList())
Assertions.assertFalse(
recordMessages.isEmpty(),
Expand Down Expand Up @@ -446,12 +446,12 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() {

@JvmStatic
protected fun filterRecords(
messages: Collection<AirbyteMessage?>?
messages: Collection<AirbyteMessage>
): List<AirbyteRecordMessage> {
return messages!!
return messages
.stream()
.filter { m: AirbyteMessage? -> m!!.type == AirbyteMessage.Type.RECORD }
.map { obj: AirbyteMessage? -> obj!!.record }
.filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD }
.map { obj: AirbyteMessage -> obj.record }
.collect(Collectors.toList())
}
}
Expand Down
Expand Up @@ -79,21 +79,20 @@ abstract class AbstractSourcePerformanceTest : AbstractSourceBasePerformanceTest
validateNumberOfReceivedMsgs(checkStatusMap)
}

protected fun validateNumberOfReceivedMsgs(checkStatusMap: Map<String?, Int?>?) {
protected fun validateNumberOfReceivedMsgs(checkStatusMap: Map<String, Int>) {
// Iterate through all streams map and check for streams where
val failedStreamsMap =
checkStatusMap!!
.entries
checkStatusMap.entries
.stream()
.filter { el: Map.Entry<String?, Int?> -> el.value != 0 }
.filter { el: Map.Entry<String, Int> -> el.value != 0 }
.collect(
Collectors.toMap(
Function { obj: Map.Entry<String?, Int?> -> obj.key },
Function { obj: Map.Entry<String?, Int?> -> obj.value }
Function { obj: Map.Entry<String, Int> -> obj.key },
Function { obj: Map.Entry<String, Int> -> obj.value }
)
)

if (!failedStreamsMap.isEmpty()) {
if (failedStreamsMap.isNotEmpty()) {
Assertions.fail<Any>("Non all messages were delivered. $failedStreamsMap")
}
c.info("Finished all checks, no issues found for {} of streams", checkStatusMap.size)
Expand All @@ -102,8 +101,8 @@ abstract class AbstractSourcePerformanceTest : AbstractSourceBasePerformanceTest
protected fun prepareMapWithExpectedRecords(
streamNumber: Int,
expectedRecordsNumberInEachStream: Int
): MutableMap<String?, Int> {
val resultMap: MutableMap<String?, Int> = HashMap() // streamName&expected records in stream
): MutableMap<String, Int> {
val resultMap: MutableMap<String, Int> = HashMap() // streamName&expected records in stream

for (currentStream in 0 until streamNumber) {
val streamName = String.format(testStreamNameTemplate, currentStream)
Expand Down

0 comments on commit fad0737

Please sign in to comment.