Skip to content

Commit

Permalink
Normalization - BigQuery use json_extract_string_array for array of…
Browse files Browse the repository at this point in the history
… simple types (#13289)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
Co-authored-by: andrii.leonets <aleonets@gmail.com>
Co-authored-by: Andrii Leonets <30464745+DoNotPanicUA@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 10, 2022
1 parent 5df20b7 commit 2daaf5b
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
Expand All @@ -19,6 +20,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.SourceOperations;
import io.airbyte.db.util.JsonUtil;
import io.airbyte.protocol.models.JsonSchemaType;
import java.text.DateFormat;
import java.text.ParseException;
Expand All @@ -43,20 +45,19 @@ public JsonNode rowToJson(final BigQueryResultSet bigQueryResultSet) {
return jsonNode;
}

private void fillObjectNode(final String fieldName, final StandardSQLTypeName fieldType, final FieldValue fieldValue, final ObjectNode node) {
private void fillObjectNode(final String fieldName, final StandardSQLTypeName fieldType, final FieldValue fieldValue, final ContainerNode<?> node) {
switch (fieldType) {
case BOOL -> node.put(fieldName, fieldValue.getBooleanValue());
case INT64 -> node.put(fieldName, fieldValue.getLongValue());
case FLOAT64 -> node.put(fieldName, fieldValue.getDoubleValue());
case NUMERIC -> node.put(fieldName, fieldValue.getNumericValue());
case BIGNUMERIC -> node.put(fieldName, returnNullIfInvalid(fieldValue::getNumericValue));
case STRING -> node.put(fieldName, fieldValue.getStringValue());
case BYTES -> node.put(fieldName, fieldValue.getBytesValue());
case DATE -> node.put(fieldName, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATE_FORMAT)));
case DATETIME -> node.put(fieldName, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATETIME_FORMAT)));
case TIMESTAMP -> node.put(fieldName, toISO8601String(fieldValue.getTimestampValue() / 1000));
case TIME -> node.put(fieldName, fieldValue.getStringValue());
default -> node.put(fieldName, fieldValue.getStringValue());
case BOOL -> JsonUtil.putBooleanValueIntoJson(node, fieldValue.getBooleanValue(), fieldName);
case INT64 -> JsonUtil.putLongValueIntoJson(node, fieldValue.getLongValue(), fieldName);
case FLOAT64 -> JsonUtil.putDoubleValueIntoJson(node, fieldValue.getDoubleValue(), fieldName);
case NUMERIC -> JsonUtil.putBigDecimalValueIntoJson(node, fieldValue.getNumericValue(), fieldName);
case BIGNUMERIC -> JsonUtil.putBigDecimalValueIntoJson(node, returnNullIfInvalid(fieldValue::getNumericValue), fieldName);
case STRING, TIME -> JsonUtil.putStringValueIntoJson(node, fieldValue.getStringValue(), fieldName);
case BYTES -> JsonUtil.putBytesValueIntoJson(node, fieldValue.getBytesValue(), fieldName);
case DATE -> JsonUtil.putStringValueIntoJson(node, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATE_FORMAT)), fieldName);
case DATETIME -> JsonUtil.putStringValueIntoJson(node, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATETIME_FORMAT)), fieldName);
case TIMESTAMP -> JsonUtil.putStringValueIntoJson(node, toISO8601String(fieldValue.getTimestampValue() / 1000), fieldName);
default -> JsonUtil.putStringValueIntoJson(node, fieldValue.getStringValue(), fieldName);
}
}

Expand All @@ -74,7 +75,7 @@ private void setJsonField(final Field field, final FieldValue fieldValue, final
final FieldList subFields = field.getSubFields();
// Array of primitive
if (subFields == null || subFields.isEmpty()) {
fieldValue.getRepeatedValue().forEach(arrayFieldValue -> fillObjectNode(fieldName, fieldType, arrayFieldValue, arrayNode.addObject()));
fieldValue.getRepeatedValue().forEach(arrayFieldValue -> fillObjectNode(fieldName, fieldType, arrayFieldValue, arrayNode));
// Array of records
} else {
for (final FieldValue arrayFieldValue : fieldValue.getRepeatedValue()) {
Expand Down
70 changes: 70 additions & 0 deletions airbyte-db/db-lib/src/main/java/io/airbyte/db/util/JsonUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.airbyte.db.util;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.math.BigDecimal;

public class JsonUtil {

public static void putBooleanValueIntoJson(final ContainerNode<?> node, final boolean value, final String fieldName) {
if (node instanceof ArrayNode) {
((ArrayNode) node).add(value);
} else if (node instanceof ObjectNode) {
((ObjectNode) node).put(fieldName, value);
} else {
throw new RuntimeException("Can't populate the node type : " + node.getClass().getName());
}
}

public static void putLongValueIntoJson(final ContainerNode<?> node, final long value, final String fieldName) {
if (node instanceof ArrayNode) {
((ArrayNode) node).add(value);
} else if (node instanceof ObjectNode) {
((ObjectNode) node).put(fieldName, value);
} else {
throw new RuntimeException("Can't populate the node type : " + node.getClass().getName());
}
}

public static void putDoubleValueIntoJson(final ContainerNode<?> node, final double value, final String fieldName) {
if (node instanceof ArrayNode) {
((ArrayNode) node).add(value);
} else if (node instanceof ObjectNode) {
((ObjectNode) node).put(fieldName, value);
} else {
throw new RuntimeException("Can't populate the node type : " + node.getClass().getName());
}
}

public static void putBigDecimalValueIntoJson(final ContainerNode<?> node, final BigDecimal value, final String fieldName) {
if (node instanceof ArrayNode) {
((ArrayNode) node).add(value);
} else if (node instanceof ObjectNode) {
((ObjectNode) node).put(fieldName, value);
} else {
throw new RuntimeException("Can't populate the node type : " + node.getClass().getName());
}
}

public static void putStringValueIntoJson(final ContainerNode<?> node, final String value, final String fieldName) {
if (node instanceof ArrayNode) {
((ArrayNode) node).add(value);
} else if (node instanceof ObjectNode) {
((ObjectNode) node).put(fieldName, value);
} else {
throw new RuntimeException("Can't populate the node type : " + node.getClass().getName());
}
}

public static void putBytesValueIntoJson(final ContainerNode<?> node, final byte[] value, final String fieldName) {
if (node instanceof ArrayNode) {
((ArrayNode) node).add(value);
} else if (node instanceof ObjectNode) {
((ObjectNode) node).put(fieldName, value);
} else {
throw new RuntimeException("Can't populate the node type : " + node.getClass().getName());
}
}

}
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,18 @@
{% macro clickhouse__json_extract_array(json_column, json_path_list, normalized_json_path) -%}
JSONExtractArrayRaw(assumeNotNull({{ json_column }}), {{ format_json_path(json_path_list) }})
{%- endmacro %}

{# json_extract_string_array ------------------------------------------------- #}

{% macro json_extract_string_array(json_column, json_path_list, normalized_json_path) -%}
{{ adapter.dispatch('json_extract_string_array')(json_column, json_path_list, normalized_json_path) }}
{%- endmacro %}

{% macro default__json_extract_string_array(json_column, json_path_list, normalized_json_path) -%}
json_extract_array({{ json_column }}, {{ format_json_path(json_path_list) }})
{%- endmacro %}

# https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_extract_string_array
{% macro bigquery__json_extract_string_array(json_column, json_path_list, normalized_json_path) -%}
json_extract_string_array({{ json_column }}, {{ format_json_path(normalized_json_path) }})
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ def extract_json_column(property_name: str, json_column_name: str, definition: D
if "type" in definition:
if is_array(definition["type"]):
json_extract = jinja_call(f"json_extract_array({json_column_name}, {json_path}, {normalized_json_path})")
if is_simple_property(definition.get("items", {"type": "object"}).get("type", "object")):
json_extract = jinja_call(f"json_extract_string_array({json_column_name}, {json_path}, {normalized_json_path})")
elif is_object(definition["type"]):
json_extract = jinja_call(f"json_extract('{table_alias}', {json_column_name}, {json_path}, {normalized_json_path})")
elif is_simple_property(definition["type"]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ protected boolean compareJsonNodes(final JsonNode expectedValue, final JsonNode
return compareDateTimeValues(expectedValue.asText(), actualValue.asText());
} else if (isDateValue(expectedValue.asText())) {
return compareDateValues(expectedValue.asText(), actualValue.asText());
} else if (expectedValue.isArray() && actualValue.isArray()) {
} else if (expectedValue.isArray()) {
return compareArrays(expectedValue, actualValue);
} else if (expectedValue.isObject() && actualValue.isObject()) {
} else if (expectedValue.isObject()) {
compareObjects(expectedValue, actualValue);
return true;
} else {
LOGGER.warn("Default comparison method!");
return compareString(expectedValue, actualValue);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ protected boolean supportBasicDataTypeTest() {

@Override
protected boolean supportArrayDataTypeTest() {
// #13154 Normalization issue
return false;
return true;
}

@Override
Expand Down Expand Up @@ -182,7 +181,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.bigquery;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
Expand Down Expand Up @@ -49,6 +51,19 @@ private LocalDateTime parseDateTime(String dateTimeValue) {
}
}

@Override
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
if (destinationValue != null) {
if (destinationValue.matches(".+Z")) {
return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC);
} else {
return ZonedDateTime.parse(destinationValue, getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant(ZoneOffset.UTC);
}
} else {
return null;
}
}

@Override
protected boolean compareDateTimeValues(String expectedValue, String actualValue) {
var destinationDate = parseDateTime(actualValue);
Expand All @@ -70,11 +85,6 @@ protected boolean compareDateValues(String expectedValue, String actualValue) {
return expectedDate.equals(destinationDate);
}

@Override
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(BIGQUERY_DATETIME_FORMAT)), ZoneOffset.UTC);
}

@Override
protected boolean compareDateTimeWithTzValues(String airbyteMessageValue, String destinationValue) {
// #13123 Normalization issue
Expand All @@ -92,4 +102,9 @@ private ZonedDateTime getBrokenDate() {
return ZonedDateTime.of(1583, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
}

@Override
protected void compareObjects(JsonNode expectedObject, JsonNode actualObject) {
JsonNode actualJsonNode = (actualObject.isTextual() ? Jsons.deserialize(actualObject.textValue()) : actualObject);
super.compareObjects(expectedObject, actualJsonNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ protected void initTests() {
.airbyteType(JsonSchemaType.STRING)
.createTablePatternSql(CREATE_SQL_PATTERN)
.addInsertValues("['a', 'b']")
.addExpectedValues("[{\"test_column\":\"a\"},{\"test_column\":\"b\"}]")
.addExpectedValues("[\"a\",\"b\"]")
.build());

addDataTypeTestData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class NormalizationRunnerFactory {

public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization";
public static final String NORMALIZATION_VERSION = "0.2.1";
public static final String NORMALIZATION_VERSION = "0.2.2";

static final Map<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>> NORMALIZATION_MAPPING =
ImmutableMap.<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>>builder()
Expand Down
1 change: 1 addition & 0 deletions docs/understanding-airbyte/basic-normalization.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need

| Airbyte Version | Normalization Version | Date | Pull Request | Subject |
|:----------------| :--- | :--- | :--- | :--- |
| | 0.2.2 | 2022-06-02 | [\#13289](https://github.com/airbytehq/airbyte/pull/13289) | BigQuery use `json_extract_string_array` for array of simple type elements |
| | 0.2.1 | 2022-05-17 | [\#12924](https://github.com/airbytehq/airbyte/pull/12924) | Fixed checking --event-buffer-size on old dbt crashed entrypoint.sh |
| | 0.2.0 | 2022-05-15 | [\#12745](https://github.com/airbytehq/airbyte/pull/12745) | Snowflake: add datetime without timezone |
| | 0.1.78 | 2022-05-06 | [\#12305](https://github.com/airbytehq/airbyte/pull/12305) | Mssql: use NVARCHAR and datetime2 by default |
Expand Down

0 comments on commit 2daaf5b

Please sign in to comment.