Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): bigquery - Running lineage extraction after metadata extraction #6653

Merged
merged 2 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metadata-ingestion/docs/sources/bigquery/bigquery_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ If you have multiple projects in your BigQuery setup, the role should be granted
| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.list`           | List BigQuery tables.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.get`           | Retrieve metadata for a table.                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.get`           | Get Routines. Needs to retrieve metadata for a table from system table.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.list`           | List Routines. Needs to retrieve metadata for a table from system table                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `resourcemanager.projects.get`   | Retrieve project names and metadata.                                                                         | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.jobs.listAll`         | List all jobs (queries) submitted by any user. Needs for Lineage extraction.                                 | Lineage Extraction/Usage extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) |
| `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,8 @@ class LineageConfig(ConfigModel):
default=True,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

sql_parser_use_external_process: bool = Field(
default=False,
description="When enabled, sql parser will run in isolated in a separate process. This can affect processing time but can protect from sql parser's mem leak.",
)
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status, SubTypes
from datahub.metadata.com.linkedin.pegasus2avro.common import (
Status,
SubTypes,
TimeStamp,
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetProperties,
UpstreamLineage,
Expand Down Expand Up @@ -568,6 +572,31 @@ def _process_project(
)
continue

if self.config.include_table_lineage:
logger.info(f"Generate lineage for {project_id}")
for dataset in self.db_tables[project_id]:
for table in self.db_tables[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=table,
platform=self.platform,
)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)

for dataset in self.db_views[project_id]:
for view in self.db_views[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=view,
platform=self.platform,
)
yield from self.gen_lineage(dataset_urn, lineage_info)

if self.config.include_usage_statistics:
logger.info(f"Generate usage for {project_id}")
tables: Dict[str, List[str]] = {}
Expand Down Expand Up @@ -642,18 +671,8 @@ def _process_table(
f"Table doesn't have any column or unable to get columns for table: {table_identifier}"
)

lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None

if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=schema_name,
table=table,
platform=self.platform,
)

table_workunits = self.gen_table_dataset_workunits(
table, project_id, schema_name, lineage_info
table, project_id, schema_name
)
for wu in table_workunits:
self.report.report_workunit(wu)
Expand All @@ -679,18 +698,12 @@ def _process_view(
conn, table_identifier, column_limit=self.config.column_limit
)

lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset_name,
table=view,
platform=self.platform,
)
if dataset_name not in self.db_views[project_id]:
self.db_views[project_id][dataset_name] = []

view_workunits = self.gen_view_dataset_workunits(
view, project_id, dataset_name, lineage_info
)
self.db_views[project_id][dataset_name].append(view)

view_workunits = self.gen_view_dataset_workunits(view, project_id, dataset_name)
for wu in view_workunits:
self.report.report_workunit(wu)
yield wu
Expand Down Expand Up @@ -718,7 +731,6 @@ def gen_table_dataset_workunits(
table: BigqueryTable,
project_id: str,
dataset_name: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]],
) -> Iterable[MetadataWorkUnit]:
custom_properties: Dict[str, str] = {}
if table.expires:
Expand Down Expand Up @@ -761,7 +773,6 @@ def gen_table_dataset_workunits(
project_id=project_id,
dataset_name=dataset_name,
sub_type="table",
lineage_info=lineage_info,
tags_to_add=tags_to_add,
custom_properties=custom_properties,
)
Expand All @@ -771,15 +782,13 @@ def gen_view_dataset_workunits(
table: BigqueryView,
project_id: str,
dataset_name: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]],
) -> Iterable[MetadataWorkUnit]:

yield from self.gen_dataset_workunits(
table=table,
project_id=project_id,
dataset_name=dataset_name,
sub_type="view",
lineage_info=lineage_info,
)

view = cast(BigqueryView, table)
Expand All @@ -802,7 +811,6 @@ def gen_dataset_workunits(
project_id: str,
dataset_name: str,
sub_type: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None,
tags_to_add: Optional[List[str]] = None,
custom_properties: Optional[Dict[str, str]] = None,
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -819,43 +827,14 @@ def gen_dataset_workunits(

yield self.gen_schema_metadata(dataset_urn, table, str(datahub_dataset_name))

if lineage_info is not None:
upstream_lineage, upstream_column_props = lineage_info
else:
upstream_column_props = {}
upstream_lineage = None

if upstream_lineage is not None:
if self.config.incremental_lineage:
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(
urn=dataset_urn
)
for upstream in upstream_lineage.upstreams:
patch_builder.add_upstream_lineage(upstream)

lineage_workunits = [
MetadataWorkUnit(
id=f"upstreamLineage-for-{dataset_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
else:
lineage_workunits = [
wrap_aspect_as_workunit(
"dataset", dataset_urn, "upstreamLineage", upstream_lineage
)
]

for wu in lineage_workunits:
yield wu
self.report.report_workunit(wu)

dataset_properties = DatasetProperties(
name=datahub_dataset_name.get_table_display_name(),
description=table.comment,
qualifiedName=str(datahub_dataset_name),
customProperties={**upstream_column_props},
created=TimeStamp(time=int(table.created.timestamp() * 1000)),
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000))
if table.last_altered is not None
else None,
)
if custom_properties:
dataset_properties.customProperties.update(custom_properties)
Expand Down Expand Up @@ -895,6 +874,41 @@ def gen_dataset_workunits(
urn=dataset_urn,
)

def gen_lineage(
self,
dataset_urn: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None,
) -> Iterable[MetadataWorkUnit]:
if lineage_info is None:
return

upstream_lineage, upstream_column_props = lineage_info
if upstream_lineage is not None:
if self.config.incremental_lineage:
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(
urn=dataset_urn
)
for upstream in upstream_lineage.upstreams:
patch_builder.add_upstream_lineage(upstream)

lineage_workunits = [
MetadataWorkUnit(
id=f"upstreamLineage-for-{dataset_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
else:
lineage_workunits = [
wrap_aspect_as_workunit(
"dataset", dataset_urn, "upstreamLineage", upstream_lineage
)
]

for wu in lineage_workunits:
yield wu
self.report.report_workunit(wu)

def gen_tags_aspect_workunit(
self, dataset_urn: str, tags_to_add: List[str]
) -> MetadataWorkUnit:
Expand Down Expand Up @@ -1133,8 +1147,6 @@ def get_views_for_dataset(

views = self.db_views.get(project_id)

# get all views for database failed,
# falling back to get views for schema
if not views:
return BigQueryDataDictionary.get_views_for_dataset(
conn, project_id, dataset_name, self.config.profiling.enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,9 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
# in the references. There is no distinction between direct/base objects accessed. So doing sql parsing
# to ensure we only use direct objects accessed for lineage
try:
parser = BigQuerySQLParser(e.query)
parser = BigQuerySQLParser(
e.query, self.config.sql_parser_use_external_process
)
referenced_objs = set(
map(lambda x: x.split(".")[-1], parser.get_tables())
)
Expand Down Expand Up @@ -468,7 +470,9 @@ def parse_view_lineage(
parsed_tables = set()
if view.ddl:
try:
parser = BigQuerySQLParser(view.ddl)
parser = BigQuerySQLParser(
view.ddl, self.config.sql_parser_use_external_process
)
tables = parser.get_tables()
except Exception as ex:
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
class BigQuerySQLParser(SQLParser):
parser: SQLParser

def __init__(self, sql_query: str) -> None:
def __init__(self, sql_query: str, use_external_process: bool = False) -> None:
super().__init__(sql_query)

self._parsed_sql_query = self.parse_sql_query(sql_query)
self.parser = SqlLineageSQLParser(self._parsed_sql_query)
self.parser = SqlLineageSQLParser(self._parsed_sql_query, use_external_process)

def parse_sql_query(self, sql_query: str) -> str:
sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query)
Expand Down