Skip to content

Commit

Permalink
feat(ingest/unity): GE Profiling (datahub-project#8951)
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored and Salman-Apptware committed Dec 15, 2023
1 parent a48e75e commit 49faac9
Show file tree
Hide file tree
Showing 18 changed files with 449 additions and 211 deletions.
4 changes: 4 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #9257: The Python SDK urn types are now autogenerated. The new classes are largely backwards compatible with the previous, manually written classes, but many older methods are now deprecated in favor of a more uniform interface. The only breaking change is that the signature for the director constructor e.g. `TagUrn("tag", ["tag_name"])` is no longer supported, and the simpler `TagUrn("tag_name")` should be used instead.
The canonical place to import the urn classes from is `datahub.metadata.urns.*`. Other import paths, like `datahub.utilities.urns.corpuser_urn.CorpuserUrn` are retained for backwards compatibility, but are considered deprecated.
- #9286: The `DataHubRestEmitter.emit` method no longer returns anything. It previously returned a tuple of timestamps.
- #8951: A great expectations based profiler has been added for the Unity Catalog source.
To use the old profiler, set `method: analyze` under the `profiling` section in your recipe.
To use the new profiler, set `method: ge`. Profiling is disabled by default, so to enable it,
one of these methods must be specified.

### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
* [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html)
+ To ingest your workspace's notebooks and respective lineage, your service principal must have `CAN_READ` privileges on the folders containing the notebooks you want to ingest: [guide](https://docs.databricks.com/en/security/auth-authz/access-control/workspace-acl.html#folder-permissions).
+ To `include_usage_statistics` (enabled by default), your service principal must have `CAN_MANAGE` permissions on any SQL Warehouses you want to ingest: [guide](https://docs.databricks.com/security/auth-authz/access-control/sql-endpoint-acl.html).
+ To ingest `profiling` information with `call_analyze` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile.
+ To ingest `profiling` information with `method: ge`, you need `SELECT` privileges on all profiled tables.
+ To ingest `profiling` information with `method: analyze` and `call_analyze: true` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile.
* Alternatively, you can run [ANALYZE TABLE](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-analyze-table.html) yourself on any tables you want to profile, then set `call_analyze` to `false`.
You will still need `SELECT` privilege on those tables to fetch the results.
- Check the starter recipe below and replace `workspace_url` and `token` with your information from the previous steps.
52 changes: 33 additions & 19 deletions metadata-ingestion/docs/sources/databricks/unity-catalog_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,43 @@ source:
type: unity-catalog
config:
workspace_url: https://my-workspace.cloud.databricks.com
token: "mygenerated_databricks_token"
#metastore_id_pattern:
# deny:
# - 11111-2222-33333-44-555555
#catalog_pattern:
# allow:
# - my-catalog
#schema_pattern:
# deny:
# - information_schema
#table_pattern:
# allow:
# - test.lineagedemo.dinner
# First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions
#domain:
# urn:li:domain:1111-222-333-444-555:
# allow:
# - main.*
token: "<token>"
include_metastore: false
include_ownership: true
profiling:
method: "ge"
enabled: true
warehouse_id: "<warehouse_id>"
profile_table_level_only: false
max_wait_secs: 60
pattern:
deny:
- ".*\\.unwanted_schema"

# profiling:
# method: "analyze"
# enabled: true
# warehouse_id: "<warehouse_id>"
# profile_table_level_only: true
# call_analyze: true

# catalogs: ["my_catalog"]
# schema_pattern:
# deny:
# - information_schema
# table_pattern:
# allow:
# - my_catalog.my_schema.my_table
# First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions
# domain:
# urn:li:domain:1111-222-333-444-555:
# allow:
# - main.*

stateful_ingestion:
enabled: true

pipeline_name: acme-corp-unity


# sink configs if needed
# sink configs if needed
5 changes: 3 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@
"databricks-sdk>=0.9.0",
"pyspark~=3.3.0",
"requests",
"databricks-sql-connector",
# Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes
"databricks-sql-connector>=2.8.0",
}

mysql = sql_common | {"pymysql>=1.0.2"}
Expand Down Expand Up @@ -393,7 +394,7 @@
"powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib,
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"},
"unity-catalog": databricks | sqllineage_lib,
"unity-catalog": databricks | sql_common | sqllineage_lib,
"fivetran": snowflake_common,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def get_workunits(
return
yield from self.generate_profile_workunits(
profile_requests,
self.config.profiling.max_workers,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import sqlalchemy as sa
import sqlalchemy.sql.compiler
from great_expectations.core.profiler_types_mapping import ProfilerTypeMapping
from great_expectations.core.util import convert_to_json_serializable
from great_expectations.data_context import AbstractDataContext, BaseDataContext
from great_expectations.data_context.types.base import (
Expand Down Expand Up @@ -77,8 +78,26 @@
SNOWFLAKE = "snowflake"
BIGQUERY = "bigquery"
REDSHIFT = "redshift"
DATABRICKS = "databricks"
TRINO = "trino"

# Type names for Databricks, to match Title Case types in sqlalchemy
ProfilerTypeMapping.INT_TYPE_NAMES.append("Integer")
ProfilerTypeMapping.INT_TYPE_NAMES.append("SmallInteger")
ProfilerTypeMapping.INT_TYPE_NAMES.append("BigInteger")
ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Float")
ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Numeric")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("String")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("Text")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("Unicode")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("UnicodeText")
ProfilerTypeMapping.BOOLEAN_TYPE_NAMES.append("Boolean")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Date")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("DateTime")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Time")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Interval")
ProfilerTypeMapping.BINARY_TYPE_NAMES.append("LargeBinary")

# The reason for this wacky structure is quite fun. GE basically assumes that
# the config structures were generated directly from YML and further assumes that
# they can be `deepcopy`'d without issue. The SQLAlchemy engine and connection
Expand Down Expand Up @@ -697,6 +716,9 @@ def generate_dataset_profile( # noqa: C901 (complexity)
1, unique_count / non_null_count
)

if not profile.rowCount:
continue

self._get_dataset_column_sample_values(column_profile, column)

if (
Expand Down Expand Up @@ -1172,7 +1194,7 @@ def _get_ge_dataset(
},
)

if platform == BIGQUERY:
if platform == BIGQUERY or platform == DATABRICKS:
# This is done as GE makes the name as DATASET.TABLE
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
name_parts = pretty_name.split(".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ def get_workunits(

yield from self.generate_profile_workunits(
profile_requests,
self.config.profiling.max_workers,
db,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ def get_workunits(

yield from self.generate_profile_workunits(
profile_requests,
self.config.profiling.max_workers,
database.name,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ def __init__(
def generate_profile_workunits(
self,
requests: List[TableProfilerRequest],
*,
max_workers: int,
db_name: Optional[str] = None,
platform: Optional[str] = None,
profiler_args: Optional[Dict] = None,
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -98,7 +98,7 @@ def generate_profile_workunits(
return

# Otherwise, if column level profiling is enabled, use GE profiler.
ge_profiler = self.get_profiler_instance(db_name)
ge_profiler = self.get_profiler_instance()

for ge_profiler_request, profile in ge_profiler.generate_profiles(
ge_profile_requests, max_workers, platform, profiler_args
Expand Down Expand Up @@ -149,12 +149,18 @@ def get_profile_request(
profile_table_level_only = self.config.profiling.profile_table_level_only
dataset_name = self.get_dataset_name(table.name, schema_name, db_name)
if not self.is_dataset_eligible_for_profiling(
dataset_name, table.last_altered, table.size_in_bytes, table.rows_count
dataset_name,
last_altered=table.last_altered,
size_in_bytes=table.size_in_bytes,
rows_count=table.rows_count,
):
# Profile only table level if dataset is filtered from profiling
# due to size limits alone
if self.is_dataset_eligible_for_profiling(
dataset_name, table.last_altered, 0, 0
dataset_name,
last_altered=table.last_altered,
size_in_bytes=None,
rows_count=None,
):
profile_table_level_only = True
else:
Expand Down Expand Up @@ -199,9 +205,7 @@ def get_inspectors(self) -> Iterable[Inspector]:
inspector = inspect(conn)
yield inspector

def get_profiler_instance(
self, db_name: Optional[str] = None
) -> "DatahubGEProfiler":
def get_profiler_instance(self) -> "DatahubGEProfiler":
logger.debug(f"Getting profiler instance from {self.platform}")
url = self.config.get_sql_alchemy_url()

Expand All @@ -221,9 +225,10 @@ def get_profiler_instance(
def is_dataset_eligible_for_profiling(
self,
dataset_name: str,
last_altered: Optional[datetime],
size_in_bytes: Optional[int],
rows_count: Optional[int],
*,
last_altered: Optional[datetime] = None,
size_in_bytes: Optional[int] = None,
rows_count: Optional[int] = None,
) -> bool:
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.unity.config import UnityCatalogProfilerConfig
from datahub.ingestion.source.unity.config import UnityCatalogAnalyzeProfilerConfig
from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy
from datahub.ingestion.source.unity.proxy_types import (
ColumnProfile,
Expand All @@ -23,8 +23,8 @@


@dataclass
class UnityCatalogProfiler:
config: UnityCatalogProfilerConfig
class UnityCatalogAnalyzeProfiler:
config: UnityCatalogAnalyzeProfilerConfig
report: UnityCatalogReport
proxy: UnityCatalogApiProxy
dataset_urn_builder: Callable[[TableReference], str]
Expand Down

0 comments on commit 49faac9

Please sign in to comment.