Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] Fix copying of CheckpointConfig for substitution and printing purposes #3759

87 changes: 19 additions & 68 deletions great_expectations/checkpoint/checkpoint.py
Expand Up @@ -12,11 +12,7 @@
from great_expectations.checkpoint.util import get_substituted_validation_dict
from great_expectations.core import RunIdentifier
from great_expectations.core.async_executor import AsyncExecutor, AsyncResult
from great_expectations.core.batch import (
BatchRequest,
RuntimeBatchRequest,
get_batch_request_dict,
)
from great_expectations.core.batch import BatchRequest, get_batch_request_dict
from great_expectations.core.util import get_datetime_string_from_strftime_format
from great_expectations.data_asset import DataAsset
from great_expectations.data_context.types.base import CheckpointConfig
Expand Down Expand Up @@ -124,7 +120,6 @@ def action_list(self) -> List[Dict]:
def ge_cloud_id(self) -> UUID:
return self._config.ge_cloud_id

# TODO: (Rob) should we type the big validation dicts for better validation/prevent duplication
def get_substituted_config(
self,
config: Optional[Union[CheckpointConfig, dict]] = None,
Expand All @@ -136,67 +131,20 @@ def get_substituted_config(
if isinstance(config, dict):
config = CheckpointConfig(**config)

if any(runtime_kwargs.values()):
config.update(runtime_kwargs=runtime_kwargs)

substituted_config: Union[CheckpointConfig, dict]
template_name = runtime_kwargs.get("template_name") or config.template_name

template_name = config.template_name
if not template_name:
if (
config.batch_request is not None
and config.batch_request.get("runtime_parameters") is not None
and config.batch_request["runtime_parameters"].get("batch_data")
is not None
):
batch_data = config.batch_request["runtime_parameters"].pop(
"batch_data"
)
substituted_config = copy.deepcopy(config)
substituted_config.batch_request["runtime_parameters"][
"batch_data"
] = batch_data
elif len(config.validations) > 0:
batch_data_list = []
for val in config.validations:
if (
val.get("batch_request") is not None
and val["batch_request"].get("runtime_parameters") is not None
and val["batch_request"]["runtime_parameters"].get("batch_data")
is not None
):
batch_data_list.append(
val["batch_request"]["runtime_parameters"].pop("batch_data")
)
else:
batch_data_list.append(None)
substituted_config = copy.deepcopy(config)
for idx, val in enumerate(substituted_config.validations):
if (
val.get("batch_request") is not None
and val["batch_request"].get("runtime_parameters") is not None
and batch_data_list[idx] is not None
):
val["batch_request"]["runtime_parameters"][
"batch_data"
] = batch_data_list[idx]

for idx, val in enumerate(config.validations):
if (
val.get("batch_request") is not None
and val["batch_request"].get("runtime_parameters") is not None
and batch_data_list[idx] is not None
):
val["batch_request"]["runtime_parameters"][
"batch_data"
] = batch_data_list[idx]
else:
substituted_config = copy.deepcopy(config)

if any(runtime_kwargs.values()):
substituted_config.update(runtime_kwargs=runtime_kwargs)

substituted_config = copy.deepcopy(config)
self._substituted_config = substituted_config
else:
checkpoint = self.data_context.get_checkpoint(name=template_name)
template_config = checkpoint.config
checkpoint: Checkpoint = self.data_context.get_checkpoint(
name=template_name
)
template_config: CheckpointConfig = checkpoint.config

if template_config.config_version != config.config_version:
raise ge_exceptions.CheckpointError(
Expand All @@ -207,7 +155,7 @@ def get_substituted_config(
if template_config.template_name is not None:
substituted_config = self.get_substituted_config(config=template_config)
else:
substituted_config = template_config
substituted_config = copy.deepcopy(template_config)

# merge template with config
substituted_config.update(
Expand All @@ -217,8 +165,10 @@ def get_substituted_config(
# don't replace _substituted_config if already exists
if self._substituted_config is None:
self._substituted_config = substituted_config

if self.data_context.ge_cloud_mode:
return substituted_config

return self._substitute_config_variables(config=substituted_config)

def _substitute_config_variables(
Expand Down Expand Up @@ -260,9 +210,9 @@ def _run_validation(
substituted_runtime_config=substituted_runtime_config,
validation_dict=validation_dict,
)
batch_request: Union[
BatchRequest, RuntimeBatchRequest
] = substituted_validation_dict.get("batch_request")
batch_request: BatchRequest = substituted_validation_dict.get(
"batch_request"
)
expectation_suite_name: str = substituted_validation_dict.get(
"expectation_suite_name"
)
Expand Down Expand Up @@ -377,13 +327,13 @@ def run(
"template_name": template_name,
"run_name_template": run_name_template,
"expectation_suite_name": expectation_suite_name,
"expectation_suite_ge_cloud_id": expectation_suite_ge_cloud_id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this line removed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it was just moved to the bottom (new line 336)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@donaldheppner I simply moved it to a "more logical place" congruent with the order these arguments were supplied on input to the method.

"batch_request": batch_request,
"action_list": action_list,
"evaluation_parameters": evaluation_parameters,
"runtime_configuration": runtime_configuration,
"validations": validations,
"profilers": profilers,
"expectation_suite_ge_cloud_id": expectation_suite_ge_cloud_id,
}
substituted_runtime_config: CheckpointConfig = self.get_substituted_config(
runtime_kwargs=runtime_kwargs
Expand All @@ -410,10 +360,11 @@ def run(
with AsyncExecutor(
self.data_context.concurrency, max_workers=len(validations)
) as async_executor:
# noinspection PyUnresolvedReferences
async_validation_operator_results: List[
AsyncResult[ValidationOperatorResult]
] = []
if len(validations) != 0:
if len(validations) > 0:
for idx, validation_dict in enumerate(validations):
self._run_validation(
substituted_runtime_config=substituted_runtime_config,
Expand Down
117 changes: 26 additions & 91 deletions great_expectations/checkpoint/types/checkpoint_result.py
@@ -1,8 +1,12 @@
import json
from copy import deepcopy
from typing import Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union

from great_expectations.core.batch import BatchRequest
from great_expectations.core.batch import (
delete_runtime_parameters_batch_data_references_from_config,
get_runtime_parameters_batch_data_references_from_config,
restore_runtime_parameters_batch_data_references_into_config,
)
from great_expectations.core.expectation_validation_result import (
ExpectationSuiteValidationResult,
)
Expand Down Expand Up @@ -284,97 +288,28 @@ def _list_validation_statistics(self) -> Dict[ValidationResultIdentifier, dict]:
}
return self._validation_statistics

def to_json_dict(self):
json_dict = {}
batch_data_list = []
batch_data = None
if len(self.checkpoint_config.validations) > 0:
for val in self.checkpoint_config["validations"]:
if (
val.get("batch_request") is not None
and val["batch_request"].get("runtime_parameters") is not None
and val["batch_request"]["runtime_parameters"].get("batch_data")
is not None
):
batch_data_list.append(
val["batch_request"]["runtime_parameters"].pop("batch_data")
)
else:
batch_data_list.append(None)
elif self.checkpoint_config.batch_request is not None:
if (
self.checkpoint_config.batch_request is not None
and self.checkpoint_config.batch_request.get("runtime_parameters")
is not None
and self.checkpoint_config.batch_request["runtime_parameters"].get(
"batch_data"
)
is not None
):
batch_data = self.checkpoint_config["batch_request"][
"runtime_parameters"
].pop("batch_data")

json_dict = checkpointResultSchema.dump(self)
if len(batch_data_list) > 0:
for idx, val in enumerate(json_dict["checkpoint_config"]["validations"]):
if batch_data_list[idx] is not None:
self.checkpoint_config["validations"][idx]["batch_request"][
"runtime_parameters"
]["batch_data"] = json_dict["checkpoint_config"]["validations"][
idx
][
"batch_request"
][
"runtime_parameters"
][
"batch_data"
] = batch_data_list[
idx
]
elif batch_data is not None:
self.checkpoint_config["batch_request"]["runtime_parameters"][
"batch_data"
] = json_dict["checkpoint_config"]["batch_request"]["runtime_parameters"][
"batch_data"
] = batch_data

return json_dict
def to_json_dict(self) -> dict:
return checkpointResultSchema.dump(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


def __repr__(self):
serializeable_dict = self.to_json_dict()
if len(serializeable_dict["checkpoint_config"].get("validations")) > 0:
for val in serializeable_dict["checkpoint_config"]["validations"]:
if (val["batch_request"].get("runtime_parameters") is not None) and (
val["batch_request"]["runtime_parameters"].get("batch_data")
is not None
):
val["batch_request"]["runtime_parameters"]["batch_data"] = str(
type(val["batch_request"]["runtime_parameters"]["batch_data"])
)

if serializeable_dict.get("batch_request") is not None:
if (
serializeable_dict["checkpoint_config"]["batch_request"].get(
"runtime_parameters"
)
is not None
) and (
serializeable_dict["checkpoint_config"]["batch_request"][
"runtime_parameters"
].get("batch_data")
is not None
):
serializeable_dict["checkpoint_config"]["batch_request"][
"runtime_parameters"
]["batch_data"] = str(
type(
serializeable_dict["checkpoint_config"]["batch_request"][
"runtime_parameters"
]["batch_data"]
)
)

batch_data_references: Tuple[
Optional[Any], Optional[List[Any]]
] = get_runtime_parameters_batch_data_references_from_config(
alexsherstinsky marked this conversation as resolved.
Show resolved Hide resolved
config=self["checkpoint_config"]
)
delete_runtime_parameters_batch_data_references_from_config(
config=self["checkpoint_config"]
)
serializeable_dict: dict = self.to_json_dict()
restore_runtime_parameters_batch_data_references_into_config(
config=self["checkpoint_config"],
batch_data_references=batch_data_references,
)
restore_runtime_parameters_batch_data_references_into_config(
config=serializeable_dict["checkpoint_config"],
batch_data_references=batch_data_references,
replace_value_with_type_string=True,
)
return json.dumps(serializeable_dict, indent=2)


Expand Down
37 changes: 20 additions & 17 deletions great_expectations/checkpoint/util.py
Expand Up @@ -5,14 +5,15 @@
import ssl
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Optional
from typing import Optional, Union

import requests

import great_expectations.exceptions as ge_exceptions
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
from great_expectations.core.util import nested_update
from great_expectations.data_context.types.base import CheckpointConfig
from great_expectations.util import filter_properties_dict

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -207,26 +208,11 @@ def send_email(
# TODO: <Alex>A common utility function should be factored out from DataContext.get_batch_list() for any purpose.</Alex>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@donaldheppner Unfortunately, this comment is still valid -- we have not had a chance to do the work. I would like to leave it for now and remove only after we had a chance to go through the backlog formally. Thank you.

def get_runtime_batch_request(
substituted_runtime_config: CheckpointConfig,
validation_batch_request: Optional[dict] = None,
validation_batch_request: Optional[Union[dict, BatchRequest]] = None,
ge_cloud_mode: bool = False,
) -> Optional[BatchRequest]:
runtime_config_batch_request = substituted_runtime_config.batch_request

if (
(
runtime_config_batch_request is not None
and "runtime_parameters" in runtime_config_batch_request
)
or (
validation_batch_request is not None
and "runtime_parameters" in validation_batch_request
)
or (isinstance(validation_batch_request, RuntimeBatchRequest))
):
batch_request_class = RuntimeBatchRequest
else:
batch_request_class = BatchRequest

if runtime_config_batch_request is None and validation_batch_request is None:
return None

Expand All @@ -236,6 +222,16 @@ def get_runtime_batch_request(
if validation_batch_request is None:
validation_batch_request = {}

effective_batch_request: dict = dict(
**runtime_config_batch_request, **validation_batch_request
)
if "runtime_parameters" in effective_batch_request or isinstance(
validation_batch_request, RuntimeBatchRequest
):
batch_request_class = RuntimeBatchRequest
else:
batch_request_class = BatchRequest

Comment on lines +225 to +234
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better!

if (
validation_batch_request.get("runtime_parameters") is not None
and validation_batch_request["runtime_parameters"].get("batch_data") is not None
Expand Down Expand Up @@ -265,6 +261,13 @@ def get_runtime_batch_request(
batch_identifiers["timestamp"] = str(datetime.datetime.now())
runtime_batch_request_dict["batch_identifiers"] = batch_identifiers

filter_properties_dict(
properties=runtime_batch_request_dict,
keep_fields=batch_request_class.field_names,
clean_nulls=False,
inplace=True,
)

return batch_request_class(**runtime_batch_request_dict)


Expand Down