Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/unity): GE Profiling #8951

Merged
merged 18 commits into from
Dec 6, 2023
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@
"databricks-sdk>=0.9.0",
"pyspark",
"requests",
"databricks-sql-connector>=2.8.0", # Only added in 2.4.0, bug fixes since
}

mysql = sql_common | {"pymysql>=1.0.2"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def generate_wu_from_profile_requests(
table_profile_requests = cast(List[TableProfilerRequest], profile_requests)
for request, profile in self.generate_profiles(
table_profile_requests,
self.config.profiling.max_workers,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import sqlalchemy as sa
import sqlalchemy.sql.compiler
from great_expectations.core.profiler_types_mapping import ProfilerTypeMapping
from great_expectations.core.util import convert_to_json_serializable
from great_expectations.data_context import AbstractDataContext, BaseDataContext
from great_expectations.data_context.types.base import (
Expand Down Expand Up @@ -77,8 +78,26 @@
SNOWFLAKE = "snowflake"
BIGQUERY = "bigquery"
REDSHIFT = "redshift"
DATABRICKS = "databricks"
TRINO = "trino"

# Type names for Databricks, to match Title Case types in sqlalchemy
ProfilerTypeMapping.INT_TYPE_NAMES.append("Integer")
ProfilerTypeMapping.INT_TYPE_NAMES.append("SmallInteger")
ProfilerTypeMapping.INT_TYPE_NAMES.append("BigInteger")
ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Float")
ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Numeric")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("String")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("Text")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("Unicode")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("UnicodeText")
ProfilerTypeMapping.BOOLEAN_TYPE_NAMES.append("Boolean")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Date")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("DateTime")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Time")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Interval")
ProfilerTypeMapping.BINARY_TYPE_NAMES.append("LargeBinary")

# The reason for this wacky structure is quite fun. GE basically assumes that
# the config structures were generated directly from YML and further assumes that
# they can be `deepcopy`'d without issue. The SQLAlchemy engine and connection
Expand Down Expand Up @@ -329,6 +348,7 @@ def _should_ignore_column(self, sqlalchemy_type: sa.types.TypeEngine) -> bool:

@_run_with_query_combiner
def _get_column_type(self, column_spec: _SingleColumnSpec, column: str) -> None:
# logger.info(f"{column} {self.dataset.columns} {self.dataset.columns[0]}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove this line

column_spec.type_ = BasicDatasetProfilerBase._get_column_type(
self.dataset, column
)
Expand Down Expand Up @@ -665,6 +685,9 @@ def generate_dataset_profile( # noqa: C901 (complexity)
1, unique_count / non_null_count
)

if not profile.rowCount:
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
continue

self._get_dataset_column_sample_values(column_profile, column)

if (
Expand Down Expand Up @@ -1121,7 +1144,7 @@ def _get_ge_dataset(
},
)

if platform == BIGQUERY:
if platform == BIGQUERY or platform == DATABRICKS:
# This is done as GE makes the name as DATASET.TABLE
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
name_parts = pretty_name.split(".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ def get_workunits(
table_profile_requests = cast(List[TableProfilerRequest], profile_requests)
for request, profile in self.generate_profiles(
table_profile_requests,
self.config.profiling.max_workers,
db,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ def get_workunits(

for request, profile in self.generate_profiles(
table_profile_requests,
self.config.profiling.max_workers,
database.name,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def generate_profiles(
self,
requests: List[TableProfilerRequest],
max_workers: int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can force these to be kwargs for clarity

Suggested change
max_workers: int,
*, max_workers: int,

db_name: Optional[str] = None,
platform: Optional[str] = None,
profiler_args: Optional[Dict] = None,
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
Expand All @@ -92,7 +91,7 @@ def generate_profiles(
return

# Otherwise, if column level profiling is enabled, use GE profiler.
ge_profiler = self.get_profiler_instance(db_name)
ge_profiler = self.get_profiler_instance()
yield from ge_profiler.generate_profiles(
ge_profile_requests, max_workers, platform, profiler_args
)
Expand All @@ -108,9 +107,7 @@ def get_inspectors(self) -> Iterable[Inspector]:
inspector = inspect(conn)
yield inspector

def get_profiler_instance(
self, db_name: Optional[str] = None
asikowitz marked this conversation as resolved.
Show resolved Hide resolved
) -> "DatahubGEProfiler":
def get_profiler_instance(self) -> "DatahubGEProfiler":
logger.debug(f"Getting profiler instance from {self.platform}")
url = self.config.get_sql_alchemy_url()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.unity.config import UnityCatalogProfilerConfig
from datahub.ingestion.source.unity.config import UnityCatalogAnalyzeProfilerConfig
from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy
from datahub.ingestion.source.unity.proxy_types import (
ColumnProfile,
Expand All @@ -23,8 +23,8 @@


@dataclass
class UnityCatalogProfiler:
config: UnityCatalogProfilerConfig
class UnityCatalogAnalyzeProfiler:
config: UnityCatalogAnalyzeProfilerConfig
report: UnityCatalogReport
proxy: UnityCatalogApiProxy
dataset_urn_builder: Callable[[TableReference], str]
Expand Down
73 changes: 52 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Union
from urllib.parse import urlparse

import pydantic
from pydantic import Field
from typing_extensions import Literal

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.ge_data_profiler import DATABRICKS
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
Expand All @@ -23,25 +28,13 @@
)


class UnityCatalogProfilerConfig(ConfigModel):
# TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig
enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)
class UnityCatalogConfig(ConfigModel):
method: str
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this show up in the docs? does this need a Field(description="docs")?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it doesn't show up at all. I'll add a description but in general, our docs support for discriminated unions is not very good -- we don't show which type supports which options. I'll update example recipes to help here


warehouse_id: Optional[str] = Field(
default=None, description="SQL Warehouse id, for running profiling queries."
)

profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only or include column-level profiling as well.",
)

pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
Expand All @@ -51,6 +44,24 @@ class UnityCatalogProfilerConfig(ConfigModel):
),
)


class UnityCatalogAnalyzeProfilerConfig(UnityCatalogConfig):
method: Literal["analyze"] = "analyze"

# TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please

enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)

profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only or include column-level profiling as well.",
)

call_analyze: bool = Field(
default=True,
description=(
Expand Down Expand Up @@ -82,7 +93,12 @@ def include_columns(self):
return not self.profile_table_level_only


class UnityCatalogGEProfilerConfig(UnityCatalogConfig, GEProfilingConfig):
method: Literal["ge"] = "ge"


class UnityCatalogSourceConfig(
SQLCommonConfig,
StatefulIngestionConfigBase,
BaseUsageConfig,
DatasetSourceConfigMixin,
Expand Down Expand Up @@ -122,10 +138,6 @@ class UnityCatalogSourceConfig(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in `catalog.schema.table` format. e.g. to match all tables starting with customer in Customer catalog and public schema, use the regex `Customer\\.public\\.customer.*`.",
)
domain: Dict[str, AllowDenyPattern] = Field(
default=dict(),
description='Attach domains to catalogs, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like "Marketing".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.',
)

include_table_lineage: bool = pydantic.Field(
default=True,
Expand Down Expand Up @@ -156,15 +168,34 @@ class UnityCatalogSourceConfig(
description="Generate usage statistics.",
)

profiling: UnityCatalogProfilerConfig = Field(
default=UnityCatalogProfilerConfig(), description="Data profiling configuration"
profiling: Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig] = Field( # type: ignore
default=UnityCatalogGEProfilerConfig(),
description="Data profiling configuration",
discriminator="method",
)

scheme: str = DATABRICKS

def get_sql_alchemy_url(self):
return make_sqlalchemy_uri(
scheme=self.scheme,
username="token",
password=self.token,
at=urlparse(self.workspace_url).netloc,
db=None,
uri_opts={
"http_path": f"/sql/1.0/warehouses/{self.profiling.warehouse_id}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes use of SQL warehouse. Looks like this may take different formats.

http-path is the HTTP Path either to a Databricks SQL endpoint (e.g. /sql/1.0/endpoints/1234567890abcdef), or to a Databricks Runtime interactive cluster (e.g. /sql/protocolv1/o/1234567890123456/1234-123456-slid123).

Ref - https://pypi.org/project/databricks-sql-connector/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point. It's going to be a bit annoying to support both profilers :|

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gonna hold off on this... want to get this in before I never get to it again

},
)

def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

def is_ge_profiling(self) -> bool:
return self.profiling.method == "ge"

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Unity Catalog Stateful Ingestion Config."
)
Expand Down