### Installation

In [0]:
#!pip install great_expectations
!pip install 'great_expectations[spark]' # this install all python-spark dependencies, see output

Collecting great_expectations[spark]
  Downloading great_expectations-1.5.4-py3-none-any.whl (4.9 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/4.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/4.9 MB[0m [31m49.1 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m4.9/4.9 MB[0m [31m84.9 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.9/4.9 MB[0m [31m58.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting ruamel.yaml>=0.16
  Downloading ruamel.yaml-0.18.14-py3-none-any.whl (118 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/118.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m118.6/118.6 kB[0m [31m19.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting altair<5.0.0,>=4.2.1
  Downloading altair-4.2.

In [0]:
dbutils.library.restartPython()

In [0]:
import great_expectations as gx
print(gx.__version__)

1.5.4


### Set up environment + connect to data

* Context
* Read spark DF
* Add data source / data asset
* Define batch

In [0]:
# Define GX entry point
context = gx.get_context()

INFO:great_expectations.data_context.types.base:Created temporary directory '/tmp/tmplvhbr_qo' for ephemeral docs site


In [0]:
df = spark.read.table("catalog.schema.table") # replace with your own Unity Catalog table
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
# Connect to data
# Add data source
data_source_name = "plants_map_src"
data_source = context.data_sources.add_spark(data_source_name)

# Add data asset
data_asset_name = "plants map df"
data_asset = data_source.add_dataframe_asset(name=data_asset_name) 

# Define batch
batch_definition_name = "test_batch"
batch_parameters = {"dataframe": df}

batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)
batch = batch_definition.get_batch(batch_parameters=batch_parameters) # passes Spark df

### Test Expectation

In [0]:
# Test the Expectation
expectation = gx.expectations.ExpectColumnValuesToBeUnique(column="plant_code")
validation_results = batch.validate(expectation)
print(validation_results) # print(validation_results.success)

Calculating Metrics:   0%|          | 0/12 [00:00<?, ?it/s]

{
  "success": true,
  "expectation_config": {
    "type": "expect_column_values_to_be_unique",
    "kwargs": {
      "batch_id": "plants_map_src-plants map df",
      "column": "plant_code"
    },
    "meta": {}
  },
  "result": {
    "element_count": 308,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0,
    "partial_unexpected_counts": []
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


### Configure Expectation Suite (group of Expectations)

In [0]:
# Create an Expectation Suite
suite_name = "plants_suite"
suite = gx.ExpectationSuite(name=suite_name)

# Add the Expectation Suite to the Data Context
suite = context.suites.add(suite)

In [0]:
# Add the previously created Expectation to the Expectation Suite
suite.add_expectation(expectation)

ExpectColumnValuesToBeUnique(id='af8e07c5-11a8-4062-95fe-eed59f3da433', meta=None, notes=None, result_format=<ResultFormat.BASIC: 'BASIC'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='plant_code', mostly=1, row_condition=None, condition_parser=None)

In [0]:
# Add another Expectation to the Expectation Suite.
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="plant_code")
)

ExpectColumnValuesToNotBeNull(id='57a61960-f44a-4f3c-93cf-f967561e6f34', meta=None, notes=None, result_format=<ResultFormat.BASIC: 'BASIC'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='plant_code', mostly=1, row_condition=None, condition_parser=None)

In [0]:
# Retrieve an Expectation Suite from the Data Context
existing_suite_name = ("plants_suite")  # replace this with the name of your Expectation Suite
expectation_suite = context.suites.get(name=existing_suite_name)
print(expectation_suite)

{
  "name": "plants_suite",
  "id": "cdc7a972-39b3-4147-8726-8d30874809dd",
  "expectations": [
    {
      "type": "expect_column_values_to_be_unique",
      "kwargs": {
        "column": "plant_code"
      },
      "meta": {},
      "id": "af8e07c5-11a8-4062-95fe-eed59f3da433"
    },
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "plant_code"
      },
      "meta": {},
      "id": "57a61960-f44a-4f3c-93cf-f967561e6f34"
    }
  ],
  "meta": {
    "great_expectations_version": "1.5.4"
  },
  "notes": null
}


### Run Validation

In [0]:
# Retrieve the Batch Definition that describes the data to associate with the Expectation Suite
batch_definition = (
                    context.data_sources.get(data_source_name)
                                        .get_asset(data_asset_name)
                                        .get_batch_definition(batch_definition_name)
                    )

# Create a Validation Definition
validation_definition_name = "plants_validation_definition"
validation_definition = gx.ValidationDefinition(data=batch_definition, suite=expectation_suite, name=validation_definition_name)
validation_definition = context.validation_definitions.add(validation_definition)

In [0]:
# Retrieve the Validation Definition
validation_definition = context.validation_definitions.get(validation_definition_name)

In [0]:
# Run the Validation Definition
validation_results = validation_definition.run(batch_parameters=batch_parameters)
print(validation_results)

Calculating Metrics:   0%|          | 0/15 [00:00<?, ?it/s]

{
  "success": true,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_be_unique",
        "kwargs": {
          "batch_id": "plants_map_src-plants map df",
          "column": "plant_code"
        },
        "meta": {},
        "id": "af8e07c5-11a8-4062-95fe-eed59f3da433"
      },
      "result": {
        "element_count": 308,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "partial_unexpected_list": [],
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_percent_total": 0.0,
        "unexpected_percent_nonmissing": 0.0,
        "partial_unexpected_counts": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_traceback": null,
        "exception_message": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {
    

### Checkpoint w/Action

In [0]:
# Retrieve existing validation definitions
validation_definitions = [context.validation_definitions.get("plants_validation_definition")]

In [0]:
# Determine the Actions that the Checkpoint will automate
action_list = [gx.checkpoint.MicrosoftTeamsNotificationAction(
        name="send_teams_notification_on_failed_expectations",
        teams_webhook = "your_teams_webhook",
        notify_on="failure"
    )]

In [0]:
# Create Checkpoint
checkpoint_name = "plants_checkpoint"
checkpoint = gx.Checkpoint(
    name=checkpoint_name,
    validation_definitions=validation_definitions,
    actions=action_list,
    result_format={"result_format": "COMPLETE"},
)

# Add Checkpoint to Data Context for possible reuse
context.checkpoints.add(checkpoint)

In [0]:
# Run Checkpoint
checkpoint_result = checkpoint.run(batch_parameters=batch_parameters)

Calculating Metrics:   0%|          | 0/15 [00:00<?, ?it/s]

In [0]:
print(checkpoint_result)

### Interesting use cases:

* Try Use SQL to define a custom Expectation
* Try Define a Multi-source Expectation

Doc reference: https://docs.greatexpectations.io/docs/core/introduction/gx_overview/