Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): Improve memory usage of lineage extraction #7326

Merged
merged 7 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,8 @@ def generate_lineage(
db_views: Dict[str, List[BigqueryView]],
) -> Iterable[MetadataWorkUnit]:
logger.info(f"Generate lineage for {project_id}")
lineage = self.lineage_extractor.calculate_lineage_for_project(project_id)

for dataset in db_tables.keys():
for table in db_tables[dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
Expand All @@ -660,6 +662,7 @@ def generate_lineage(
dataset_name=dataset,
table=table,
platform=self.platform,
lineage_metadata=lineage,
)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)
Expand All @@ -671,8 +674,10 @@ def generate_lineage(
dataset_name=dataset,
table=view,
platform=self.platform,
lineage_metadata=lineage,
)
yield from self.gen_lineage(dataset_urn, lineage_info)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)

def generate_usage_statistics(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
import logging
import textwrap
from collections import defaultdict
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union

import humanfriendly
Expand Down Expand Up @@ -82,7 +81,6 @@ class BigqueryLineageExtractor:
def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report):
self.config = config
self.report = report
self.lineage_metadata: Dict[str, Set[str]] = defaultdict(set)
self.loaded_project_ids: List[str] = []

def error(self, log: logging.Logger, key: str, reason: str) -> None:
Expand Down Expand Up @@ -663,10 +661,13 @@ def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[str]]:
return lineage_metadata

def get_upstream_tables(
self, bq_table: str, tables_seen: List[str] = []
self,
bq_table: str,
lineage_metadata: Dict[str, Set[str]],
tables_seen: List[str] = [],
) -> Set[BigQueryTableRef]:
upstreams: Set[BigQueryTableRef] = set()
for ref_table in self.lineage_metadata[str(bq_table)]:
for ref_table in lineage_metadata[str(bq_table)]:
upstream_table = BigQueryTableRef.from_string_name(ref_table)
if upstream_table.is_temporary_table(
[self.config.temp_table_dataset_prefix]
Expand All @@ -678,56 +679,63 @@ def get_upstream_tables(
)
continue
tables_seen.append(ref_table)
if ref_table in self.lineage_metadata:
if ref_table in lineage_metadata:
upstreams = upstreams.union(
self.get_upstream_tables(ref_table, tables_seen=tables_seen)
self.get_upstream_tables(
ref_table,
lineage_metadata=lineage_metadata,
tables_seen=tables_seen,
)
)
else:
upstreams.add(upstream_table)

return upstreams

def calculate_lineage_for_project(self, project_id: str) -> Dict[str, Set[str]]:
with PerfTimer() as timer:
lineage = self._compute_bigquery_lineage(project_id)

self.report.lineage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
)

return lineage

def get_upstream_lineage_info(
self,
project_id: str,
dataset_name: str,
table: Union[BigqueryTable, BigqueryView],
platform: str,
lineage_metadata: Dict[str, Set[str]],
) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]:
table_identifier = BigqueryTableIdentifier(project_id, dataset_name, table.name)

if table_identifier.project_id not in self.loaded_project_ids:
with PerfTimer() as timer:
self.lineage_metadata.update(
self._compute_bigquery_lineage(table_identifier.project_id)
)
self.report.lineage_extraction_sec[table_identifier.project_id] = round(
timer.elapsed_seconds(), 2
)
self.loaded_project_ids.append(table_identifier.project_id)

if self.config.lineage_parse_view_ddl and isinstance(table, BigqueryView):
for table_id in self.parse_view_lineage(project_id, dataset_name, table):
if table_identifier.get_table_name() in self.lineage_metadata:
self.lineage_metadata[
if table_identifier.get_table_name() in lineage_metadata:
lineage_metadata[
str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
].add(str(BigQueryTableRef(table_id).get_sanitized_table_ref()))
else:
self.lineage_metadata[
lineage_metadata[
str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
] = {str(BigQueryTableRef(table_id).get_sanitized_table_ref())}

bq_table = BigQueryTableRef.from_bigquery_table(table_identifier)
if str(bq_table) in self.lineage_metadata:
if str(bq_table) in lineage_metadata:
upstream_list: List[UpstreamClass] = []
# Sorting the list of upstream lineage events in order to avoid creating multiple aspects in backend
# even if the lineage is same but the order is different.
for upstream_table in sorted(
self.get_upstream_tables(str(bq_table), tables_seen=[])
self.get_upstream_tables(
str(bq_table), lineage_metadata, tables_seen=[]
)
):
upstream_table_class = UpstreamClass(
mce_builder.make_dataset_urn_with_platform_instance(
Expand Down
25 changes: 17 additions & 8 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ def test_simple_upstream_table_generation():
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
source.lineage_extractor.lineage_metadata = {str(a): {str(b)}}
upstreams = source.lineage_extractor.get_upstream_tables(str(a), [])
lineage_metadata = {str(a): {str(b)}}
upstreams = source.lineage_extractor.get_upstream_tables(
str(a), lineage_metadata, []
)
assert list(upstreams) == [b]


Expand All @@ -116,8 +118,11 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream():
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
source.lineage_extractor.lineage_metadata = {str(a): {str(b)}}
upstreams = source.lineage_extractor.get_upstream_tables(str(a), [])

lineage_metadata = {str(a): {str(b)}}
upstreams = source.lineage_extractor.get_upstream_tables(
str(a), lineage_metadata, []
)
assert list(upstreams) == []


Expand Down Expand Up @@ -146,11 +151,13 @@ def test_upstream_table_generation_with_temporary_table_with_temp_upstream():
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
source.lineage_extractor.lineage_metadata = {
lineage_metadata = {
str(a): {str(b)},
str(b): {str(c)},
}
upstreams = source.lineage_extractor.get_upstream_tables(str(a), [])
upstreams = source.lineage_extractor.get_upstream_tables(
str(a), lineage_metadata, []
)
assert list(upstreams) == [c]


Expand Down Expand Up @@ -187,12 +194,14 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
source.lineage_extractor.lineage_metadata = {
lineage_metadata = {
str(a): {str(b)},
str(b): {str(c), str(d)},
str(d): {str(e)},
}
upstreams = source.lineage_extractor.get_upstream_tables(str(a), [])
upstreams = source.lineage_extractor.get_upstream_tables(
str(a), lineage_metadata, []
)
assert list(upstreams).sort() == [c, e].sort()


Expand Down