Skip to content

Commit

Permalink
馃帀 Destination S3: support anyOf allOf and oneOf (#4613)
Browse files Browse the repository at this point in the history
* Support combined restrictions in json schema

* Bump s3 version

* Add more test cases

* Update changelog

* Add more test cases

* Update documentation

* Format code
  • Loading branch information
tuliren committed Jul 7, 2021
1 parent 3aebe55 commit 646aae1
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
name: S3
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum JsonSchemaType {
BOOLEAN("boolean", true, Schema.Type.BOOLEAN),
NULL("null", true, Schema.Type.NULL),
OBJECT("object", false, Schema.Type.RECORD),
ARRAY("array", false, Schema.Type.ARRAY);
ARRAY("array", false, Schema.Type.ARRAY),
COMBINED("combined", false, Schema.Type.UNION);

private final String jsonSchemaType;
private final boolean isPrimitive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.s3.avro;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -34,6 +35,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -64,23 +66,46 @@ public class JsonToAvroSchemaConverter {

private final Map<String, String> standardizedNames = new HashMap<>();

static List<JsonSchemaType> getNonNullTypes(String fieldName, JsonNode typeProperty) {
return getTypes(fieldName, typeProperty).stream()
static List<JsonSchemaType> getNonNullTypes(String fieldName, JsonNode fieldDefinition) {
return getTypes(fieldName, fieldDefinition).stream()
.filter(type -> type != JsonSchemaType.NULL).collect(Collectors.toList());
}

static List<JsonSchemaType> getTypes(String fieldName, JsonNode typeProperty) {
if (typeProperty == null) {
static List<JsonSchemaType> getTypes(String fieldName, JsonNode fieldDefinition) {
Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
if (combinedRestriction.isPresent()) {
return Collections.singletonList(JsonSchemaType.COMBINED);
}

JsonNode typeProperty = fieldDefinition.get("type");
if (typeProperty == null || typeProperty.isNull()) {
throw new IllegalStateException(String.format("Field %s has no type", fieldName));
} else if (typeProperty.isArray()) {
}

if (typeProperty.isArray()) {
return MoreIterators.toList(typeProperty.elements()).stream()
.map(s -> JsonSchemaType.fromJsonSchemaType(s.asText()))
.collect(Collectors.toList());
} else if (typeProperty.isTextual()) {
}

if (typeProperty.isTextual()) {
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText()));
} else {
throw new IllegalStateException("Unexpected type: " + typeProperty);
}

throw new IllegalStateException("Unexpected type: " + typeProperty);
}

static Optional<JsonNode> getCombinedRestriction(JsonNode fieldDefinition) {
if (fieldDefinition.has("anyOf")) {
return Optional.of(fieldDefinition.get("anyOf"));
}
if (fieldDefinition.has("allOf")) {
return Optional.of(fieldDefinition.get("allOf"));
}
if (fieldDefinition.has("oneOf")) {
return Optional.of(fieldDefinition.get("oneOf"));
}
return Optional.empty();
}

public Map<String, String> getStandardizedNames() {
Expand Down Expand Up @@ -141,33 +166,27 @@ public Schema getAvroSchema(JsonNode jsonSchema,
return assembler.endRecord();
}

Schema getSingleFieldType(String fieldName,
JsonSchemaType fieldType,
JsonNode fieldDefinition,
boolean canBeComposite) {
Schema getSingleFieldType(String fieldName, JsonSchemaType fieldType, JsonNode fieldDefinition) {
Preconditions
.checkState(fieldType != JsonSchemaType.NULL, "Null types should have been filtered out");
Preconditions
.checkState(canBeComposite || fieldType.isPrimitive(), "Field %s has invalid type %s",
fieldName, fieldType);

Schema fieldSchema;
switch (fieldType) {
case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case COMBINED -> {
Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
List<Schema> unionTypes = getSchemasFromTypes(fieldName, (ArrayNode) combinedRestriction.get());
fieldSchema = Schema.createUnion(unionTypes);
}
case ARRAY -> {
JsonNode items = fieldDefinition.get("items");
Preconditions.checkNotNull(items, "Array field %s misses the items property.", fieldName);

if (items.isObject()) {
fieldSchema = Schema
.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items));
fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items));
} else if (items.isArray()) {
List<Schema> arrayElementTypes = MoreIterators.toList(items.elements())
.stream()
.flatMap(itemDefinition -> getNonNullTypes(fieldName, itemDefinition.get("type")).stream()
.map(type -> getSingleFieldType(fieldName, type, itemDefinition, false)))
.distinct()
.collect(Collectors.toList());
arrayElementTypes.add(0, Schema.create(Schema.Type.NULL));
List<Schema> arrayElementTypes = getSchemasFromTypes(fieldName, (ArrayNode) items);
arrayElementTypes.add(0, Schema.create(Type.NULL));
fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes));
} else {
throw new IllegalStateException(
Expand All @@ -181,15 +200,30 @@ Schema getSingleFieldType(String fieldName,
return fieldSchema;
}

List<Schema> getSchemasFromTypes(String fieldName, ArrayNode types) {
return MoreIterators.toList(types.elements())
.stream()
.flatMap(definition -> getNonNullTypes(fieldName, definition).stream().flatMap(type -> {
Schema singleFieldSchema = getSingleFieldType(fieldName, type, definition);
if (singleFieldSchema.isUnion()) {
return singleFieldSchema.getTypes().stream();
} else {
return Stream.of(singleFieldSchema);
}
}))
.distinct()
.collect(Collectors.toList());
}

/**
* @param fieldDefinition - Json schema field definition. E.g. { type: "number" }.
*/
Schema getNullableFieldTypes(String fieldName, JsonNode fieldDefinition) {
// Filter out null types, which will be added back in the end.
List<Schema> nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition.get("type"))
List<Schema> nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition)
.stream()
.flatMap(fieldType -> {
Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition, true);
Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition);
if (singleFieldSchema.isUnion()) {
return singleFieldSchema.getTypes().stream();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.s3.avro;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
Expand All @@ -47,15 +48,27 @@ public void testGetSingleTypes() {
JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }");
assertEquals(
Collections.singletonList(JsonSchemaType.NUMBER),
JsonToAvroSchemaConverter.getTypes("field", input1.get("type")));
JsonToAvroSchemaConverter.getTypes("field", input1));
}

@Test
public void testGetUnionTypes() {
JsonNode input2 = Jsons.deserialize("{ \"type\": [\"null\", \"string\"] }");
assertEquals(
Lists.newArrayList(JsonSchemaType.NULL, JsonSchemaType.STRING),
JsonToAvroSchemaConverter.getTypes("field", input2.get("type")));
JsonToAvroSchemaConverter.getTypes("field", input2));
}

@Test
public void testNoCombinedRestriction() {
JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }");
assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input1).isEmpty());
}

@Test
public void testWithCombinedRestriction() {
JsonNode input2 = Jsons.deserialize("{ \"anyOf\": [{ \"type\": \"string\" }, { \"type\": \"integer\" }] }");
assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input2).isPresent());
}

public static class GetFieldTypeTestCaseProvider implements ArgumentsProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,132 @@
}
]
}
},
{
"schemaName": "field_with_combined_restriction",
"namespace": "namespace8",
"appendAirbyteFields": false,
"jsonSchema": {
"properties": {
"created_at": {
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": ["null", "string"]
},
{
"type": "integer"
}
]
}
}
},
"avroSchema": {
"type": "record",
"name": "field_with_combined_restriction",
"namespace": "namespace8",
"fields": [
{
"name": "created_at",
"type": ["null", "string", "int"],
"default": null
}
]
}
},
{
"schemaName": "record_with_combined_restriction_field",
"namespace": "namespace9",
"appendAirbyteFields": false,
"jsonSchema": {
"properties": {
"user": {
"type": "object",
"properties": {
"created_at": {
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": ["null", "string"]
},
{
"type": "integer"
}
]
}
}
}
}
},
"avroSchema": {
"type": "record",
"name": "record_with_combined_restriction_field",
"namespace": "namespace9",
"fields": [
{
"name": "user",
"type": [
"null",
{
"type": "record",
"name": "user",
"namespace": "",
"fields": [
{
"name": "created_at",
"type": ["null", "string", "int"],
"default": null
}
]
}
],
"default": null
}
]
}
},
{
"schemaName": "array_with_combined_restriction_field",
"namespace": "namespace10",
"appendAirbyteFields": false,
"jsonSchema": {
"properties": {
"identifiers": {
"type": "array",
"items": [
{
"oneOf": [{ "type": "integer" }, { "type": "string" }]
},
{
"type": "boolean"
}
]
}
}
},
"avroSchema": {
"type": "record",
"name": "array_with_combined_restriction_field",
"namespace": "namespace10",
"fields": [
{
"name": "identifiers",
"type": [
"null",
{
"type": "array",
"items": ["null", "int", "string", "boolean"]
}
],
"default": null
}
]
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,26 @@
]
}
]
},
{
"fieldName": "any_of_field",
"jsonFieldSchema": {
"anyOf": [{ "type": "string" }, { "type": "integer" }]
},
"avroFieldType": ["null", "string", "int"]
},
{
"fieldName": "all_of_field",
"jsonFieldSchema": {
"allOf": [{ "type": "string" }, { "type": "integer" }]
},
"avroFieldType": ["null", "string", "int"]
},
{
"fieldName": "one_of_field",
"jsonFieldSchema": {
"oneOf": [{ "type": "string" }, { "type": "integer" }]
},
"avroFieldType": ["null", "string", "int"]
}
]

0 comments on commit 646aae1

Please sign in to comment.