Skip to content

Commit

Permalink
Merge branch 'aj/ci/re-enable-on-demand-connector-tests' of https://g…
Browse files Browse the repository at this point in the history
…ithub.com/airbytehq/airbyte into aj/ci/re-enable-on-demand-connector-tests
  • Loading branch information
aaronsteers committed Apr 22, 2024
2 parents 75c8bd8 + 116628a commit 977b84b
Show file tree
Hide file tree
Showing 257 changed files with 10,083 additions and 3,044 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/2-issue-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ body:
- type: textarea
id: description
attributes:
label: Revelant information
label: Relevant information
description: Please give any additional information you have and steps to reproduce the problem.
- type: textarea
id: logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ body:
- type: textarea
id: description
attributes:
label: Revelant information
label: Relevant information
description: Please give any additional information you have and steps to reproduce the problem.
- type: textarea
id: logs
Expand Down
88 changes: 88 additions & 0 deletions .github/ISSUE_TEMPLATE/4-issue-abctl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
name: 🐛 [abctl] Report an issue with the abctl tool
description: Use this template when you experience an issue with the abctl tool
labels: [type/bug, area/abctl, needs-triage]
body:
- type: markdown
attributes:
value: >
<p align="center">
<a target="_blank" href="https://airbyte.com">
<image>
<source srcset="https://raw.githubusercontent.com/airbytehq/airbyte/master/.github/octavia-issue-template.svg">
<img alt="octavia-welcome" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/.github/octavia-issue-template.svg" width="auto" height="120">
</image>
</a>
</p>
- type: markdown
attributes:
value: |
Thanks for taking the time to fill out this bug report...
Make sure to update this issue with a concise title and provide all information you have to
help us debug the problem together. Issues not following the template will be closed.
- type: textarea
id: problem
attributes:
label: What happened?
description: Please give any additional information you have and steps to reproduce the problem.
validations:
required: true
- type: textarea
id: expected
attributes:
label: What did you expect to happen?
validations:
required: true
- type: textarea
id: abctlVersion
attributes:
label: Abctl Version
value: |
<details>
```console
$ abctl version
# paste output here
```
</details>
validations:
required: true
- type: textarea
id: dockerVersion
attributes:
label: Docker Version
value: |
<details>
```console
$ docker version
# paste output here
```
</details>
validations:
required: true
- type: textarea
id: osVersion
attributes:
label: OS Version
value: |
<details>
```console
# On Linux:
$ cat /etc/os-release
# paste output here
# On Mac:
$ uname -a
# paste output here
# On Windows:
C:\> wmic os get Caption, Version, BuildNumber, OSArchitecture
# paste output here
```
</details>
validations:
required: true
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.30.6 | 2024-04-19 | [\#37442](https://github.com/airbytehq/airbyte/pull/37442) | Destinations: Rename File format related classes to be agnostic of S3 |
| 0.30.3 | 2024-04-12 | [\#37106](https://github.com/airbytehq/airbyte/pull/37106) | Destinations: Simplify constructors in `AsyncStreamConsumer` |
| 0.30.2 | 2024-04-12 | [\#36926](https://github.com/airbytehq/airbyte/pull/36926) | Destinations: Remove `JdbcSqlOperations#formatData`; misc changes for java interop |
| 0.30.1 | 2024-04-11 | [\#36919](https://github.com/airbytehq/airbyte/pull/36919) | Fix regression in sources conversion of null values |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ object DataTypeUtils {
}
}

@JvmStatic
fun <T> throwExceptionIfInvalid(valueProducer: DataTypeSupplier<T>): T? {
return throwExceptionIfInvalid(valueProducer, Function { _: T? -> true })
}

@JvmStatic
fun <T> throwExceptionIfInvalid(
valueProducer: DataTypeSupplier<T>,
isValidFn: Function<T?, Boolean>
): T? {
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will
// throw an
// exception when parsed. We want to parse those
// values as null.
// This method reduces error handling boilerplate.
try {
val value = valueProducer.apply()
return if (isValidFn.apply(value)) value
else throw SQLException("Given value is not valid.")
} catch (e: SQLException) {
return null
}
}

@JvmStatic
fun toISO8601StringWithMicroseconds(instant: Instant): String {
val dateWithMilliseconds = dateFormatMillisPattern.format(Date.from(instant))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode)
} catch (e: java.lang.Exception) {
jsonNode.putNull(columnName)
LOGGER.info(
"Failed to serialize column: {}, of type {}, with error {}",
columnName,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.30.5
version=0.30.7
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import io.airbyte.cdk.integrations.destination.gcs.credential.GcsCredentialType
import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConfig
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConstants
import io.airbyte.cdk.integrations.destination.s3.S3FormatConfig
import io.airbyte.cdk.integrations.destination.s3.S3FormatConfigs.getS3FormatConfig
import io.airbyte.cdk.integrations.destination.s3.S3StorageOperations
import io.airbyte.cdk.integrations.destination.s3.UploadFormatConfig
import io.airbyte.cdk.integrations.destination.s3.UploadFormatConfigFactory.getUploadFormatConfig

/**
* Currently we always reuse the S3 client for GCS. So the GCS config extends from the S3 config.
Expand All @@ -28,7 +28,7 @@ class GcsDestinationConfig(
bucketPath: String,
bucketRegion: String?,
val gcsCredentialConfig: GcsCredentialConfig,
formatConfig: S3FormatConfig
formatConfig: UploadFormatConfig
) :
S3DestinationConfig(
GCS_ENDPOINT,
Expand Down Expand Up @@ -76,7 +76,7 @@ class GcsDestinationConfig(
config["gcs_bucket_path"].asText(),
config["gcs_bucket_region"].asText(),
GcsCredentialConfigs.getCredentialConfig(config),
getS3FormatConfig(config)
getUploadFormatConfig(config)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.util.GcsUtils
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
import io.airbyte.cdk.integrations.destination.s3.avro.JsonToAvroSchemaConverter
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
Expand Down Expand Up @@ -63,8 +63,7 @@ constructor(
)
LOGGER.info("Avro schema for stream {}: {}", stream.name, schema!!.toString(false))

val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.AVRO)
val outputFilename: String = getOutputFilename(uploadTimestamp, FileUploadFormat.AVRO)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)

Expand All @@ -84,7 +83,7 @@ constructor(
// performant.
this.outputStream = uploadManager.multiPartOutputStreams[0]

val formatConfig = config.formatConfig as S3AvroFormatConfig
val formatConfig = config.formatConfig as UploadAvroFormatConfig
// The DataFileWriter always uses binary encoding.
// If json encoding is needed in the future, use the GenericDatumWriter directly.
this.dataFileWriter =
Expand Down Expand Up @@ -118,8 +117,8 @@ constructor(
uploadManager.abort()
}

override val fileFormat: S3Format
get() = S3Format.AVRO
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.AVRO

companion object {
protected val LOGGER: Logger = LoggerFactory.getLogger(GcsAvroWriter::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import com.amazonaws.services.s3.AmazonS3
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator
import io.airbyte.cdk.integrations.destination.s3.csv.CsvSheetGenerator.Factory.create
import io.airbyte.cdk.integrations.destination.s3.csv.S3CsvFormatConfig
import io.airbyte.cdk.integrations.destination.s3.csv.UploadCsvFormatConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
Expand Down Expand Up @@ -43,11 +43,11 @@ class GcsCsvWriter(
override val outputPath: String

init {
val formatConfig = config.formatConfig as S3CsvFormatConfig
val formatConfig = config.formatConfig as UploadCsvFormatConfig
this.csvSheetGenerator = create(configuredStream.stream.jsonSchema, formatConfig)

val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.CSV)
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, FileUploadFormat.CSV)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)

Expand Down Expand Up @@ -97,8 +97,8 @@ class GcsCsvWriter(
uploadManager.abort()
}

override val fileFormat: S3Format
get() = S3Format.CSV
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.CSV

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(GcsCsvWriter::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.integrations.base.JavaBaseConstants
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.commons.jackson.MoreMappers
Expand Down Expand Up @@ -40,7 +40,7 @@ class GcsJsonlWriter(

init {
val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.JSONL)
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, FileUploadFormat.JSONL)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)

fileLocation = String.format("gs://%s/%s", config.bucketName, outputPath)
Expand Down Expand Up @@ -84,8 +84,8 @@ class GcsJsonlWriter(
uploadManager.abort()
}

override val fileFormat: S3Format
get() = S3Format.JSONL
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.JSONL

companion object {
protected val LOGGER: Logger = LoggerFactory.getLogger(GcsJsonlWriter::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.gcs.util.GcsS3FileSystem
import io.airbyte.cdk.integrations.destination.gcs.writer.BaseGcsWriter
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.avro.AvroRecordFactory
import io.airbyte.cdk.integrations.destination.s3.parquet.S3ParquetFormatConfig
import io.airbyte.cdk.integrations.destination.s3.parquet.UploadParquetFormatConfig
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
Expand Down Expand Up @@ -46,7 +46,7 @@ class GcsParquetWriter(

init {
val outputFilename: String =
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, S3Format.PARQUET)
BaseGcsWriter.Companion.getOutputFilename(uploadTimestamp, FileUploadFormat.PARQUET)
outputPath = java.lang.String.join("/", outputPrefix, outputFilename)
LOGGER.info(
"Storage path for stream '{}': {}/{}",
Expand All @@ -62,7 +62,7 @@ class GcsParquetWriter(

LOGGER.info("Full GCS path for stream '{}': {}", stream.name, path)

val formatConfig = config.formatConfig as S3ParquetFormatConfig
val formatConfig = config.formatConfig as UploadParquetFormatConfig
val hadoopConfig = getHadoopConfig(config)
this.parquetWriter =
AvroParquetWriter.builder<GenericData.Record>(
Expand Down Expand Up @@ -102,8 +102,8 @@ class GcsParquetWriter(
}
}

override val fileFormat: S3Format
get() = S3Format.PARQUET
override val fileFormat: FileUploadFormat
get() = FileUploadFormat.PARQUET

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(GcsParquetWriter::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.DeleteObjectsRequest
import com.amazonaws.services.s3.model.HeadBucketRequest
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.S3DestinationConstants
import io.airbyte.cdk.integrations.destination.s3.S3Format
import io.airbyte.cdk.integrations.destination.s3.util.S3OutputPathHelper.getOutputPrefix
import io.airbyte.cdk.integrations.destination.s3.writer.DestinationFileWriter
import io.airbyte.protocol.models.v0.AirbyteStream
Expand Down Expand Up @@ -128,7 +128,7 @@ protected constructor(
private val LOGGER: Logger = LoggerFactory.getLogger(BaseGcsWriter::class.java)

// Filename: <upload-date>_<upload-millis>_0.<format-extension>
fun getOutputFilename(timestamp: Timestamp, format: S3Format): String {
fun getOutputFilename(timestamp: Timestamp, format: FileUploadFormat): String {
val formatter: DateFormat =
SimpleDateFormat(S3DestinationConstants.YYYY_MM_DD_FORMAT_STRING)
formatter.timeZone = TimeZone.getTimeZone("UTC")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.airbyte.cdk.integrations.destination.gcs

import io.airbyte.cdk.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig
import io.airbyte.commons.json.Jsons
import io.airbyte.commons.resources.MoreResources
import java.io.IOException
Expand All @@ -30,9 +30,9 @@ internal class GcsDestinationConfigTest {
Assertions.assertEquals("test_secret", hmacKeyConfig.hmacKeySecret)

val formatConfig = config.formatConfig
Assertions.assertTrue(formatConfig is S3AvroFormatConfig)
Assertions.assertTrue(formatConfig is UploadAvroFormatConfig)

val avroFormatConfig = formatConfig as S3AvroFormatConfig
val avroFormatConfig = formatConfig as UploadAvroFormatConfig
Assertions.assertEquals("deflate-5", avroFormatConfig.codecFactory.toString())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.amazonaws.services.s3.internal.Constants
import com.google.common.collect.Lists
import io.airbyte.cdk.integrations.destination.gcs.GcsDestinationConfig
import io.airbyte.cdk.integrations.destination.gcs.util.ConfigTestUtils
import io.airbyte.cdk.integrations.destination.s3.avro.S3AvroFormatConfig.Companion.parseCodecConfig
import io.airbyte.cdk.integrations.destination.s3.avro.UploadAvroFormatConfig.Companion.parseCodecConfig
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory
import io.airbyte.cdk.integrations.destination.s3.util.StreamTransferManagerFactory.create
import io.airbyte.commons.json.Jsons
Expand Down
Loading

0 comments on commit 977b84b

Please sign in to comment.