Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1340,11 +1340,25 @@ def _gen_lineage_for_downstream(
upstreams.setdefault(upstream, query.query_id)

for lineage_info in query.column_lineage:
for upstream_ref in lineage_info.upstreams:
cll[lineage_info.downstream.column].setdefault(
SchemaFieldUrn(upstream_ref.table, upstream_ref.column),
query.query_id,
if (
not lineage_info.downstream.column
or not lineage_info.downstream.column.strip()
):
logger.debug(
f"Skipping lineage entry with empty downstream column in query {query.query_id}"
)
continue

for upstream_ref in lineage_info.upstreams:
if upstream_ref.column and upstream_ref.column.strip():
cll[lineage_info.downstream.column].setdefault(
SchemaFieldUrn(upstream_ref.table, upstream_ref.column),
query.query_id,
)
else:
logger.debug(
f"Skipping empty column reference in lineage for query {query.query_id}"
)

# Finally, we can build our lineage edge.
required_queries = OrderedSet[QueryId]()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:e0ac0a087b1c788497e6ae3ed1abf6538c66f80a0b1e6fad83219813d6c95b27"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD),col_a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD),col_a)"
],
"confidenceScore": 1.0,
"query": "urn:li:query:e0ac0a087b1c788497e6ae3ed1abf6538c66f80a0b1e6fad83219813d6c95b27"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD),col_b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD),col_b)"
],
"confidenceScore": 1.0,
"query": "urn:li:query:e0ac0a087b1c788497e6ae3ed1abf6538c66f80a0b1e6fad83219813d6c95b27"
}
]
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.target_table,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.source_table,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:7bfef29f2b8442ffa6e240bfeb06b78bbf971b25dff80a9d2ba31cf5aef55e2e"
}
]
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_downstream,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_upstream,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:9138bff85215113e471f5869fabf99797c37181899f5fb145ed05846437426d2"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_upstream,PROD),title)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,dev.public.empty_downstream,PROD),TITLE_DOWNSTREAM)"
],
"confidenceScore": 1.0,
"query": "urn:li:query:9138bff85215113e471f5869fabf99797c37181899f5fb145ed05846437426d2"
}
]
}
}
}
]
165 changes: 165 additions & 0 deletions metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1132,3 +1132,168 @@ def test_diamond_problem(pytestconfig: pytest.Config, tmp_path: pathlib.Path) ->
pytestconfig.rootpath
/ "tests/unit/sql_parsing/aggregator_goldens/test_diamond_problem_golden.json",
)


@freeze_time(FROZEN_TIME)
def test_empty_column_in_snowflake_lineage(
pytestconfig: pytest.Config, tmp_path: pathlib.Path
) -> None:
"""Test that column lineage with empty string column names doesn't cause errors.

Note: Uses KnownQueryLineageInfo instead of ObservedQuery since empty column names from
external systems would require mocking _run_sql_parser().
"""
aggregator = SqlParsingAggregator(
platform="snowflake",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)

downstream_urn = DatasetUrn("snowflake", "dev.public.target_table").urn()
upstream_urn = DatasetUrn("snowflake", "dev.public.source_table").urn()

known_query_lineage = KnownQueryLineageInfo(
query_text="insert into target_table (col_a, col_b, col_c) select col_a, col_b, col_c from source_table",
downstream=downstream_urn,
upstreams=[upstream_urn],
column_lineage=[
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column="col_a"),
upstreams=[ColumnRef(table=upstream_urn, column="col_a")],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column="col_b"),
upstreams=[
ColumnRef(table=upstream_urn, column="col_b"),
ColumnRef(table=upstream_urn, column=""),
],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column="col_c"),
upstreams=[
ColumnRef(table=upstream_urn, column=""),
],
),
],
timestamp=_ts(20),
query_type=QueryType.INSERT,
)

aggregator.add_known_query_lineage(known_query_lineage)

mcpws = [mcp for mcp in aggregator.gen_metadata()]
lineage_mcpws = [mcpw for mcpw in mcpws if mcpw.aspectName == "upstreamLineage"]
out_path = tmp_path / "mcpw.json"
write_metadata_file(out_path, lineage_mcpws)

mce_helpers.check_golden_file(
pytestconfig,
out_path,
RESOURCE_DIR / "test_empty_column_in_snowflake_lineage_golden.json",
)


@freeze_time(FROZEN_TIME)
def test_empty_downstream_column_in_snowflake_lineage(
pytestconfig: pytest.Config, tmp_path: pathlib.Path
) -> None:
"""Test that column lineage with empty downstream column names doesn't cause errors.

Note: Uses KnownQueryLineageInfo instead of ObservedQuery since empty column names from
external systems would require mocking _run_sql_parser().
"""
aggregator = SqlParsingAggregator(
platform="snowflake",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)

downstream_urn = DatasetUrn("snowflake", "dev.public.target_table").urn()
upstream_urn = DatasetUrn("snowflake", "dev.public.source_table").urn()

known_query_lineage = KnownQueryLineageInfo(
query_text='create table target_table as select $1 as "", $2 as " " from source_table',
downstream=downstream_urn,
upstreams=[upstream_urn],
column_lineage=[
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column=""),
upstreams=[ColumnRef(table=upstream_urn, column="col_a")],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column=" "),
upstreams=[ColumnRef(table=upstream_urn, column="col_b")],
),
],
timestamp=_ts(20),
query_type=QueryType.CREATE_TABLE_AS_SELECT,
)

aggregator.add_known_query_lineage(known_query_lineage)

mcpws = [mcp for mcp in aggregator.gen_metadata()]
lineage_mcpws = [mcpw for mcpw in mcpws if mcpw.aspectName == "upstreamLineage"]
out_path = tmp_path / "mcpw.json"
write_metadata_file(out_path, lineage_mcpws)

mce_helpers.check_golden_file(
pytestconfig,
out_path,
RESOURCE_DIR / "test_empty_downstream_column_in_snowflake_lineage_golden.json",
)


@freeze_time(FROZEN_TIME)
def test_partial_empty_downstream_column_in_snowflake_lineage(
pytestconfig: pytest.Config, tmp_path: pathlib.Path
) -> None:
"""Test that column lineage with mix of empty and valid downstream columns works correctly.

Note: Uses KnownQueryLineageInfo instead of ObservedQuery since empty column names from
external systems would require mocking _run_sql_parser().
"""
aggregator = SqlParsingAggregator(
platform="snowflake",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)

downstream_urn = DatasetUrn("snowflake", "dev.public.empty_downstream").urn()
upstream_urn = DatasetUrn("snowflake", "dev.public.empty_upstream").urn()

known_query_lineage = KnownQueryLineageInfo(
query_text='create table empty_downstream as select $1 as "", $2 as "TITLE_DOWNSTREAM" from empty_upstream',
downstream=downstream_urn,
upstreams=[upstream_urn],
column_lineage=[
ColumnLineageInfo(
downstream=DownstreamColumnRef(table=downstream_urn, column=""),
upstreams=[ColumnRef(table=upstream_urn, column="name")],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(
table=downstream_urn, column="TITLE_DOWNSTREAM"
),
upstreams=[ColumnRef(table=upstream_urn, column="title")],
),
],
timestamp=_ts(20),
query_type=QueryType.CREATE_TABLE_AS_SELECT,
)

aggregator.add_known_query_lineage(known_query_lineage)

mcpws = [mcp for mcp in aggregator.gen_metadata()]
lineage_mcpws = [mcpw for mcpw in mcpws if mcpw.aspectName == "upstreamLineage"]
out_path = tmp_path / "mcpw.json"
write_metadata_file(out_path, lineage_mcpws)

mce_helpers.check_golden_file(
pytestconfig,
out_path,
RESOURCE_DIR
/ "test_partial_empty_downstream_column_in_snowflake_lineage_golden.json",
)
Loading