**Great Expectation (GX) Set up**

In [15]:
%run UtilityFunc

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 26, Finished, Available, Finished)

In [2]:
import great_expectations as gx
from great_expectations import expectations as gxe

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 9, Finished, Available, Finished)



**Create Data Context, Data Asset and Batch Definition**

In [3]:

# Set up GX environment
context = gx.get_context(mode="file")

# Add the Data Source to the Data Context
data_source = context.data_sources.add_spark(name="logistics")

# Add a Data Asset to the Data Source
data_asset = data_source.add_dataframe_asset("logistics_dataframe_asset")

# Add a Batch Definition to the Data Asset
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    "logistics_batch"
)


StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 10, Finished, Available, Finished)

**Create Expectation Suite and Expectations**

In [4]:
# Expectation Suite - Collection of Expectation

expectation_suite= gx.ExpectationSuite(name="logistics_expectation_suite")

# Add Suite to data context
expectation_suite = context.suites.add(expectation_suite)

# Create Expectations based on your domain knowledge of data

weight_expectation =  gxe.ExpectColumnValuesToBeBetween(
    column='weight_kg',
    min_value=0,
    meta={
        "title": "Check weight value is non-negative",
        "description":" The item weight should be positive or greater than zero"
        }
     )

quantity_expectation = gxe.ExpectColumnValuesToBeBetween(
    column='quantity', 
    min_value=0,
    meta={
        "title": "Check quantity value is non-negative",
        "description":" The quantity of items ordered should be positive or greater than zero"
        }
    )

tracking_expectation = gxe.ExpectColumnValueLengthsToEqual(
    column='tracking_number', 
    value =18,
    meta={
        "title": "Check tracking ID has valid length (18)",
        "description":" The tracking ID should have 18 characters"
        }
    )

carrier_expectation = gxe.ExpectColumnValuesToNotBeNull(
    column ='carrier',
    meta={
        "title": "Check carrier is present",
        "description":" The carrier/courier should be present and not null"
        }
    )


# Add expectations to Expectation Suite
expectation_suite.add_expectation(weight_expectation)
expectation_suite.add_expectation(quantity_expectation)
expectation_suite.add_expectation(tracking_expectation)
expectation_suite.add_expectation(carrier_expectation)

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 11, Finished, Available, Finished)

ExpectColumnValuesToNotBeNull(id='b6be0206-9705-4692-b532-21b9960b675d', meta={'title': 'Check carrier is present', 'description': ' The carrier/courier should be present and not null'}, notes=None, result_format=<ResultFormat.BASIC: 'BASIC'>, description=None, catch_exceptions=True, rendered_content=None, severity=<FailureSeverity.CRITICAL: 'critical'>, windows=None, batch_id=None, column='carrier', mostly=1, row_condition=None, condition_parser=None)

**Create Validation Definition**

In [5]:
# Add Validation Definition
validation_definition_name = 'logistics_val_def'

validation_def = gx.ValidationDefinition(data=batch_definition, suite=expectation_suite, name = validation_definition_name)

# Save the Validation Definition to your Data Context.
validation_def = context.validation_definitions.add(validation_def)

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 12, Finished, Available, Finished)

**Configure Data Docs**

In [6]:
base_directory = "uncommitted/data_docs/local_site/"  # this is the default path (relative to the root folder of the Data Context) but can be changed as required
site_config = {
    "class_name": "SiteBuilder",
    "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
    "store_backend": {
        "class_name": "TupleFilesystemStoreBackend",
        "base_directory": base_directory,
    },
}

site_name = "my_data_docs_site"
context.add_data_docs_site(site_name=site_name, site_config=site_config)

context.build_data_docs(site_names=site_name)

# Navigate here to see build
# !ls gx/uncommitted/data_docs/local_site/

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 13, Finished, Available, Finished)

{'my_data_docs_site': 'file:///mnt/var/hadoop/tmp/nm-local-dir/usercache/trusted-service-user/appcache/application_1759313574241_0001/container_1759313574241_0001_01_000001/gx/uncommitted/data_docs/local_site/index.html'}

**Create Action( Update data Docs) and Checkpoint**

In [7]:
# Create Action to be used by checkpoint
action = [
    # This Action updates the Data Docs static website with the Validation
    #   Results after the Checkpoint is run.
    gx.checkpoint.actions.UpdateDataDocsAction(
        name="update_all_data_docs",
        site_names =[site_name]
    ),
]

# Create checkpoint
checkpoint_name = "my_checkpoint"
checkpoint = gx.Checkpoint(
    name=checkpoint_name,
    validation_definitions=[validation_def],
    actions=action,
    result_format={"result_format": "COMPLETE"},
)

# Add checkpoint to data context
context.checkpoints.add(checkpoint)

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 14, Finished, Available, Finished)

Checkpoint(name='my_checkpoint', validation_definitions=[ValidationDefinition(name='logistics_val_def', data=BatchDefinition(id=UUID('8c36fe0f-cdce-4569-b098-422fc63859c9'), name='logistics_batch', partitioner=None), suite={
  "name": "logistics_expectation_suite",
  "id": "2bfd851e-a8f7-4d76-9953-9a0a53ed7dff",
  "expectations": [
    {
      "type": "expect_column_values_to_be_between",
      "kwargs": {
        "column": "weight_kg",
        "min_value": 0.0
      },
      "meta": {
        "description": " The item weight should be positive or greater than zero",
        "title": "Check weight value is non-negative"
      },
      "id": "5cef5da8-d715-458a-9d01-36c500c660b9",
      "severity": "critical"
    },
    {
      "type": "expect_column_values_to_be_between",
      "kwargs": {
        "column": "quantity",
        "min_value": 0.0
      },
      "meta": {
        "description": " The quantity of items ordered should be positive or greater than zero",
        "title": "Chec

**Load data into Spark dataframe from lakehouse**

In [8]:
workspace = "Logistics"
lakehouse = "Shipments_LH"
table = "logistics"

df = spark.read.format("csv") \
        .options(header=True, inferSchema=True) \
        .load(f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{lakehouse}.Lakehouse/Files/{table}.csv")

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 15, Finished, Available, Finished)

**Run validations by running Checkpoint on batch parameter (loaded data)**

In [9]:
# Run Checkpoint and Save validation result
result = checkpoint.run(batch_parameters={"dataframe": df})

# Can copy gx files to lakehouse to leverage built expectation suites and docs for new data points
# !cp gx /lakehouse/default/Files

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 16, Finished, Available, Finished)

Calculating Metrics:   0%|          | 0/38 [00:00<?, ?it/s]

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 17, Finished, Available, Finished)

**Use Validation result to clean data**
- Get clean data
- Get failed data

In [11]:
# Use GX result to clean data
cleaned_df, failed_df = cleanData(df, result) # Called from UtilityFunc notebook

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 19, Finished, Available, Finished)

Final combined SQL:
 ((weight_kg IS NOT NULL) AND (NOT (weight_kg >= 0.0))) OR ((quantity IS NOT NULL) AND (NOT (quantity >= 0.0))) OR ((tracking_number IS NOT NULL) AND (NOT (length(tracking_number) = 18.0))) OR (NOT (carrier IS NOT NULL))


**Write clean and validated data to target Lakehouse** (_Typically would be written to Silver or Gold tables_)

In [12]:
cleaned_df.write.format("delta") \
        .mode("overwrite") \
        .save(f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{lakehouse}.Lakehouse/Tables/shipments")

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 20, Finished, Available, Finished)

**Get report from the data**

In [36]:
report = summarizeReport(result) # Called from UtilityFunc notebook

StatementMeta(, 10ed5b8b-5b31-46a9-b82a-8e383cebd009, 76, Finished, Available, Finished)

**Pass Validation report as exit value to item that calls the Validation notebook**

In [None]:
mssparkutils.notebook.exit(report)