In [1]:
import great_expectations as gx
from great_expectations.core import RunIdentifier
from great_expectations.checkpoint.actions import UpdateDataDocsAction
from datetime import datetime
import pandas as pd
import os
import numpy as np

In [2]:
#getting the data and context
context = gx.get_context() #not specified the mode name as file as we have already did that in our test file and get_context() will automatically look for the context in the current directory
#context = gx.get_context(mode="file", project_root_dir=os.getcwd()) #if you want to specify the mode and project root directory
df = pd.read_csv("Test_data.csv")

In [3]:
# Specifying Names - Data Source, Data Asset, Batch, Expecation Suite, Validation, Checkpoint
data_source_name = "risk_analytics"
data_asset_name = "delq_history"
batch_name = "delq_history_batch"
expectation_suite_name = "delq_history_quality_suite" 
batch_parameters = {"dataframe": df}
checkpoint_name = "delq_history_checkpoint"
validation_definition_name = "delq_history_validation"
run_id = RunIdentifier(run_name=f"Delq History Validation")

In [7]:
data_source = context.data_sources.get(data_source_name)
type(data_source)  # should be <class 'great_expectations.datasource.data_source.DataSource'>

great_expectations.datasource.fluent.pandas_datasource.PandasDatasource

In [4]:
# adding data source,  data asset, batch to the context
data_source = context.data_sources.add_pandas(name=data_source_name)
data_asset = data_source.add_dataframe_asset(name=data_asset_name)
batch_definition = data_asset.add_batch_definition_whole_dataframe(name=batch_name)

DataContextError: Can not write the fluent datasource risk_analytics because a datasource of that name already exists in the data context.

In [11]:
# Creating an expectation suite and adding it to the context
delq_suite = gx.ExpectationSuite(name=expectation_suite_name)
context.suites.add(delq_suite)

{
  "name": "delq_history_quality_suite",
  "id": "72ea19b8-992e-4ede-9ef9-dd327ee9b6a0",
  "expectations": [],
  "meta": {
    "great_expectations_version": "1.5.5"
  },
  "notes": null
}

In [12]:
# Creating Expectations and add them to the expectation suite
# Expecation 1: Column 'Region' Should be in a set of values 
    # We will use the value counts of the 'REGION' column to create a set of expected values
    # and then use that set to create an expectation that the 'REGION' column should only contain those values.
    # This is useful for ensuring that the 'REGION' column does not contain unexpected values.
region_set = set(df['REGION'].value_counts().index)
region_set.remove("OTHER")
region_expectation = gx.expectations.ExpectColumnValuesToBeInSet( 
        column="REGION",
        value_set=region_set,
        result_format="COMPLETE",
        mostly=0.99
    )
delq_suite.add_expectation(region_expectation)

# Expecation 2: Columns which should not be null 
not_null_columns = ["REGION", "CERTNUM","DTAPPREC"]
for column in not_null_columns:
    expectation_name = f"not_null_expectation_{column}"
    expectation_name = gx.expectations.ExpectColumnValuesToNotBeNull(
        column=column,
        result_format="COMPLETE",
        mostly=0.99
    )
    delq_suite.add_expectation(expectation_name)

# Expecation 3: Columns which should be not null with a condition 
condition_columns = ["CURRENT_OUTSTANDING_BAL_AMT", "CPA_REMAINING_AMORTIZATION"]
for column in condition_columns:
    expectation_name = f"not_null_condition_expectation_{column}"
    expectation_name = gx.expectations.ExpectColumnValuesToNotBeNull(
        column=column,
        result_format="COMPLETE",
        mostly=0.99,
        row_condition= 'CURRENT_OUTSTANDING_BAL_AMT > 0'
    )
    delq_suite.add_expectation(expectation_name)

# Expecation 4: Column 'CURRENT_OUTSTANDING_BAL_AMT' should be greater than 0 when 'CPA_REMAINING_AMORTIZATION' is greater than 0
outstanding_condition_expectation = gx.expectations.ExpectColumnValuesToNotBeNull(
        column="CURRENT_OUTSTANDING_BAL_AMT",
        result_format="COMPLETE",
        mostly=0.99,
        row_condition='CPA_REMAINING_AMORTIZATION > 0'
    )
delq_suite.add_expectation(outstanding_condition_expectation)

# Expecation 5: Outstanding balance should be less than or equal to 1500000
outstanding_balance_expectation = gx.expectations.ExpectColumnValuesToBeBetween(
        column="CURRENT_OUTSTANDING_BAL_AMT",
        min_value=  0,
        strict_min = True,
        max_value=1500000,
        result_format="COMPLETE",
        mostly=0.99
    )

In [31]:
delq_suite.expectations

[ExpectColumnValuesToBeInSet(id='028de4d7-b367-445b-844b-470545b01b24', meta=None, notes=None, result_format=<ResultFormat.COMPLETE: 'COMPLETE'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='REGION', mostly=0.99, row_condition=None, condition_parser='pandas', value_set=['GTA', 'PACIFIC', 'ATLANTIC', 'PRAIRIES', 'QUEBEC', 'ONTARIO', 'ALBERTA']),
 ExpectColumnValuesToNotBeNull(id='bf9fffdb-2abc-42ef-9158-253490f6900b', meta=None, notes=None, result_format=<ResultFormat.COMPLETE: 'COMPLETE'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='REGION', mostly=0.99, row_condition=None, condition_parser='pandas'),
 ExpectColumnValuesToNotBeNull(id='8235255c-4aff-4671-8c84-3c8ff55c4180', meta=None, notes=None, result_format=<ResultFormat.COMPLETE: 'COMPLETE'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='CERTNUM', mostly=0.99, row_co

In [23]:
validation_definition = gx.ValidationDefinition(
    name=validation_definition_name,
    suite=delq_suite,
    data =batch_definition)

context.validation_definitions.add(validation_definition)

ValidationDefinition(name='delq_history_validation', data=BatchDefinition(id=UUID('8a63d1e7-333a-41ed-811c-e0a568265ea4'), name='delq_history_batch', partitioner=None), suite={
  "name": "delq_history_quality_suite",
  "id": "72ea19b8-992e-4ede-9ef9-dd327ee9b6a0",
  "expectations": [
    {
      "type": "expect_column_values_to_be_in_set",
      "kwargs": {
        "result_format": "COMPLETE",
        "column": "REGION",
        "mostly": 0.99,
        "value_set": [
          "GTA",
          "PACIFIC",
          "ATLANTIC",
          "PRAIRIES",
          "QUEBEC",
          "ONTARIO",
          "ALBERTA"
        ]
      },
      "meta": {},
      "id": "028de4d7-b367-445b-844b-470545b01b24"
    },
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "result_format": "COMPLETE",
        "column": "REGION",
        "mostly": 0.99
      },
      "meta": {},
      "id": "bf9fffdb-2abc-42ef-9158-253490f6900b"
    },
    {
      "type": "expect_column_value

In [24]:
# Specifying the action to take when the validation fails
action_list = [UpdateDataDocsAction(
    name="Update Data Docs for delq history",
)]
checkpoint = gx.Checkpoint(
    name=checkpoint_name,
    validation_definitions=[validation_definition],
    actions=action_list,
    result_format= {"result_format": "COMPLETE"})

context.checkpoints.add(checkpoint)


Checkpoint(name='delq_history_checkpoint', validation_definitions=[ValidationDefinition(name='delq_history_validation', data=BatchDefinition(id=UUID('8a63d1e7-333a-41ed-811c-e0a568265ea4'), name='delq_history_batch', partitioner=None), suite={
  "name": "delq_history_quality_suite",
  "id": "72ea19b8-992e-4ede-9ef9-dd327ee9b6a0",
  "expectations": [
    {
      "type": "expect_column_values_to_be_in_set",
      "kwargs": {
        "result_format": "COMPLETE",
        "column": "REGION",
        "mostly": 0.99,
        "value_set": [
          "GTA",
          "PACIFIC",
          "ATLANTIC",
          "PRAIRIES",
          "QUEBEC",
          "ONTARIO",
          "ALBERTA"
        ]
      },
      "meta": {},
      "id": "028de4d7-b367-445b-844b-470545b01b24"
    },
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "result_format": "COMPLETE",
        "column": "REGION",
        "mostly": 0.99
      },
      "meta": {},
      "id": "bf9fffdb-2abc-42ef

In [33]:
# Running the checkpoint
checkpoint_result = checkpoint.run(run_id=run_id, batch_parameters=batch_parameters)

# Displaying the result of the checkpoint run
print(f"Result of the run: {checkpoint_result.success}")

Calculating Metrics: 100%|██████████| 49/49 [00:06<00:00,  7.46it/s]


Result of the run: True


In [30]:
for exp in delq_suite.expectations:
    exp.condition_parser= "pandas"
delq_suite.save()

In [7]:
# Modifying the expecatio suite to include description for few excepeation with conditions 
delq_suite = context.suites.get(expectation_suite_name)
for exp in delq_suite.expectations:
    if isinstance(exp, gx.expectations.ExpectColumnValuesToNotBeNull) and exp.row_condition:
        exp.description = f"Column '{exp.column}' should not be null when {exp.row_condition}"
    elif isinstance(exp, gx.expectations.ExpectColumnValuesToBeBetween):
        exp.description = f"Column '{exp.column}' should be between {exp.min_value} and {exp.max_value}"
delq_suite.save()

In [8]:
delq_suite.expectations

[ExpectColumnValuesToBeInSet(id='028de4d7-b367-445b-844b-470545b01b24', meta={}, notes=None, result_format=<ResultFormat.COMPLETE: 'COMPLETE'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='REGION', mostly=0.99, row_condition=None, condition_parser='pandas', value_set=['GTA', 'PACIFIC', 'ATLANTIC', 'PRAIRIES', 'QUEBEC', 'ONTARIO', 'ALBERTA']),
 ExpectColumnValuesToNotBeNull(id='bf9fffdb-2abc-42ef-9158-253490f6900b', meta={}, notes=None, result_format=<ResultFormat.COMPLETE: 'COMPLETE'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='REGION', mostly=0.99, row_condition=None, condition_parser='pandas'),
 ExpectColumnValuesToNotBeNull(id='8235255c-4aff-4671-8c84-3c8ff55c4180', meta={}, notes=None, result_format=<ResultFormat.COMPLETE: 'COMPLETE'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='CERTNUM', mostly=0.99, row_conditio

# Running it after the checkpoints are setup 

In [3]:
batch_parameters = {"dataframe": df}
checkpoint_name = "delq_history_checkpoint"
run_id = RunIdentifier(run_name=f"Delq History Validation")

In [4]:
checkpoint = context.checkpoints.get(checkpoint_name)
checkpoint_result = checkpoint.run(run_id=run_id, batch_parameters=batch_parameters)
print(f"Checkpoint run result: {checkpoint_result.success}")

Calculating Metrics: 100%|██████████| 49/49 [00:06<00:00,  7.70it/s]


Checkpoint run result: True
