Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENHANCEMENT] Restore cli functionality for legacy checkpoints #2511

Merged
merged 18 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion great_expectations/checkpoint/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,12 @@ def run(
):
batches_to_validate = self._get_batches_to_validate(self.batches)

if self.validation_operator_name:
if (
self.validation_operator_name
and self.data_context.validation_operators.get(
self.validation_operator_name
)
):
results = self.data_context.run_validation_operator(
self.validation_operator_name,
assets_to_validate=batches_to_validate,
Expand All @@ -570,6 +575,11 @@ def run(
**kwargs,
)
else:
if self.validation_operator_name:
logger.warning(
f'Could not find Validation Operator "{self.validation_operator_name}" when '
f'running Checkpoint "{self.name}". Using default action_list_operator.'
)
results = self._run_default_validation_operator(
assets_to_validate=batches_to_validate,
run_id=run_id,
Expand Down
90 changes: 31 additions & 59 deletions great_expectations/cli/v012/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,54 +83,41 @@ def checkpoint():
default=None,
help="The project's great_expectations directory.",
)
@click.option("--legacy/--non-legacy", default=True)
@mark.cli_as_experimental
def checkpoint_new(checkpoint, suite, directory, datasource, legacy):
def checkpoint_new(checkpoint, suite, directory, datasource):
"""Create a new checkpoint for easy deployments. (Experimental)"""
if legacy:
suite_name = suite
usage_event = "cli.checkpoint.new"
context = toolkit.load_data_context_with_error_handling(directory)
ge_config_version = context.get_config().config_version
if ge_config_version >= 3:
cli_message(
f"""<red>The `checkpoint new` CLI command is not yet implemented for Great Expectations config versions >= 3.</red>"""
)
toolkit.send_usage_message(context, usage_event, success=False)
sys.exit(1)
suite_name = suite
usage_event = "cli.checkpoint.new"
context = toolkit.load_data_context_with_error_handling(directory)

_verify_checkpoint_does_not_exist(context, checkpoint, usage_event)
suite: ExpectationSuite = toolkit.load_expectation_suite(
context, suite_name, usage_event
)
datasource = toolkit.select_datasource(context, datasource_name=datasource)
if datasource is None:
toolkit.send_usage_message(context, usage_event, success=False)
sys.exit(1)
_, _, _, batch_kwargs = toolkit.get_batch_kwargs(context, datasource.name)

_ = context.add_checkpoint(
name=checkpoint,
**{
"class_name": "LegacyCheckpoint",
"validation_operator_name": "action_list_operator",
"batches": [
{
"batch_kwargs": dict(batch_kwargs),
"expectation_suite_names": [suite.expectation_suite_name],
}
],
},
)
_verify_checkpoint_does_not_exist(context, checkpoint, usage_event)
suite: ExpectationSuite = toolkit.load_expectation_suite(
context, suite_name, usage_event
)
datasource = toolkit.select_datasource(context, datasource_name=datasource)
if datasource is None:
toolkit.send_usage_message(context, usage_event, success=False)
sys.exit(1)
_, _, _, batch_kwargs = toolkit.get_batch_kwargs(context, datasource.name)

_ = context.add_checkpoint(
name=checkpoint,
**{
"class_name": "LegacyCheckpoint",
"batches": [
{
"batch_kwargs": dict(batch_kwargs),
"expectation_suite_names": [suite.expectation_suite_name],
}
],
},
)

cli_message(
f"""<green>A checkpoint named `{checkpoint}` was added to your project!</green>
- To run this checkpoint run `great_expectations checkpoint run {checkpoint}`"""
)
toolkit.send_usage_message(context, usage_event, success=True)
# TODO: <Rob>Rob</Rob> Add flow for new style checkpoints
else:
pass
cli_message(
f"""<green>A checkpoint named `{checkpoint}` was added to your project!</green>
roblim marked this conversation as resolved.
Show resolved Hide resolved
- To run this checkpoint run `great_expectations checkpoint run {checkpoint}`"""
roblim marked this conversation as resolved.
Show resolved Hide resolved
)
toolkit.send_usage_message(context, usage_event, success=True)


def _verify_checkpoint_does_not_exist(
Expand Down Expand Up @@ -218,14 +205,6 @@ def checkpoint_run(checkpoint, directory):
directory=directory, from_cli_upgrade_command=False
)

ge_config_version = context.get_config().config_version
if ge_config_version >= 3:
cli_message(
f"""<red>The `checkpoint run` CLI command is not yet implemented for Great Expectations config versions >= 3.</red>"""
)
toolkit.send_usage_message(context, usage_event, success=False)
sys.exit(1)

checkpoint: Checkpoint = toolkit.load_checkpoint(
context,
checkpoint,
Expand Down Expand Up @@ -300,13 +279,6 @@ def checkpoint_script(checkpoint, directory):
"""
context = toolkit.load_data_context_with_error_handling(directory)
usage_event = "cli.checkpoint.script"
ge_config_version = context.get_config().config_version
if ge_config_version >= 3:
cli_message(
f"""<red>The `checkpoint script` CLI command is not yet implemented for Great Expectations config versions >= 3.</red>"""
)
toolkit.send_usage_message(context, usage_event, success=False)
sys.exit(1)

# Attempt to load the checkpoint and deal with errors
_ = toolkit.load_checkpoint(context, checkpoint, usage_event)
Expand Down
18 changes: 3 additions & 15 deletions great_expectations/cli/v012/checkpoint_script_template.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
This is a basic generated Great Expectations script that runs a checkpoint.

A checkpoint is a list of one or more batches paired with one or more
Internally, a checkpoint is a list of one or more batches paired with one or more
roblim marked this conversation as resolved.
Show resolved Hide resolved
Expectation Suites and a configurable Validation Operator.

Checkpoints can be run directly without this script using the
Expand Down Expand Up @@ -31,20 +31,8 @@
context = DataContext("{1}")
checkpoint = context.get_checkpoint("{0}")

# load batches of data
batches_to_validate = []
for batch in checkpoint.batches:
batch_kwargs = batch["batch_kwargs"]
for suite_name in batch["expectation_suite_names"]:
suite = context.get_expectation_suite(suite_name)
batch = context.get_batch(batch_kwargs, suite)
batches_to_validate.append(batch)

# run the validation operator
results = context.run_validation_operator(
checkpoint.validation_operator_name,
assets_to_validate=batches_to_validate,
)
# run the Checkpoint
results = checkpoint.run()

# take action based on results
if not results["success"]:
Expand Down
147 changes: 7 additions & 140 deletions tests/cli/v012/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ def test_checkpoint_run_on_checkpoint_with_empty_suite_list_raises_error_with_ge
def test_checkpoint_run_on_non_existent_validation_operator_with_ge_config_v2(
mock_emit, caplog, titanic_data_context_stats_enabled_config_version_2
):
# checkpoint should still run using fallback action_list_operator
context = titanic_data_context_stats_enabled_config_version_2
root_dir = context.root_directory
csv_path = os.path.join(root_dir, "..", "data", "Titanic.csv")
Expand Down Expand Up @@ -716,18 +717,16 @@ def test_checkpoint_run_on_non_existent_validation_operator_with_ge_config_v2(
catch_exceptions=False,
)
stdout = result.stdout
assert result.exit_code == 1
assert result.exit_code == 0

assert (
f"No validation operator `foo` was found in your project. Please verify this in your great_expectations.yml"
in stdout
)
assert "Validation succeeded!" in stdout
usage_emits = mock_emit.call_args_list

assert mock_emit.call_count == 3
assert mock_emit.call_count == 4
assert usage_emits[0][0][0]["success"] is True
assert usage_emits[1][0][0]["success"] is False
assert usage_emits[2][0][0]["success"] is False
assert usage_emits[1][0][0]["success"] is True
assert usage_emits[2][0][0]["success"] is True
assert usage_emits[3][0][0]["success"] is True

assert_no_logging_messages_or_tracebacks(
my_caplog=caplog,
Expand Down Expand Up @@ -1097,138 +1096,6 @@ def test_checkpoint_script_happy_path_executable_failed_validation_with_ge_confi
assert "Validation failed!" in output


@mock.patch(
"great_expectations.core.usage_statistics.usage_statistics.UsageStatisticsHandler.emit"
)
def test_checkpoint_new_with_ge_config_3_raises_error(
mock_emit, caplog, titanic_data_context_stats_enabled
):
context = titanic_data_context_stats_enabled
root_dir = context.root_directory
mock_emit.reset_mock()

runner = CliRunner(mix_stderr=False)
result = runner.invoke(
cli,
f"checkpoint new foo not_a_suite -d {root_dir}",
catch_exceptions=False,
)
stdout = result.stdout
assert result.exit_code == 1
assert (
"The `checkpoint new` CLI command is not yet implemented for Great Expectations config versions >= 3."
in stdout
)

assert mock_emit.call_count == 2
assert mock_emit.call_args_list == [
mock.call(
{"event_payload": {}, "event": "data_context.__init__", "success": True}
),
mock.call(
{
"event": "cli.checkpoint.new",
"event_payload": {"api_version": "v2"},
"success": False,
}
),
]

assert_no_logging_messages_or_tracebacks(
my_caplog=caplog,
click_result=result,
allowed_deprecation_message=VALIDATION_OPERATORS_DEPRECATION_MESSAGE,
)


@mock.patch(
"great_expectations.core.usage_statistics.usage_statistics.UsageStatisticsHandler.emit"
)
def test_checkpoint_run_with_ge_config_3_raises_error(
mock_emit, caplog, titanic_data_context_stats_enabled
):
context = titanic_data_context_stats_enabled
root_dir = context.root_directory
mock_emit.reset_mock()

runner = CliRunner(mix_stderr=False)
result = runner.invoke(
cli,
f"checkpoint run my_checkpoint -d {root_dir}",
catch_exceptions=False,
)
stdout = result.stdout
assert result.exit_code == 1
assert (
"The `checkpoint run` CLI command is not yet implemented for Great Expectations config versions >= 3."
in stdout
)

assert mock_emit.call_count == 2
assert mock_emit.call_args_list == [
mock.call(
{"event_payload": {}, "event": "data_context.__init__", "success": True}
),
mock.call(
{
"event": "cli.checkpoint.run",
"event_payload": {"api_version": "v2"},
"success": False,
}
),
]

assert_no_logging_messages_or_tracebacks(
my_caplog=caplog,
click_result=result,
allowed_deprecation_message=VALIDATION_OPERATORS_DEPRECATION_MESSAGE,
)


@mock.patch(
"great_expectations.core.usage_statistics.usage_statistics.UsageStatisticsHandler.emit"
)
def test_checkpoint_script_with_ge_config_3_raises_error(
mock_emit, caplog, titanic_data_context_stats_enabled
):
context = titanic_data_context_stats_enabled
root_dir = context.root_directory
mock_emit.reset_mock()

runner = CliRunner(mix_stderr=False)
result = runner.invoke(
cli,
f"checkpoint script my_checkpoint -d {root_dir}",
catch_exceptions=False,
)
stdout = result.stdout
assert result.exit_code == 1
assert (
"The `checkpoint script` CLI command is not yet implemented for Great Expectations config versions >= 3."
in stdout
)

assert mock_emit.call_count == 2
assert mock_emit.call_args_list == [
mock.call(
{"event_payload": {}, "event": "data_context.__init__", "success": True}
),
mock.call(
{
"event": "cli.checkpoint.script",
"event_payload": {"api_version": "v2"},
"success": False,
}
),
]

assert_no_logging_messages_or_tracebacks(
my_caplog=caplog,
click_result=result,
allowed_deprecation_message=VALIDATION_OPERATORS_DEPRECATION_MESSAGE,
)


def _write_checkpoint_dict_to_file(bad, checkpoint_file_path):
yaml = YAML()
with open(checkpoint_file_path, "w") as f:
Expand Down