Skip to content

Commit

Permalink
enable spotbugs for db-destinatino CDK submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Apr 3, 2024
1 parent 5a11e42 commit 23df75f
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 54 deletions.
3 changes: 0 additions & 3 deletions airbyte-cdk/java/airbyte-cdk/db-destinations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ compileKotlin.compilerOptions.allWarningsAsErrors = false
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
compileTestKotlin.compilerOptions.allWarningsAsErrors = false

spotbugsTest.enabled = false
spotbugsTestFixtures.enabled = false

dependencies {
api 'org.apache.commons:commons-csv:1.10.0'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory

abstract class DestinationAcceptanceTest {
protected lateinit var TEST_SCHEMAS: HashSet<String>
protected var TEST_SCHEMAS: HashSet<String> = HashSet()

private lateinit var testEnv: TestDestinationEnv

Expand Down Expand Up @@ -1606,13 +1606,11 @@ abstract class DestinationAcceptanceTest {
val actualStateMessage =
destinationOutput
.stream()
.filter { m: io.airbyte.protocol.models.v0.AirbyteMessage? ->
m!!.type == io.airbyte.protocol.models.v0.AirbyteMessage.Type.STATE
}
.filter { it.type == Type.STATE }
.findFirst()
.map { msg: io.airbyte.protocol.models.v0.AirbyteMessage? ->
.map { msg: AirbyteMessage ->
// Modify state message to remove destination stats.
val clone = msg!!.state
val clone = msg.state
clone.destinationStats = null
msg.state = clone
msg
Expand All @@ -1628,10 +1626,10 @@ abstract class DestinationAcceptanceTest {
@Throws(Exception::class)
private fun runSync(
config: JsonNode,
messages: List<io.airbyte.protocol.models.v0.AirbyteMessage>,
catalog: io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog,
messages: List<AirbyteMessage>,
catalog: ConfiguredAirbyteCatalog,
runNormalization: Boolean
): List<io.airbyte.protocol.models.v0.AirbyteMessage?> {
): List<AirbyteMessage> {
val destinationConfig =
WorkerDestinationConfig()
.withConnectionId(UUID.randomUUID())
Expand Down Expand Up @@ -1664,11 +1662,10 @@ abstract class DestinationAcceptanceTest {
)
destination.notifyEndOfInput()

val destinationOutput: MutableList<io.airbyte.protocol.models.v0.AirbyteMessage?> =
ArrayList()
val destinationOutput: MutableList<AirbyteMessage> = ArrayList()
while (!destination.isFinished()) {
destination.attemptRead().ifPresent { m: io.airbyte.protocol.models.AirbyteMessage ->
destinationOutput.add(convertProtocolObject(m, AirbyteMessage::class.java))
destination.attemptRead().ifPresent {
destinationOutput.add(convertProtocolObject(it, AirbyteMessage::class.java))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ object TestingNamespaces {
generateSuffix()
}

fun generateFromOriginal(toOverwrite: String?, oldPrefix: String?, newPrefix: String?): String {
return toOverwrite!!.replace(oldPrefix!!, newPrefix!!)
fun generateFromOriginal(toOverwrite: String, oldPrefix: String, newPrefix: String): String {
return toOverwrite.replace(oldPrefix, newPrefix)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
private fun insertRecords(
tableName: Name,
columnNames: List<String>,
records: List<JsonNode>?,
records: List<JsonNode>,
vararg columnsToParseJson: String
) {
var insert =
Expand All @@ -69,7 +69,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
.map { columnName: String? -> DSL.field(DSL.quotedName(columnName)) }
.toList()
)
for (record in records!!) {
for (record in records) {
insert =
insert.values(
columnNames
Expand Down Expand Up @@ -103,10 +103,10 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
}

@Throws(Exception::class)
override fun createRawTable(streamId: StreamId?) {
override fun createRawTable(streamId: StreamId) {
database.execute(
dslContext
.createTable(DSL.name(streamId!!.rawNamespace, streamId.rawName))
.createTable(DSL.name(streamId.rawNamespace, streamId.rawName))
.column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false))
.column(COLUMN_NAME_AB_EXTRACTED_AT, timestampWithTimeZoneType.nullable(false))
.column(COLUMN_NAME_AB_LOADED_AT, timestampWithTimeZoneType)
Expand All @@ -117,10 +117,10 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
}

@Throws(Exception::class)
override fun createV1RawTable(v1RawTable: StreamId?) {
override fun createV1RawTable(v1RawTable: StreamId) {
database.execute(
dslContext
.createTable(DSL.name(v1RawTable!!.rawNamespace, v1RawTable.rawName))
.createTable(DSL.name(v1RawTable.rawNamespace, v1RawTable.rawName))
.column(COLUMN_NAME_AB_ID, SQLDataType.VARCHAR(36).nullable(false))
.column(COLUMN_NAME_EMITTED_AT, timestampWithTimeZoneType.nullable(false))
.column(COLUMN_NAME_DATA, structType.nullable(false))
Expand All @@ -129,9 +129,9 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
}

@Throws(Exception::class)
public override fun insertRawTableRecords(streamId: StreamId?, records: List<JsonNode>?) {
public override fun insertRawTableRecords(streamId: StreamId, records: List<JsonNode>) {
insertRecords(
DSL.name(streamId!!.rawNamespace, streamId.rawName),
DSL.name(streamId.rawNamespace, streamId.rawName),
JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES,
records,
COLUMN_NAME_DATA,
Expand All @@ -140,9 +140,9 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
}

@Throws(Exception::class)
override fun insertV1RawTableRecords(streamId: StreamId?, records: List<JsonNode>?) {
override fun insertV1RawTableRecords(streamId: StreamId, records: List<JsonNode>) {
insertRecords(
DSL.name(streamId!!.rawNamespace, streamId.rawName),
DSL.name(streamId.rawNamespace, streamId.rawName),
LEGACY_RAW_TABLE_COLUMNS,
records,
COLUMN_NAME_DATA
Expand All @@ -152,14 +152,14 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
@Throws(Exception::class)
override fun insertFinalTableRecords(
includeCdcDeletedAt: Boolean,
streamId: StreamId?,
streamId: StreamId,
suffix: String?,
records: List<JsonNode>?
records: List<JsonNode>
) {
val columnNames =
if (includeCdcDeletedAt) FINAL_TABLE_COLUMN_NAMES_CDC else FINAL_TABLE_COLUMN_NAMES
insertRecords(
DSL.name(streamId!!.finalNamespace, streamId.finalName + suffix),
DSL.name(streamId.finalNamespace, streamId.finalName + suffix),
columnNames,
records,
COLUMN_NAME_AB_META,
Expand All @@ -170,7 +170,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
}

@Throws(Exception::class)
override fun dumpRawTableRecords(streamId: StreamId?): List<JsonNode> {
override fun dumpRawTableRecords(streamId: StreamId): List<JsonNode> {
return database.queryJsons(
dslContext
.selectFrom(DSL.name(streamId!!.rawNamespace, streamId.rawName))
Expand All @@ -179,7 +179,7 @@ abstract class JdbcSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
}

@Throws(Exception::class)
override fun dumpFinalTableRecords(streamId: StreamId?, suffix: String?): List<JsonNode> {
override fun dumpFinalTableRecords(streamId: StreamId, suffix: String?): List<JsonNode> {
return database.queryJsons(
dslContext
.selectFrom(DSL.name(streamId!!.finalNamespace, streamId.finalName + suffix))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,13 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
}

@Throws(Exception::class)
override fun dumpRawTableRecords(
streamNamespace: String?,
streamName: String?
): List<JsonNode> {
override fun dumpRawTableRecords(streamNamespace: String?, streamName: String): List<JsonNode> {
var streamNamespace = streamNamespace
if (streamNamespace == null) {
streamNamespace = getDefaultSchema(config!!)
}
val tableName =
concatenateRawTableName(
streamNamespace,
Names.toAlphanumericAndUnderscore(streamName!!)
)
concatenateRawTableName(streamNamespace, Names.toAlphanumericAndUnderscore(streamName))
val schema = rawSchema
return database!!.queryJsons(DSL.selectFrom(DSL.name(schema, tableName)).sql)
}
Expand All @@ -109,14 +103,14 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() {
}

@Throws(Exception::class)
override fun teardownStreamAndNamespace(streamNamespace: String?, streamName: String?) {
override fun teardownStreamAndNamespace(streamNamespace: String?, streamName: String) {
var streamNamespace = streamNamespace
if (streamNamespace == null) {
streamNamespace = getDefaultSchema(config!!)
}
database!!.execute(
DSL.dropTableIfExists(
DSL.name(rawSchema, concatenateRawTableName(streamNamespace, streamName!!))
DSL.name(rawSchema, concatenateRawTableName(streamNamespace, streamName))
)
.sql
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,23 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
@Throws(Exception::class) protected abstract fun createNamespace(namespace: String?)

/** Create a raw table using the StreamId's rawTableId. */
@Throws(Exception::class) protected abstract fun createRawTable(streamId: StreamId?)
@Throws(Exception::class) protected abstract fun createRawTable(streamId: StreamId)

/** Creates a raw table in the v1 format */
@Throws(Exception::class) protected abstract fun createV1RawTable(v1RawTable: StreamId?)
@Throws(Exception::class) protected abstract fun createV1RawTable(v1RawTable: StreamId)

@Throws(Exception::class)
protected abstract fun insertRawTableRecords(streamId: StreamId?, records: List<JsonNode>?)
protected abstract fun insertRawTableRecords(streamId: StreamId, records: List<JsonNode>)

@Throws(Exception::class)
protected abstract fun insertV1RawTableRecords(streamId: StreamId?, records: List<JsonNode>?)
protected abstract fun insertV1RawTableRecords(streamId: StreamId, records: List<JsonNode>)

@Throws(Exception::class)
protected abstract fun insertFinalTableRecords(
includeCdcDeletedAt: Boolean,
streamId: StreamId?,
streamId: StreamId,
suffix: String?,
records: List<JsonNode>?
records: List<JsonNode>
)

/**
Expand All @@ -125,11 +125,11 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
* destination as a string.
*/
@Throws(Exception::class)
protected abstract fun dumpRawTableRecords(streamId: StreamId?): List<JsonNode>
protected abstract fun dumpRawTableRecords(streamId: StreamId): List<JsonNode>

@Throws(Exception::class)
protected abstract fun dumpFinalTableRecords(
streamId: StreamId?,
streamId: StreamId,
suffix: String?
): List<JsonNode>

Expand Down Expand Up @@ -1574,7 +1574,7 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
}

@Throws(Exception::class)
protected fun dumpV1RawTableRecords(streamId: StreamId?): List<JsonNode> {
protected fun dumpV1RawTableRecords(streamId: StreamId): List<JsonNode> {
return dumpRawTableRecords(streamId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ abstract class BaseTypingDedupingTest {
@Throws(Exception::class)
protected abstract fun dumpRawTableRecords(
streamNamespace: String?,
streamName: String?
streamName: String
): List<JsonNode>

/**
Expand Down Expand Up @@ -136,7 +136,7 @@ abstract class BaseTypingDedupingTest {
* airbyte.<streamNamespace>_<streamName>; DROP SCHEMA IF EXISTS <streamNamespace>`.
*/
@Throws(Exception::class)
protected abstract fun teardownStreamAndNamespace(streamNamespace: String?, streamName: String?)
protected abstract fun teardownStreamAndNamespace(streamNamespace: String?, streamName: String)

protected abstract val sqlGenerator: SqlGenerator
get
Expand Down

0 comments on commit 23df75f

Please sign in to comment.