Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPEC] Allow nested struct fields in SchemaDatasetFacet #2548

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,12 @@ public TypeName visit(ArrayResolvedType arrayType) {

@Override
public TypeName visit(TypeResolver.EnumResolvedType enumType) {
return ClassName.get(containerClass, enumType.getParentName() + "." + enumType.getName());
return ClassName.get(containerClass, enumType.getParentName() + "." + enumType.getName());
}

@Override
public TypeName visit(TypeResolver.RefResolvedType refType) {
return ClassName.get(containerClass, refType.getName());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
public class TypeResolver {

private Map<String, ObjectResolvedType> types = new HashMap<>();
private Set<String> referencedTypes = new HashSet<>();
private Set<String> baseTypes = new HashSet<>();

private Map<URL, ResolvedType> rootResolvedTypePerURL = new HashMap<URL, TypeResolver.ResolvedType>();
Expand Down Expand Up @@ -75,7 +74,6 @@ public ResolvedType visit(ObjectType objectType) {
List<Field> properties = objectType.getProperties();
currentObjectName = currentName;
List<ResolvedField> resolvedFields = new ArrayList<>(properties.size());
resolvedFields.addAll(resolveFields(properties));
ObjectResolvedType objectResolvedType = new ObjectResolvedType(
container,
asList(objectType),
Expand All @@ -88,6 +86,8 @@ public ResolvedType visit(ObjectType objectType) {
if (types.put(key, objectResolvedType) != null) {
throw new RuntimeException("Duplicated type: " + objectResolvedType.getName());
};
// hack to resolve fields of self-referencing type
resolvedFields.addAll(resolveFields(properties));
return objectResolvedType;
}

Expand All @@ -104,28 +104,6 @@ private List<ResolvedField> resolveFields(List<Field> properties) {
currentName = previousCurrentName + currentProperty;
ResolvedField resolvedField = new ResolvedField(property, visit(property.getType()));
resolvedFields.add(resolvedField);
referencedTypes.add(resolvedField.getType().accept(new ResolvedTypeVisitor<String>() {
@Override
public String visit(PrimitiveResolvedType primitiveType) {
return primitiveType.getName();
}

@Override
public String visit(ObjectResolvedType objectType) {
return objectType.getName();
}

@Override
public String visit(ArrayResolvedType arrayType) {
return visit(arrayType.items);
}

@Override
public String visit(EnumResolvedType enumType) {
return enumType.getName();
}

}));
}
currentName = previousCurrentName;
return resolvedFields;
Expand Down Expand Up @@ -164,7 +142,18 @@ public ResolvedType visit(AllOfType allOfType) {
ResolvedType additionalPropertiesType = null;
Set<ObjectResolvedType> parents = new LinkedHashSet<>();
for (Type child : children) {
ObjectResolvedType resolvedChildType = (ObjectResolvedType) visit(child);
ResolvedType childType = visit(child);
ObjectResolvedType resolvedChildType;

if (childType instanceof RefResolvedType) {
RefResolvedType refType = (RefResolvedType) childType;
resolvedChildType = types.get(refType.getFullName());
if (resolvedChildType == null) {
throw new RuntimeException("Unknown type: " + refType.getFullName());
}
} else {
resolvedChildType = (ObjectResolvedType) childType;
}
List<ObjectType> objectTypes = resolvedChildType.getObjectTypes();
if (!currentName.equals(resolvedChildType.getName())) {
// base interface
Expand Down Expand Up @@ -208,7 +197,7 @@ public ResolvedType visit(RefType refType) {
}
String key = refContainer + "." + typeName;
if (types.containsKey(key)) {
return types.get(key);
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
return new RefResolvedType(refContainer, typeName);
}
if (anchorIndex > 0) {
throw new RuntimeException("This ref should have been resolved already: " + refContainer + " " + refType.getPointer() + " => "+ key + " keys: " + types.keySet());
Expand Down Expand Up @@ -283,6 +272,8 @@ interface ResolvedTypeVisitor<T> {

T visit(EnumResolvedType enumType);

T visit(RefResolvedType refType);

default T visit(ResolvedType type) {
try {
return type == null ? null : type.accept(this);
Expand All @@ -303,6 +294,8 @@ public static class DefaultResolvedTypeVisitor<T> implements ResolvedTypeVisitor

public T visit(EnumResolvedType enumResolvedType) { return null; }

public T visit(RefResolvedType refType) { return null; }

}

static class PrimitiveResolvedType implements ResolvedType {
Expand Down Expand Up @@ -338,6 +331,14 @@ public boolean equals(Object o) {
return primitiveType.equals(that.primitiveType);
}

@Override
public String toString() {
if (getFormat() == null) {
return "PrimitiveType{" + getName() + "}";
}
return "PrimitiveType{name: " + getName() + ", format: " + getFormat() + "}";
}

@Override
public int hashCode() {
return Objects.hash(primitiveType);
Expand Down Expand Up @@ -474,6 +475,52 @@ public int hashCode() {
}
}

static class RefResolvedType implements ResolvedType {

private String container;
private String name;

public RefResolvedType(String container, String name) {
this.container = container;
this.name = name;
}

public String getFullName() {
return container + "." + name;
}

public String getName() {
return name;
}

@Override
public String toString() {
return "RefResolvedType{" + getFullName() + "}";
}

@Override
public <T> T accept(ResolvedTypeVisitor<T> visitor) {
return visitor.visit(this);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RefResolvedType that = (RefResolvedType) o;
return container.equals(that.container) && name.equals(that.name);
}

@Override
public int hashCode() {
return Objects.hash(container, name);
}
}

static class ArrayResolvedType implements ResolvedType {

private final ArrayType arrayType;
Expand Down Expand Up @@ -505,6 +552,11 @@ public boolean equals(Object o) {
return arrayType.equals(that.arrayType) && items.equals(that.items);
}

@Override
public String toString() {
return "ArrayResolvedType{type: " + arrayType.toString() + ", items: " + items.toString() + "}";
}

@Override
public int hashCode() {
return Objects.hash(arrayType, items);
Expand Down
108 changes: 94 additions & 14 deletions client/java/src/test/java/io/openlineage/client/OpenLineageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.openlineage.client.OpenLineage.Run;
import io.openlineage.client.OpenLineage.RunEvent;
import io.openlineage.client.OpenLineage.RunFacets;
import io.openlineage.client.OpenLineage.SchemaDatasetFacet;
import java.net.URI;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand Down Expand Up @@ -248,6 +249,81 @@ void factory() throws JsonProcessingException {
.build())
.build())
.build());

SchemaDatasetFacet schemaFacet =
ol.newSchemaDatasetFacetBuilder()
.fields(
Arrays.asList(
ol.newSchemaDatasetFacetFieldsBuilder().name("user_id").type("int64").build(),
ol.newSchemaDatasetFacetFieldsBuilder()
.name("phones")
.type("array")
.description("List of phone numbers")
.fields(
Arrays.asList(
ol.newSchemaDatasetFacetFieldsBuilder()
.name("_element")
.type("string")
.build()))
.build(),
ol.newSchemaDatasetFacetFieldsBuilder()
.name("addresses")
.type("struct")
.description("Has customer completed activation process")
.fields(
Arrays.asList(
ol.newSchemaDatasetFacetFieldsBuilder()
.name("type")
.type("string")
.description("Address type, g.e. home, work, etc.")
.build(),
ol.newSchemaDatasetFacetFieldsBuilder()
.name("country")
.type("string")
.description("Country name")
.build(),
ol.newSchemaDatasetFacetFieldsBuilder()
.name("zip")
.type("string")
.description("Zip code")
.build(),
ol.newSchemaDatasetFacetFieldsBuilder()
.name("state")
.type("string")
.description("State name")
.build(),
ol.newSchemaDatasetFacetFieldsBuilder()
.name("street")
.type("string")
.description("Street name")
.build()))
.build(),
ol.newSchemaDatasetFacetFieldsBuilder()
.name("custom_properties")
.type("map")
.fields(
Arrays.asList(
ol.newSchemaDatasetFacetFieldsBuilder()
.name("key")
.type("string")
.build(),
ol.newSchemaDatasetFacetFieldsBuilder()
.name("value")
.type("union")
.fields(
Arrays.asList(
ol.newSchemaDatasetFacetFieldsBuilder()
.name("_0")
.type("string")
.build(),
ol.newSchemaDatasetFacetFieldsBuilder()
.name("_1")
.type("int64")
.build()))
.build()))
.build()))
.build();

List<OutputDataset> outputs =
Arrays.asList(
ol.newOutputDatasetBuilder()
Expand All @@ -256,6 +332,7 @@ void factory() throws JsonProcessingException {
.facets(
ol.newDatasetFacetsBuilder()
.version(ol.newDatasetVersionDatasetFacet("output-version"))
.schema(schemaFacet)
.build())
.outputFacets(
ol.newOutputDatasetOutputFacetsBuilder()
Expand Down Expand Up @@ -300,30 +377,33 @@ void factory() throws JsonProcessingException {

DataQualityMetricsInputDatasetFacet dq =
inputDataset.getInputFacets().getDataQualityMetrics();
assertEquals((Long) 10L, dq.getRowCount());
assertEquals((Long) 20L, dq.getBytes());
assertEquals((Long) 5L, dq.getFileCount());
assertEquals(10L, dq.getRowCount());
assertEquals(20L, dq.getBytes());
assertEquals(5L, dq.getFileCount());
DataQualityMetricsInputDatasetFacetColumnMetricsAdditional colMetrics =
dq.getColumnMetrics().getAdditionalProperties().get("mycol");
assertEquals((Double) 10D, colMetrics.getCount());
assertEquals((Long) 10L, colMetrics.getDistinctCount());
assertEquals((Double) 30D, colMetrics.getMax());
assertEquals((Double) 5D, colMetrics.getMin());
assertEquals((Long) 1L, colMetrics.getNullCount());
assertEquals((Double) 3000D, colMetrics.getSum());
assertEquals((Double) 52D, colMetrics.getQuantiles().getAdditionalProperties().get("25"));
assertEquals(10D, colMetrics.getCount());
assertEquals(10L, colMetrics.getDistinctCount());
assertEquals(30D, colMetrics.getMax());
assertEquals(5D, colMetrics.getMin());
assertEquals(1L, colMetrics.getNullCount());
assertEquals(3000D, colMetrics.getSum());
assertEquals(52D, colMetrics.getQuantiles().getAdditionalProperties().get("25"));

assertEquals(1, runStateUpdate.getOutputs().size());
OutputDataset outputDataset = runStateUpdate.getOutputs().get(0);
assertEquals("ons", outputDataset.getNamespace());
assertEquals("output", outputDataset.getName());
assertEquals("output-version", outputDataset.getFacets().getVersion().getDatasetVersion());

assertEquals(roundTrip(json), roundTrip(mapper.writeValueAsString(read)));
assertEquals((Long) 10L, outputDataset.getOutputFacets().getOutputStatistics().getRowCount());
assertEquals((Long) 20L, outputDataset.getOutputFacets().getOutputStatistics().getSize());
assertEquals((Long) 5L, outputDataset.getOutputFacets().getOutputStatistics().getFileCount());
SchemaDatasetFacet outputDatasetSchema = outputDataset.getFacets().getSchema();
assertEquals(outputDatasetSchema, schemaFacet);

assertEquals(10L, outputDataset.getOutputFacets().getOutputStatistics().getRowCount());
assertEquals(20L, outputDataset.getOutputFacets().getOutputStatistics().getSize());
assertEquals(5L, outputDataset.getOutputFacets().getOutputStatistics().getFileCount());

assertEquals(roundTrip(json), roundTrip(mapper.writeValueAsString(read)));
assertEquals(json, mapper.writeValueAsString(read));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ public static OpenLineage.SchemaDatasetFacet convert(OpenLineage openLineage, Sc
.forEach(
avroField -> {
fields.add(
openLineage.newSchemaDatasetFacetFields(
avroField.name(), getTypeName(avroField.schema()), avroField.doc()));
openLineage
.newSchemaDatasetFacetFieldsBuilder()
.name(avroField.name())
.type(getTypeName(avroField.schema()))
.description(avroField.doc())
.build());
});

return builder.fields(fields).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ public static OpenLineage.SchemaDatasetFacet getSchema(OpenLineageContext contex
field ->
context
.getOpenLineage()
.newSchemaDatasetFacetFields(
field.name(), field.type().typeId().name(), field.doc()))
.newSchemaDatasetFacetFieldsBuilder()
.name(field.name())
.type(field.type().typeId().name())
.description(field.doc())
.build())
.collect(Collectors.toList());
return context.getOpenLineage().newSchemaDatasetFacet(fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ void testApply() {
OpenLineage.SchemaDatasetFacet schemaDatasetFacet =
openLineage.newSchemaDatasetFacet(
Collections.singletonList(
openLineage.newSchemaDatasetFacetFields("a", "INTEGER", "desc")));
openLineage
.newSchemaDatasetFacetFieldsBuilder()
.name("a")
.type("INTEGER")
.description("desc")
.build()));
ExampleLineageProvider provider =
new ExampleLineageProvider("name", "namespace", schemaDatasetFacet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

class SaveIntoDataSourceCommandVisitorTest {

SaveIntoDataSourceCommand command = mock(SaveIntoDataSourceCommand.class);;
SaveIntoDataSourceCommand command = mock(SaveIntoDataSourceCommand.class);
OpenLineageContext context = mock(OpenLineageContext.class);
SaveIntoDataSourceCommandVisitor visitor = new SaveIntoDataSourceCommandVisitor(context);
DatasetFactory datasetFactory = mock(DatasetFactory.class);
Expand Down