Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fixing rounding of numeric values for async destinations #31083

Merged
merged 4 commits into from Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Expand Up @@ -155,7 +155,8 @@ MavenLocal debugging steps:
### Java CDK

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.1.6 | 2023-10-10 | [\#31083](https://github.com/airbytehq/airbyte/pull/31083) | Fix precision of numeric values in async destinations |
| 0.1.5 | 2023-10-09 | [\#31196](https://github.com/airbytehq/airbyte/pull/31196) | Update typo in CDK (CDN_LSN -> CDC_LSN) |
| 0.1.4 | 2023-10-06 | [\#31139](https://github.com/airbytehq/airbyte/pull/31139) | Reduce async buffer |
| 0.1.1 | 2023-09-28 | [\#30835](https://github.com/airbytehq/airbyte/pull/30835) | JDBC destinations now avoid staging area name collisions by using the raw table name as the stage name. (previously we used the stream name as the stage name) |
Expand Down
Expand Up @@ -134,7 +134,7 @@ public void accept(final String messageString, final Integer sizeInBytes) throws
public static PartialAirbyteMessage deserializeAirbyteMessage(final String messageString) {
// TODO: (ryankfu) plumb in the serialized AirbyteStateMessage to match AirbyteRecordMessage code
// parity. https://github.com/airbytehq/airbyte/issues/27530 for additional context
final var partial = Jsons.tryDeserialize(messageString, PartialAirbyteMessage.class)
final var partial = Jsons.tryDeserializeExact(messageString, PartialAirbyteMessage.class)
.orElseThrow(() -> new RuntimeException("Unable to deserialize PartialAirbyteMessage."));

final var msgType = partial.getType();
Expand Down
@@ -1 +1 @@
version=0.1.5
version=0.1.6
Expand Up @@ -35,6 +35,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -239,6 +240,22 @@ void deserializeAirbyteMessageWithAirbyteRecord() {
assertEquals(airbyteRecordString, partial.getSerialized());
}

@Test
void deserializeAirbyteMessageWithBigDecimalAirbyteRecord() {
final JsonNode payload = Jsons.jsonNode(Map.of(
"foo", new BigDecimal("1234567890.1234567890")));
final AirbyteMessage airbyteMessage = new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(STREAM_NAME)
.withNamespace(SCHEMA_NAME)
.withData(payload));
final String serializedAirbyteMessage = Jsons.serialize(airbyteMessage);
final String airbyteRecordString = Jsons.serialize(payload);
final PartialAirbyteMessage partial = AsyncStreamConsumer.deserializeAirbyteMessage(serializedAirbyteMessage);
assertEquals(airbyteRecordString, partial.getSerialized());
}

@Test
void deserializeAirbyteMessageWithEmptyAirbyteRecord() {
final Map emptyMap = Map.of();
Expand Down
Expand Up @@ -121,6 +121,14 @@ public static <T> Optional<T> tryDeserialize(final String jsonString, final Clas
}
}

public static <T> Optional<T> tryDeserializeExact(final String jsonString, final Class<T> klass) {
try {
return Optional.of(OBJECT_MAPPER_EXACT.readValue(jsonString, klass));
} catch (final Throwable e) {
return Optional.empty();
}
}

public static Optional<JsonNode> tryDeserialize(final String jsonString) {
try {
return Optional.of(OBJECT_MAPPER.readTree(jsonString));
Expand Down
Expand Up @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.1.0
LABEL io.airbyte.version=2.1.1
LABEL io.airbyte.name=airbyte/destination-bigquery
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.1.4'
cdkVersionRequired = '0.1.6'
features = ['db-destinations']
useLocalCdk = false
}
Expand Down
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.1.0
dockerImageTag: 2.1.1
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Expand Up @@ -17,7 +17,7 @@ class CdkImportTest {
*/
@Test
void cdkVersionShouldMatch() {
assertEquals("0.1.4", CDKConstants.VERSION.replace("-SNAPSHOT", ""));
assertEquals("0.1.6", CDKConstants.VERSION.replace("-SNAPSHOT", ""));
}

}
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.1.4'
cdkVersionRequired = '0.1.6'
features = ['db-destinations']
useLocalCdk = false
}
Expand Down
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.6.8
dockerImageTag: 0.6.9
dockerRepository: airbyte/destination-redshift
githubIssueLabel: destination-redshift
icon: redshift.svg
Expand Down
Expand Up @@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=3.2.0
LABEL io.airbyte.version=3.2.1
LABEL io.airbyte.name=airbyte/destination-snowflake
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.1.4'
cdkVersionRequired = '0.1.6'
features = ['db-destinations']
useLocalCdk = false
}
Expand Down
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerImageTag: 3.2.0
dockerImageTag: 3.2.1
dockerRepository: airbyte/destination-snowflake
githubIssueLabel: destination-snowflake
icon: snowflake.svg
Expand Down
7 changes: 4 additions & 3 deletions docs/integrations/destinations/bigquery.md
Expand Up @@ -88,7 +88,7 @@ Airbyte converts any invalid characters into `_` characters when writing data. H
## Data type map

| Airbyte type | BigQuery type |
| :---------------------------------- | :------------ |
|:------------------------------------|:--------------|
| DATE | DATE |
| STRING (BASE64) | STRING |
| NUMBER | FLOAT |
Expand Down Expand Up @@ -125,8 +125,9 @@ Now that you have set up the BigQuery destination connector, check out the follo
## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 2.1.0 | 2023-10-09 | [\#31149](https://github.com/airbytehq/airbyte/pull/31149) | No longer fail syncs when PKs are null - try do dedupe anyway |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.1.1 | 2023-10-10 | [\#31083](https://github.com/airbytehq/airbyte/pull/31083) | Fix precision of numeric values in async destinations |
| 2.1.0 | 2023-10-09 | [\#31149](https://github.com/airbytehq/airbyte/pull/31149) | No longer fail syncs when PKs are null - try do dedupe anyway |
| 2.0.26 | 2023-10-09 | [\#31198](https://github.com/airbytehq/airbyte/pull/31198) | Clarify configuration groups |
| 2.0.25 | 2023-10-09 | [\#31185](https://github.com/airbytehq/airbyte/pull/31185) | Increase staging file upload timeout to 5 minutes |
| 2.0.24 | 2023-10-06 | [\#31139](https://github.com/airbytehq/airbyte/pull/31139) | Bump CDK version |
Expand Down
7 changes: 4 additions & 3 deletions docs/integrations/destinations/redshift.md
Expand Up @@ -141,7 +141,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
## Data type mapping

| Redshift Type | Airbyte Type | Notes |
| :-------------------- | :------------------------ | :---- |
|:----------------------|:--------------------------|:------|
| `boolean` | `boolean` | |
| `int` | `integer` | |
| `float` | `number` | |
Expand All @@ -155,7 +155,8 @@ Each stream will be output into its own raw table in Redshift. Each table will c
## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.6.9 | 2023-10-10 | [\#31083](https://github.com/airbytehq/airbyte/pull/31083) | Fix precision of numeric values in async destinations |
| 0.6.8 | 2023-10-10 | [\#31218](https://github.com/airbytehq/airbyte/pull/31218) | Clarify configuration groups |
| 0.6.7 | 2023-10-06 | [\#31153](https://github.com/airbytehq/airbyte/pull/31153) | Increase jvm GC retries |
| 0.6.6 | 2023-10-06 | [\#31129](https://github.com/airbytehq/airbyte/pull/31129) | Reduce async buffer size |
Expand All @@ -180,7 +181,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
| 0.3.55 | 2023-01-26 | [\#20631](https://github.com/airbytehq/airbyte/pull/20631) | Added support for destination checkpointing with staging |
| 0.3.54 | 2023-01-18 | [\#21087](https://github.com/airbytehq/airbyte/pull/21087) | Wrap Authentication Errors as Config Exceptions |
| 0.3.53 | 2023-01-03 | [\#17273](https://github.com/airbytehq/airbyte/pull/17273) | Flatten JSON arrays to fix maximum size check for SUPER field |
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) |
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers (⛔ this version has a bug and will not work; use `0.3.56` instead) |
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
| 0.3.49 | 2022-09-01 | [\#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
Expand Down