Skip to content

Commit

Permalink
Running lineage extraction after metadata extraction
Browse files Browse the repository at this point in the history
Adding table creation/alter time to the datasetproperties
Fixing bigquery permissions doc
  • Loading branch information
treff7es committed Dec 6, 2022
1 parent ed9110e commit 815c00f
Show file tree
Hide file tree
Showing 3 changed files with 345 additions and 62 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/docs/sources/bigquery/bigquery_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ If you have multiple projects in your BigQuery setup, the role should be granted
| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.list`           | List BigQuery tables.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.get`           | Retrieve metadata for a table.                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.get`           | Get Routines. Needs to retrieve metadata for a table from system table.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.list`           | List Routines. Needs to retrieve metadata for a table from system table                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `resourcemanager.projects.get`   | Retrieve project names and metadata.                                                                         | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.jobs.listAll`         | List all jobs (queries) submitted by any user. Needs for Lineage extraction.                                 | Lineage Extraction/Usage extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) |
| `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status, SubTypes
from datahub.metadata.com.linkedin.pegasus2avro.common import (
Status,
SubTypes,
TimeStamp,
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetProperties,
UpstreamLineage,
Expand Down Expand Up @@ -568,6 +572,31 @@ def _process_project(
)
continue

if self.config.include_table_lineage:
logger.info(f"Generate lineage for {project_id}")
for dataset in self.db_tables[project_id]:
for table in self.db_tables[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=table,
platform=self.platform,
)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)

for dataset in self.db_views[project_id]:
for view in self.db_views[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=view,
platform=self.platform,
)
yield from self.gen_lineage(dataset_urn, lineage_info)

if self.config.include_usage_statistics:
logger.info(f"Generate usage for {project_id}")
tables: Dict[str, List[str]] = {}
Expand Down Expand Up @@ -642,18 +671,8 @@ def _process_table(
f"Table doesn't have any column or unable to get columns for table: {table_identifier}"
)

lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None

if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=schema_name,
table=table,
platform=self.platform,
)

table_workunits = self.gen_table_dataset_workunits(
table, project_id, schema_name, lineage_info
table, project_id, schema_name
)
for wu in table_workunits:
self.report.report_workunit(wu)
Expand All @@ -679,18 +698,7 @@ def _process_view(
conn, table_identifier, column_limit=self.config.column_limit
)

lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset_name,
table=view,
platform=self.platform,
)

view_workunits = self.gen_view_dataset_workunits(
view, project_id, dataset_name, lineage_info
)
view_workunits = self.gen_view_dataset_workunits(view, project_id, dataset_name)
for wu in view_workunits:
self.report.report_workunit(wu)
yield wu
Expand Down Expand Up @@ -718,7 +726,6 @@ def gen_table_dataset_workunits(
table: BigqueryTable,
project_id: str,
dataset_name: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]],
) -> Iterable[MetadataWorkUnit]:
custom_properties: Dict[str, str] = {}
if table.expires:
Expand Down Expand Up @@ -761,7 +768,6 @@ def gen_table_dataset_workunits(
project_id=project_id,
dataset_name=dataset_name,
sub_type="table",
lineage_info=lineage_info,
tags_to_add=tags_to_add,
custom_properties=custom_properties,
)
Expand All @@ -771,15 +777,13 @@ def gen_view_dataset_workunits(
table: BigqueryView,
project_id: str,
dataset_name: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]],
) -> Iterable[MetadataWorkUnit]:

yield from self.gen_dataset_workunits(
table=table,
project_id=project_id,
dataset_name=dataset_name,
sub_type="view",
lineage_info=lineage_info,
)

view = cast(BigqueryView, table)
Expand All @@ -802,7 +806,6 @@ def gen_dataset_workunits(
project_id: str,
dataset_name: str,
sub_type: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None,
tags_to_add: Optional[List[str]] = None,
custom_properties: Optional[Dict[str, str]] = None,
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -819,43 +822,14 @@ def gen_dataset_workunits(

yield self.gen_schema_metadata(dataset_urn, table, str(datahub_dataset_name))

if lineage_info is not None:
upstream_lineage, upstream_column_props = lineage_info
else:
upstream_column_props = {}
upstream_lineage = None

if upstream_lineage is not None:
if self.config.incremental_lineage:
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(
urn=dataset_urn
)
for upstream in upstream_lineage.upstreams:
patch_builder.add_upstream_lineage(upstream)

lineage_workunits = [
MetadataWorkUnit(
id=f"upstreamLineage-for-{dataset_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
else:
lineage_workunits = [
wrap_aspect_as_workunit(
"dataset", dataset_urn, "upstreamLineage", upstream_lineage
)
]

for wu in lineage_workunits:
yield wu
self.report.report_workunit(wu)

dataset_properties = DatasetProperties(
name=datahub_dataset_name.get_table_display_name(),
description=table.comment,
qualifiedName=str(datahub_dataset_name),
customProperties={**upstream_column_props},
created=TimeStamp(time=int(table.created.timestamp() * 1000)),
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000))
if table.last_altered is not None
else None,
)
if custom_properties:
dataset_properties.customProperties.update(custom_properties)
Expand Down Expand Up @@ -895,6 +869,41 @@ def gen_dataset_workunits(
urn=dataset_urn,
)

def gen_lineage(
self,
dataset_urn: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None,
) -> Iterable[MetadataWorkUnit]:
if lineage_info is None:
return

upstream_lineage, upstream_column_props = lineage_info
if upstream_lineage is not None:
if self.config.incremental_lineage:
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(
urn=dataset_urn
)
for upstream in upstream_lineage.upstreams:
patch_builder.add_upstream_lineage(upstream)

lineage_workunits = [
MetadataWorkUnit(
id=f"upstreamLineage-for-{dataset_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
else:
lineage_workunits = [
wrap_aspect_as_workunit(
"dataset", dataset_urn, "upstreamLineage", upstream_lineage
)
]

for wu in lineage_workunits:
yield wu
self.report.report_workunit(wu)

def gen_tags_aspect_workunit(
self, dataset_urn: str, tags_to_add: List[str]
) -> MetadataWorkUnit:
Expand Down
Loading

0 comments on commit 815c00f

Please sign in to comment.