Skip to content

Commit

Permalink
enabling spotbugs for gcs-destinations submodule (#36703)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Apr 3, 2024
1 parent f82847c commit c38c3be
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 48 deletions.
Expand Up @@ -178,8 +178,8 @@ abstract class DestinationAcceptanceTest {
@Throws(Exception::class)
protected abstract fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode>

Expand Down Expand Up @@ -1454,7 +1454,7 @@ abstract class DestinationAcceptanceTest {
false
)
val destinationOutput =
retrieveRecords(testEnv, stream.name, getDefaultSchema(config), stream.jsonSchema)
retrieveRecords(testEnv, stream.name, getDefaultSchema(config)!!, stream.jsonSchema)
// Remove state message
secondSyncMessagesWithNewFields.removeIf {
airbyteMessage: io.airbyte.protocol.models.v0.AirbyteMessage ->
Expand Down
2 changes: 0 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle
Expand Up @@ -20,8 +20,6 @@ compileTestFixturesKotlin {
}
}

spotbugsTestFixtures.enabled = false

dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core')
Expand Down
Expand Up @@ -90,13 +90,14 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) :
else fieldDefinition["type"]
val airbyteTypeProperty = fieldDefinition["airbyte_type"]
val airbyteTypePropertyText = airbyteTypeProperty?.asText()
return Arrays.stream(JsonSchemaType.entries.toTypedArray())
return JsonSchemaType.entries
.toTypedArray()
.filter { value: JsonSchemaType ->
value.jsonSchemaType == typeProperty.asText() &&
compareAirbyteTypes(airbyteTypePropertyText, value)
}
.map(JsonSchemaType::avroType)
.collect(Collectors.toSet())
.map { it.avroType }
.toSet()
}

private fun compareAirbyteTypes(
Expand Down Expand Up @@ -126,8 +127,8 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) :

@Throws(Exception::class)
protected abstract fun retrieveDataTypesFromPersistedFiles(
streamName: String?,
namespace: String?
streamName: String,
namespace: String
): Map<String?, Set<Schema.Type?>?>

protected fun getTypes(record: GenericData.Record): Map<String, Set<Schema.Type>> {
Expand Down
Expand Up @@ -35,17 +35,17 @@ abstract class GcsBaseAvroDestinationAcceptanceTest :
@Throws(Exception::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater = getFieldNameUpdater(streamName!!, namespace, streamSchema)
val nameUpdater = getFieldNameUpdater(streamName, namespace, streamSchema)

val objectSummaries = getAllSyncedObjects(streamName, namespace)
val objectSummaries = getAllSyncedObjects(streamName, namespace!!)
val jsonRecords: MutableList<JsonNode> = LinkedList()

for (objectSummary in objectSummaries!!) {
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
for (objectSummary in objectSummaries) {
val `object` = s3Client.getObject(objectSummary.bucketName, objectSummary.key)
DataFileReader<GenericData.Record>(
SeekableByteArrayInput(`object`.objectContent.readAllBytes()),
GenericDatumReader<GenericData.Record>()
Expand All @@ -67,8 +67,8 @@ abstract class GcsBaseAvroDestinationAcceptanceTest :

@Throws(Exception::class)
override fun retrieveDataTypesFromPersistedFiles(
streamName: String?,
namespace: String?
streamName: String,
namespace: String
): Map<String?, Set<Schema.Type?>?> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
val resultDataTypes: MutableMap<String?, Set<Schema.Type?>?> = HashMap()
Expand Down
Expand Up @@ -40,8 +40,8 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes
@Throws(IOException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
Expand Down
Expand Up @@ -36,8 +36,8 @@ abstract class GcsBaseJsonlDestinationAcceptanceTest :
@Throws(IOException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
Expand Down
Expand Up @@ -37,17 +37,17 @@ abstract class GcsBaseParquetDestinationAcceptanceTest :
@Throws(IOException::class, URISyntaxException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater = getFieldNameUpdater(streamName!!, namespace, streamSchema)
val nameUpdater = getFieldNameUpdater(streamName, namespace, streamSchema)

val objectSummaries = getAllSyncedObjects(streamName, namespace)
val jsonRecords: MutableList<JsonNode> = LinkedList()

for (objectSummary in objectSummaries!!) {
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
for (objectSummary in objectSummaries) {
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)
val uri = URI(String.format("s3a://%s/%s", `object`.bucketName, `object`.key))
val path = Path(uri)
val hadoopConfig = GcsParquetWriter.getHadoopConfig(config)
Expand All @@ -73,14 +73,14 @@ abstract class GcsBaseParquetDestinationAcceptanceTest :

@Throws(Exception::class)
override fun retrieveDataTypesFromPersistedFiles(
streamName: String?,
namespace: String?
streamName: String,
namespace: String
): Map<String?, Set<Schema.Type?>?> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
val resultDataTypes: MutableMap<String?, Set<Schema.Type?>?> = HashMap()

for (objectSummary in objectSummaries!!) {
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
for (objectSummary in objectSummaries) {
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)
val uri = URI(String.format("s3a://%s/%s", `object`.bucketName, `object`.key))
val path = Path(uri)
val hadoopConfig = getHadoopConfig(config)
Expand Down
Expand Up @@ -30,6 +30,7 @@ import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -46,9 +47,10 @@ import org.slf4j.LoggerFactory
abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format) :
DestinationAcceptanceTest() {
protected var configJson: JsonNode? = null
protected lateinit var config: GcsDestinationConfig
protected lateinit var s3Client: AmazonS3
protected lateinit var nameTransformer: NamingConventionTransformer
// Not a big fan of those mocks(). Here to make spotbugs happy
protected var config: GcsDestinationConfig = mock()
protected var s3Client: AmazonS3 = mock()
protected var nameTransformer: NamingConventionTransformer = mock()
protected var s3StorageOperations: S3StorageOperations? = null

protected val baseConfigJson: JsonNode
Expand Down Expand Up @@ -96,23 +98,23 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format

/** Helper method to retrieve all synced objects inside the configured bucket path. */
protected fun getAllSyncedObjects(
streamName: String?,
namespace: String?
streamName: String,
namespace: String
): List<S3ObjectSummary> {
val namespaceStr = nameTransformer!!.getNamespace(namespace!!)
val streamNameStr = nameTransformer!!.getIdentifier(streamName!!)
val namespaceStr = nameTransformer.getNamespace(namespace)
val streamNameStr = nameTransformer.getIdentifier(streamName)
val outputPrefix =
s3StorageOperations!!.getBucketObjectPath(
namespaceStr,
streamNameStr,
DateTime.now(DateTimeZone.UTC),
config!!.pathFormat!!
config.pathFormat!!
)
// the child folder contains a non-deterministic epoch timestamp, so use the parent folder
val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1)
val objectSummaries =
s3Client
.listObjects(config!!.bucketName, parentFolder)
.listObjects(config.bucketName, parentFolder)
.objectSummaries
.stream()
.filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") }
Expand Down
Expand Up @@ -39,8 +39,8 @@ abstract class S3BaseAvroDestinationAcceptanceTest protected constructor() :
@Throws(Exception::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater =
Expand Down
Expand Up @@ -36,8 +36,8 @@ abstract class S3BaseCsvDestinationAcceptanceTest : S3DestinationAcceptanceTest(
@Throws(IOException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
Expand Down
Expand Up @@ -35,8 +35,8 @@ abstract class S3BaseJsonlDestinationAcceptanceTest protected constructor() :
@Throws(IOException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
Expand Down
Expand Up @@ -29,8 +29,8 @@ abstract class S3BaseParquetDestinationAcceptanceTest protected constructor() :
@Throws(IOException::class, URISyntaxException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater =
Expand Down

0 comments on commit c38c3be

Please sign in to comment.