From 0296c431692811fa6dc1ad427adfeb53dba101ff Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Wed, 10 Apr 2024 15:46:15 -0700 Subject: [PATCH] [Db analytics] : add message for data type serialization error (#36981) --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../io/airbyte/cdk/db/DbAnalyticsUtils.kt | 8 +++++ .../AbstractJdbcCompatibleSourceOperations.kt | 32 ++++++++++++------- .../src/main/resources/version.properties | 2 +- .../connectors/source-postgres/build.gradle | 2 +- .../connectors/source-postgres/metadata.yaml | 2 +- .../ctid/CtidPostgresSourceOperations.java | 7 +++- docs/integrations/sources/postgres.md | 1 + 8 files changed, 39 insertions(+), 16 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index cb2629750e264..1a0d4536ff121 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.29.13 | 2024-04-10 | [\#36981](https://github.com/airbytehq/airbyte/pull/36981) | DB sources : Emit analytics for data type serialization errors. | | 0.29.11 | 2024-04-10 | [\#36865](https://github.com/airbytehq/airbyte/pull/36865) | Sources : Remove noisy log line. | | 0.29.10 | 2024-04-10 | [\#36805](https://github.com/airbytehq/airbyte/pull/36805) | Destinations: Enhance CatalogParser name collision handling; add DV2 tests for long identifiers | | 0.29.9 | 2024-04-09 | [\#36047](https://github.com/airbytehq/airbyte/pull/36047) | Destinations: CDK updates for raw-only destinations | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt index eaac7acf8aba7..ffe7897b79e2f 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt @@ -12,9 +12,17 @@ import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage */ object DbAnalyticsUtils { const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid" + const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error" @JvmStatic fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage { return AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1") } + + @JvmStatic + fun dataTypesSerializationErrorMessage(): AirbyteAnalyticsTraceMessage { + return AirbyteAnalyticsTraceMessage() + .withType(DATA_TYPES_SERIALIZATION_ERROR_KEY) + .withValue("1") + } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt index ae05f815c8f05..db78b4ab3b8b2 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt @@ -7,8 +7,9 @@ import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.ObjectNode import io.airbyte.cdk.db.DataTypeUtils +import io.airbyte.cdk.db.DbAnalyticsUtils.dataTypesSerializationErrorMessage import io.airbyte.cdk.db.JdbcCompatibleSourceOperations -import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations.Companion.LOGGER +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility import io.airbyte.commons.json.Jsons import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange @@ -40,18 +41,25 @@ abstract class AbstractJdbcCompatibleSourceOperations : for (i in 1..columnCount) { val columnName = queryContext.metaData.getColumnName(i) + val columnTypeName = queryContext.metaData.getColumnTypeName(i) try { // convert to java types that will convert into reasonable json. copyToJsonField(queryContext, i, jsonNode) } catch (e: java.lang.Exception) { - LOGGER.info("Failed to serialize column: {}, with error {}", columnName, e.message) + LOGGER.info( + "Failed to serialize column: {}, of type {}, with error {}", + columnName, + columnTypeName, + e.message + ) + AirbyteTraceMessageUtility.emitAnalyticsTrace(dataTypesSerializationErrorMessage()) metaChanges.add( AirbyteRecordMessageMetaChange() .withField(columnName) .withChange(AirbyteRecordMessageMetaChange.Change.NULLED) .withReason( - AirbyteRecordMessageMetaChange.Reason.SOURCE_SERIALIZATION_ERROR - ) + AirbyteRecordMessageMetaChange.Reason.SOURCE_SERIALIZATION_ERROR, + ), ) } } @@ -166,8 +174,8 @@ abstract class AbstractJdbcCompatibleSourceOperations : columnName, DataTypeUtils.returnNullIfInvalid( { resultSet.getDouble(index) }, - { d: Double? -> java.lang.Double.isFinite(d!!) } - ) + { d: Double? -> java.lang.Double.isFinite(d!!) }, + ), ) } @@ -182,8 +190,8 @@ abstract class AbstractJdbcCompatibleSourceOperations : columnName, DataTypeUtils.returnNullIfInvalid( { resultSet.getFloat(index) }, - { f: Float? -> java.lang.Float.isFinite(f!!) } - ) + { f: Float? -> java.lang.Float.isFinite(f!!) }, + ), ) } @@ -226,7 +234,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : ) { node.put( columnName, - DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime::class.java)) + DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime::class.java)), ) } @@ -241,8 +249,8 @@ abstract class AbstractJdbcCompatibleSourceOperations : node.put( columnName, DateTimeConverter.convertToTimestamp( - getObject(resultSet, index, LocalDateTime::class.java) - ) + getObject(resultSet, index, LocalDateTime::class.java), + ), ) } catch (e: Exception) { // for backward compatibility @@ -450,7 +458,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : val localDate = timestamptz.toLocalDate() node.put( columnName, - resolveEra(localDate, timestamptz.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER)) + resolveEra(localDate, timestamptz.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER)), ) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index dff7ca91c4b30..410b365cbed72 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.29.11 +version=0.29.13 diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 896d4d41a9dd7..e9eda04cbaca6 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -12,7 +12,7 @@ java { } airbyteJavaConnector { - cdkVersionRequired = '0.29.11' + cdkVersionRequired = '0.29.13' features = ['db-sources', 'datastore-postgres'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index 71119ccdb739e..bd100e97024d3 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.3.24 + dockerImageTag: 3.3.25 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPostgresSourceOperations.java index 4047f0c582507..a9d43c2464df5 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidPostgresSourceOperations.java @@ -4,8 +4,11 @@ package io.airbyte.integrations.source.postgres.ctid; +import static io.airbyte.cdk.db.DbAnalyticsUtils.dataTypesSerializationErrorMessage; + import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.cdk.db.jdbc.AirbyteRecordData; +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.postgres.PostgresSourceOperations; import io.airbyte.integrations.source.postgres.cdc.PostgresCdcConnectorMetadataInjector; @@ -45,6 +48,7 @@ public RowDataWithCtid recordWithCtid(final ResultSet queryContext) throws SQLEx final List metaChanges = new ArrayList<>(); for (int i = 1; i <= columnCount; i++) { final String columnName = metadata.getColumnName(i); + final String columnTypeName = metadata.getColumnTypeName(i); try { if (columnName.equalsIgnoreCase(CTID)) { ctid = queryContext.getString(i); @@ -54,7 +58,8 @@ public RowDataWithCtid recordWithCtid(final ResultSet queryContext) throws SQLEx // convert to java types that will convert into reasonable json. copyToJsonField(queryContext, i, jsonNode); } catch (Exception e) { - LOGGER.info("Failed to serialize column: {}, with error {}", columnName, e.getMessage()); + LOGGER.info("Failed to serialize column: {}, of type {}, with error {}", columnName, columnTypeName, e.getMessage()); + AirbyteTraceMessageUtility.emitAnalyticsTrace(dataTypesSerializationErrorMessage()); metaChanges.add( new AirbyteRecordMessageMetaChange() .withField(columnName) diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 1975fce7c1695..c33cdd27f87e4 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.25 | 2024-04-10 | [36981](https://github.com/airbytehq/airbyte/pull/36981) | Track latest CDK | | 3.3.24 | 2024-04-10 | [36865](https://github.com/airbytehq/airbyte/pull/36865) | Track latest CDK | | 3.3.23 | 2024-04-02 | [36759](https://github.com/airbytehq/airbyte/pull/36759) | Track latest CDK | | 3.3.22 | 2024-04-01 | [36739](https://github.com/airbytehq/airbyte/pull/36739) | Fix useLocalCdk flag. |