In [1]:
#   ___              _     ___                  _        _   _
#  / __|_ _ ___ __ _| |_  | __|_ ___ __  ___ __| |_ __ _| |_(_)___ _ _  ___
# | (_ | '_/ -_) _` |  _| | _|\ \ / '_ \/ -_) _|  _/ _` |  _| / _ \ ' \(_-<
#  \___|_| \___\__,_|\__| |___/_\_\ .__/\___\__|\__\__,_|\__|_\___/_||_/__/
#                                 |_|

In [2]:
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
from great_expectations.core.expectation_suite import ExpectationSuite
from great_expectations.data_context import FileDataContext

# Create a DataContext as an entry point to the GX Python API
context = gx.get_context(project_root_dir='.')
print(context)

{
  "checkpoint_store_name": "checkpoint_store",
  "config_variables_file_path": "uncommitted/config_variables.yml",
  "config_version": 4.0,
  "data_context_id": "d27e374d-865c-4512-9f92-3a4a38cb9c6b",
  "data_docs_sites": {
    "local_site": {
      "class_name": "SiteBuilder",
      "show_how_to_buttons": true,
      "store_backend": {
        "class_name": "TupleFilesystemStoreBackend",
        "base_directory": "uncommitted/data_docs/local_site/"
      },
      "site_index_builder": {
        "class_name": "DefaultSiteIndexBuilder"
      }
    }
  },
  "expectations_store_name": "expectations_store",
  "fluent_datasources": {},
  "plugins_directory": "plugins/",
  "stores": {
    "expectations_store": {
      "class_name": "ExpectationsStore",
      "store_backend": {
        "class_name": "TupleFilesystemStoreBackend",
        "base_directory": "expectations/"
      }
    },
    "validation_results_store": {
      "class_name": "ValidationResultsStore",
      "store_backend": {
 

In [3]:
# Connect to PostgreSQL OpenAQ_DWH Database
datasource_name = "dainy-openaq-quality"
my_connection_string = (
    "postgresql+psycopg2://postgres:123456@localhost:5432/OpenAQ_DWH"
)

pg_datasource = context.data_sources.add_postgres(
    name=datasource_name, connection_string=my_connection_string
)

In [4]:
# Verify Postgres connection
print(context.data_sources.get(datasource_name))

connection_string: postgresql+psycopg2://postgres:123456@localhost:5432/OpenAQ_DWH
id: a5748ff5-4a5c-4795-b507-a214cc5a7fcc
name: dainy-openaq-quality
type: postgres



In [5]:
# Add Data Asset to the data source (pg_datasource)
asset_name = "openaq_measurement_data"
database_table_name="stg_measurement_by_sensors"
pg_datasource.add_table_asset(
    name=asset_name, table_name=database_table_name, schema_name="staging"
)

TableAsset(name='openaq_measurement_data', type='table', id=UUID('2eb93e69-673a-452a-be74-963ccfe3c8f5'), order_by=[], batch_metadata={}, batch_definitions=[], table_name='stg_measurement_by_sensors', schema_name='staging')

In [6]:
# Create batch request as data asset
batch_request = pg_datasource.get_asset("openaq_measurement_data")

In [7]:
# Retrieve the full table of data asset
full_table_batch_definition = batch_request.add_batch_definition_whole_table(
    name="FULL_TABLE"
)

In [8]:
# Verify if the batch request successfully collect the data from datasource
full_table_batch = full_table_batch_definition.get_batch()
full_table_batch.head(15)

Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00,  6.17it/s]


    sensor_id         sensor_name  parameter_id    parameter_name  \
0    11357396         pm1 Âµg/mÂ³            19               pm1   
1    11357396         pm1 Âµg/mÂ³            19               pm1   
2    11357395        pm10 Âµg/mÂ³             1              pm10   
3    11357396         pm1 Âµg/mÂ³            19               pm1   
4    11357395        pm10 Âµg/mÂ³             1              pm10   
5    11357424        pm25 Âµg/mÂ³             2              pm25   
6    11357396         pm1 Âµg/mÂ³            19               pm1   
7    11357395        pm10 Âµg/mÂ³             1              pm10   
8    11357424        pm25 Âµg/mÂ³             2              pm25   
9    11357398  relativehumidity %            98  relativehumidity   
10   11357396         pm1 Âµg/mÂ³            19               pm1   
11   11357395        pm10 Âµg/mÂ³             1              pm10   
12   11357424        pm25 Âµg/mÂ³             2              pm25   
13   11357398  relativehumidity % 

In [9]:
# Create an Expectation Suite.
expectation_suite_name = "validate_openaq_measurement"
suite = gx.ExpectationSuite(name=expectation_suite_name)

In [10]:
# Add some expectations
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="sensor_name"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="parameter_id"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="parameter_name"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="parameter_units"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="measurement_datetime_utc"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="summary_min"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="summary_max"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="summary_avg"))

suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(
    column="sensor_id"
))

suite.add_expectation(gx.expectations.ExpectColumnValuesToBeInSet(
    column="parameter_name",
    value_set=["pm1", "pm10", "pm25", "temperature", "relativehumidity"]
))

suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column="measurement_value",
    type_="FLOAT"
))

suite.add_expectation(gx.expectations.ExpectColumnValuesToBeOfType(
    column="summary_avg",
    type_="FLOAT"
))

suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
    column="latitude",
    min_value=10,
    max_value=11
))

suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
    column="longitude",
    min_value=106.60000,
    max_value=106.70000
))

ExpectColumnValuesToBeBetween(id=None, meta=None, notes=None, result_format=<ResultFormat.BASIC: 'BASIC'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column='longitude', mostly=1, row_condition=None, condition_parser=None, min_value=106.6, max_value=106.7, strict_min=False, strict_max=False)

In [11]:
# Add the Expectation to the Context
context.suites.add(suite)

{
  "name": "validate_openaq_measurement",
  "id": "50e2d136-0e62-4311-be50-e20118979a80",
  "expectations": [
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "sensor_name"
      },
      "meta": {},
      "id": "b78261dc-2022-4598-801b-b6a4914df12b"
    },
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "parameter_id"
      },
      "meta": {},
      "id": "27fcdd40-3717-4b49-8c57-a7c089d708b9"
    },
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "parameter_name"
      },
      "meta": {},
      "id": "f330fb1f-636d-4366-8da7-84f7d380ed0c"
    },
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "parameter_units"
      },
      "meta": {},
      "id": "8ff0efaf-8a69-48bc-a3b4-6e105b220d41"
    },
    {
      "type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "measureme

In [12]:
# Validating the data
validation_results = full_table_batch.validate(suite, result_format="BASIC")
print(validation_results)

Calculating Metrics: 100%|██████████| 67/67 [00:00<00:00, 704.65it/s]

{
  "success": false,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "batch_id": "dainy-openaq-quality-openaq_measurement_data",
          "column": "sensor_name"
        },
        "meta": {},
        "id": "b78261dc-2022-4598-801b-b6a4914df12b"
      },
      "result": {
        "element_count": 15,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "partial_unexpected_list": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_traceback": null,
        "exception_message": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "batch_id": "dainy-openaq-quality-openaq_measurement_data",
          "column": "parameter_id"
        },
        "meta": {},
        "id": "27fcdd40-3717-4b49-8c5




In [13]:
# Create validation_definition
definition_name = "openaq_validation_definition"
expectation_suite = context.suites.get(name=expectation_suite_name)
validation_definition = gx.ValidationDefinition(
    data=full_table_batch_definition, suite=expectation_suite, name=definition_name
)

validation_definition = context.validation_definitions.add(validation_definition)

In [15]:
# Create validation definitions
validation_definitions = [context.validation_definitions.get("openaq_validation_definition")]

In [None]:
# Create custom action logic
action_list = [
    
]

In [None]:
# Create Checkpoint
checkpoint_name = "staging_openaq_asset_checkpoint"
checkpoint = gx.Checkpoint(
    name=checkpoint_name,
    validation_definitions=validation_definitions,
    actions=action_list,
    result_format={"result_format": "COMPLETE"},
)