Skip to content

Commit

Permalink
Destinations V2: clean up AirbyteType code (#28430)
Browse files Browse the repository at this point in the history
* general cleanup - move stuff around, add more comments

* guarantee `getAirbyteProtocolType` won't handle array values for `type`

* rename OneOf to Union

* simplify union ordering logic

* update testChooseUnion

* fix docs typos

* Automated Commit - Format and Process Resources Changes

* address comments

* Automated Commit - Format and Process Resources Changes

---------

Co-authored-by: cynthiaxyin <cynthiaxyin@users.noreply.github.com>
  • Loading branch information
cynthiaxyin and cynthiaxyin committed Jul 20, 2023
1 parent 225bfc4 commit 7e4797d
Show file tree
Hide file tree
Showing 15 changed files with 504 additions and 564 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

import com.fasterxml.jackson.databind.JsonNode;

/**
* Protocol types are ordered by precedence in the case of a Union that contains multiple types.
* Priority is given to wider scope types over narrower ones. (Note that because of dedup logic in
* {@link AirbyteType#fromJsonSchema(JsonNode)}, at most one string or date/time type can exist in a
* Union.)
*/
public enum AirbyteProtocolType implements AirbyteType {

STRING,
DATE,
TIME_WITHOUT_TIMEZONE,
TIME_WITH_TIMEZONE,
TIMESTAMP_WITHOUT_TIMEZONE,
TIMESTAMP_WITH_TIMEZONE,
NUMBER,
INTEGER,
BOOLEAN,
UNKNOWN;

private static AirbyteProtocolType matches(final String type) {
try {
return AirbyteProtocolType.valueOf(type.toUpperCase());
} catch (final IllegalArgumentException e) {
LOGGER.error(String.format("Could not find matching AirbyteProtocolType for \"%s\": %s", type, e));
return UNKNOWN;
}
}

// Extracts the appropriate protocol type from the representative JSON
protected static AirbyteProtocolType fromJson(final JsonNode node) {
// JSON could be a string (ex: "number")
if (node.isTextual()) {
return matches(node.asText());
}

// or, JSON could be a node with fields
final JsonNode propertyType = node.get("type");
final JsonNode airbyteType = node.get("airbyte_type");
final JsonNode format = node.get("format");

if (AirbyteType.nodeMatches(propertyType, "boolean")) {
return BOOLEAN;
} else if (AirbyteType.nodeMatches(propertyType, "integer")) {
return INTEGER;
} else if (AirbyteType.nodeMatches(propertyType, "number")) {
return AirbyteType.nodeMatches(airbyteType, "integer") ? INTEGER : NUMBER;
} else if (AirbyteType.nodeMatches(propertyType, "string")) {
if (AirbyteType.nodeMatches(format, "date")) {
return DATE;
} else if (AirbyteType.nodeMatches(format, "time")) {
if (AirbyteType.nodeMatches(airbyteType, "time_without_timezone")) {
return TIME_WITHOUT_TIMEZONE;
} else if (AirbyteType.nodeMatches(airbyteType, "time_with_timezone")) {
return TIME_WITH_TIMEZONE;
}
} else if (AirbyteType.nodeMatches(format, "date-time")) {
if (AirbyteType.nodeMatches(airbyteType, "timestamp_without_timezone")) {
return TIMESTAMP_WITHOUT_TIMEZONE;
} else if (airbyteType == null || AirbyteType.nodeMatches(airbyteType, "timestamp_with_timezone")) {
return TIMESTAMP_WITH_TIMEZONE;
}
} else {
return STRING;
}
}

return UNKNOWN;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,67 +6,36 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf;
import com.fasterxml.jackson.databind.node.TextNode;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public sealed interface AirbyteType permits Array,OneOf,Struct,UnsupportedOneOf,AirbyteProtocolType {
public sealed interface AirbyteType permits AirbyteProtocolType,Struct,Array,UnsupportedOneOf,Union {

Logger LOGGER = LoggerFactory.getLogger(AirbyteTypeUtils.class);
Logger LOGGER = LoggerFactory.getLogger(AirbyteType.class);

/**
* The most common call pattern is probably to use this method on the stream schema, verify that
* it's an {@link Struct} schema, and then call {@link Struct#properties()} to get the columns.
* <p>
* If the top-level schema is not an object, then we can't really do anything with it, and should
* probably fail the sync. (but see also {@link OneOf#asColumns()}).
* probably fail the sync. (but see also {@link Union#asColumns()}).
*/
static AirbyteType fromJsonSchema(final JsonNode schema) {
try {
final JsonNode topLevelType = schema.get("type");
if (topLevelType != null) {
if (topLevelType.isTextual()) {
if (AirbyteTypeUtils.nodeIsType(topLevelType, "object")) {
if (nodeMatches(topLevelType, "object")) {
return getStruct(schema);
} else if (AirbyteTypeUtils.nodeIsType(topLevelType, "array")) {
} else if (nodeMatches(topLevelType, "array")) {
return getArray(schema);
}
} else if (topLevelType.isArray()) {
final List<String> typeOptions = new ArrayList<>();
topLevelType.elements().forEachRemaining(element -> {
// ignore "null" type and remove duplicates
String type = element.asText("");
if (!"null".equals(type) && !typeOptions.contains(type)) {
typeOptions.add(element.asText());
}
});

// we encounter an array of types that actually represents a single type rather than a OneOf
if (typeOptions.size() == 1) {
if (typeOptions.get(0).equals("object")) {
return getStruct(schema);
} else if (typeOptions.get(0).equals("array")) {
return getArray(schema);
} else {
return AirbyteTypeUtils.getAirbyteProtocolType(schema);
}
}

final List<AirbyteType> options = typeOptions.stream().map(typeOption -> {
// Recurse into a schema that forces a specific one of each option
JsonNode schemaClone = schema.deepCopy();
// schema is guaranteed to be an object here, because we know it has a `type` key
((ObjectNode) schemaClone).put("type", typeOption);
return fromJsonSchema(schemaClone);
}).toList();
return new OneOf(options);
return fromArrayJsonSchema(schema, topLevelType);
}
} else if (schema.hasNonNull("oneOf")) {
final List<AirbyteType> options = new ArrayList<>();
Expand All @@ -78,13 +47,20 @@ static AirbyteType fromJsonSchema(final JsonNode schema) {
// This is for backwards-compatibility with legacy normalization.
return getStruct(schema);
}
return AirbyteTypeUtils.getAirbyteProtocolType(schema);
return AirbyteProtocolType.fromJson(schema);
} catch (final Exception e) {
LOGGER.error("Exception parsing JSON schema {}: {}; returning UNKNOWN.", schema, e);
return AirbyteProtocolType.UNKNOWN;
}
}

static boolean nodeMatches(final JsonNode node, final String value) {
if (node == null || !node.isTextual()) {
return false;
}
return node.equals(TextNode.valueOf(value));
}

private static Struct getStruct(final JsonNode schema) {
final LinkedHashMap<String, AirbyteType> propertiesMap = new LinkedHashMap<>();
final JsonNode properties = schema.get("properties");
Expand All @@ -107,87 +83,38 @@ private static Array getArray(final JsonNode schema) {
}
}

enum AirbyteProtocolType implements AirbyteType {

STRING,
NUMBER,
INTEGER,
BOOLEAN,
TIMESTAMP_WITH_TIMEZONE,
TIMESTAMP_WITHOUT_TIMEZONE,
TIME_WITH_TIMEZONE,
TIME_WITHOUT_TIMEZONE,
DATE,
UNKNOWN;
private static AirbyteType fromArrayJsonSchema(final JsonNode schema, final JsonNode array) {
final List<String> typeOptions = new ArrayList<>();
array.elements().forEachRemaining(element -> {
// ignore "null" type and remove duplicates
final String type = element.asText("");
if (!"null".equals(type) && !typeOptions.contains(type)) {
typeOptions.add(element.asText());
}
});

public static AirbyteProtocolType matches(final String type) {
try {
return AirbyteProtocolType.valueOf(type.toUpperCase());
} catch (final IllegalArgumentException e) {
LOGGER.error(String.format("Could not find matching AirbyteProtocolType for \"%s\": %s", type, e));
return UNKNOWN;
// we encounter an array of types that actually represents a single type rather than a Union
if (typeOptions.size() == 1) {
if (typeOptions.get(0).equals("object")) {
return getStruct(schema);
} else if (typeOptions.get(0).equals("array")) {
return getArray(schema);
} else {
return AirbyteProtocolType.fromJson(getTrimmedJsonSchema(schema, typeOptions.get(0)));
}
}

// Recurse into a schema that forces a specific one of each option
final List<AirbyteType> options = typeOptions.stream().map(typeOption -> fromJsonSchema(getTrimmedJsonSchema(schema, typeOption))).toList();
return new Union(options);
}

/**
* @param properties Use LinkedHashMap to preserve insertion order.
*/
record Struct(LinkedHashMap<String, AirbyteType> properties) implements AirbyteType {

}

record Array(AirbyteType items) implements AirbyteType {

}

/**
* Represents a {oneOf: [...]} schema.
* <p>
* This is purely a legacy type that we should eventually delete. See also {@link OneOf}.
*/
record UnsupportedOneOf(List<AirbyteType> options) implements AirbyteType {

}

/**
* Represents a {type: [a, b, ...]} schema. This is theoretically equivalent to {oneOf: [{type: a},
* {type: b}, ...]} but legacy normalization only handles the {type: [...]} schemas.
* <p>
* Eventually we should:
* <ol>
* <li>Announce a breaking change to handle both oneOf styles the same</li>
* <li>Test against some number of API sources to verify that they won't break badly</li>
* <li>Update {@link AirbyteType#fromJsonSchema(JsonNode)} to parse both styles into
* SupportedOneOf</li>
* <li>Delete UnsupportedOneOf</li>
* </ol>
*/
record OneOf(List<AirbyteType> options) implements AirbyteType {

/**
* This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level
* schema looks like this, we still want to be able to extract the object properties (i.e. treat it
* as though the string option didn't exist).
*
* @throws IllegalArgumentException if we cannot extract columns from this schema
*/
public LinkedHashMap<String, AirbyteType> asColumns() {
final long numObjectOptions = options.stream().filter(o -> o instanceof Struct).count();
if (numObjectOptions > 1) {
LOGGER.error("Can't extract columns from a schema with multiple object options");
return new LinkedHashMap<>();
}

return (options.stream().filter(o -> o instanceof Struct).findFirst())
.map(o -> ((Struct) o).properties())
.orElseGet(() -> {
LOGGER.error("Can't extract columns from a schema with no object options");
return new LinkedHashMap<>();
});
}

// Duplicates the JSON schema but keeps only one type
private static JsonNode getTrimmedJsonSchema(final JsonNode schema, final String type) {
final JsonNode schemaClone = schema.deepCopy();
// schema is guaranteed to be an object here, because we know it has a `type` key
((ObjectNode) schemaClone).put("type", type);
return schemaClone;
}

}

0 comments on commit 7e4797d

Please sign in to comment.