Skip to content

Commit

Permalink
enabling spotbugs for gcs-destinations submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Apr 3, 2024
1 parent 23df75f commit 47c584c
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ abstract class DestinationAcceptanceTest {
@Throws(Exception::class)
protected abstract fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode>

Expand Down Expand Up @@ -1454,7 +1454,7 @@ abstract class DestinationAcceptanceTest {
false
)
val destinationOutput =
retrieveRecords(testEnv, stream.name, getDefaultSchema(config), stream.jsonSchema)
retrieveRecords(testEnv, stream.name, getDefaultSchema(config)!!, stream.jsonSchema)
// Remove state message
secondSyncMessagesWithNewFields.removeIf {
airbyteMessage: io.airbyte.protocol.models.v0.AirbyteMessage ->
Expand Down
2 changes: 0 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/gcs-destinations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ compileTestFixturesKotlin {
}
}

spotbugsTestFixtures.enabled = false

dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-cdk-core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) :
else fieldDefinition["type"]
val airbyteTypeProperty = fieldDefinition["airbyte_type"]
val airbyteTypePropertyText = airbyteTypeProperty?.asText()
return Arrays.stream(JsonSchemaType.entries.toTypedArray())
return JsonSchemaType.entries
.toTypedArray()
.filter { value: JsonSchemaType ->
value.jsonSchemaType == typeProperty.asText() &&
compareAirbyteTypes(airbyteTypePropertyText, value)
}
.map(JsonSchemaType::avroType)
.collect(Collectors.toSet())
.map { it.avroType }
.toSet()
}

private fun compareAirbyteTypes(
Expand Down Expand Up @@ -126,8 +127,8 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) :

@Throws(Exception::class)
protected abstract fun retrieveDataTypesFromPersistedFiles(
streamName: String?,
namespace: String?
streamName: String,
namespace: String
): Map<String?, Set<Schema.Type?>?>

protected fun getTypes(record: GenericData.Record): Map<String, Set<Schema.Type>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ abstract class GcsBaseAvroDestinationAcceptanceTest :
@Throws(Exception::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater = getFieldNameUpdater(streamName!!, namespace, streamSchema)
val nameUpdater = getFieldNameUpdater(streamName, namespace, streamSchema)

val objectSummaries = getAllSyncedObjects(streamName, namespace)
val objectSummaries = getAllSyncedObjects(streamName, namespace!!)
val jsonRecords: MutableList<JsonNode> = LinkedList()

for (objectSummary in objectSummaries!!) {
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
for (objectSummary in objectSummaries) {
val `object` = s3Client.getObject(objectSummary.bucketName, objectSummary.key)
DataFileReader<GenericData.Record>(
SeekableByteArrayInput(`object`.objectContent.readAllBytes()),
GenericDatumReader<GenericData.Record>()
Expand All @@ -67,8 +67,8 @@ abstract class GcsBaseAvroDestinationAcceptanceTest :

@Throws(Exception::class)
override fun retrieveDataTypesFromPersistedFiles(
streamName: String?,
namespace: String?
streamName: String,
namespace: String
): Map<String?, Set<Schema.Type?>?> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
val resultDataTypes: MutableMap<String?, Set<Schema.Type?>?> = HashMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTes
@Throws(IOException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ abstract class GcsBaseJsonlDestinationAcceptanceTest :
@Throws(IOException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ abstract class GcsBaseParquetDestinationAcceptanceTest :
@Throws(IOException::class, URISyntaxException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater = getFieldNameUpdater(streamName!!, namespace, streamSchema)
val nameUpdater = getFieldNameUpdater(streamName, namespace, streamSchema)

val objectSummaries = getAllSyncedObjects(streamName, namespace)
val jsonRecords: MutableList<JsonNode> = LinkedList()

for (objectSummary in objectSummaries!!) {
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
for (objectSummary in objectSummaries) {
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)
val uri = URI(String.format("s3a://%s/%s", `object`.bucketName, `object`.key))
val path = Path(uri)
val hadoopConfig = GcsParquetWriter.getHadoopConfig(config)
Expand All @@ -73,14 +73,14 @@ abstract class GcsBaseParquetDestinationAcceptanceTest :

@Throws(Exception::class)
override fun retrieveDataTypesFromPersistedFiles(
streamName: String?,
namespace: String?
streamName: String,
namespace: String
): Map<String?, Set<Schema.Type?>?> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
val resultDataTypes: MutableMap<String?, Set<Schema.Type?>?> = HashMap()

for (objectSummary in objectSummaries!!) {
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
for (objectSummary in objectSummaries) {
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)
val uri = URI(String.format("s3a://%s/%s", `object`.bucketName, `object`.key))
val path = Path(uri)
val hadoopConfig = getHadoopConfig(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -46,9 +47,10 @@ import org.slf4j.LoggerFactory
abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format) :
DestinationAcceptanceTest() {
protected var configJson: JsonNode? = null
protected lateinit var config: GcsDestinationConfig
protected lateinit var s3Client: AmazonS3
protected lateinit var nameTransformer: NamingConventionTransformer
// Not a big fan of those mocks(). Here to make spotbugs happy
protected var config: GcsDestinationConfig = mock()
protected var s3Client: AmazonS3 = mock()
protected var nameTransformer: NamingConventionTransformer = mock()
protected var s3StorageOperations: S3StorageOperations? = null

protected val baseConfigJson: JsonNode
Expand Down Expand Up @@ -96,23 +98,23 @@ abstract class GcsDestinationAcceptanceTest(protected val outputFormat: S3Format

/** Helper method to retrieve all synced objects inside the configured bucket path. */
protected fun getAllSyncedObjects(
streamName: String?,
namespace: String?
streamName: String,
namespace: String
): List<S3ObjectSummary> {
val namespaceStr = nameTransformer!!.getNamespace(namespace!!)
val streamNameStr = nameTransformer!!.getIdentifier(streamName!!)
val namespaceStr = nameTransformer.getNamespace(namespace)
val streamNameStr = nameTransformer.getIdentifier(streamName)
val outputPrefix =
s3StorageOperations!!.getBucketObjectPath(
namespaceStr,
streamNameStr,
DateTime.now(DateTimeZone.UTC),
config!!.pathFormat!!
config.pathFormat!!
)
// the child folder contains a non-deterministic epoch timestamp, so use the parent folder
val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1)
val objectSummaries =
s3Client
.listObjects(config!!.bucketName, parentFolder)
.listObjects(config.bucketName, parentFolder)
.objectSummaries
.stream()
.filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ abstract class S3BaseAvroDestinationAcceptanceTest protected constructor() :
@Throws(Exception::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ abstract class S3BaseCsvDestinationAcceptanceTest : S3DestinationAcceptanceTest(
@Throws(IOException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ abstract class S3BaseJsonlDestinationAcceptanceTest protected constructor() :
@Throws(IOException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ abstract class S3BaseParquetDestinationAcceptanceTest protected constructor() :
@Throws(IOException::class, URISyntaxException::class)
override fun retrieveRecords(
testEnv: TestDestinationEnv?,
streamName: String?,
namespace: String?,
streamName: String,
namespace: String,
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater =
Expand Down

0 comments on commit 47c584c

Please sign in to comment.