From cf126ced9b107a351160c0dd6315e580640930b8 Mon Sep 17 00:00:00 2001 From: "Sherif A. Nada" Date: Tue, 9 Nov 2021 21:53:41 -0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Destination=20BQ=20Denormalized:?= =?UTF-8?q?=20handle=20null=20values=20in=20fields=20described=20by=20a=20?= =?UTF-8?q?`$ref`=20schema=20(#7804)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../seed/destination_definitions.yaml | 2 +- .../resources/seed/destination_specs.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 5 +- .../Dockerfile | 2 +- .../BigQueryDenormalizedDestination.java | 7 +- .../BigQueryDenormalizedRecordConsumer.java | 6 +- .../BigQueryDenormalizedDestinationTest.java | 66 ++++++++++++------- .../BigQueryDenormalizedTestDataUtils.java | 16 ++--- docs/integrations/destinations/bigquery.md | 1 + 9 files changed, 64 insertions(+), 43 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 3595780cb24d..6b917df23f33 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -11,7 +11,7 @@ - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.1.9 + dockerImageTag: 0.1.10 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery - name: Cassandra destinationDefinitionId: 707456df-6f4f-4ced-b5c6-03f73bcad1c5 diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 6dacee970edb..0e52d85a3e79 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -254,7 +254,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:0.1.9" +- dockerImage: "airbyte/destination-bigquery-denormalized:0.1.10" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index f5836bc0141b..ed6768f7e5a2 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3428,7 +3428,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mixpanel:0.1.3" +- dockerImage: "airbyte/source-mixpanel:0.1.5" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel" connectionSpecification: @@ -3474,7 +3474,8 @@ start_date: type: "string" description: "The default value to use if no bookmark exists for an endpoint.\ - \ Default is 1 year ago." + \ If this option is not set, the connector will replicate data from up\ + \ to one year ago by default." examples: - "2021-11-16" pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$" diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 225216fb63bf..fd3d2e2e268e 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.9 +LABEL io.airbyte.version=0.1.10 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java index a256807210c3..88ab0c24f78c 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java @@ -43,7 +43,8 @@ public class BigQueryDenormalizedDestination extends BigQueryDestination { private static final String TYPE_FIELD = "type"; private static final String FORMAT_FIELD = "format"; private static final String REF_DEFINITION_KEY = "$ref"; - private static final Set fieldsContainRefDefinitionValue = new HashSet<>(); + + private final Set fieldsContainRefDefinitionValue = new HashSet<>(); @Override protected String getTargetTableName(final String streamName) { @@ -73,7 +74,7 @@ protected Schema getBigQuerySchema(final JsonNode jsonSchema) { return com.google.cloud.bigquery.Schema.of(fieldList); } - private static List getSchemaFields(final BigQuerySQLNameTransformer namingResolver, final JsonNode jsonSchema) { + private List getSchemaFields(final BigQuerySQLNameTransformer namingResolver, final JsonNode jsonSchema) { Preconditions.checkArgument(jsonSchema.isObject() && jsonSchema.has(PROPERTIES_FIELD)); final ObjectNode properties = (ObjectNode) jsonSchema.get(PROPERTIES_FIELD); List tmpFields = Jsons.keys(properties).stream() @@ -96,7 +97,7 @@ private static List getSchemaFields(final BigQuerySQLNameTransformer nami * Currently, AirByte doesn't support parsing value by $ref key definition. * The issue to track this 7725 */ - private static Consumer addToRefList(ObjectNode properties) { + private Consumer addToRefList(ObjectNode properties) { return key -> { if (properties.get(key).has(REF_DEFINITION_KEY)) { fieldsContainRefDefinitionValue.add(key); diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java index 4b5b7a4bcb1f..e7128b9778fd 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java @@ -63,7 +63,11 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage // replace ObjectNode with TextNode for fields with $ref definition key // Do not need to iterate through all JSON Object nodes, only first nesting object. if (!fieldsWithRefDefinition.isEmpty()) { - fieldsWithRefDefinition.forEach(key -> data.put(key, data.get(key).toString())); + fieldsWithRefDefinition.forEach(key -> { + if (data.get(key) != null && !data.get(key).isNull()){ + data.put(key, data.get(key).toString()); + } + }); } data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString()); data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt); diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java index 3fdb3dbb2a58..b57b9c60f40b 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java @@ -9,6 +9,7 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; @@ -32,17 +33,21 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.DestinationSyncMode; import io.airbyte.protocol.models.SyncMode; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; + +import org.assertj.core.util.Sets; import org.joda.time.DateTime; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -57,33 +62,20 @@ class BigQueryDenormalizedDestinationTest { private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + private static final Set AIRBYTE_METADATA_FIELDS = Set.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, JavaBaseConstants.COLUMN_NAME_AB_ID); private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestinationTest.class); private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; private static final Instant NOW = Instant.now(); private static final String USERS_STREAM_NAME = "users"; - private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(getData()) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(getDataWithEmptyObjectAndArray()) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_USERS3 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(getDataWithFormats()) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_USERS4 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(getDataWithJSONDateTimeFormats()) - .withEmittedAt(NOW.toEpochMilli())); - private static final AirbyteMessage MESSAGE_USERS5 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME) - .withData(getDataWithJSONWithReference()) - .withEmittedAt(NOW.toEpochMilli())); - + private static final AirbyteMessage MESSAGE_USERS1 = createRecordMessage(USERS_STREAM_NAME, getData()); + private static final AirbyteMessage MESSAGE_USERS2 = createRecordMessage(USERS_STREAM_NAME, getDataWithEmptyObjectAndArray()); + private static final AirbyteMessage MESSAGE_USERS3 = createRecordMessage(USERS_STREAM_NAME, getDataWithFormats()); + private static final AirbyteMessage MESSAGE_USERS4 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONDateTimeFormats()); + private static final AirbyteMessage MESSAGE_USERS5 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONWithReference()); + private static final AirbyteMessage MESSAGE_USERS6 = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{\"users\":null}")); + private static final AirbyteMessage EMPTY_MESSAGE = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{}")); private JsonNode config; @@ -122,6 +114,8 @@ void setup(final TestInfo info) throws IOException { MESSAGE_USERS3.getRecord().setNamespace(datasetId); MESSAGE_USERS4.getRecord().setNamespace(datasetId); MESSAGE_USERS5.getRecord().setNamespace(datasetId); + MESSAGE_USERS6.getRecord().setNamespace(datasetId); + EMPTY_MESSAGE.getRecord().setNamespace(datasetId); final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build(); dataset = bigquery.create(datasetInfo); @@ -258,12 +252,20 @@ void testJsonReferenceDefinition() throws Exception { final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector); consumer.accept(MESSAGE_USERS5); + consumer.accept(MESSAGE_USERS6); + consumer.accept(EMPTY_MESSAGE); consumer.close(); - final List usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME); - final JsonNode resultJson = usersActual.get(0); - assertEquals(usersActual.size(), 1); - assertEquals(extractJsonValues(resultJson, "users"), Set.of("{\"name\":\"John\",\"surname\":\"Adams\"}")); + final Set actual = + retrieveRecordsAsJson(USERS_STREAM_NAME).stream().flatMap(x -> extractJsonValues(x, "users").stream()).collect(Collectors.toSet()); + + final Set expected = Sets.set( + "{\"name\":\"John\",\"surname\":\"Adams\"}", + null // we expect one record to have not had the users field set + ); + + assertEquals(2, actual.size()); + assertEquals(expected, actual); } private Set extractJsonValues(final JsonNode node, final String attributeName) { @@ -282,6 +284,13 @@ private Set extractJsonValues(final JsonNode node, final String attribut return resultSet; } + private JsonNode removeAirbyteMetadataFields(JsonNode record) { + for (String airbyteMetadataField : AIRBYTE_METADATA_FIELDS) { + ((ObjectNode) record).remove(airbyteMetadataField); + } + return record; + } + private List retrieveRecordsAsJson(final String tableName) throws Exception { final QueryJobConfiguration queryConfig = QueryJobConfiguration @@ -294,6 +303,7 @@ private List retrieveRecordsAsJson(final String tableName) throws Exce .stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false) .map(v -> v.get("jsonValue").getStringValue()) .map(Jsons::deserialize) + .map(this::removeAirbyteMetadataFields) .collect(Collectors.toList()); } @@ -304,4 +314,10 @@ private static Stream schemaAndDataProvider() { arguments(getSchema(), MESSAGE_USERS2)); } + private static AirbyteMessage createRecordMessage(String stream, JsonNode data) { + return new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(stream) + .withData(data) + .withEmittedAt(NOW.toEpochMilli())); + } } diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java index 995de73ab7c1..a506d8c467c5 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.bigquery.util; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; public class BigQueryDenormalizedTestDataUtils { @@ -203,14 +204,11 @@ public static JsonNode getDataWithJSONDateTimeFormats() { } public static JsonNode getDataWithJSONWithReference() { - return Jsons.deserialize( - "{\n" - + " \"users\" :{\n" - + " \"name\": \"John\",\n" - + " \"surname\": \"Adams" - +"\"\n" - + " }\n" - + "}"); + return Jsons.jsonNode( + ImmutableMap.of("users", ImmutableMap.of( + "name", "John", + "surname", "Adams" + ))); } public static JsonNode getSchemaWithReferenceDefinition() { @@ -218,7 +216,7 @@ public static JsonNode getSchemaWithReferenceDefinition() { "{ \n" + " \"type\" : [ \"null\", \"object\" ],\n" + " \"properties\" : {\n" - +" \"users\": {\n" + + " \"users\": {\n" + " \"$ref\": \"#/definitions/users_\"\n" + " }\n" diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index a394a09e74fe..99873e08b91c 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -169,6 +169,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.10 | 2021-11-09 | [\#7804](https://github.com/airbytehq/airbyte/pull/7804) | handle null values in fields described by a $ref definition | | 0.1.9 | 2021-11-08 | [\#7736](https://github.com/airbytehq/airbyte/issues/7736) | Fixed the handling of ObjectNodes with $ref definition key | | 0.1.8 | 2021-10-27 | [\#7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery | | 0.1.7 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |