In [1]:
# Install pakcage if not installed
# pip install great-expectations -q

In [2]:
import great_expectations as gx
import great_expectations.expectations as gxe
import os

## Define Data Context

In [3]:
# context = gx.get_context(mode="file")
# Retrieve your Data Context
context = gx.get_context(mode="ephemeral") # Please choose one of: ephemeral, file, cloud.
print(type(context).__name__ )
assert type(context).__name__ == "EphemeralDataContext" # Please choose one of: FileDataContext, EphemeralDataContext, CloudDataContext

EphemeralDataContext


## Define Data Source

In [4]:
import great_expectations as gx

# Define the Data Source name
data_source_name = "canada_data_source"

# Add the Data Source to the Data Context
data_source = context.data_sources.add_pandas(name=data_source_name)
assert data_source.name == data_source_name

## Define Data Assest

In [5]:
import great_expectations as gx

# Retrieve the Data Source
data_source_name = "canada_data_source"
data_source = context.data_sources.get(data_source_name)

# Define the Data Asset name
data_asset_name = "canada_cleaned_data_asset"

# Add a Data Asset to the Data Source
data_asset = data_source.add_dataframe_asset(name=data_asset_name)

assert data_asset.name == data_asset_name

## Define Batch Definition

In [6]:
import great_expectations as gx

# Retrieve the Data Asset
data_source_name = "canada_data_source"
data_asset_name = "canada_cleaned_data_asset"
data_asset = context.data_sources.get(data_source_name).get_asset(data_asset_name)

# Define the Batch Definition name
batch_definition_name = "canada_cleaned_batch_definition"

# Add a Batch Definition to the Data Asset
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    batch_definition_name
)
assert batch_definition.name == batch_definition_name

In [7]:
import great_expectations as gx

# Retrieve the dataframe Batch Definition
data_source_name = "canada_data_source"
data_asset_name = "canada_cleaned_data_asset"
batch_definition_name = "canada_cleaned_batch_definition"
batch_definition = (
    context.data_sources.get(data_source_name)
    .get_asset(data_asset_name)
    .get_batch_definition(batch_definition_name)
)

## Define Expectations

In [None]:
expectations = []

# 1. Expect exact column count
expectations.append(gx.expectations.ExpectTableColumnCountToEqual(meta={"importance": "required"},
                                                                value=24))

# 2. Expect column names to match
# Data real đang thiếu View
expectations.append(gx.expectations.ExpectTableColumnsToMatchOrderedList(
    meta={"importance": "required"},
    column_list=[
        "City", "Province", "Latitude", "Longitude", "Price",
        "Bedrooms", "Bathrooms", "Acreage", "Property Type",
        "Square Footage", "Garage", "Parking", "Basement",
        "Exterior", "Fireplace", "Heating", "Flooring", "Roof", 
        "Waterfront", "Sewer", "Pool", "Garden", "View", "Balcony"
    ]
))

# 3. Expect data types
expectations.extend([
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"}, 
                                                column="City", type_="object"),
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"},
                                                column="Province", type_="object"),
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"},
                                                column="Latitude", type_="float64"),
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"},
                                                column="Longitude", type_="float64"),
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"},
                                                column="Price", type_="float64"),
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"}, 
                                                column="Bedrooms", type_="float64"),
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"}, 
                                                column="Bathrooms", type_="float64"),
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"},
                                                column="Acreage", type_="float64"),
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"},
                                                column="Property Type", type_="object"),
    gx.expectations.ExpectColumnValuesToBeOfType(meta={"importance": "required"},
                                                column="Square Footage", type_="float64"),
])

# 4. Expect ranges and value constraints
expectations.extend([
    gx.expectations.ExpectColumnValuesToBeBetween(column="Bedrooms", min_value=0, max_value=37),
    gx.expectations.ExpectColumnValuesToBeBetween(column="Bathrooms", min_value=0, max_value=26),
    gx.expectations.ExpectColumnValuesToBeBetween(column="Price", min_value=50000, max_value=58800000),
    gx.expectations.ExpectColumnValuesToBeBetween(column="Latitude", min_value=40.0, max_value=66.0),
    gx.expectations.ExpectColumnValuesToBeBetween(column="Longitude", min_value=-140.0, max_value=-50.0),
    gx.expectations.ExpectColumnValuesToBeBetween(column="Acreage", min_value=0.0)
])

# 5. Expect non-null for key columns
critical_columns = ["City","Province","Latitude","Longitude","Price",
                    "Bedrooms","Bathrooms","Acreage","Property Type",
                    "Square Footage","Garage","Parking","Fireplace",
                    "Waterfront","Sewer","Pool","Garden","Balcony"
                    ]

for column in critical_columns:
    expectations.extend([gx.expectations.ExpectColumnValuesToNotBeNull(meta={"importance": "required"},
                                                                       column=column)])

# expectations.extend([
#     gx.expectations.ExpectColumnValuesToNotBeNull(meta={"importance": "required"},
#                                                 column="Price"),
#     gx.expectations.ExpectColumnValuesToNotBeNull(meta={"importance": "required"},
#                                                 column="City"),
#     gx.expectations.ExpectColumnValuesToNotBeNull(meta={"importance": "required"}, 
#                                                 column="Province"),
#     gx.expectations.ExpectColumnValuesToNotBeNull(meta={"importance": "required"},
#                                                 column="Latitude"),
#     gx.expectations.ExpectColumnValuesToNotBeNull(meta={"importance": "required"},
#                                                 column="Longitude"),
# ])

# 6. Expect boolean-style fields to contain only "Yes" or "No"
yes_no_columns = ["Fireplace", "Waterfront", "Pool", "Garden", "Balcony"]
for col in yes_no_columns:
    expectations.append(gx.expectations.ExpectColumnValuesToBeInSet(column=col, value_set=["Yes", "No"]))

# 6. Expect Object fields to within a value set
expectations.append(gx.expectations.ExpectColumnValuesToBeInSet(column="View", value_set=['Valley', 'City', 'River', 'Downtown', 'Lake']))

## Define Batch Parameters

In [None]:
import pandas as pd

DATA_DIR = os.environ.get('DATA_PATH')
DATA_PATH = f'{DATA_DIR}/cleaned_canada.csv'
# DATA_PATH = '/home/aircsrv5/Quan/DataOps/DataOps-bootstrap/demo/include/data/cleaned_canada.csv'
cleaned_dataframe = pd.read_csv(DATA_PATH, index_col=False)

cleaned_batch_parameters = {"dataframe": cleaned_dataframe}

  cleaned_dataframe = pd.read_csv(DATA_PATH, index_col=False)


In [10]:
cleaned_batch = batch_definition.get_batch(batch_parameters=cleaned_batch_parameters)

## Validate expectations

In [11]:
# Run them
results = []
for exp in expectations:
    result = cleaned_batch.validate(exp)
    results.append(result)
    if not result["success"]:
        print("❌ Validation failed:")
        print(result)
    # else:
    #     print("✅ Validation successed:")
    #     print(result)

Calculating Metrics: 100%|██████████| 3/3 [00:00<00:00, 2192.91it/s]




❌ Validation failed:
{
  "success": false,
  "expectation_config": {
    "type": "expect_table_column_count_to_equal",
    "kwargs": {
      "batch_id": "canada_data_source-canada_cleaned_data_asset",
      "value": 25
    },
    "meta": {
      "importance": "required"
    }
  },
  "result": {
    "observed_value": 24
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


Calculating Metrics: 100%|██████████| 2/2 [00:00<00:00, 1211.88it/s]
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 802.12it/s] 
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 1204.22it/s]
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 1683.11it/s]
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 1584.55it/s]
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 1680.41it/s]
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 1402.78it/s]
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 1683.11it/s]
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 1618.17it/s]
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 1685.14it/s]
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 1718.98it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 879.71it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 816.66it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 897.00it/s]


❌ Validation failed:
{
  "success": false,
  "expectation_config": {
    "type": "expect_column_values_to_be_between",
    "kwargs": {
      "batch_id": "canada_data_source-canada_cleaned_data_asset",
      "column": "Price",
      "min_value": 50000.0,
      "max_value": 58800000.0
    },
    "meta": {}
  },
  "result": {
    "element_count": 45763,
    "unexpected_count": 2591,
    "unexpected_percent": 5.661779166575618,
    "partial_unexpected_list": [
      29900.0,
      40000.0,
      34900.0,
      29000.0,
      40000.0,
      35000.0,
      44900.0,
      44500.0,
      49900.0,
      49900.0,
      36000.0,
      49000.0,
      19900.0,
      36800.0,
      37400.0,
      37400.0,
      38000.0,
      39000.0,
      39900.0,
      39000.0
    ],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 5.661779166575618,
    "unexpected_percent_nonmissing": 5.661779166575618,
    "partial_unexpected_counts": [
      {
        "value": 37400.0,
     

Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 908.76it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 918.01it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 931.38it/s]
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 3642.47it/s]
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 2956.60it/s]
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 2934.11it/s]
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 3861.71it/s]
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 3835.67it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 688.38it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 683.60it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 695.71it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 723.67it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 735.69it/s]
Calculating Metrics: 100%|██████████| 10/10 [00:00<00:00, 2332.11it/s]


In [12]:
from datetime import datetime
import json
# Run validation for each expectation and collect results

# Prepare summary
passed = []
failed = []
critical_failed = 0
optional_failed = 0

for r in results:
    config = r.get("expectation_config", {})
    expectation_type = r.get("expectation_config", {}).get("type", "unknown_expectation")
    importance = config.get("meta", {}).get("importance", "optional")  # default to optional
    if r.get("success", False):
        passed.append(expectation_type)
    else:
        failed.append({
            "expectation_type": expectation_type,
            "expected_value": config.get("kwargs", {}).get("value", None),
            "observed_value": r.get("result", {}).get("observed_value", None),
            "importance": config.get("meta", {}).get("importance", "optional")
        })
        if importance == "required":
            critical_failed += 1
        else:
            optional_failed += 1

summary = {
    "total_tests": len(results),
    "passed_tests": len(passed),
    "failed_tests": len(failed),
    # "passed_expectations": passed,
    "failed_expectations": failed,
    "critical_failed_tests": critical_failed,
    "optional_failed_tests": optional_failed,
    "passed_percentage": round(100 * len(passed) / len(results), 2) if results else 0.0,
    "run_time": datetime.utcnow().isoformat() + "Z",
    "batch_info": {
        "datasource": cleaned_batch.batch_definition._datasource_name,
        "data_asset": cleaned_batch.batch_definition.data_asset_name
    }
}

# Save to JSON
# output_path = "/tmp/gx_validation_summary.json"
output_path = "/tmp/gx_validation_summary.json"
with open(output_path, "w") as f:
    json.dump(summary, f, indent=2)

print(f"Validation summary written to {output_path}")


Validation summary written to /tmp/gx_validation_summary.json
