Skip to content

Commit

Permalink
šŸ› Destination BigQuery(Denormalized): Fix processing arrays of records (
Browse files Browse the repository at this point in the history
#5261)

* [4487] Fix processing arrays of records
  • Loading branch information
DoNotPanicUA committed Aug 10, 2021
1 parent 40c12a9 commit 634aa3d
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,14 @@ protected JsonNode formatData(FieldList fields, JsonNode root) {
.collect(Collectors.toMap(namingResolver::getIdentifier,
key -> formatData(fields.get(namingResolver.getIdentifier(key)).getSubFields(), root.get(key)))));
} else if (root.isArray()) {
// Arrays can have only one field
Field arrayField = fields.get(0);
// If an array of records, we should use subfields
FieldList subFields = (arrayField.getSubFields() == null || arrayField.getSubFields().isEmpty() ? fields : arrayField.getSubFields());
final JsonNode items = Jsons.jsonNode(MoreIterators.toList(root.elements()).stream()
.map(p -> formatData(fields, p))
.map(p -> formatData(subFields, p))
.collect(Collectors.toList()));

// "Array of Array of" (nested arrays) are not permitted by BigQuery ("Array of Record of Array of"
// is)
// Turn all "Array of" into "Array of Record of" instead
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.bigquery;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.SyncMode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BigQueryDenormalizedDestinationTest {

private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestinationTest.class);

private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb";
private static final Instant NOW = Instant.now();
private static final String USERS_STREAM_NAME = "users";
private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(getData())
.withEmittedAt(NOW.toEpochMilli()));

private JsonNode config;

private BigQuery bigquery;
private Dataset dataset;
private ConfiguredAirbyteCatalog catalog;

private boolean tornDown = true;

@BeforeEach
void setup(TestInfo info) throws IOException {
if (info.getDisplayName().equals("testSpec()")) {
return;
}

if (!Files.exists(CREDENTIALS_PATH)) {
throw new IllegalStateException(
"Must provide path to a big query credentials file. By default {module-root}/config/credentials.json. Override by setting setting path with the CREDENTIALS_PATH constant.");
}
final String credentialsJsonString = new String(Files.readAllBytes(CREDENTIALS_PATH));
final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString);

final String projectId = credentialsJson.get(BigQueryDestination.CONFIG_PROJECT_ID).asText();
final ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream(new ByteArrayInputStream(credentialsJsonString.getBytes()));
bigquery = BigQueryOptions.newBuilder()
.setProjectId(projectId)
.setCredentials(credentials)
.build()
.getService();

final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);
final String datasetLocation = "EU";
MESSAGE_USERS1.getRecord().setNamespace(datasetId);

catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
.withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchema()))
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE)));

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
dataset = bigquery.create(datasetInfo);

config = Jsons.jsonNode(ImmutableMap.builder()
.put(BigQueryDestination.CONFIG_PROJECT_ID, projectId)
.put(BigQueryDestination.CONFIG_CREDS, credentialsJsonString)
.put(BigQueryDestination.CONFIG_DATASET_ID, datasetId)
.put(BigQueryDestination.CONFIG_DATASET_LOCATION, datasetLocation)
.put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10)
.build());

tornDown = false;
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
if (!tornDown) {
tearDownBigQuery();
}
}));

}

@AfterEach
void tearDown(TestInfo info) {
if (info.getDisplayName().equals("testSpec()")) {
return;
}

tearDownBigQuery();
}

private void tearDownBigQuery() {
// allows deletion of a dataset that has contents
final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents();

final boolean success = bigquery.delete(dataset.getDatasetId(), option);
if (success) {
LOGGER.info("BQ Dataset " + dataset + " deleted...");
} else {
LOGGER.info("BQ Dataset cleanup for " + dataset + " failed!");
}

tornDown = true;
}

@Test
void testNestedWrite() throws Exception {
final BigQueryDestination destination = new BigQueryDenormalizedDestination();
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);

consumer.accept(MESSAGE_USERS1);
consumer.close();

final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
final JsonNode expectedUsersJson = MESSAGE_USERS1.getRecord().getData();
assertEquals(usersActual.size(), 1);
final JsonNode resultJson = usersActual.get(0);
assertEquals(extractJsonValues(resultJson, "name"), extractJsonValues(expectedUsersJson, "name"));
assertEquals(extractJsonValues(resultJson, "grants"), extractJsonValues(expectedUsersJson, "grants"));
assertEquals(extractJsonValues(resultJson, "domain"), extractJsonValues(expectedUsersJson, "domain"));

}

private Set<String> extractJsonValues(JsonNode node, String attributeName) {
List<JsonNode> valuesNode = node.findValues(attributeName);
Set<String> resultSet = new HashSet<>();
valuesNode.forEach(jsonNode -> {
if (jsonNode.isArray()) {
jsonNode.forEach(arrayNodeValue -> resultSet.add(arrayNodeValue.textValue()));
} else if (jsonNode.isObject()) {
resultSet.addAll(extractJsonValues(jsonNode, "value"));
} else {
resultSet.add(jsonNode.textValue());
}
});

return resultSet;
}

private List<JsonNode> retrieveRecordsAsJson(String tableName) throws Exception {
QueryJobConfiguration queryConfig =
QueryJobConfiguration
.newBuilder(
String.format("select TO_JSON_STRING(t) as jsonValue from %s.%s t;", dataset.getDatasetId().getDataset(), tableName.toLowerCase()))
.setUseLegacySql(false).build();

BigQueryUtils.executeQuery(bigquery, queryConfig);

return StreamSupport
.stream(BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults().iterateAll().spliterator(), false)
.map(v -> v.get("jsonValue").getStringValue())
.map(Jsons::deserialize)
.collect(Collectors.toList());
}

private JsonNode getSchema() {
return Jsons.deserialize(
"{\n"
+ " \"type\": [\n"
+ " \"object\"\n"
+ " ],\n"
+ " \"properties\": {\n"
+ " \"name\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " },\n"
+ " \"permissions\": {\n"
+ " \"type\": [\n"
+ " \"array\"\n"
+ " ],\n"
+ " \"items\": {\n"
+ " \"type\": [\n"
+ " \"object\"\n"
+ " ],\n"
+ " \"properties\": {\n"
+ " \"domain\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " },\n"
+ " \"grants\": {\n"
+ " \"type\": [\n"
+ " \"array\"\n"
+ " ],\n"
+ " \"items\": {\n"
+ " \"type\": [\n"
+ " \"string\"\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}");

}

private static JsonNode getData() {
return Jsons.deserialize(
"{\n"
+ " \"name\": \"Andrii\",\n"
+ " \"permissions\": [\n"
+ " {\n"
+ " \"domain\": \"abs\",\n"
+ " \"grants\": [\n"
+ " \"admin\"\n"
+ " ]\n"
+ " },\n"
+ " {\n"
+ " \"domain\": \"tools\",\n"
+ " \"grants\": [\n"
+ " \"read\", \"write\"\n"
+ " ]\n"
+ " }\n"
+ " ]\n"
+ "}");

}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.3 | 2021-08-07 | [#5261](https://github.com/airbytehq/airbyte/pull/5261) | šŸ› Destination BigQuery(Denormalized): Fix processing arrays of records |
| 0.1.2 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
| 0.1.1 | 2021-06-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer |
| 0.1.0 | 2021-06-21 | [#4176](https://github.com/airbytehq/airbyte/pull/4176) | Destination using Typed Struct and Repeated fields |

0 comments on commit 634aa3d

Please sign in to comment.