Skip to content

Commit

Permalink
Rename File format related classes to be agnostic of S3 (#37442)
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed Apr 20, 2024
1 parent 7adfb7a commit f493c7b
Show file tree
Hide file tree
Showing 55 changed files with 253 additions and 242 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.30.6 | 2024-04-19 | [\#37442](https://github.com/airbytehq/airbyte/pull/37442) | Destinations: Rename File format related classes to be agnostic of S3 |
| 0.30.3 | 2024-04-12 | [\#37106](https://github.com/airbytehq/airbyte/pull/37106) | Destinations: Simplify constructors in `AsyncStreamConsumer` |
| 0.30.2 | 2024-04-12 | [\#36926](https://github.com/airbytehq/airbyte/pull/36926) | Destinations: Remove `JdbcSqlOperations#formatData`; misc changes for java interop |
| 0.30.1 | 2024-04-11 | [\#36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix regression in sources conversion of null values |
Expand Down
@@ -1 +1 @@
version=0.30.5
version=0.30.6
Expand Up @@ -15,9 +15,9 @@ import io.airbyte.cdk.integrations.destination.gcs.credential.GcsCredentialType
import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConstants
import io.airbyte.cdk.integrations.destination.s3.S3FormatConfig
import io.airbyte.cdk.integrations.destination.s3.S3FormatConfigs.getS3FormatConfig
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
import io.airbyte.cdk.integrations.destination.s3.UploadFormatConfig
import io.airbyte.cdk.integrations.destination.s3.UploadFormatConfigFactory.getUploadFormatConfig

/**
* Currently we always reuse the S3 client for GCS. So the GCS config extends from the S3 config.
Expand All @@ -28,7 +28,7 @@ class GcsDestinationConfig(
bucketPath: String,
bucketRegion: String?,
val gcsCredentialConfig: GcsCredentialConfig,
formatConfig: S3FormatConfig
formatConfig: UploadFormatConfig
) :
S3DestinationConfig(
GCS_ENDPOINT,
Expand Down Expand Up @@ -76,7 +76,7 @@ class GcsDestinationConfig(
config["gcs_bucket_path"].asText(),
config["gcs_bucket_region"].asText(),
GcsCredentialConfigs.getCredentialConfig(config),
getS3FormatConfig(config)
getUploadFormatConfig(config)
)
}
}
Expand Down
Expand Up @@ -10,10 +10,10 @@ import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.util.GcsUtils
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
import io.airbyte.cdk.integrations.destination.s3.avro.JsonToAvroSchemaConverter
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
Expand Down Expand Up @@ -63,8 +63,7 @@ constructor(
)
LOGGER.info("Avro schema for stream {}: {}", stream.name, schema!!.toString(false))

val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.AVRO)
val outputFilename: String = getOutputFilename(uploadTimestamp, FileUploadFormat.AVRO)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)

Expand All @@ -84,7 +83,7 @@ constructor(
// performant.
this.outputStream = uploadManager.multiPartOutputStreams[0]

val formatConfig = config.formatConfig as S3AvroFormatConfig
val formatConfig = config.formatConfig as UploadAvroFormatConfig
// The DataFileWriter always uses binary encoding.
// If json encoding is needed in the future, use the GenericDatumWriter directly.
this.dataFileWriter =
Expand Down Expand Up @@ -118,8 +117,8 @@ constructor(
uploadManager.abort()
}

override val fileFormat: S3Format
get() = S3Format.AVRO
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.AVRO

companion object {
protected val LOGGER: Logger = LoggerFactory.getLogger(GcsAvroWriter::class.java)
Expand Down
Expand Up @@ -9,10 +9,10 @@ import com.amazonaws.services.s3.AmazonS3
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator.Factory.create
import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvFormatConfig
import io.airbyte.cdk.integrations.destination.s3.csv.UploadCsvFormatConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
Expand Down Expand Up @@ -43,11 +43,11 @@ class GcsCsvWriter(
override val outputPath: String

init {
val formatConfig = config.formatConfig as S3CsvFormatConfig
val formatConfig = config.formatConfig as UploadCsvFormatConfig
this.csvSheetGenerator = create(configuredStream.stream.jsonSchema, formatConfig)

val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.CSV)
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, FileUploadFormat.CSV)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)

Expand Down Expand Up @@ -97,8 +97,8 @@ class GcsCsvWriter(
uploadManager.abort()
}

override val fileFormat: S3Format
get() = S3Format.CSV
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.CSV

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(GcsCsvWriter::class.java)
Expand Down
Expand Up @@ -11,7 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.commons.jackson.MoreMappers
Expand Down Expand Up @@ -40,7 +40,7 @@ class GcsJsonlWriter(

init {
val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.JSONL)
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, FileUploadFormat.JSONL)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)

fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)
Expand Down Expand Up @@ -84,8 +84,8 @@ class GcsJsonlWriter(
uploadManager.abort()
}

override val fileFormat: S3Format
get() = S3Format.JSONL
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.JSONL

companion object {
protected val LOGGER: Logger = LoggerFactory.getLogger(GcsJsonlWriter::class.java)
Expand Down
Expand Up @@ -10,9 +10,9 @@ import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.gcs.util.GcsS3FileSystem
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetFormatConfig
import io.airbyte.cdk.integrations.destination.s3.parquet.UploadParquetFormatConfig
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
Expand Down Expand Up @@ -46,7 +46,7 @@ class GcsParquetWriter(

init {
val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.PARQUET)
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, FileUploadFormat.PARQUET)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
LOGGER.info(
"Storage path for stream '{}': {}/{}",
Expand All @@ -62,7 +62,7 @@ class GcsParquetWriter(

LOGGER.info("Full GCS path for stream '{}': {}", stream.name, path)

val formatConfig = config.formatConfig as S3ParquetFormatConfig
val formatConfig = config.formatConfig as UploadParquetFormatConfig
val hadoopConfig = getHadoopConfig(config)
this.parquetWriter =
AvroParquetWriter.builder<GenericData.Record>(
Expand Down Expand Up @@ -102,8 +102,8 @@ class GcsParquetWriter(
}
}

override val fileFormat: S3Format
get() = S3Format.PARQUET
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.PARQUET

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(GcsParquetWriter::class.java)
Expand Down
Expand Up @@ -7,8 +7,8 @@ import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.DeleteObjectsRequest
import com.amazonaws.services.s3.model.HeadBucketRequest
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConstants
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.util.S3OutputPathHelper.getOutputPrefix
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.protocol.models.v0.AirbyteStream
Expand Down Expand Up @@ -128,7 +128,7 @@ protected constructor(
private val LOGGER: Logger = LoggerFactory.getLogger(BaseGcsWriter::class.java)

// Filename: <upload-date>_<upload-millis>_0.<format-extension>
fun getOutputFilename(timestamp: Timestamp, format: S3Format): String {
fun getOutputFilename(timestamp: Timestamp, format: FileUploadFormat): String {
val formatter: DateFormat =
SimpleDateFormat(S3DestinationConstants.YYYY_MM_DD_FORMAT_STRING)
formatter.timeZone = TimeZone.getTimeZone("UTC")
Expand Down
Expand Up @@ -4,7 +4,7 @@
package io.airbyte.cdk.integrations.destination.gcs

import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig
import io.airbyte.commons.json.Jsons
import io.airbyte.commons.resources.MoreResources
import java.io.IOException
Expand All @@ -30,9 +30,9 @@ internal class GcsDestinationConfigTest {
Assertions.assertEquals("test_secret", hmacKeyConfig.hmacKeySecret)

val formatConfig = config.formatConfig
Assertions.assertTrue(formatConfig is S3AvroFormatConfig)
Assertions.assertTrue(formatConfig is UploadAvroFormatConfig)

val avroFormatConfig = formatConfig as S3AvroFormatConfig
val avroFormatConfig = formatConfig as UploadAvroFormatConfig
Assertions.assertEquals("deflate-5", avroFormatConfig.codecFactory.toString())
}
}
Expand Up @@ -7,7 +7,7 @@ import com.amazonaws.services.s3.internal.Constants
import com.google.common.collect.Lists
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig.Companion.parseCodecConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig.Companion.parseCodecConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.commons.json.Jsons
Expand Down
Expand Up @@ -9,7 +9,7 @@ import com.google.common.collect.Lists
import io.airbyte.cdk.integrations.base.DestinationConfig.Companion.initialize
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig
import io.airbyte.commons.json.Jsons
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
Expand All @@ -34,7 +34,7 @@ internal class GcsAvroWriterTest {
"fake-bucketPath",
"fake-bucketRegion",
GcsHmacKeyCredentialConfig("fake-access-id", "fake-secret"),
S3AvroFormatConfig(ObjectMapper().createObjectNode())
UploadAvroFormatConfig(ObjectMapper().createObjectNode())
),
Mockito.mock(AmazonS3::class.java, Mockito.RETURNS_DEEP_STUBS),
ConfiguredAirbyteStream()
Expand Down
Expand Up @@ -4,7 +4,7 @@
package io.airbyte.cdk.integrations.destination.gcs

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.JsonSchemaType
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.NumberDataTypeTestArgumentProvider
Expand All @@ -25,8 +25,8 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ArgumentsSource

abstract class GcsAvroParquetDestinationAcceptanceTest(s3Format: S3Format) :
GcsDestinationAcceptanceTest(s3Format) {
abstract class GcsAvroParquetDestinationAcceptanceTest(fileUploadFormat: FileUploadFormat) :
GcsDestinationAcceptanceTest(fileUploadFormat) {
override fun getProtocolVersion() = ProtocolVersion.V1

@ParameterizedTest
Expand Down
Expand Up @@ -5,7 +5,7 @@ package io.airbyte.cdk.integrations.destination.gcs

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectReader
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.pruneAirbyteJson
Expand All @@ -20,7 +20,7 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericDatumReader

abstract class GcsBaseAvroDestinationAcceptanceTest :
GcsAvroParquetDestinationAcceptanceTest(S3Format.AVRO) {
GcsAvroParquetDestinationAcceptanceTest(FileUploadFormat.AVRO) {
override val formatConfig: JsonNode?
get() =
Jsons.deserialize(
Expand Down
Expand Up @@ -7,7 +7,7 @@ import com.amazonaws.services.s3.model.S3Object
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.util.Flattening
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.commons.json.Jsons
Expand All @@ -21,7 +21,8 @@ import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVRecord
import org.apache.commons.csv.QuoteMode

abstract class GcsBaseCsvDestinationAcceptanceTest : GcsDestinationAcceptanceTest(S3Format.CSV) {
abstract class GcsBaseCsvDestinationAcceptanceTest :
GcsDestinationAcceptanceTest(FileUploadFormat.CSV) {
override fun getProtocolVersion() = ProtocolVersion.V1

override val formatConfig: JsonNode?
Expand Down
Expand Up @@ -6,7 +6,7 @@ package io.airbyte.cdk.integrations.destination.gcs
import com.amazonaws.services.s3.model.S3Object
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.standardtest.destination.ProtocolVersion
import io.airbyte.commons.json.Jsons
import java.io.BufferedReader
Expand All @@ -19,7 +19,7 @@ import kotlin.collections.List
import kotlin.collections.MutableList

abstract class GcsBaseJsonlDestinationAcceptanceTest :
GcsDestinationAcceptanceTest(S3Format.JSONL) {
GcsDestinationAcceptanceTest(FileUploadFormat.JSONL) {
override fun getProtocolVersion() = ProtocolVersion.V1

override val formatConfig: JsonNode?
Expand Down
Expand Up @@ -6,7 +6,7 @@ package io.airbyte.cdk.integrations.destination.gcs
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectReader
import io.airbyte.cdk.integrations.destination.gcs.parquet.GcsParquetWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroConstants
import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetWriter.Companion.getHadoopConfig
import io.airbyte.cdk.integrations.destination.s3.util.AvroRecordHelper.getFieldNameUpdater
Expand All @@ -25,7 +25,7 @@ import org.apache.parquet.avro.AvroReadSupport
import org.apache.parquet.hadoop.ParquetReader

abstract class GcsBaseParquetDestinationAcceptanceTest :
GcsAvroParquetDestinationAcceptanceTest(S3Format.PARQUET) {
GcsAvroParquetDestinationAcceptanceTest(FileUploadFormat.PARQUET) {
override fun getProtocolVersion() = ProtocolVersion.V1

override val formatConfig: JsonNode?
Expand Down

0 comments on commit f493c7b

Please sign in to comment.