From 815c00fdb0161a5b1cd5551bb7e8c074445d2362 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 6 Dec 2022 15:00:29 +0100 Subject: [PATCH] Running lineage extraction after metadata extraction Adding table creation/alter time to the datasetproperties Fixing bigquery permissions doc --- .../docs/sources/bigquery/bigquery_pre.md | 2 + .../ingestion/source/bigquery_v2/bigquery.py | 133 +++++---- .../ingestion/source/bigquery_v2/profiler2.py | 272 ++++++++++++++++++ 3 files changed, 345 insertions(+), 62 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler2.py diff --git a/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md b/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md index c204fcb6449d5..2217e4185d4b7 100644 --- a/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md +++ b/metadata-ingestion/docs/sources/bigquery/bigquery_pre.md @@ -34,6 +34,8 @@ If you have multiple projects in your BigQuery setup, the role should be granted | `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | | `bigquery.tables.list`           | List BigQuery tables.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | | `bigquery.tables.get`           | Retrieve metadata for a table.                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | +| `bigquery.routines.get`           | Get Routines. Needs to retrieve metadata for a table from system table.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | +| `bigquery.routines.list`           | List Routines. Needs to retrieve metadata for a table from system table                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | | `resourcemanager.projects.get`   | Retrieve project names and metadata.                                                                         | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) | | `bigquery.jobs.listAll`         | List all jobs (queries) submitted by any user. Needs for Lineage extraction.                                 | Lineage Extraction/Usage extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) | | `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) | diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 8066df165a01f..a48c05a7b8fe3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -67,7 +67,11 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) -from datahub.metadata.com.linkedin.pegasus2avro.common import Status, SubTypes +from datahub.metadata.com.linkedin.pegasus2avro.common import ( + Status, + SubTypes, + TimeStamp, +) from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( DatasetProperties, UpstreamLineage, @@ -568,6 +572,31 @@ def _process_project( ) continue + if self.config.include_table_lineage: + logger.info(f"Generate lineage for {project_id}") + for dataset in self.db_tables[project_id]: + for table in self.db_tables[project_id][dataset]: + dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name) + lineage_info = self.lineage_extractor.get_upstream_lineage_info( + project_id=project_id, + dataset_name=dataset, + table=table, + platform=self.platform, + ) + if lineage_info: + yield from self.gen_lineage(dataset_urn, lineage_info) + + for dataset in self.db_views[project_id]: + for view in self.db_views[project_id][dataset]: + dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name) + lineage_info = self.lineage_extractor.get_upstream_lineage_info( + project_id=project_id, + dataset_name=dataset, + table=view, + platform=self.platform, + ) + yield from self.gen_lineage(dataset_urn, lineage_info) + if self.config.include_usage_statistics: logger.info(f"Generate usage for {project_id}") tables: Dict[str, List[str]] = {} @@ -642,18 +671,8 @@ def _process_table( f"Table doesn't have any column or unable to get columns for table: {table_identifier}" ) - lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None - - if self.config.include_table_lineage: - lineage_info = self.lineage_extractor.get_upstream_lineage_info( - project_id=project_id, - dataset_name=schema_name, - table=table, - platform=self.platform, - ) - table_workunits = self.gen_table_dataset_workunits( - table, project_id, schema_name, lineage_info + table, project_id, schema_name ) for wu in table_workunits: self.report.report_workunit(wu) @@ -679,18 +698,7 @@ def _process_view( conn, table_identifier, column_limit=self.config.column_limit ) - lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None - if self.config.include_table_lineage: - lineage_info = self.lineage_extractor.get_upstream_lineage_info( - project_id=project_id, - dataset_name=dataset_name, - table=view, - platform=self.platform, - ) - - view_workunits = self.gen_view_dataset_workunits( - view, project_id, dataset_name, lineage_info - ) + view_workunits = self.gen_view_dataset_workunits(view, project_id, dataset_name) for wu in view_workunits: self.report.report_workunit(wu) yield wu @@ -718,7 +726,6 @@ def gen_table_dataset_workunits( table: BigqueryTable, project_id: str, dataset_name: str, - lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]], ) -> Iterable[MetadataWorkUnit]: custom_properties: Dict[str, str] = {} if table.expires: @@ -761,7 +768,6 @@ def gen_table_dataset_workunits( project_id=project_id, dataset_name=dataset_name, sub_type="table", - lineage_info=lineage_info, tags_to_add=tags_to_add, custom_properties=custom_properties, ) @@ -771,7 +777,6 @@ def gen_view_dataset_workunits( table: BigqueryView, project_id: str, dataset_name: str, - lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]], ) -> Iterable[MetadataWorkUnit]: yield from self.gen_dataset_workunits( @@ -779,7 +784,6 @@ def gen_view_dataset_workunits( project_id=project_id, dataset_name=dataset_name, sub_type="view", - lineage_info=lineage_info, ) view = cast(BigqueryView, table) @@ -802,7 +806,6 @@ def gen_dataset_workunits( project_id: str, dataset_name: str, sub_type: str, - lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None, tags_to_add: Optional[List[str]] = None, custom_properties: Optional[Dict[str, str]] = None, ) -> Iterable[MetadataWorkUnit]: @@ -819,43 +822,14 @@ def gen_dataset_workunits( yield self.gen_schema_metadata(dataset_urn, table, str(datahub_dataset_name)) - if lineage_info is not None: - upstream_lineage, upstream_column_props = lineage_info - else: - upstream_column_props = {} - upstream_lineage = None - - if upstream_lineage is not None: - if self.config.incremental_lineage: - patch_builder: DatasetPatchBuilder = DatasetPatchBuilder( - urn=dataset_urn - ) - for upstream in upstream_lineage.upstreams: - patch_builder.add_upstream_lineage(upstream) - - lineage_workunits = [ - MetadataWorkUnit( - id=f"upstreamLineage-for-{dataset_urn}", - mcp_raw=mcp, - ) - for mcp in patch_builder.build() - ] - else: - lineage_workunits = [ - wrap_aspect_as_workunit( - "dataset", dataset_urn, "upstreamLineage", upstream_lineage - ) - ] - - for wu in lineage_workunits: - yield wu - self.report.report_workunit(wu) - dataset_properties = DatasetProperties( name=datahub_dataset_name.get_table_display_name(), description=table.comment, qualifiedName=str(datahub_dataset_name), - customProperties={**upstream_column_props}, + created=TimeStamp(time=int(table.created.timestamp() * 1000)), + lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000)) + if table.last_altered is not None + else None, ) if custom_properties: dataset_properties.customProperties.update(custom_properties) @@ -895,6 +869,41 @@ def gen_dataset_workunits( urn=dataset_urn, ) + def gen_lineage( + self, + dataset_urn: str, + lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None, + ) -> Iterable[MetadataWorkUnit]: + if lineage_info is None: + return + + upstream_lineage, upstream_column_props = lineage_info + if upstream_lineage is not None: + if self.config.incremental_lineage: + patch_builder: DatasetPatchBuilder = DatasetPatchBuilder( + urn=dataset_urn + ) + for upstream in upstream_lineage.upstreams: + patch_builder.add_upstream_lineage(upstream) + + lineage_workunits = [ + MetadataWorkUnit( + id=f"upstreamLineage-for-{dataset_urn}", + mcp_raw=mcp, + ) + for mcp in patch_builder.build() + ] + else: + lineage_workunits = [ + wrap_aspect_as_workunit( + "dataset", dataset_urn, "upstreamLineage", upstream_lineage + ) + ] + + for wu in lineage_workunits: + yield wu + self.report.report_workunit(wu) + def gen_tags_aspect_workunit( self, dataset_urn: str, tags_to_add: List[str] ) -> MetadataWorkUnit: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler2.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler2.py new file mode 100644 index 0000000000000..5d01aad177932 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler2.py @@ -0,0 +1,272 @@ +import dataclasses +import datetime +import logging +from typing import Dict, Iterable, List, Optional, Tuple, cast + +from dateutil.relativedelta import relativedelta +from sqlalchemy import create_engine, inspect +from sqlalchemy.engine.reflection import Inspector + +from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance +from datahub.emitter.mcp_builder import wrap_aspect_as_workunit +from datahub.ingestion.api.common import WorkUnit +from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier +from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config +from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report +from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( + BigqueryColumn, + BigqueryTable, +) +from datahub.ingestion.source.ge_data_profiler import ( + DatahubGEProfiler, + GEProfilerRequest, +) +from datahub.ingestion.source.sql.sql_generic_profiler import ( + GenericProfiler, + TableProfilerRequest, +) +from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile +from datahub.metadata.schema_classes import DatasetProfileClass + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class BigqueryProfilerRequest(GEProfilerRequest): + table: BigqueryTable + profile_table_level_only: bool = False + + +class BigqueryProfiler(GenericProfiler): + def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report) -> None: + super().__init__(config, report, "bigquery") + self.config = config + self.report = report + + @staticmethod + def get_partition_range_from_partition_id( + partition_id: str, partition_datetime: Optional[datetime.datetime] + ) -> Tuple[datetime.datetime, datetime.datetime]: + partition_range_map: Dict[int, Tuple[relativedelta, str]] = { + 4: (relativedelta(years=1), "%Y"), + 6: (relativedelta(months=1), "%Y%m"), + 8: (relativedelta(days=1), "%Y%m%d"), + 10: (relativedelta(hours=1), "%Y%m%d%H"), + } + + duration: relativedelta + if partition_range_map.get(len(partition_id)): + (delta, format) = partition_range_map[len(partition_id)] + duration = delta + if not partition_datetime: + partition_datetime = datetime.datetime.strptime(partition_id, format) + else: + raise ValueError( + f"check your partition_id {partition_id}. It must be yearly/monthly/daily/hourly." + ) + upper_bound_partition_datetime = partition_datetime + duration + return partition_datetime, upper_bound_partition_datetime + + def generate_partition_profiler_query( + self, + project: str, + schema: str, + table: BigqueryTable, + partition_datetime: Optional[datetime.datetime], + ) -> Tuple[Optional[str], Optional[str]]: + """ + Method returns partition id if table is partitioned or sharded and generate custom partition query for + partitioned table. + See more about partitioned tables at https://cloud.google.com/bigquery/docs/partitioned-tables + """ + logger.debug( + f"generate partition profiler query for project: {project} schema: {schema} and table {table.name}, partition_datetime: {partition_datetime}" + ) + partition = table.max_partition_id + if partition: + partition_where_clause: str + + if not table.time_partitioning: + partition_column: Optional[BigqueryColumn] = None + for column in table.columns: + if column.is_partition_column: + partition_column = column + break + if partition_column: + partition_where_clause = f"{partition_column.name} >= {partition}" + else: + logger.warning( + f"Partitioned table {table.name} without partiton column" + ) + return None, None + else: + logger.debug( + f"{table.name} is partitioned and partition column is {partition}" + ) + try: + ( + partition_datetime, + upper_bound_partition_datetime, + ) = self.get_partition_range_from_partition_id( + partition, partition_datetime + ) + except ValueError as e: + logger.error( + f"Unable to get partition range for partition id: {partition} it failed with exception {e}" + ) + self.report.invalid_partition_ids[ + f"{schema}.{table.name}" + ] = partition + return None, None + + partition_column_type: str = "DATE" + for c in table.columns: + if c.is_partition_column: + partition_column_type = c.data_type + + if table.time_partitioning.type_ in ("DAY", "MONTH", "YEAR"): + partition_where_clause = f"`{table.time_partitioning.field}` BETWEEN {partition_column_type}('{partition_datetime}') AND {partition_column_type}('{upper_bound_partition_datetime}')" + elif table.time_partitioning.type_ in ("HOUR"): + partition_where_clause = f"`{table.time_partitioning.field}` BETWEEN '{partition_datetime}' AND '{upper_bound_partition_datetime}'" + else: + logger.warning( + f"Not supported partition type {table.time_partitioning.type_}" + ) + return None, None + custom_sql = """ +SELECT + * +FROM + `{table_catalog}.{table_schema}.{table_name}` +WHERE + {partition_where_clause} + """.format( + table_catalog=project, + table_schema=schema, + table_name=table.name, + partition_where_clause=partition_where_clause, + ) + + return (partition, custom_sql) + if table.max_shard_id: + # For sharded table we want to get the partition id but not needed to generate custom query + return table.max_shard_id, None + + return None, None + + def get_workunits( + self, tables: Dict[str, Dict[str, List[BigqueryTable]]] + ) -> Iterable[WorkUnit]: + + # Otherwise, if column level profiling is enabled, use GE profiler. + for project in tables.keys(): + if not self.config.project_id_pattern.allowed(project): + continue + profile_requests = [] + + for dataset in tables[project]: + if not self.config.schema_pattern.allowed(dataset): + continue + + for table in tables[project][dataset]: + # Emit the profile work unit + profile_request = self.get_bigquery_profile_request( + project=project, dataset=dataset, table=table + ) + if profile_request is not None: + profile_requests.append(profile_request) + + if len(profile_requests) == 0: + continue + profile_requests = cast(List[TableProfilerRequest], profile_requests) + for request, profile in self.generate_profiles( + profile_requests, + self.config.profiling.max_workers, + platform=self.platform, + profiler_args=self.get_profile_args(), + ): + if request is None or profile is None: + continue + + request = cast(BigqueryProfilerRequest, request) + profile.sizeInBytes = request.table.size_in_bytes + # If table is partitioned we profile only one partition (if nothing set then the last one) + # but for table level we can use the rows_count from the table metadata + # This way even though column statistics only reflects one partition data but the rows count + # shows the proper count. + if profile.partitionSpec and profile.partitionSpec.partition: + profile.rowCount = request.table.rows_count + + dataset_name = request.pretty_name + dataset_urn = make_dataset_urn_with_platform_instance( + self.platform, + dataset_name, + self.config.platform_instance, + self.config.env, + ) + wu = wrap_aspect_as_workunit( + "dataset", + dataset_urn, + "datasetProfile", + profile, + ) + self.report.report_workunit(wu) + yield wu + + def get_bigquery_profile_request( + self, project: str, dataset: str, table: BigqueryTable + ) -> Optional[BigqueryProfilerRequest]: + skip_profiling = False + profile_table_level_only = self.config.profiling.profile_table_level_only + dataset_name = BigqueryTableIdentifier( + project_id=project, dataset=dataset, table=table.name + ).get_table_name() + if not self.is_dataset_eligible_for_profiling( + dataset_name, table.last_altered, table.size_in_bytes, table.rows_count + ): + profile_table_level_only = True + self.report.num_tables_not_eligible_profiling[dataset] = ( + self.report.num_tables_not_eligible_profiling.get(dataset, 0) + 1 + ) + + if not table.columns: + skip_profiling = True + + if skip_profiling: + if self.config.profiling.report_dropped_profiles: + self.report.report_dropped(f"profile of {dataset_name}") + return None + (partition, custom_sql) = self.generate_partition_profiler_query( + project, dataset, table, self.config.profiling.partition_datetime + ) + + if partition is None and table.time_partitioning: + self.report.report_warning( + "profile skipped as partitioned table is empty or partition id was invalid", + dataset_name, + ) + return None + + if ( + partition is not None + and not self.config.profiling.partition_profiling_enabled + ): + logger.debug( + f"{dataset_name} and partition {partition} is skipped because profiling.partition_profiling_enabled property is disabled" + ) + return None + + self.report.report_entity_profiled(dataset_name) + logger.debug(f"Preparing profiling request for {dataset_name}") + profile_request = BigqueryProfilerRequest( + pretty_name=dataset_name, + batch_kwargs=dict( + schema=project, + table=f"{dataset}.{table.name}", + custom_sql=custom_sql, + partition=partition, + ), + table=table, + profile_table_level_only=profile_table_level_only, + ) + return profile_request