Skip to content

Commit

Permalink
[SPEC] Allow nested struct fields in SchemaDatasetFacet
Browse files Browse the repository at this point in the history
Signed-off-by: Martynov Maxim <martinov_m_s_@mail.ru>
  • Loading branch information
dolfinus committed Apr 3, 2024
1 parent afcf225 commit 6d4ee3b
Show file tree
Hide file tree
Showing 24 changed files with 215 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,12 @@ public TypeName visit(ArrayResolvedType arrayType) {

@Override
public TypeName visit(TypeResolver.EnumResolvedType enumType) {
return ClassName.get(containerClass, enumType.getParentName() + "." + enumType.getName());
return ClassName.get(containerClass, enumType.getParentName() + "." + enumType.getName());
}

@Override
public TypeName visit(TypeResolver.RefResolvedType refType) {
return ClassName.get(containerClass, refType.getName());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
public class TypeResolver {

private Map<String, ObjectResolvedType> types = new HashMap<>();
private Set<String> referencedTypes = new HashSet<>();
private Set<String> baseTypes = new HashSet<>();

private Map<URL, ResolvedType> rootResolvedTypePerURL = new HashMap<URL, TypeResolver.ResolvedType>();
Expand Down Expand Up @@ -75,7 +74,6 @@ public ResolvedType visit(ObjectType objectType) {
List<Field> properties = objectType.getProperties();
currentObjectName = currentName;
List<ResolvedField> resolvedFields = new ArrayList<>(properties.size());
resolvedFields.addAll(resolveFields(properties));
ObjectResolvedType objectResolvedType = new ObjectResolvedType(
container,
asList(objectType),
Expand All @@ -88,6 +86,7 @@ public ResolvedType visit(ObjectType objectType) {
if (types.put(key, objectResolvedType) != null) {
throw new RuntimeException("Duplicated type: " + objectResolvedType.getName());
};
resolvedFields.addAll(resolveFields(properties));
return objectResolvedType;
}

Expand All @@ -104,28 +103,6 @@ private List<ResolvedField> resolveFields(List<Field> properties) {
currentName = previousCurrentName + currentProperty;
ResolvedField resolvedField = new ResolvedField(property, visit(property.getType()));
resolvedFields.add(resolvedField);
referencedTypes.add(resolvedField.getType().accept(new ResolvedTypeVisitor<String>() {
@Override
public String visit(PrimitiveResolvedType primitiveType) {
return primitiveType.getName();
}

@Override
public String visit(ObjectResolvedType objectType) {
return objectType.getName();
}

@Override
public String visit(ArrayResolvedType arrayType) {
return visit(arrayType.items);
}

@Override
public String visit(EnumResolvedType enumType) {
return enumType.getName();
}

}));
}
currentName = previousCurrentName;
return resolvedFields;
Expand Down Expand Up @@ -164,7 +141,18 @@ public ResolvedType visit(AllOfType allOfType) {
ResolvedType additionalPropertiesType = null;
Set<ObjectResolvedType> parents = new LinkedHashSet<>();
for (Type child : children) {
ObjectResolvedType resolvedChildType = (ObjectResolvedType) visit(child);
ResolvedType childType = visit(child);
ObjectResolvedType resolvedChildType;

if (childType instanceof RefResolvedType) {
RefResolvedType refType = (RefResolvedType) childType;
resolvedChildType = types.get(refType.getFullName());
if (resolvedChildType == null) {
throw new RuntimeException("Unknown type: " + refType.getFullName());
}
} else {
resolvedChildType = (ObjectResolvedType) childType;
}
List<ObjectType> objectTypes = resolvedChildType.getObjectTypes();
if (!currentName.equals(resolvedChildType.getName())) {
// base interface
Expand Down Expand Up @@ -208,7 +196,7 @@ public ResolvedType visit(RefType refType) {
}
String key = refContainer + "." + typeName;
if (types.containsKey(key)) {
return types.get(key);
return new RefResolvedType(refContainer, typeName);
}
if (anchorIndex > 0) {
throw new RuntimeException("This ref should have been resolved already: " + refContainer + " " + refType.getPointer() + " => "+ key + " keys: " + types.keySet());
Expand Down Expand Up @@ -283,6 +271,8 @@ interface ResolvedTypeVisitor<T> {

T visit(EnumResolvedType enumType);

T visit(RefResolvedType refType);

default T visit(ResolvedType type) {
try {
return type == null ? null : type.accept(this);
Expand All @@ -303,6 +293,8 @@ public static class DefaultResolvedTypeVisitor<T> implements ResolvedTypeVisitor

public T visit(EnumResolvedType enumResolvedType) { return null; }

public T visit(RefResolvedType refType) { return null; }

}

static class PrimitiveResolvedType implements ResolvedType {
Expand Down Expand Up @@ -338,6 +330,14 @@ public boolean equals(Object o) {
return primitiveType.equals(that.primitiveType);
}

@Override
public String toString() {
if (getFormat() == null) {
return "PrimitiveType{" + getName() + "}";
}
return "PrimitiveType{name: " + getName() + ", format: " + getFormat() + "}";
}

@Override
public int hashCode() {
return Objects.hash(primitiveType);
Expand Down Expand Up @@ -474,6 +474,52 @@ public int hashCode() {
}
}

static class RefResolvedType implements ResolvedType {

private String container;
private String name;

public RefResolvedType(String container, String name) {
this.container = container;
this.name = name;
}

public String getFullName() {
return container + "." + name;
}

public String getName() {
return name;
}

@Override
public String toString() {
return "RefResolvedType{" + getFullName() + "}";
}

@Override
public <T> T accept(ResolvedTypeVisitor<T> visitor) {
return visitor.visit(this);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RefResolvedType that = (RefResolvedType) o;
return container.equals(that.container) && name.equals(that.name);
}

@Override
public int hashCode() {
return Objects.hash(container, name);
}
}

static class ArrayResolvedType implements ResolvedType {

private final ArrayType arrayType;
Expand Down Expand Up @@ -505,6 +551,11 @@ public boolean equals(Object o) {
return arrayType.equals(that.arrayType) && items.equals(that.items);
}

@Override
public String toString() {
return "ArrayResolvedType{type: " + arrayType.toString() + ", items: " + items.toString() + "}";
}

@Override
public int hashCode() {
return Objects.hash(arrayType, items);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ public class AvroSchemaUtils {
*/
public static OpenLineage.SchemaDatasetFacet convert(OpenLineage openLineage, Schema avroSchema) {
OpenLineage.SchemaDatasetFacetBuilder builder = openLineage.newSchemaDatasetFacetBuilder();
List<OpenLineage.SchemaDatasetFacetFields> fields = new LinkedList<>();
List<OpenLineage.SchemaDatasetFacetField> fields = new LinkedList<>();

avroSchema.getFields().stream()
.forEach(
avroField -> {
fields.add(
openLineage.newSchemaDatasetFacetFields(
avroField.name(), getTypeName(avroField.schema()), avroField.doc()));
openLineage
.newSchemaDatasetFacetFieldBuilder()
.name(avroField.name())
.type(getTypeName(avroField.schema()))
.description(avroField.doc())
.build());
});

return builder.fields(fields).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.openlineage.flink.utils;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineage.SchemaDatasetFacetFields;
import io.openlineage.client.OpenLineage.SchemaDatasetFacetField;
import io.openlineage.flink.api.OpenLineageContext;
import java.util.List;
import java.util.stream.Collectors;
Expand All @@ -15,14 +15,17 @@
public class IcebergUtils {

public static OpenLineage.SchemaDatasetFacet getSchema(OpenLineageContext context, Table table) {
List<SchemaDatasetFacetFields> fields =
List<SchemaDatasetFacetField> fields =
table.schema().columns().stream()
.map(
field ->
context
.getOpenLineage()
.newSchemaDatasetFacetFields(
field.name(), field.type().typeId().name(), field.doc()))
.newSchemaDatasetFacetFieldBuilder()
.name(field.name())
.type(field.type().typeId().name())
.description(field.doc())
.build())
.collect(Collectors.toList());
return context.getOpenLineage().newSchemaDatasetFacet(fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void testConvert() {
OpenLineage.SchemaDatasetFacet schemaDatasetFacet =
AvroSchemaUtils.convert(openLineage, schema);

List<OpenLineage.SchemaDatasetFacetFields> fields = schemaDatasetFacet.getFields();
List<OpenLineage.SchemaDatasetFacetField> fields = schemaDatasetFacet.getFields();

assertEquals(3, fields.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void testApply() {

List<OpenLineage.InputDataset> inputDatasets =
flinkKafkaConsumerVisitor.apply(flinkKafkaConsumer);
List<OpenLineage.SchemaDatasetFacetFields> fields =
List<OpenLineage.SchemaDatasetFacetField> fields =
inputDatasets.get(0).getFacets().getSchema().getFields();

assertEquals(2, inputDatasets.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void testApply() {
when(wrapper.getAvroSchema()).thenReturn(Optional.of(schema));

List<OpenLineage.InputDataset> inputDatasets = flinkKafkaConsumerVisitor.apply(kafkaConsumer);
List<OpenLineage.SchemaDatasetFacetFields> fields =
List<OpenLineage.SchemaDatasetFacetField> fields =
inputDatasets.get(0).getFacets().getSchema().getFields();

assertEquals(2, inputDatasets.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void testApply() {
List<OpenLineage.InputDataset> inputDatasets = hybridSourceVisitor.apply(hybridSource);
assertEquals(3, inputDatasets.size());
// Validate Iceberg
List<OpenLineage.SchemaDatasetFacetFields> icebergFields =
List<OpenLineage.SchemaDatasetFacetField> icebergFields =
inputDatasets.get(0).getFacets().getSchema().getFields();

assertEquals("table", inputDatasets.get(0).getName());
Expand All @@ -118,7 +118,7 @@ void testApply() {
assertEquals("topic1", inputDatasets.get(1).getName());
assertEquals("kafka://server1;server2", inputDatasets.get(1).getNamespace());

List<OpenLineage.SchemaDatasetFacetFields> kafkaFields =
List<OpenLineage.SchemaDatasetFacetField> kafkaFields =
inputDatasets.get(1).getFacets().getSchema().getFields();
assertEquals(1, kafkaFields.size());
assertEquals("a", kafkaFields.get(0).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void testApply() {
when(wrapper.getTable()).thenReturn(Optional.of(table));

List<OutputDataset> outputDatasets = sinkVisitor.apply(sink);
List<OpenLineage.SchemaDatasetFacetFields> fields =
List<OpenLineage.SchemaDatasetFacetField> fields =
outputDatasets.get(0).getFacets().getSchema().getFields();

assertEquals(1, outputDatasets.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void testApply(Object sourceObject, Class sourceClass) {
when(wrapper.getTable()).thenReturn(table);

List<OpenLineage.InputDataset> inputDatasets = icebergSourceVisitor.apply(sourceObject);
List<OpenLineage.SchemaDatasetFacetFields> fields =
List<OpenLineage.SchemaDatasetFacetField> fields =
inputDatasets.get(0).getFacets().getSchema().getFields();

assertEquals(1, inputDatasets.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void testApply() {
when(wrapper.getAvroSchema()).thenReturn(Optional.of(schema));

OpenLineage.OutputDataset outputDataset = visitor.apply(kafkaSink).get(0);
List<OpenLineage.SchemaDatasetFacetFields> fields =
List<OpenLineage.SchemaDatasetFacetField> fields =
outputDataset.getFacets().getSchema().getFields();

assertEquals("topic", outputDataset.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void testApply() {
when(wrapper.getAvroSchema()).thenReturn(Optional.of(schema));

List<OpenLineage.InputDataset> inputDatasets = kafkaSourceVisitor.apply(kafkaSource);
List<OpenLineage.SchemaDatasetFacetFields> fields =
List<OpenLineage.SchemaDatasetFacetField> fields =
inputDatasets.get(0).getFacets().getSchema().getFields();

assertEquals(2, inputDatasets.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ void testApply() {
OpenLineage.SchemaDatasetFacet schemaDatasetFacet =
openLineage.newSchemaDatasetFacet(
Collections.singletonList(
openLineage.newSchemaDatasetFacetFields("a", "INTEGER", "desc")));
openLineage
.newSchemaDatasetFacetFieldBuilder()
.name("a")
.type("INTEGER")
.description("desc")));
ExampleLineageProvider provider =
new ExampleLineageProvider("name", "namespace", schemaDatasetFacet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public class ColumnLevelLineageDeltaTest {
OpenLineage.SchemaDatasetFacet schemaDatasetFacet =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("b").type("int").build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("b").type("int").build()));

@BeforeEach
@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class ColumnLevelLineageHiveTest {
OpenLineage.SchemaDatasetFacet schemaDatasetFacet =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("b").type("int").build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("b").type("int").build()));

@BeforeAll
@SneakyThrows
Expand Down

0 comments on commit 6d4ee3b

Please sign in to comment.