Module imports

In [1]:
import great_expectations as gx
import pandas as pd
from pyspark.sql import SparkSession

Disable GX analytics (uses internet)

In [2]:
from great_expectations.analytics.config import ENV_CONFIG
ENV_CONFIG.gx_analytics_enabled = False

Create Spark session and Spark DataFrame using Hive table hosted on S3

In [3]:
spark = SparkSession.builder.appName("gx_source").getOrCreate()

raw_df = spark.read.table("dapcats.animal_rescue_hive").select([
    "incident_number", "date_time_of_call", "cal_year", 
    "type_of_incident", "hourly_notional_cost", "final_description"
])

raw_df.show(5)

Setting spark.hadoop.yarn.resourcemanager.principal to christoffer.soderberg
Hive Session ID = f7a79c1b-1a60-4cf6-bf06-73e0f71d55d6
[Stage 1:>                                                          (0 + 1) / 1]

+---------------+-----------------+--------+----------------+--------------------+--------------------+
|incident_number|date_time_of_call|cal_year|type_of_incident|hourly_notional_cost|   final_description|
+---------------+-----------------+--------+----------------+--------------------+--------------------+
|       21319091| 06/02/2009 14:42|    2009| Special Service|                 255|ASSIST RSPCA WITH...|
|       21401091| 06/02/2009 17:14|    2009| Special Service|                 255|CAT TRAPPED BETWE...|
|       21458091| 06/02/2009 19:29|    2009| Special Service|                 255|CAT TRAPPED BETWE...|
|       50107091| 26/03/2009 16:04|    2009| Special Service|                 255|DISTRESSED CAT ST...|
|       73250091| 02/05/2009 16:00|    2009| Special Service|                 260|CAT TRAPPED IN BA...|
+---------------+-----------------+--------+----------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

Create a GX context

In [4]:
context = gx.get_context()

Connect to data, create a data asset and create a Batch (a description of how data should be retrieved - in our case by reading a DataFrame object)

In [5]:
data_source = context.data_sources.add_spark("spark")
data_asset = data_source.add_dataframe_asset(name="spark dataframe asset")

batch_parameters = {"dataframe": raw_df}
batch_definition = data_asset.add_batch_definition_whole_dataframe("batch definition")
batch = batch_definition.get_batch(batch_parameters=batch_parameters)

Create an Expectation Suite (an object that contains one or more Expectations about the data)

In [6]:
suite = context.suites.add(
    gx.core.expectation_suite.ExpectationSuite(name="expectations")
)

# Expect incident_number to be unique
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(
        column="incident_number"
    )
)

# Expect no data from after year 2024
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="cal_year", max_value=int(2024)
    )
)

# Expect hourly costs to lie between £200 per hour and £300 per hour
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="hourly_notional_cost", min_value=int(200), max_value=int(300))
)

ExpectColumnValuesToBeBetween(id='919da8e2-2d9c-4f31-b9a4-daf9ac918793', meta=None, notes=None, result_format=<ResultFormat.BASIC: 'BASIC'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='hourly_notional_cost', mostly=1, row_condition=None, condition_parser=None, min_value=200.0, max_value=300.0, strict_min=False, strict_max=False)

Create a Validation Definition that connects the Batch of data with the Expectation Suite

In [7]:
validation_definition = context.validation_definitions.add(
    gx.core.validation_definition.ValidationDefinition(
        name="validation definition",
        data=batch_definition,
        suite=suite,
    )
)

Run the Validation Definition and display the results

In [8]:
validation_results = validation_definition.run(batch_parameters=batch_parameters)

print(validation_results)

Calculating Metrics: 100%|██████████| 30/30 [00:17<00:00,  1.69it/s]            

{
  "success": false,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_be_unique",
        "kwargs": {
          "batch_id": "spark-spark dataframe asset",
          "column": "incident_number"
        },
        "meta": {},
        "id": "b7509dd1-4c82-4edd-a403-087a83686e3a"
      },
      "result": {
        "element_count": 5898,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "partial_unexpected_list": [],
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_percent_total": 0.0,
        "unexpected_percent_nonmissing": 0.0,
        "partial_unexpected_counts": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_traceback": null,
        "exception_message": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_be_between",
        "kwargs": {


