## Use Great Expectations For Data Validation

In [1]:
%pip install great-expectations

StatementMeta(, , -1, Finished, Available)

Collecting great-expectations
  Downloading great_expectations-0.18.0-py3-none-any.whl (5.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.4/5.4 MB[0m [31m93.4 MB/s[0m eta [36m0:00:00[0mta [36m0:00:01[0m
Collecting tzlocal>=1.2
  Downloading tzlocal-5.2-py3-none-any.whl (17 kB)
Collecting notebook>=6.4.10
  Downloading notebook-7.0.6-py3-none-any.whl (4.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.0/4.0 MB[0m [31m166.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pydantic>=1.9.2
  Downloading pydantic-2.4.2-py3-none-any.whl (395 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m395.8/395.8 kB[0m [31m107.4 MB/s[0m eta [36m0:00:00[0m
Collecting makefun<2,>=1.7.0
  Downloading makefun-1.15.1-py2.py3-none-any.whl (22 kB)
Collecting jsonpatch>=1.22
  Downloading jsonpatch-1.33-py2.py3-none-any.whl (12 kB)
Collecting ruamel.yaml<0.17.18,>=0.16
  Downloading ruamel.yaml-0.17.17-py3-none-any.whl (109 kB)
[2K     




In [2]:
%pip install opencensus-ext-azure

StatementMeta(, , -1, Finished, Available)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/nfs4/pyenv-afae7582-6a6c-4d25-95cf-5ff324a1e805/bin/python -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.





### Prepare Data Source

In [3]:
import pandas as pd
# Load Yellow Taxi Trip Records parquet file from staging zone to pandas dataframe

pd_df = pd.read_parquet(f"/lakehouse/default/Files/yellow_taxi_tripdata_2022.parquet", engine="pyarrow")
pd_df.head()

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 17, Finished, Available)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2.0,3.8,1.0,N,142,236,1,14.5,3.0,0.5,3.65,0.0,0.3,21.95,2.5,0.0
1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1.0,2.1,1.0,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.3,13.3,0.0,0.0
2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1.0,0.97,1.0,N,166,166,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56,0.0,0.0
3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1.0,1.09,1.0,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5,0.0
4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1.0,4.3,1.0,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.3,30.3,2.5,0.0


## 1. Configure Data Context
https://docs.greatexpectations.io/docs/terms/data_context

In [4]:
from ruamel import yaml
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
    DataContextConfig,
    DatasourceConfig,
    FilesystemStoreBackendDefaults,
)

work_path = "/lakehouse/default/Files/validation"

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 18, Finished, Available)

In [5]:
data_context_config = DataContextConfig(
    datasources={
        "transformed_data_source": DatasourceConfig(
            class_name="Datasource",
            # PandasExecutionEngine, or SparkDFExecutionEngine decides which kind of dataframe to use
            execution_engine={"class_name": "PandasExecutionEngine"},
            data_connectors={
                "transformed_data_connector": {
                    "module_name": "great_expectations.datasource.data_connector",
                    "class_name": "RuntimeDataConnector",
                    "batch_identifiers": [
                        "environment",
                    ],
                }
            }
        )
    },
    store_backend_defaults=FilesystemStoreBackendDefaults(root_directory=work_path)
)
context = BaseDataContext(project_config=data_context_config)

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 19, Finished, Available)




## 2. Create a Batch Request based on dataframe
https://docs.greatexpectations.io/docs/terms/batch

In [6]:
batch_request = RuntimeBatchRequest(
    datasource_name="transformed_data_source",
    data_connector_name="transformed_data_connector",
    data_asset_name="nyctaxi_data",
    batch_identifiers={
        "environment": "stage",
    },
    runtime_parameters={"batch_data": pd_df},
)

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 20, Finished, Available)

## 3. Define Expecation Suite and corresponding Data Expectations
https://docs.greatexpectations.io/docs/terms/expectation_suite

In [7]:
expectation_suite_name = "Nyctaxi_data_suite_basic"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
    datasource_name="transformed_data_source",
    data_connector_name="transformed_data_connector",
    data_asset_name="nyctaxi_data",
)
# Add Validatons to suite
# Check available expectations: validator.list_available_expectation_types()
validator.expect_column_values_to_be_between(column="passenger_count", min_value=0, max_value=10)
validator.expect_column_values_to_not_be_null(column="passenger_count")
validator.expect_column_values_to_not_be_null(column="trip_distance")
validator.expect_column_values_to_be_of_type(column="store_and_fwd_flag", type_="object")
validator.expect_column_values_to_not_be_null(column="fare_amount")
validator.expect_column_values_to_be_of_type(column="payment_type", type_="int")
# To run validations without checkpoint
# validator.validate()
validator.save_expectation_suite(discard_failed_expectations=False)

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 21, Finished, Available)


  warn(



Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

## 4. Configure a checkpoint and run Expectation suite using checkpoint
https://docs.greatexpectations.io/docs/terms/checkpoint

In [8]:
my_checkpoint_name = "Nyctaxi Data"
checkpoint_config = {
    "name": my_checkpoint_name,
    "config_version": 1.0,
    "class_name": "SimpleCheckpoint",
    "run_name_template": "%Y%m%d-%H%M%S-my-run-name-template",
}
my_checkpoint = context.test_yaml_config(yaml.dump(checkpoint_config,default_flow_style=False))
context.add_or_update_checkpoint(**checkpoint_config)
# Run Checkpoint passing in expectation suite
checkpoint_result = context.run_checkpoint(
    checkpoint_name=my_checkpoint_name,
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": expectation_suite_name,
        }
    ],
)

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 22, Finished, Available)

Attempting to instantiate class from config...
	Instantiating as a SimpleCheckpoint, since class_name is SimpleCheckpoint
	Successfully instantiated SimpleCheckpoint


Checkpoint class name: SimpleCheckpoint
Your current Checkpoint configuration has an empty or missing "validations" attribute.  This
means you must either update your Checkpoint configuration or provide an appropriate validations
list programmatically (i.e., when your Checkpoint is run).
                    
  warn(



Calculating Metrics:   0%|          | 0/23 [00:00<?, ?it/s]

## Report Data Quality Metrics to Azure Monitor

### Build Data Docs

In [9]:
context.build_data_docs()

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 23, Finished, Available)

{'local_site': 'file:///lakehouse/default/Files/05_validation/uncommitted/data_docs/local_site/index.html'}

In [10]:
with open('/lakehouse/default/Files/validation/validation_results.json', 'w') as f:
    f.write(str(checkpoint_result))

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 24, Finished, Available)

###  Send Data Quality Logs to Azure Monitor

In [11]:
import json
result_dic = checkpoint_result.to_json_dict()
key_name = [key for key in result_dic['run_results'].keys()][0]
results = result_dic['run_results'][key_name]['validation_result']['results']

checks = {'check_name':checkpoint_result['checkpoint_config']['name'],
          'data_asset_name': result_dic['run_results'][key_name]['validation_result']['meta']['active_batch_definition']['data_asset_name'],
          'datasource_name': result_dic['run_results'][key_name]['validation_result']['meta']['active_batch_definition']['datasource_name'],}

for i in range(len(results)):
    validation_name = results[i]['expectation_config']['expectation_type'] + "_on_" + results[i]['expectation_config']['kwargs']['column']
    checks[validation_name] = results[i]['success']
    if(checks[validation_name] == False):
        detailed_failure_info = 'failure_info_on_' + validation_name
        checks[detailed_failure_info] = json.dumps(results[i]['result'])
    
properties = {'custom_dimensions': checks}

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 25, Finished, Available)

In [12]:
# Report Data Quality Metrics to Azure Monitor using python Azure Monitor open-census exporter 
import logging
import time
from opencensus.ext.azure.log_exporter import AzureLogHandler

AZURE_MONITOR_SECRET = "InstrumentationKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
# You can find the related logs in featuresinsightsxxxx
# Click 'Logs', and check relevant output logs by querying 'traces' table
logger = logging.getLogger(__name__)
logger.addHandler(AzureLogHandler(connection_string=AZURE_MONITOR_SECRET))

if checkpoint_result.success is True:
    logger.setLevel(logging.INFO)
    logger.info('verifychecks', extra=properties)
else:
    logger.setLevel(logging.ERROR)
    logger.error('verifychecks', extra=properties)
    raise RuntimeError(
            "The Great Expectations validation failed. Check "
            "the logs or the Great Expectations data docs for more information.")  

time.sleep(16)

StatementMeta(, e962d86e-701c-403f-8df4-cc8d5d4e5650, 26, Finished, Available)

verifychecks


RuntimeError: The Great Expectations validation failed. Check the logs or the Great Expectations data docs for more information.