Skip to content

Commit

Permalink
fix(ingest/bigquery): Improve memory usage of lineage extraction (dat…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored and Oleg Ruban committed Feb 28, 2023
1 parent 0a3f97c commit e225037
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,8 @@ def generate_lineage(
db_views: Dict[str, List[BigqueryView]],
) -> Iterable[MetadataWorkUnit]:
logger.info(f"Generate lineage for {project_id}")
lineage = self.lineage_extractor.calculate_lineage_for_project(project_id)

for dataset in db_tables.keys():
for table in db_tables[dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
Expand All @@ -660,6 +662,7 @@ def generate_lineage(
dataset_name=dataset,
table=table,
platform=self.platform,
lineage_metadata=lineage,
)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)
Expand All @@ -671,8 +674,10 @@ def generate_lineage(
dataset_name=dataset,
table=view,
platform=self.platform,
lineage_metadata=lineage,
)
yield from self.gen_lineage(dataset_urn, lineage_info)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)

def generate_usage_statistics(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import collections
import logging
import textwrap
from collections import defaultdict
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union

import humanfriendly
Expand Down Expand Up @@ -82,7 +81,6 @@ class BigqueryLineageExtractor:
def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report):
self.config = config
self.report = report
self.lineage_metadata: Dict[str, Set[str]] = defaultdict(set)
self.loaded_project_ids: List[str] = []

def error(self, log: logging.Logger, key: str, reason: str) -> None:
Expand Down Expand Up @@ -663,10 +661,13 @@ def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[str]]:
return lineage_metadata

def get_upstream_tables(
self, bq_table: str, tables_seen: List[str] = []
self,
bq_table: str,
lineage_metadata: Dict[str, Set[str]],
tables_seen: List[str] = [],
) -> Set[BigQueryTableRef]:
upstreams: Set[BigQueryTableRef] = set()
for ref_table in self.lineage_metadata[str(bq_table)]:
for ref_table in lineage_metadata[str(bq_table)]:
upstream_table = BigQueryTableRef.from_string_name(ref_table)
if upstream_table.is_temporary_table(
[self.config.temp_table_dataset_prefix]
Expand All @@ -678,56 +679,63 @@ def get_upstream_tables(
)
continue
tables_seen.append(ref_table)
if ref_table in self.lineage_metadata:
if ref_table in lineage_metadata:
upstreams = upstreams.union(
self.get_upstream_tables(ref_table, tables_seen=tables_seen)
self.get_upstream_tables(
ref_table,
lineage_metadata=lineage_metadata,
tables_seen=tables_seen,
)
)
else:
upstreams.add(upstream_table)

return upstreams

def calculate_lineage_for_project(self, project_id: str) -> Dict[str, Set[str]]:
with PerfTimer() as timer:
lineage = self._compute_bigquery_lineage(project_id)

self.report.lineage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
)

return lineage

def get_upstream_lineage_info(
self,
project_id: str,
dataset_name: str,
table: Union[BigqueryTable, BigqueryView],
platform: str,
lineage_metadata: Dict[str, Set[str]],
) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]:
table_identifier = BigqueryTableIdentifier(project_id, dataset_name, table.name)

if table_identifier.project_id not in self.loaded_project_ids:
with PerfTimer() as timer:
self.lineage_metadata.update(
self._compute_bigquery_lineage(table_identifier.project_id)
)
self.report.lineage_extraction_sec[table_identifier.project_id] = round(
timer.elapsed_seconds(), 2
)
self.loaded_project_ids.append(table_identifier.project_id)

if self.config.lineage_parse_view_ddl and isinstance(table, BigqueryView):
for table_id in self.parse_view_lineage(project_id, dataset_name, table):
if table_identifier.get_table_name() in self.lineage_metadata:
self.lineage_metadata[
if table_identifier.get_table_name() in lineage_metadata:
lineage_metadata[
str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
].add(str(BigQueryTableRef(table_id).get_sanitized_table_ref()))
else:
self.lineage_metadata[
lineage_metadata[
str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
] = {str(BigQueryTableRef(table_id).get_sanitized_table_ref())}

bq_table = BigQueryTableRef.from_bigquery_table(table_identifier)
if str(bq_table) in self.lineage_metadata:
if str(bq_table) in lineage_metadata:
upstream_list: List[UpstreamClass] = []
# Sorting the list of upstream lineage events in order to avoid creating multiple aspects in backend
# even if the lineage is same but the order is different.
for upstream_table in sorted(
self.get_upstream_tables(str(bq_table), tables_seen=[])
self.get_upstream_tables(
str(bq_table), lineage_metadata, tables_seen=[]
)
):
upstream_table_class = UpstreamClass(
mce_builder.make_dataset_urn_with_platform_instance(
Expand Down
25 changes: 17 additions & 8 deletions metadata-ingestion/tests/unit/test_bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ def test_simple_upstream_table_generation():
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
source.lineage_extractor.lineage_metadata = {str(a): {str(b)}}
upstreams = source.lineage_extractor.get_upstream_tables(str(a), [])
lineage_metadata = {str(a): {str(b)}}
upstreams = source.lineage_extractor.get_upstream_tables(
str(a), lineage_metadata, []
)
assert list(upstreams) == [b]


Expand All @@ -116,8 +118,11 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream():
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
source.lineage_extractor.lineage_metadata = {str(a): {str(b)}}
upstreams = source.lineage_extractor.get_upstream_tables(str(a), [])

lineage_metadata = {str(a): {str(b)}}
upstreams = source.lineage_extractor.get_upstream_tables(
str(a), lineage_metadata, []
)
assert list(upstreams) == []


Expand Down Expand Up @@ -146,11 +151,13 @@ def test_upstream_table_generation_with_temporary_table_with_temp_upstream():
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
source.lineage_extractor.lineage_metadata = {
lineage_metadata = {
str(a): {str(b)},
str(b): {str(c)},
}
upstreams = source.lineage_extractor.get_upstream_tables(str(a), [])
upstreams = source.lineage_extractor.get_upstream_tables(
str(a), lineage_metadata, []
)
assert list(upstreams) == [c]


Expand Down Expand Up @@ -187,12 +194,14 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
source.lineage_extractor.lineage_metadata = {
lineage_metadata = {
str(a): {str(b)},
str(b): {str(c), str(d)},
str(d): {str(e)},
}
upstreams = source.lineage_extractor.get_upstream_tables(str(a), [])
upstreams = source.lineage_extractor.get_upstream_tables(
str(a), lineage_metadata, []
)
assert list(upstreams).sort() == [c, e].sort()


Expand Down

0 comments on commit e225037

Please sign in to comment.