Skip to content

Commit

Permalink
🐛 Destination S3: avro and parquet formats have issues with JsonToAvr…
Browse files Browse the repository at this point in the history
…oSchemaConverter (#8574)

* add namespace to avro record type

* refactoring

* Add unit tests

* added CHANGELOG

* fix typo in method name

* fix typo in method name

* fix for jdk 17

* created recursive keys adding

* refactoring

* format code

* cleanup Dockerfile

* refactoring

* removed unneded tests case

* updated namespace generation

* removed unneeded method from AvroNameTransformer

* resolved merge conflicts

* removed unused imports

* reformat the code

* bump version

* bump Bigquery Denormalized version

Co-authored-by: vmaltsev <vitalii.maltsev@globallogic.com>
Co-authored-by: Oleksandr Sheheda <alexandrshegeda@gmail.com>
  • Loading branch information
3 people committed Dec 21, 2021
1 parent fd716d7 commit a3100cb
Show file tree
Hide file tree
Showing 17 changed files with 484 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "079d5540-f236-4294-ba7c-ade8fd918496",
"name": "BigQuery (denormalized typed struct)",
"dockerRepository": "airbyte/destination-bigquery-denormalized",
"dockerImageTag": "0.1.8",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery",
"icon": "bigquery.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.14",
"dockerImageTag": "0.2.2",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3",
"icon": "s3.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "ca8f6566-e555-4b40-943a-545bf123117a",
"name": "Google Cloud Storage (GCS)",
"dockerRepository": "airbyte/destination-gcs",
"dockerImageTag": "0.1.14",
"dockerImageTag": "0.1.17",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/gcs",
"icon": "googlecloudstorage.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.11
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
icon: bigquery.svg
- name: Cassandra
Expand Down Expand Up @@ -60,7 +60,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.1.16
dockerImageTag: 0.1.17
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
- name: Google PubSub
Expand Down Expand Up @@ -167,7 +167,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
- name: SFTP-JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.version=0.1.17
LABEL io.airbyte.name=airbyte/destination-gcs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public GcsAvroWriter(final GcsDestinationConfig config,

Schema schema = (airbyteSchema == null ? GcsUtils.getDefaultAvroSchema(stream.getName(), stream.getNamespace(), true)
: new JsonToAvroSchemaConverter().getAvroSchema(airbyteSchema, stream.getName(),
stream.getNamespace(), true, false, false));
stream.getNamespace(), true, false, false,true));
LOGGER.info("Avro schema : {}", schema);
final String outputFilename = BaseGcsWriter.getOutputFilename(uploadTimestamp, S3Format.AVRO);
objectKey = String.join("/", outputPrefix, outputFilename);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public S3Writer create(final GcsDestinationConfig config,
return new GcsAvroWriter(config, s3Client, configuredStream, uploadTimestamp, AvroConstants.JSON_CONVERTER, stream.getJsonSchema());
} else {
final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true);
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true, true);

LOGGER.info("Avro schema for stream {}: {}", stream.getName(), avroSchema.toString(false));
return new GcsParquetWriter(config, s3Client, configuredStream, uploadTimestamp, avroSchema, AvroConstants.JSON_CONVERTER);
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.version=0.2.2
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.s3.avro;

import static io.airbyte.integrations.destination.s3.util.AvroRecordHelper.obtainPaths;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -46,6 +48,7 @@ public class JsonToAvroSchemaConverter {
.addToSchema(Schema.create(Schema.Type.LONG));

private final Map<String, String> standardizedNames = new HashMap<>();
private final Map<JsonNode, String> jsonNodePathMap = new HashMap<>();

static List<JsonSchemaType> getNonNullTypes(final String fieldName, final JsonNode fieldDefinition) {
return getTypes(fieldName, fieldDefinition).stream()
Expand Down Expand Up @@ -96,8 +99,9 @@ public Map<String, String> getStandardizedNames() {
public Schema getAvroSchema(final JsonNode jsonSchema,
final String name,
@Nullable final String namespace,
final boolean appendAirbyteFields) {
return getAvroSchema(jsonSchema, name, namespace, appendAirbyteFields, true, true);
final boolean appendAirbyteFields,
final boolean isRootNode) {
return getAvroSchema(jsonSchema, name, namespace, appendAirbyteFields, true, true, isRootNode);
}

/**
Expand All @@ -108,9 +112,13 @@ public Schema getAvroSchema(final JsonNode jsonSchema,
@Nullable final String namespace,
final boolean appendAirbyteFields,
final boolean appendExtraProps,
final boolean addStringToLogicalTypes) {
final boolean addStringToLogicalTypes,
final boolean isRootNode) {
final String stdName = AvroConstants.NAME_TRANSFORMER.getIdentifier(name);
RecordBuilder<Schema> builder = SchemaBuilder.record(stdName);
if (isRootNode) {
obtainPaths("", jsonSchema, jsonNodePathMap);
}
if (!stdName.equals(name)) {
standardizedNames.put(name, stdName);
LOGGER.warn("Schema name contains illegal character(s) and is standardized: {} -> {}", name,
Expand Down Expand Up @@ -226,7 +234,7 @@ Schema getSingleFieldType(final String fieldName,
String.format("Array field %s has invalid items property: %s", fieldName, items));
}
}
case OBJECT -> fieldSchema = getAvroSchema(fieldDefinition, fieldName, null, false, appendExtraProps, addStringToLogicalTypes);
case OBJECT -> fieldSchema = getAvroSchema(fieldDefinition, fieldName, jsonNodePathMap.get(fieldDefinition), false, appendExtraProps, addStringToLogicalTypes, false);
default -> throw new IllegalStateException(
String.format("Unexpected type for field %s: %s", fieldName, fieldType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
package io.airbyte.integrations.destination.s3.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Helper methods for unit tests. This is needed by multiple modules, so it is in the src directory.
Expand All @@ -18,7 +23,7 @@ public class AvroRecordHelper {

public static JsonFieldNameUpdater getFieldNameUpdater(final String streamName, final String namespace, final JsonNode streamSchema) {
final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
schemaConverter.getAvroSchema(streamSchema, streamName, namespace, true);
schemaConverter.getAvroSchema(streamSchema, streamName, namespace, true, true);
return new JsonFieldNameUpdater(schemaConverter.getStandardizedNames());
}

Expand Down Expand Up @@ -47,4 +52,32 @@ public static JsonNode pruneAirbyteJson(final JsonNode input) {
return output;
}

public static void obtainPaths(String currentPath, JsonNode jsonNode, Map<JsonNode, String> jsonNodePathMap) {
if (jsonNode.isObject()) {
ObjectNode objectNode = (ObjectNode) jsonNode;
Iterator<Map.Entry<String, JsonNode>> iter = objectNode.fields();
String pathPrefix = currentPath.isEmpty() ? "" : currentPath + "/";
String[] pathFieldsArray = currentPath.split("/");
String parent = Arrays.stream(pathFieldsArray)
.filter(x -> !x.equals("items"))
.filter(x -> !x.equals("properties"))
.filter(x -> !x.equals(pathFieldsArray[pathFieldsArray.length - 1]))
.collect(Collectors.joining("."));
if (!parent.isEmpty()) {
jsonNodePathMap.put(jsonNode, parent);
}
while (iter.hasNext()) {
Map.Entry<String, JsonNode> entry = iter.next();
obtainPaths(pathPrefix + entry.getKey(), entry.getValue(), jsonNodePathMap);
}
} else if (jsonNode.isArray()) {
ArrayNode arrayNode = (ArrayNode) jsonNode;

for (int i = 0; i < arrayNode.size(); i++) {
String arrayPath = currentPath + "/" + i;
obtainPaths(arrayPath, arrayNode.get(i), jsonNodePathMap);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public S3Writer create(final S3DestinationConfig config,
LOGGER.info("Json schema for stream {}: {}", stream.getName(), stream.getJsonSchema());

final JsonToAvroSchemaConverter schemaConverter = new JsonToAvroSchemaConverter();
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true);
final Schema avroSchema = schemaConverter.getAvroSchema(stream.getJsonSchema(), stream.getName(), stream.getNamespace(), true, true);

LOGGER.info("Avro schema for stream {}: {}", stream.getName(), avroSchema.toString(false));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testJsonAvroConversion(final String schemaName,
final JsonNode avroSchema,
final JsonNode avroObject)
throws Exception {
final Schema actualAvroSchema = SCHEMA_CONVERTER.getAvroSchema(jsonSchema, schemaName, namespace, appendAirbyteFields);
final Schema actualAvroSchema = SCHEMA_CONVERTER.getAvroSchema(jsonSchema, schemaName, namespace, appendAirbyteFields, true);
assertEquals(
avroSchema,
Jsons.deserialize(actualAvroSchema.toString()),
Expand Down

0 comments on commit a3100cb

Please sign in to comment.