Skip to content

Commit

Permalink
Merge branch 'develop' into m/_/clean_up_regex_for_data_assistants
Browse files Browse the repository at this point in the history
  • Loading branch information
William Shin committed Oct 25, 2022
2 parents c4bc15b + 82ccff2 commit e16a080
Show file tree
Hide file tree
Showing 17 changed files with 2,515 additions and 20 deletions.
38 changes: 38 additions & 0 deletions great_expectations/core/batch_spec.py
@@ -1,5 +1,6 @@
import logging
from abc import ABCMeta
from typing import List

from great_expectations.core.id_dict import BatchSpec
from great_expectations.exceptions import InvalidBatchIdError, InvalidBatchSpecError
Expand Down Expand Up @@ -107,3 +108,40 @@ def query(self):
@query.setter
def query(self, query) -> None:
self["query"] = query


class GlueDataCatalogBatchSpec(BatchSpec):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
if "database_name" not in self:
raise InvalidBatchSpecError(
"GlueDataCatalogBatchSpec requires a database_name"
)
if "table_name" not in self:
raise InvalidBatchSpecError(
"GlueDataCatalogBatchSpec requires a table_name"
)

@property
def reader_method(self) -> str:
return "table"

@property
def database_name(self) -> str:
return self["database_name"]

@property
def table_name(self) -> str:
return self["table_name"]

@property
def path(self) -> str:
return f"{self.database_name}.{self.table_name}"

@property
def reader_options(self) -> dict:
return self.get("reader_options", {})

@property
def partitions(self) -> List[str]:
return self.get("partitions", [])
69 changes: 62 additions & 7 deletions great_expectations/data_context/types/base.py
Expand Up @@ -382,6 +382,20 @@ class Meta:
schema_name = fields.String(required=False, allow_none=True)
batch_spec_passthrough = fields.Dict(required=False, allow_none=True)

"""
Necessary addition for AWS Glue Data Catalog assets.
By using AWS Glue Data Catalog, we need to have both database and table names.
The partitions are optional, it must match the partitions defined in the table
and it is used to create batch identifiers that allows the validation of a single
partition. Example: if we have two partitions (year, month), specifying these would
create one batch id per combination of year and month. The connector gets the partition
values from the AWS Glue Data Catalog.
"""
database_name = fields.String(required=False, allow_none=True)
partitions = fields.List(
cls_or_instance=fields.Str(), required=False, allow_none=True
)

# Necessary addition for Cloud assets
table_name = fields.String(required=False, allow_none=True)
type = fields.String(required=False, allow_none=True)
Expand Down Expand Up @@ -670,9 +684,16 @@ class Meta:
introspection_directives = fields.Dict(required=False, allow_none=True)
batch_spec_passthrough = fields.Dict(required=False, allow_none=True)

# AWS Glue Data Catalog
glue_introspection_directives = fields.Dict(required=False, allow_none=True)
catalog_id = fields.String(required=False, allow_none=True)
partitions = fields.List(
cls_or_instance=fields.Str(), required=False, allow_none=True
)

# noinspection PyUnusedLocal
@validates_schema
def validate_schema(self, data, **kwargs):
@validates_schema # noqa: C901
def validate_schema(self, data, **kwargs): # noqa: C901 - complexity 16
# If a class_name begins with the dollar sign ("$"), then it is assumed to be a variable name to be substituted.
if data["class_name"][0] == "$":
return
Expand Down Expand Up @@ -810,15 +831,11 @@ def validate_schema(self, data, **kwargs):
"""
)
if (
"data_asset_name_prefix" in data
or "data_asset_name_suffix" in data
or "include_schema_name" in data
"include_schema_name" in data
or "splitter_method" in data
or "splitter_kwargs" in data
or "sampling_method" in data
or "sampling_kwargs" in data
or "excluded_tables" in data
or "included_tables" in data
or "skip_inapplicable_tables" in data
) and not (
data["class_name"]
Expand All @@ -830,6 +847,44 @@ def validate_schema(self, data, **kwargs):
raise ge_exceptions.InvalidConfigError(
f"""Your current configuration uses one or more keys in a data connector that are required only by an
SQL type of the data connector (your data connector is "{data['class_name']}"). Please update your configuration to
continue.
"""
)
if (
"data_asset_name_prefix" in data
or "data_asset_name_suffix" in data
or "excluded_tables" in data
or "included_tables" in data
) and not (
data["class_name"]
in [
"InferredAssetSqlDataConnector",
"ConfiguredAssetSqlDataConnector",
"InferredAssetAWSGlueDataCatalogDataConnector",
"ConfiguredAssetAWSGlueDataCatalogDataConnector",
]
):
raise ge_exceptions.InvalidConfigError(
f"""Your current configuration uses one or more keys in a data connector that are required only by an
SQL/GlueCatalog type of the data connector (your data connector is "{data['class_name']}"). Please update your configuration to
continue.
"""
)

if (
"partitions" in data
or "catalog_id" in data
or "glue_introspection_directives" in data
) and not (
data["class_name"]
in [
"InferredAssetAWSGlueDataCatalogDataConnector",
"ConfiguredAssetAWSGlueDataCatalogDataConnector",
]
):
raise ge_exceptions.InvalidConfigError(
f"""Your current configuration uses one or more keys in a data connector that are required only by an
GlueCatalog type of the data connector (your data connector is "{data['class_name']}"). Please update your configuration to
continue.
"""
)
Expand Down
6 changes: 6 additions & 0 deletions great_expectations/datasource/data_connector/__init__.py
Expand Up @@ -39,3 +39,9 @@
)
from .configured_asset_dbfs_data_connector import ConfiguredAssetDBFSDataConnector
from .inferred_asset_dbfs_data_connector import InferredAssetDBFSDataConnector
from .configured_asset_aws_glue_data_catalog_data_connector import (
ConfiguredAssetAWSGlueDataCatalogDataConnector,
)
from .inferred_asset_aws_glue_data_catalog_data_connector import (
InferredAssetAWSGlueDataCatalogDataConnector,
)

0 comments on commit e16a080

Please sign in to comment.