Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] Improve get validator functionality #4661

Merged
merged 14 commits into from Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 46 additions & 32 deletions great_expectations/data_context/data_context.py
Expand Up @@ -1759,6 +1759,8 @@ def get_validator(
data_connector_name: Optional[str] = None,
data_asset_name: Optional[str] = None,
*,
batch: Optional[Batch] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abegong I believe that this should be batch_list: List[Batch] -- do you agree? At least some way to accept list of Batch objects, even if we must support the passing of a single Batch object. Thanks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For usability sake I think it's important to take a single Batch as an argument.

We could also add the option to pass a batch_list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abegong I agree -- and, if possible, would prefer for both to be supported in the same pull request. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made this change

batch_list: Optional[List[Batch]] = None,
batch_request: Optional[BatchRequestBase] = None,
batch_request_list: List[Optional[BatchRequestBase]] = None,
batch_data: Optional[Any] = None,
Expand Down Expand Up @@ -1796,10 +1798,10 @@ def get_validator(
expectation_suite_ge_cloud_id is not None,
]
)
!= 1
> 1
):
raise ValueError(
f"Exactly one of expectation_suite_name,{'expectation_suite_ge_cloud_id,' if self.ge_cloud_mode else ''} expectation_suite, or create_expectation_suite_with_name must be specified"
f"No more than one of expectation_suite_name,{'expectation_suite_ge_cloud_id,' if self.ge_cloud_mode else ''} expectation_suite, or create_expectation_suite_with_name can be specified"
)

if expectation_suite_ge_cloud_id is not None:
Expand All @@ -1816,43 +1818,55 @@ def get_validator(
if (
sum(
bool(x)
for x in [batch_request is not None, batch_request_list is not None]
for x in [
batch is not None,
batch_list is not None,
batch_request is not None,
batch_request_list is not None,
]
)
> 1
):
raise ValueError(
"Only one of batch_request or batch_request_list may be specified"
"No more than one of batch, batch_list, batch_request, or batch_request_list can be specified"
)

if not batch_request_list:
batch_request_list = [batch_request]

batch_list: List = []
for batch_request in batch_request_list:
batch_list.extend(
self.get_batch_list(
datasource_name=datasource_name,
data_connector_name=data_connector_name,
data_asset_name=data_asset_name,
batch_request=batch_request,
batch_data=batch_data,
data_connector_query=data_connector_query,
batch_identifiers=batch_identifiers,
limit=limit,
index=index,
custom_filter_function=custom_filter_function,
sampling_method=sampling_method,
sampling_kwargs=sampling_kwargs,
splitter_method=splitter_method,
splitter_kwargs=splitter_kwargs,
runtime_parameters=runtime_parameters,
query=query,
path=path,
batch_filter_parameters=batch_filter_parameters,
batch_spec_passthrough=batch_spec_passthrough,
**kwargs,
if batch_list:
pass

elif batch:
batch_list: List = [batch]

else:
batch_list: List = []
if not batch_request_list:
batch_request_list = [batch_request]

for batch_request in batch_request_list:
batch_list.extend(
self.get_batch_list(
datasource_name=datasource_name,
data_connector_name=data_connector_name,
data_asset_name=data_asset_name,
batch_request=batch_request,
batch_data=batch_data,
data_connector_query=data_connector_query,
batch_identifiers=batch_identifiers,
limit=limit,
index=index,
custom_filter_function=custom_filter_function,
sampling_method=sampling_method,
sampling_kwargs=sampling_kwargs,
splitter_method=splitter_method,
splitter_kwargs=splitter_kwargs,
runtime_parameters=runtime_parameters,
query=query,
path=path,
batch_filter_parameters=batch_filter_parameters,
batch_spec_passthrough=batch_spec_passthrough,
**kwargs,
)
)
)

return self.get_validator_using_batch_list(
expectation_suite=expectation_suite,
Expand Down
58 changes: 58 additions & 0 deletions tests/conftest.py
Expand Up @@ -51,6 +51,11 @@
from great_expectations.rule_based_profiler.config.base import (
ruleBasedProfilerConfigSchema,
)
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
InMemoryStoreBackendDefaults,
)
from great_expectations.rule_based_profiler.parameter_builder.simple_date_format_string_parameter_builder import (
DEFAULT_CANDIDATE_STRINGS,
)
Expand Down Expand Up @@ -4251,3 +4256,56 @@ def multibatch_generic_csv_generator_context(monkeypatch, empty_data_context):
}
]
return context

def build_in_memory_runtime_context():
data_context_config: DataContextConfig = DataContextConfig(
datasources={
"pandas_datasource": {
"execution_engine": {
"class_name": "PandasExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"data_connectors": {
"runtime_data_connector": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": [
"id_key_0",
"id_key_1",
],
}
},
},
"spark_datasource": {
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"data_connectors": {
"runtime_data_connector": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": [
"id_key_0",
"id_key_1",
],
}
},
},
},
expectations_store_name="expectations_store",
validations_store_name="validations_store",
evaluation_parameter_store_name="evaluation_parameter_store",
checkpoint_store_name="checkpoint_store",
store_backend_defaults=InMemoryStoreBackendDefaults(),
)

context: BaseDataContext = BaseDataContext(project_config=data_context_config)

return context

@pytest.fixture
def in_memory_runtime_context():
return build_in_memory_runtime_context()
101 changes: 101 additions & 0 deletions tests/data_context/test_data_context.py
Expand Up @@ -1788,6 +1788,107 @@ def test_get_validator_with_attach_expectation_suite(
)
assert my_validator.expectation_suite_name == "A_expectation_suite"

def test_get_validator_without_expectation_suite(
in_memory_runtime_context
):
context = in_memory_runtime_context

batch = context.get_batch(
batch_request=RuntimeBatchRequest(
datasource_name="pandas_datasource",
data_connector_name="runtime_data_connector",
data_asset_name="my_data_asset",
runtime_parameters={
"batch_data": pd.DataFrame({
"x": range(10)
})
},
batch_identifiers={
"id_key_0": "id_0_value_a",
"id_key_1": "id_1_value_a",
},
)
)

my_validator = context.get_validator(
batch=batch
)
assert type(my_validator.get_expectation_suite()) == ExpectationSuite
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick - Could we use isinstance just to be consistent? This method accounts for things like inheritance hierarchies and is a bit safer.

abegong marked this conversation as resolved.
Show resolved Hide resolved
assert my_validator.expectation_suite_name == "default"

def test_get_validator_with_batch(
in_memory_runtime_context
):
context = in_memory_runtime_context

my_batch = context.get_batch(
batch_request=RuntimeBatchRequest(
datasource_name="pandas_datasource",
data_connector_name="runtime_data_connector",
data_asset_name="my_data_asset",
runtime_parameters={
"batch_data": pd.DataFrame({
"x": range(10)
})
},
batch_identifiers={
"id_key_0": "id_0_value_a",
"id_key_1": "id_1_value_a",
},
)
)

my_validator = context.get_validator(
batch=my_batch,
create_expectation_suite_with_name="A_expectation_suite",
)

def test_get_validator_with_batch_list(
in_memory_runtime_context
):
context = in_memory_runtime_context

my_batch_list = [
context.get_batch(
batch_request=RuntimeBatchRequest(
datasource_name="pandas_datasource",
data_connector_name="runtime_data_connector",
data_asset_name="my_data_asset",
runtime_parameters={
"batch_data": pd.DataFrame({
"x": range(10)
})
},
batch_identifiers={
"id_key_0": "id_0_value_a",
"id_key_1": "id_1_value_a",
},
)
),
context.get_batch(
batch_request=RuntimeBatchRequest(
datasource_name="pandas_datasource",
data_connector_name="runtime_data_connector",
data_asset_name="my_data_asset",
runtime_parameters={
"batch_data": pd.DataFrame({
"y": range(10)
})
},
batch_identifiers={
"id_key_0": "id_0_value_b",
"id_key_1": "id_1_value_b",
},
)
),
]

my_validator = context.get_validator(
batch_list=my_batch_list,
create_expectation_suite_with_name="A_expectation_suite",
)
assert len(my_validator.batches) == 2


def test_get_batch_multiple_datasources_do_not_scan_all(
data_context_with_bad_datasource,
Expand Down
61 changes: 0 additions & 61 deletions tests/expectations/test_expectation_arguments.py
Expand Up @@ -17,11 +17,6 @@
from great_expectations.core.usage_statistics.usage_statistics import (
UsageStatisticsHandler,
)
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
InMemoryStoreBackendDefaults,
)
from great_expectations.validator.validator import Validator

logger = logging.getLogger(__name__)
Expand All @@ -35,62 +30,6 @@
"Unable to load pyspark; install optional spark dependency for support."
)


def build_in_memory_runtime_context():
data_context_config: DataContextConfig = DataContextConfig(
datasources={
"pandas_datasource": {
"execution_engine": {
"class_name": "PandasExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"data_connectors": {
"runtime_data_connector": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": [
"id_key_0",
"id_key_1",
],
}
},
},
"spark_datasource": {
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"data_connectors": {
"runtime_data_connector": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": [
"id_key_0",
"id_key_1",
],
}
},
},
},
expectations_store_name="expectations_store",
validations_store_name="validations_store",
evaluation_parameter_store_name="evaluation_parameter_store",
checkpoint_store_name="checkpoint_store",
store_backend_defaults=InMemoryStoreBackendDefaults(),
)

context: BaseDataContext = BaseDataContext(project_config=data_context_config)

return context


@pytest.fixture
def in_memory_runtime_context():
return build_in_memory_runtime_context()


@pytest.fixture
def test_pandas_df():
df: pd.DataFrame = pd.DataFrame(
Expand Down