Skip to content

Commit

Permalink
🐛 Destination BQ Denormalized: handle null values in fields described…
Browse files Browse the repository at this point in the history
… by a `$ref` schema (#7804)
  • Loading branch information
sherifnada committed Nov 10, 2021
1 parent e26418b commit cf126ce
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- name: Cassandra
destinationDefinitionId: 707456df-6f4f-4ced-b5c6-03f73bcad1c5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.1.9"
- dockerImage: "airbyte/destination-bigquery-denormalized:0.1.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
5 changes: 3 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3428,7 +3428,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mixpanel:0.1.3"
- dockerImage: "airbyte/source-mixpanel:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel"
connectionSpecification:
Expand Down Expand Up @@ -3474,7 +3474,8 @@
start_date:
type: "string"
description: "The default value to use if no bookmark exists for an endpoint.\
\ Default is 1 year ago."
\ If this option is not set, the connector will replicate data from up\
\ to one year ago by default."
examples:
- "2021-11-16"
pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class BigQueryDenormalizedDestination extends BigQueryDestination {
private static final String TYPE_FIELD = "type";
private static final String FORMAT_FIELD = "format";
private static final String REF_DEFINITION_KEY = "$ref";
private static final Set<String> fieldsContainRefDefinitionValue = new HashSet<>();

private final Set<String> fieldsContainRefDefinitionValue = new HashSet<>();

@Override
protected String getTargetTableName(final String streamName) {
Expand Down Expand Up @@ -73,7 +74,7 @@ protected Schema getBigQuerySchema(final JsonNode jsonSchema) {
return com.google.cloud.bigquery.Schema.of(fieldList);
}

private static List<Field> getSchemaFields(final BigQuerySQLNameTransformer namingResolver, final JsonNode jsonSchema) {
private List<Field> getSchemaFields(final BigQuerySQLNameTransformer namingResolver, final JsonNode jsonSchema) {
Preconditions.checkArgument(jsonSchema.isObject() && jsonSchema.has(PROPERTIES_FIELD));
final ObjectNode properties = (ObjectNode) jsonSchema.get(PROPERTIES_FIELD);
List<Field> tmpFields = Jsons.keys(properties).stream()
Expand All @@ -96,7 +97,7 @@ private static List<Field> getSchemaFields(final BigQuerySQLNameTransformer nami
* Currently, AirByte doesn't support parsing value by $ref key definition.
* The issue to track this <a href="https://github.com/airbytehq/airbyte/issues/7725">7725</a>
*/
private static Consumer<String> addToRefList(ObjectNode properties) {
private Consumer<String> addToRefList(ObjectNode properties) {
return key -> {
if (properties.get(key).has(REF_DEFINITION_KEY)) {
fieldsContainRefDefinitionValue.add(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage
// replace ObjectNode with TextNode for fields with $ref definition key
// Do not need to iterate through all JSON Object nodes, only first nesting object.
if (!fieldsWithRefDefinition.isEmpty()) {
fieldsWithRefDefinition.forEach(key -> data.put(key, data.get(key).toString()));
fieldsWithRefDefinition.forEach(key -> {
if (data.get(key) != null && !data.get(key).isNull()){
data.put(key, data.get(key).toString());
}
});
}
data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString());
data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.junit.jupiter.params.provider.Arguments.arguments;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
Expand All @@ -32,17 +33,21 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.SyncMode;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.assertj.core.util.Sets;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -57,33 +62,20 @@
class BigQueryDenormalizedDestinationTest {

private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");
private static final Set<String> AIRBYTE_METADATA_FIELDS = Set.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, JavaBaseConstants.COLUMN_NAME_AB_ID);

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestinationTest.class);

private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb";
private static final Instant NOW = Instant.now();
private static final String USERS_STREAM_NAME = "users";
private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getData())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS2 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithEmptyObjectAndArray())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS3 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithFormats())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS4 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithJSONDateTimeFormats())
.withEmittedAt(NOW.toEpochMilli()));
private static final AirbyteMessage MESSAGE_USERS5 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getDataWithJSONWithReference())
.withEmittedAt(NOW.toEpochMilli()));

private static final AirbyteMessage MESSAGE_USERS1 = createRecordMessage(USERS_STREAM_NAME, getData());
private static final AirbyteMessage MESSAGE_USERS2 = createRecordMessage(USERS_STREAM_NAME, getDataWithEmptyObjectAndArray());
private static final AirbyteMessage MESSAGE_USERS3 = createRecordMessage(USERS_STREAM_NAME, getDataWithFormats());
private static final AirbyteMessage MESSAGE_USERS4 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONDateTimeFormats());
private static final AirbyteMessage MESSAGE_USERS5 = createRecordMessage(USERS_STREAM_NAME, getDataWithJSONWithReference());
private static final AirbyteMessage MESSAGE_USERS6 = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{\"users\":null}"));
private static final AirbyteMessage EMPTY_MESSAGE = createRecordMessage(USERS_STREAM_NAME, Jsons.deserialize("{}"));

private JsonNode config;

Expand Down Expand Up @@ -122,6 +114,8 @@ void setup(final TestInfo info) throws IOException {
MESSAGE_USERS3.getRecord().setNamespace(datasetId);
MESSAGE_USERS4.getRecord().setNamespace(datasetId);
MESSAGE_USERS5.getRecord().setNamespace(datasetId);
MESSAGE_USERS6.getRecord().setNamespace(datasetId);
EMPTY_MESSAGE.getRecord().setNamespace(datasetId);

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
dataset = bigquery.create(datasetInfo);
Expand Down Expand Up @@ -258,12 +252,20 @@ void testJsonReferenceDefinition() throws Exception {
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS5);
consumer.accept(MESSAGE_USERS6);
consumer.accept(EMPTY_MESSAGE);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
final JsonNode resultJson = usersActual.get(0);
assertEquals(usersActual.size(), 1);
assertEquals(extractJsonValues(resultJson, "users"), Set.of("{\"name\":\"John\",\"surname\":\"Adams\"}"));
final Set<String> actual =
retrieveRecordsAsJson(USERS_STREAM_NAME).stream().flatMap(x -> extractJsonValues(x, "users").stream()).collect(Collectors.toSet());

final Set<String> expected = Sets.set(
"{\"name\":\"John\",\"surname\":\"Adams\"}",
null // we expect one record to have not had the users field set
);

assertEquals(2, actual.size());
assertEquals(expected, actual);
}

private Set<String> extractJsonValues(final JsonNode node, final String attributeName) {
Expand All @@ -282,6 +284,13 @@ private Set<String> extractJsonValues(final JsonNode node, final String attribut
return resultSet;
}

private JsonNode removeAirbyteMetadataFields(JsonNode record) {
for (String airbyteMetadataField : AIRBYTE_METADATA_FIELDS) {
((ObjectNode) record).remove(airbyteMetadataField);
}
return record;
}

private List<JsonNode> retrieveRecordsAsJson(final String tableName) throws Exception {
final QueryJobConfiguration queryConfig =
QueryJobConfiguration
Expand All @@ -294,6 +303,7 @@ private List<JsonNode> retrieveRecordsAsJson(final String tableName) throws Exce
.stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false)
.map(v -> v.get("jsonValue").getStringValue())
.map(Jsons::deserialize)
.map(this::removeAirbyteMetadataFields)
.collect(Collectors.toList());
}

Expand All @@ -304,4 +314,10 @@ private static Stream<Arguments> schemaAndDataProvider() {
arguments(getSchema(), MESSAGE_USERS2));
}

private static AirbyteMessage createRecordMessage(String stream, JsonNode data) {
return new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(stream)
.withData(data)
.withEmittedAt(NOW.toEpochMilli()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.integrations.destination.bigquery.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;

public class BigQueryDenormalizedTestDataUtils {
Expand Down Expand Up @@ -203,22 +204,19 @@ public static JsonNode getDataWithJSONDateTimeFormats() {
}

public static JsonNode getDataWithJSONWithReference() {
return Jsons.deserialize(
"{\n"
+ " \"users\" :{\n"
+ " \"name\": \"John\",\n"
+ " \"surname\": \"Adams"
+"\"\n"
+ " }\n"
+ "}");
return Jsons.jsonNode(
ImmutableMap.of("users", ImmutableMap.of(
"name", "John",
"surname", "Adams"
)));
}

public static JsonNode getSchemaWithReferenceDefinition() {
return Jsons.deserialize(
"{ \n"
+ " \"type\" : [ \"null\", \"object\" ],\n"
+ " \"properties\" : {\n"
+" \"users\": {\n"
+ " \"users\": {\n"
+ " \"$ref\": \"#/definitions/users_\"\n"
+
" }\n"
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.10 | 2021-11-09 | [\#7804](https://github.com/airbytehq/airbyte/pull/7804) | handle null values in fields described by a $ref definition |
| 0.1.9 | 2021-11-08 | [\#7736](https://github.com/airbytehq/airbyte/issues/7736) | Fixed the handling of ObjectNodes with $ref definition key |
| 0.1.8 | 2021-10-27 | [\#7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery |
| 0.1.7 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |
Expand Down

0 comments on commit cf126ce

Please sign in to comment.