Skip to content

Commit

Permalink
BigQuery Denormalized : Cover arrays only if they are nested (#14023)
Browse files Browse the repository at this point in the history
* stop covering any array. cover only if we have array of arrays (restriction of BigQuery)

* add test with nested arrays and update existing tests

* [14058] fix datetime arrays

* [11109] cover only array of arrays by object instead of any array

* [14058] fix datetime format fail when we have an array of objects with datetime

* enable Array and Array+Object DATs

* reopen Issue #11166 and disable functionality

* Improve the tests by moving common part to Utils

* Add tests to check `Array of arrays` cases

* Increase version

* Doc

* format

* review update:
- update comment about reopen issue
- added test case with multiply array sub values
- fix nested arrays with datetime
- add test case for nested arrays with datetime

* fix date formatting

* disable testAnyOf test and upd comments

* remove some code duplication in the tests

* [14668] cover by tests the BigQuery inheritance limitation

* Make GCS implementation running same tests as standard impl

* Make common format for returning date values to cover DateTime and Timestamp columns by one test

* [15363] add backward compatibility for existing connections.

* Populate stream config and messages by tablespace. Now it's required inside processing.

* Compare only fields from the stream config

* Rework BigQueryUploaderFactory and UploaderConfig to have possibility make a decision about array formmater before we create temporary table

* Compare fields

* remove extra logging

* fix project:dataset format of the datasetId

* missing import

* remove debug logging

* fix log messages

* format

* 4 > 3
  • Loading branch information
DoNotPanicUA committed Sep 8, 2022
1 parent 81e8a51 commit 58f18c4
Show file tree
Hide file tree
Showing 33 changed files with 1,370 additions and 811 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,37 @@

package io.airbyte.integrations.destination.bigquery;

import static com.google.cloud.bigquery.Field.Mode.REPEATED;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Table;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.DefaultBigQueryDenormalizedRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.GcsBigQueryDenormalizedRecordFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.arrayformater.LegacyArrayFormatter;
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
import io.airbyte.integrations.destination.bigquery.uploader.config.UploaderConfig;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import io.airbyte.protocol.models.AirbyteStream;
import java.io.IOException;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDenormalizedDestination extends BigQueryDestination {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedDestination.class);

@Override
protected String getTargetTableName(final String streamName) {
// This BigQuery destination does not write to a staging "raw" table but directly to a normalized
Expand Down Expand Up @@ -67,6 +82,119 @@ protected Function<String, String> getTargetTableNameTransformer(final BigQueryS
return namingResolver::getIdentifier;
}

@Override
protected void putStreamIntoUploaderMap(AirbyteStream stream,
UploaderConfig uploaderConfig,
Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap)
throws IOException {
Table existingTable =
uploaderConfig.getBigQuery().getTable(uploaderConfig.getConfigStream().getStream().getNamespace(), uploaderConfig.getTargetTableName());
BigQueryRecordFormatter formatter = uploaderConfig.getFormatter();

if (existingTable != null) {
LOGGER.info("Target table already exists. Checking could we use the default destination processing.");
if (!compareSchemas((formatter.getBigQuerySchema()), existingTable.getDefinition().getSchema())) {
((DefaultBigQueryDenormalizedRecordFormatter) formatter).setArrayFormatter(new LegacyArrayFormatter());
LOGGER.warn("Existing target table has different structure with the new destination processing. Trying legacy implementation.");
} else {
LOGGER.info("Existing target table {} has equal structure with the destination schema. Using the default array processing.",
stream.getName());
}
} else {
LOGGER.info("Target table is not created yet. The default destination processing will be used.");
}

AbstractBigQueryUploader<?> uploader = BigQueryUploaderFactory.getUploader(uploaderConfig);
uploaderMap.put(
AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream),
uploader);
}

/**
* Compare calculated bigquery schema and existing schema of the table. Note! We compare only fields
* from the calculated schema to avoid manually created fields in the table.
*
* @param expectedSchema BigQuery schema of the table which we calculated using the stream schema
* config
* @param existingSchema BigQuery schema of the existing table (created by previous run)
* @return Are calculated fields same as we have in the existing table
*/
private boolean compareSchemas(com.google.cloud.bigquery.Schema expectedSchema, @Nullable com.google.cloud.bigquery.Schema existingSchema) {
if (expectedSchema != null && existingSchema == null) {
LOGGER.warn("Existing schema is null when we expect {}", expectedSchema);
return false;
} else if (expectedSchema == null && existingSchema == null) {
LOGGER.info("Existing and expected schemas are null.");
return true;
} else if (expectedSchema == null) {
LOGGER.warn("Expected schema is null when we have existing schema {}", existingSchema);
return false;
}

var expectedFields = expectedSchema.getFields();
var existingFields = existingSchema.getFields();

for (Field expectedField : expectedFields) {
var existingField = existingFields.get(expectedField.getName());
if (isDifferenceBetweenFields(expectedField, existingField)) {
LOGGER.warn("Expected field {} is different from existing field {}", expectedField, existingField);
return false;
}
}

LOGGER.info("Existing and expected schemas are equal.");
return true;
}

private boolean isDifferenceBetweenFields(Field expectedField, Field existingField) {
if (existingField == null) {
return true;
} else {
return !expectedField.getType().equals(existingField.getType())
|| !compareRepeatedMode(expectedField, existingField)
|| !compareSubFields(expectedField, existingField);
}
}

/**
* Compare field modes. Field can have on of four modes: NULLABLE, REQUIRED, REPEATED, null. Only
* the REPEATED mode difference is critical. The method fails only if at least one is REPEATED and
* the second one is not.
*
* @param expectedField expected field structure
* @param existingField existing field structure
* @return is critical difference in the field modes
*/
private boolean compareRepeatedMode(Field expectedField, Field existingField) {
var expectedMode = expectedField.getMode();
var existingMode = existingField.getMode();

if (expectedMode != null && expectedMode.equals(REPEATED) || existingMode != null && existingMode.equals(REPEATED)) {
return expectedMode != null && expectedMode.equals(existingMode);
} else {
return true;
}
}

private boolean compareSubFields(Field expectedField, Field existingField) {
var expectedSubFields = expectedField.getSubFields();
var existingSubFields = existingField.getSubFields();

if (expectedSubFields == null || expectedSubFields.isEmpty()) {
return true;
} else if (existingSubFields == null || existingSubFields.isEmpty()) {
return false;
} else {
for (Field expectedSubField : expectedSubFields) {
var existingSubField = existingSubFields.get(expectedSubField.getName());
if (isDifferenceBetweenFields(expectedSubField, existingSubField)) {
return false;
}
}
return true;
}
}

public static void main(final String[] args) throws Exception {
final Destination destination = new BigQueryDenormalizedDestination();
new IntegrationRunner(destination).run(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Builder;
Expand All @@ -26,6 +25,8 @@
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import io.airbyte.integrations.destination.bigquery.JsonSchemaFormat;
import io.airbyte.integrations.destination.bigquery.JsonSchemaType;
import io.airbyte.integrations.destination.bigquery.formatter.arrayformater.ArrayFormatter;
import io.airbyte.integrations.destination.bigquery.formatter.arrayformater.DefaultArrayFormatter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.IOException;
import java.util.Collections;
Expand All @@ -42,26 +43,39 @@ public class DefaultBigQueryDenormalizedRecordFormatter extends DefaultBigQueryR

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBigQueryDenormalizedRecordFormatter.class);

public static final String NESTED_ARRAY_FIELD = "big_query_array";
protected static final String PROPERTIES_FIELD = "properties";
private static final String TYPE_FIELD = "type";
public static final String PROPERTIES_FIELD = "properties";
public static final String TYPE_FIELD = "type";
private static final String ALL_OF_FIELD = "allOf";
private static final String ANY_OF_FIELD = "anyOf";
private static final String ARRAY_ITEMS_FIELD = "items";
private static final String FORMAT_FIELD = "format";
private static final String AIRBYTE_TYPE = "airbyte_type";
private static final String REF_DEFINITION_KEY = "$ref";
private static final ObjectMapper mapper = new ObjectMapper();

protected ArrayFormatter arrayFormatter;

public DefaultBigQueryDenormalizedRecordFormatter(final JsonNode jsonSchema, final StandardNameTransformer namingResolver) {
super(jsonSchema, namingResolver);
}

private ArrayFormatter getArrayFormatter() {
if (arrayFormatter == null) {
arrayFormatter = new DefaultArrayFormatter();
}
return arrayFormatter;
}

public void setArrayFormatter(ArrayFormatter arrayFormatter) {
this.arrayFormatter = arrayFormatter;
this.jsonSchema = formatJsonSchema(this.originalJsonSchema.deepCopy());
this.bigQuerySchema = getBigQuerySchema(jsonSchema);
}

@Override
protected JsonNode formatJsonSchema(final JsonNode jsonSchema) {
var modifiedJsonSchema = formatAllOfAndAnyOfFields(namingResolver, jsonSchema);
populateEmptyArrays(modifiedJsonSchema);
surroundArraysByObjects(modifiedJsonSchema);
var modifiedJsonSchema = jsonSchema.deepCopy(); // Issue #5912 is reopened (PR #11166) formatAllOfAndAnyOfFields(namingResolver, jsonSchema);
getArrayFormatter().populateEmptyArrays(modifiedJsonSchema);
getArrayFormatter().surroundArraysByObjects(modifiedJsonSchema);
return modifiedJsonSchema;
}

Expand All @@ -76,53 +90,6 @@ private JsonNode formatAllOfAndAnyOfFields(final StandardNameTransformer namingR
return modifiedSchema;
}

private List<JsonNode> findArrays(final JsonNode node) {
if (node != null) {
return node.findParents(TYPE_FIELD).stream()
.filter(
jsonNode -> {
final JsonNode type = jsonNode.get(TYPE_FIELD);
if (type.isArray()) {
final ArrayNode typeNode = (ArrayNode) type;
for (final JsonNode arrayTypeNode : typeNode) {
if (arrayTypeNode.isTextual() && arrayTypeNode.textValue().equals("array")) {
return true;
}
}
} else if (type.isTextual()) {
return jsonNode.asText().equals("array");
}
return false;
})
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}

private void populateEmptyArrays(final JsonNode node) {
findArrays(node).forEach(jsonNode -> {
if (!jsonNode.has(ARRAY_ITEMS_FIELD)) {
final ObjectNode nodeToChange = (ObjectNode) jsonNode;
nodeToChange.putObject(ARRAY_ITEMS_FIELD).putArray(TYPE_FIELD).add("string");
}
});
}

private void surroundArraysByObjects(final JsonNode node) {
findArrays(node).forEach(
jsonNode -> {
final JsonNode arrayNode = jsonNode.deepCopy();

final ObjectNode newNode = (ObjectNode) jsonNode;
newNode.removeAll();
newNode.putArray(TYPE_FIELD).add("object");
newNode.putObject(PROPERTIES_FIELD).set(NESTED_ARRAY_FIELD, arrayNode);

surroundArraysByObjects(arrayNode.get(ARRAY_ITEMS_FIELD));
});
}

@Override
public JsonNode formatRecord(final AirbyteRecordMessage recordMessage) {
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then
Expand Down Expand Up @@ -153,25 +120,32 @@ protected void addAirbyteColumns(final ObjectNode data, final AirbyteRecordMessa
data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt);
}

protected JsonNode formatData(final FieldList fields, final JsonNode root) {
private JsonNode formatData(final FieldList fields, final JsonNode root) {
// handles empty objects and arrays
if (fields == null) {
return root;
}
formatDateTimeFields(fields, root);
JsonNode formattedData;
if (root.isObject()) {
return getObjectNode(fields, root);
formattedData = getObjectNode(fields, root);
} else if (root.isArray()) {
return getArrayNode(fields, root);
formattedData = getArrayNode(fields, root);
} else {
return root;
formattedData = root;
}
formatDateTimeFields(fields, formattedData);

return formattedData;
}

protected void formatDateTimeFields(final FieldList fields, final JsonNode root) {
final List<String> dateTimeFields = BigQueryUtils.getDateTimeFieldsFromSchema(fields);
if (!dateTimeFields.isEmpty() && !root.isNull()) {
BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, (ObjectNode) root);
if (root.isArray()) {
root.forEach(jsonNode -> BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, jsonNode));
} else {
BigQueryUtils.transformJsonDateTimeToBigDataFormat(dateTimeFields, root);
}
}
}

Expand All @@ -185,11 +159,11 @@ private JsonNode getArrayNode(final FieldList fields, final JsonNode root) {
} else {
subFields = arrayField.getSubFields();
}
final JsonNode items = Jsons.jsonNode(MoreIterators.toList(root.elements()).stream()
List<JsonNode> arrayItems = MoreIterators.toList(root.elements()).stream()
.map(p -> formatData(subFields, p))
.collect(Collectors.toList()));
.toList();

return Jsons.jsonNode(ImmutableMap.of(NESTED_ARRAY_FIELD, items));
return getArrayFormatter().formatArrayItems(arrayItems);
}

private JsonNode getObjectNode(final FieldList fields, final JsonNode root) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.bigquery.formatter.arrayformater;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;

public interface ArrayFormatter {

void populateEmptyArrays(final JsonNode node);

void surroundArraysByObjects(final JsonNode node);

JsonNode formatArrayItems(final List<JsonNode> arrayItems);

}

0 comments on commit 58f18c4

Please sign in to comment.