Skip to content

Commit

Permalink
[SPEC] Allow nested struct fields in SchemaDatasetFacet
Browse files Browse the repository at this point in the history
Signed-off-by: Martynov Maxim <martinov_m_s_@mail.ru>
  • Loading branch information
dolfinus committed Apr 3, 2024
1 parent 9cb16d0 commit c0a0115
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,12 @@ public TypeName visit(ArrayResolvedType arrayType) {

@Override
public TypeName visit(TypeResolver.EnumResolvedType enumType) {
return ClassName.get(containerClass, enumType.getParentName() + "." + enumType.getName());
return ClassName.get(containerClass, enumType.getParentName() + "." + enumType.getName());
}

@Override
public TypeName visit(TypeResolver.RefResolvedType refType) {
return ClassName.get(containerClass, refType.getName());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
public class TypeResolver {

private Map<String, ObjectResolvedType> types = new HashMap<>();
private Set<String> referencedTypes = new HashSet<>();
private Set<String> baseTypes = new HashSet<>();

private Map<URL, ResolvedType> rootResolvedTypePerURL = new HashMap<URL, TypeResolver.ResolvedType>();
Expand Down Expand Up @@ -75,7 +74,6 @@ public ResolvedType visit(ObjectType objectType) {
List<Field> properties = objectType.getProperties();
currentObjectName = currentName;
List<ResolvedField> resolvedFields = new ArrayList<>(properties.size());
resolvedFields.addAll(resolveFields(properties));
ObjectResolvedType objectResolvedType = new ObjectResolvedType(
container,
asList(objectType),
Expand All @@ -88,6 +86,7 @@ public ResolvedType visit(ObjectType objectType) {
if (types.put(key, objectResolvedType) != null) {
throw new RuntimeException("Duplicated type: " + objectResolvedType.getName());
};
resolvedFields.addAll(resolveFields(properties));
return objectResolvedType;
}

Expand All @@ -104,28 +103,6 @@ private List<ResolvedField> resolveFields(List<Field> properties) {
currentName = previousCurrentName + currentProperty;
ResolvedField resolvedField = new ResolvedField(property, visit(property.getType()));
resolvedFields.add(resolvedField);
referencedTypes.add(resolvedField.getType().accept(new ResolvedTypeVisitor<String>() {
@Override
public String visit(PrimitiveResolvedType primitiveType) {
return primitiveType.getName();
}

@Override
public String visit(ObjectResolvedType objectType) {
return objectType.getName();
}

@Override
public String visit(ArrayResolvedType arrayType) {
return visit(arrayType.items);
}

@Override
public String visit(EnumResolvedType enumType) {
return enumType.getName();
}

}));
}
currentName = previousCurrentName;
return resolvedFields;
Expand Down Expand Up @@ -164,7 +141,18 @@ public ResolvedType visit(AllOfType allOfType) {
ResolvedType additionalPropertiesType = null;
Set<ObjectResolvedType> parents = new LinkedHashSet<>();
for (Type child : children) {
ObjectResolvedType resolvedChildType = (ObjectResolvedType) visit(child);
ResolvedType childType = visit(child);
ObjectResolvedType resolvedChildType;

if (childType instanceof RefResolvedType) {
RefResolvedType refType = (RefResolvedType) childType;
resolvedChildType = types.get(refType.getFullName());
if (resolvedChildType == null) {
throw new RuntimeException("Unknown type: " + refType.getFullName());
}
} else {
resolvedChildType = (ObjectResolvedType) childType;
}
List<ObjectType> objectTypes = resolvedChildType.getObjectTypes();
if (!currentName.equals(resolvedChildType.getName())) {
// base interface
Expand Down Expand Up @@ -208,7 +196,7 @@ public ResolvedType visit(RefType refType) {
}
String key = refContainer + "." + typeName;
if (types.containsKey(key)) {
return types.get(key);
return new RefResolvedType(refContainer, typeName);
}
if (anchorIndex > 0) {
throw new RuntimeException("This ref should have been resolved already: " + refContainer + " " + refType.getPointer() + " => "+ key + " keys: " + types.keySet());
Expand Down Expand Up @@ -283,6 +271,8 @@ interface ResolvedTypeVisitor<T> {

T visit(EnumResolvedType enumType);

T visit(RefResolvedType refType);

default T visit(ResolvedType type) {
try {
return type == null ? null : type.accept(this);
Expand All @@ -303,6 +293,8 @@ public static class DefaultResolvedTypeVisitor<T> implements ResolvedTypeVisitor

public T visit(EnumResolvedType enumResolvedType) { return null; }

public T visit(RefResolvedType refType) { return null; }

}

static class PrimitiveResolvedType implements ResolvedType {
Expand Down Expand Up @@ -338,6 +330,14 @@ public boolean equals(Object o) {
return primitiveType.equals(that.primitiveType);
}

@Override
public String toString() {
if (getFormat() == null) {
return "PrimitiveType{" + getName() + "}";
}
return "PrimitiveType{name: " + getName() + ", format: " + getFormat() + "}";
}

@Override
public int hashCode() {
return Objects.hash(primitiveType);
Expand Down Expand Up @@ -474,6 +474,52 @@ public int hashCode() {
}
}

static class RefResolvedType implements ResolvedType {

private String container;
private String name;

public RefResolvedType(String container, String name) {
this.container = container;
this.name = name;
}

public String getFullName() {
return container + "." + name;
}

public String getName() {
return name;
}

@Override
public String toString() {
return "RefResolvedType{" + getFullName() + "}";
}

@Override
public <T> T accept(ResolvedTypeVisitor<T> visitor) {
return visitor.visit(this);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RefResolvedType that = (RefResolvedType) o;
return container.equals(that.container) && name.equals(that.name);
}

@Override
public int hashCode() {
return Objects.hash(container, name);
}
}

static class ArrayResolvedType implements ResolvedType {

private final ArrayType arrayType;
Expand Down Expand Up @@ -505,6 +551,11 @@ public boolean equals(Object o) {
return arrayType.equals(that.arrayType) && items.equals(that.items);
}

@Override
public String toString() {
return "ArrayResolvedType{type: " + arrayType.toString() + ", items: " + items.toString() + "}";
}

@Override
public int hashCode() {
return Objects.hash(arrayType, items);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ public static OpenLineage.SchemaDatasetFacet convert(OpenLineage openLineage, Sc
.forEach(
avroField -> {
fields.add(
openLineage.newSchemaDatasetFacetFields(
avroField.name(), getTypeName(avroField.schema()), avroField.doc()));
openLineage.newSchemaDatasetFacetFieldBuilder()
.name(avroField.name())
.type(getTypeName(avroField.schema()))
.description(avroField.doc())
.build());
});

return builder.fields(fields).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ public static OpenLineage.SchemaDatasetFacet getSchema(OpenLineageContext contex
field ->
context
.getOpenLineage()
.newSchemaDatasetFacetFields(
field.name(), field.type().typeId().name(), field.doc()))
.newSchemaDatasetFacetFieldBuilder()
.name(field.name())
.type(field.type().typeId().name())
.description(field.doc())
.build())
.collect(Collectors.toList());
return context.getOpenLineage().newSchemaDatasetFacet(fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ void testApply() {
OpenLineage.SchemaDatasetFacet schemaDatasetFacet =
openLineage.newSchemaDatasetFacet(
Collections.singletonList(
openLineage.newSchemaDatasetFacetFields("a", "INTEGER", "desc")));
openLineage.newSchemaDatasetFacetFieldBuilder()
.name("a")
.type("INTEGER")
.description("desc")));
ExampleLineageProvider provider =
new ExampleLineageProvider("name", "namespace", schemaDatasetFacet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public class ColumnLevelLineageDeltaTest {
OpenLineage.SchemaDatasetFacet schemaDatasetFacet =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("b").type("int").build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("b").type("int").build()));

@BeforeEach
@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class ColumnLevelLineageHiveTest {
OpenLineage.SchemaDatasetFacet schemaDatasetFacet =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("b").type("int").build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("b").type("int").build()));

@BeforeAll
@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class ColumnLevelLineageIcebergTest {
OpenLineage.SchemaDatasetFacet schemaDatasetFacet =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("a").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("b").type(INT_TYPE).build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("a").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("b").type(INT_TYPE).build()));
StructType structTypeSchema =
new StructType(
new StructField[] {
Expand Down Expand Up @@ -186,8 +186,8 @@ void testUnaryExpression() {
OpenLineage.SchemaDatasetFacet outputSchema =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("c").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("d").type(INT_TYPE).build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("c").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("d").type(INT_TYPE).build()));

LogicalPlan plan = LastQueryExecutionSparkEventListener.getLastExecutedLogicalPlan().get();
when(queryExecution.optimizedPlan()).thenReturn(plan);
Expand All @@ -207,8 +207,8 @@ void testEmptyColumnLineageFacet() {
OpenLineage.SchemaDatasetFacet wrongSchema =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("x").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("y").type(INT_TYPE).build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("x").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("y").type(INT_TYPE).build()));

LogicalPlan plan = LastQueryExecutionSparkEventListener.getLastExecutedLogicalPlan().get();
when(queryExecution.optimizedPlan()).thenReturn(plan);
Expand Down Expand Up @@ -257,9 +257,9 @@ void testReadWriteParquetDataset() {
OpenLineage.SchemaDatasetFacet outputSchema =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("id").type("long").build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("id").type("long").build(),
openLineage
.newSchemaDatasetFacetFieldsBuilder()
.newSchemaDatasetFacetFieldBuilder()
.name("col2")
.type("long")
.build()));
Expand All @@ -281,8 +281,8 @@ void testBinaryAndComplexExpression() {
OpenLineage.SchemaDatasetFacet outputSchema =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("c").type("string").build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("d").type("string").build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("c").type("string").build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("d").type("string").build()));

LogicalPlan plan = LastQueryExecutionSparkEventListener.getLastExecutedLogicalPlan().get();
when(queryExecution.optimizedPlan()).thenReturn(plan);
Expand All @@ -308,7 +308,7 @@ void testJoinQuery() {
OpenLineage.SchemaDatasetFacet outputSchema =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("c").type(INT_TYPE).build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("c").type(INT_TYPE).build()));

LogicalPlan plan = LastQueryExecutionSparkEventListener.getLastExecutedLogicalPlan().get();
when(queryExecution.optimizedPlan()).thenReturn(plan);
Expand All @@ -328,7 +328,7 @@ void testAggregateQuery() {
OpenLineage.SchemaDatasetFacet outputSchema =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("a").type(INT_TYPE).build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("a").type(INT_TYPE).build()));

LogicalPlan plan = LastQueryExecutionSparkEventListener.getLastExecutedLogicalPlan().get();
when(queryExecution.optimizedPlan()).thenReturn(plan);
Expand All @@ -349,8 +349,8 @@ void testCTEQuery() {
OpenLineage.SchemaDatasetFacet outputSchema =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("c").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("d").type(INT_TYPE).build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("c").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("d").type(INT_TYPE).build()));

LogicalPlan plan = LastQueryExecutionSparkEventListener.getLastExecutedLogicalPlan().get();
when(queryExecution.optimizedPlan()).thenReturn(plan);
Expand Down Expand Up @@ -380,8 +380,8 @@ void testJobWithCachedDataset() {
OpenLineage.SchemaDatasetFacet outputSchema =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("e").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("f").type(INT_TYPE).build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("e").type(INT_TYPE).build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("f").type(INT_TYPE).build()));

LogicalPlan plan = LastQueryExecutionSparkEventListener.getLastExecutedLogicalPlan().get();
when(queryExecution.optimizedPlan()).thenReturn(plan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private static List<OpenLineage.SchemaDatasetFacetFields> transformFields(
for (StructField field : fields) {
list.add(
openLineage
.newSchemaDatasetFacetFieldsBuilder()
.newSchemaDatasetFacetFieldBuilder()
.name(field.name())
.type(field.dataType().typeName())
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class ColumnLevelLineageBuilderTest {
OpenLineage.SchemaDatasetFacet schema =
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage.newSchemaDatasetFacetFieldsBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldsBuilder().name("b").type("int").build()));
openLineage.newSchemaDatasetFacetFieldBuilder().name("a").type("int").build(),
openLineage.newSchemaDatasetFacetFieldBuilder().name("b").type("int").build()));
ColumnLevelLineageBuilder builder = new ColumnLevelLineageBuilder(schema, context);

ExprId rootExprId = mock(ExprId.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testCustomCollectorsAreApplied() {
openLineage.newSchemaDatasetFacet(
Arrays.asList(
openLineage
.newSchemaDatasetFacetFieldsBuilder()
.newSchemaDatasetFacetFieldBuilder()
.name(OUTPUT_COL_NAME)
.type("string")
.build()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void testLoadBuilderWithGlobalFacet() {
openLineage.newSchemaDatasetFacet(
Collections.singletonList(
openLineage
.newSchemaDatasetFacetFieldsBuilder()
.newSchemaDatasetFacetFieldBuilder()
.name("user_id")
.type("int64")
.build()));
Expand Down

0 comments on commit c0a0115

Please sign in to comment.