Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix downstream recursion #2181

Merged
merged 1 commit into from
Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 21 additions & 8 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Expand Up @@ -99,19 +99,28 @@ void doUpsertColumnLineageRow(
@SqlQuery(
"""
WITH RECURSIVE
column_lineage_latest AS (
SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) *
FROM column_lineage
WHERE created_at <= :createdAtUntil
ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at
),
dataset_fields_view AS (
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
FROM dataset_fields df
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
),
column_lineage_recursive AS (
(
SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) *, 0 as depth
FROM column_lineage
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at
SELECT
*,
0 as depth,
false as is_cycle,
ARRAY[ROW(output_dataset_field_uuid, input_dataset_field_uuid)] as path -- path and is_cycle mechanism as describe here https://www.postgresql.org/docs/current/queries-with.html (CYCLE clause not available in postgresql 12)
FROM column_lineage_latest
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
)
UNION
UNION ALL
SELECT
adjacent_node.output_dataset_version_uuid,
adjacent_node.output_dataset_field_uuid,
Expand All @@ -121,27 +130,31 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :crea
adjacent_node.transformation_type,
adjacent_node.created_at,
adjacent_node.updated_at,
node.depth + 1 as depth
FROM column_lineage adjacent_node, column_lineage_recursive node
node.depth + 1 as depth,
ROW(adjacent_node.input_dataset_field_uuid, adjacent_node.output_dataset_field_uuid) = ANY(path) as is_cycle,
path || ROW(adjacent_node.input_dataset_field_uuid, adjacent_node.output_dataset_field_uuid) as path
FROM column_lineage_latest adjacent_node, column_lineage_recursive node
WHERE (
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage
OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage
)
AND node.depth < :depth
AND NOT is_cycle
)
SELECT
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
WHERE NOT clr.is_cycle
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
Expand Down
Expand Up @@ -71,7 +71,7 @@ private Lineage toLineage(Set<ColumnLineageNodeData> lineageNodeData) {
DatasetFieldId.of(i.getNamespace(), i.getDataset(), i.getField())))
.forEach(
inputNodeId -> {
graphNodes.put(inputNodeId, Node.datasetField().id(inputNodeId));
graphNodes.putIfAbsent(inputNodeId, Node.datasetField().id(inputNodeId));
Optional.ofNullable(outEdges.get(inputNodeId))
.ifPresentOrElse(
nodeEdges -> nodeEdges.add(nodeId),
Expand Down
Expand Up @@ -318,6 +318,15 @@ public void testGetLineageWithDownstream() {
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d"))
.findAny())
.isPresent();

ColumnLineageNodeData nodeData_C =
(ColumnLineageNodeData)
lineage.getGraph().stream()
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_c"))
.findAny()
.get()
.getData();
assertThat(nodeData_C.getInputFields()).hasSize(2);
}

@Test
Expand Down