Skip to content

Commit

Permalink
enable spotbugs for s3-destinations submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Apr 2, 2024
1 parent af00bdf commit 0eace0e
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 35 deletions.
3 changes: 0 additions & 3 deletions airbyte-cdk/java/airbyte-cdk/s3-destinations/build.gradle
Expand Up @@ -32,9 +32,6 @@ compileKotlin {
}
}

spotbugsTest.enabled = false
spotbugsTestFixtures.enabled = false

dependencies {
implementation project(':airbyte-cdk:java:airbyte-cdk:dependencies')
implementation project(':airbyte-cdk:java:airbyte-cdk:core')
Expand Down
Expand Up @@ -16,11 +16,11 @@ class JsonSchemaTypeTest {
@ParameterizedTest
@ArgumentsSource(JsonSchemaTypeProvider::class)
fun testFromJsonSchemaType(
type: String?,
type: String,
airbyteType: String?,
expectedJsonSchemaType: JsonSchemaType?
) {
Assertions.assertEquals(expectedJsonSchemaType, fromJsonSchemaType(type!!, airbyteType))
Assertions.assertEquals(expectedJsonSchemaType, fromJsonSchemaType(type, airbyteType))
}

class JsonSchemaTypeProvider : ArgumentsProvider {
Expand Down
Expand Up @@ -115,9 +115,9 @@ abstract class S3AvroParquetDestinationAcceptanceTest protected constructor(s3Fo

@Throws(Exception::class)
protected abstract fun retrieveDataTypesFromPersistedFiles(
streamName: String?,
namespace: String?
): Map<String?, Set<Schema.Type?>?>
streamName: String,
namespace: String
): Map<String, Set<Schema.Type>>

protected fun getTypes(record: GenericData.Record): Map<String, Set<Schema.Type>> {
val fieldList =
Expand Down
Expand Up @@ -44,7 +44,7 @@ abstract class S3BaseAvroDestinationAcceptanceTest protected constructor() :
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater =
AvroRecordHelper.getFieldNameUpdater(streamName!!, namespace, streamSchema)
AvroRecordHelper.getFieldNameUpdater(streamName, namespace, streamSchema)

val objectSummaries = getAllSyncedObjects(streamName, namespace)
val jsonRecords: MutableList<JsonNode> = LinkedList()
Expand Down Expand Up @@ -75,11 +75,11 @@ abstract class S3BaseAvroDestinationAcceptanceTest protected constructor() :

@Throws(Exception::class)
override fun retrieveDataTypesFromPersistedFiles(
streamName: String?,
namespace: String?
): Map<String?, Set<Schema.Type?>?> {
streamName: String,
namespace: String
): Map<String, Set<Schema.Type>> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
val resultDataTypes: MutableMap<String?, Set<Schema.Type?>?> = HashMap()
val resultDataTypes: MutableMap<String, Set<Schema.Type>> = HashMap()

for (objectSummary in objectSummaries) {
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)
Expand Down
Expand Up @@ -34,13 +34,13 @@ abstract class S3BaseParquetDestinationAcceptanceTest protected constructor() :
streamSchema: JsonNode
): List<JsonNode> {
val nameUpdater =
AvroRecordHelper.getFieldNameUpdater(streamName!!, namespace, streamSchema)
AvroRecordHelper.getFieldNameUpdater(streamName, namespace, streamSchema)

val objectSummaries = getAllSyncedObjects(streamName, namespace)
val jsonRecords: MutableList<JsonNode> = LinkedList()

for (objectSummary in objectSummaries!!) {
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
for (objectSummary in objectSummaries) {
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)
val uri = URI(String.format("s3a://%s/%s", `object`.bucketName, `object`.key))
val path = Path(uri)
val hadoopConfig = S3ParquetWriter.getHadoopConfig(s3DestinationConfig)
Expand Down Expand Up @@ -68,14 +68,14 @@ abstract class S3BaseParquetDestinationAcceptanceTest protected constructor() :

@Throws(Exception::class)
override fun retrieveDataTypesFromPersistedFiles(
streamName: String?,
namespace: String?
): Map<String?, Set<Schema.Type?>?> {
streamName: String,
namespace: String
): Map<String, Set<Schema.Type>> {
val objectSummaries = getAllSyncedObjects(streamName, namespace)
val resultDataTypes: MutableMap<String?, Set<Schema.Type?>?> = HashMap()
val resultDataTypes: MutableMap<String, Set<Schema.Type>> = HashMap()

for (objectSummary in objectSummaries!!) {
val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key)
for (objectSummary in objectSummaries) {
val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key)
val uri = URI(String.format("s3a://%s/%s", `object`.bucketName, `object`.key))
val path = Path(uri)
val hadoopConfig = S3ParquetWriter.getHadoopConfig(s3DestinationConfig)
Expand All @@ -87,7 +87,7 @@ abstract class S3BaseParquetDestinationAcceptanceTest protected constructor() :
var record: GenericData.Record?
while ((parquetReader.read().also { record = it }) != null) {
val actualDataTypes = getTypes(record!!)
resultDataTypes.putAll(actualDataTypes!!)
resultDataTypes.putAll(actualDataTypes)
}
}
}
Expand Down
Expand Up @@ -23,6 +23,7 @@ import java.util.stream.Collectors
import org.apache.commons.lang3.RandomStringUtils
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.mockito.Mockito.mock
import org.slf4j.Logger
import org.slf4j.LoggerFactory

Expand All @@ -40,9 +41,9 @@ abstract class S3DestinationAcceptanceTest
protected constructor(protected val outputFormat: S3Format) : DestinationAcceptanceTest() {
protected val secretFilePath: String = "secrets/config.json"
protected var configJson: JsonNode? = null
protected lateinit var s3DestinationConfig: S3DestinationConfig
protected var s3DestinationConfig: S3DestinationConfig = mock()
protected var s3Client: AmazonS3? = null
protected lateinit var s3nameTransformer: NamingConventionTransformer
protected var s3nameTransformer: NamingConventionTransformer = mock()
protected var s3StorageOperations: S3StorageOperations? = null

protected val baseConfigJson: JsonNode
Expand All @@ -68,23 +69,23 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta

/** Helper method to retrieve all synced objects inside the configured bucket path. */
protected fun getAllSyncedObjects(
streamName: String?,
namespace: String?
streamName: String,
namespace: String
): List<S3ObjectSummary> {
val namespaceStr = s3nameTransformer!!.getNamespace(namespace!!)
val streamNameStr = s3nameTransformer!!.getIdentifier(streamName!!)
val namespaceStr = s3nameTransformer.getNamespace(namespace)
val streamNameStr = s3nameTransformer.getIdentifier(streamName)
val outputPrefix =
s3StorageOperations!!.getBucketObjectPath(
namespaceStr,
streamNameStr,
DateTime.now(DateTimeZone.UTC),
s3DestinationConfig!!.pathFormat!!
s3DestinationConfig.pathFormat!!
)
// the child folder contains a non-deterministic epoch timestamp, so use the parent folder
val parentFolder = outputPrefix.substring(0, outputPrefix.lastIndexOf("/") + 1)
val objectSummaries =
s3Client!!
.listObjects(s3DestinationConfig!!.bucketName, parentFolder)
.listObjects(s3DestinationConfig.bucketName, parentFolder)
.objectSummaries
.stream()
.filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") }
Expand Down Expand Up @@ -141,7 +142,7 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta
val keysToDelete: MutableList<DeleteObjectsRequest.KeyVersion> = LinkedList()
val objects =
s3Client!!
.listObjects(s3DestinationConfig!!.bucketName, s3DestinationConfig!!.bucketPath)
.listObjects(s3DestinationConfig.bucketName, s3DestinationConfig.bucketPath)
.objectSummaries
for (`object` in objects) {
keysToDelete.add(DeleteObjectsRequest.KeyVersion(`object`.key))
Expand All @@ -150,12 +151,12 @@ protected constructor(protected val outputFormat: S3Format) : DestinationAccepta
if (keysToDelete.size > 0) {
LOGGER.info(
"Tearing down test bucket path: {}/{}",
s3DestinationConfig!!.bucketName,
s3DestinationConfig!!.bucketPath
s3DestinationConfig.bucketName,
s3DestinationConfig.bucketPath
)
val result =
s3Client!!.deleteObjects(
DeleteObjectsRequest(s3DestinationConfig!!.bucketName).withKeys(keysToDelete)
DeleteObjectsRequest(s3DestinationConfig.bucketName).withKeys(keysToDelete)
)
LOGGER.info("Deleted {} file(s).", result.deletedObjects.size)
}
Expand Down

0 comments on commit 0eace0e

Please sign in to comment.