In [None]:
import numpy as np
import pandas as pd
import duckdb
import sqlalchemy
from pandas_gbq import read_gbq

In [None]:
import great_expectations as gx
from great_expectations import expectations as gxe

import pprint
import os

<div class="alert alert-block alert-info">
data testing

In [None]:
context = gx.get_context()
# query bigquery
project_id = "projectm2-aiess"
query = "SELECT * FROM olist.dim_customers"
df_customers = read_gbq(query, project_id=project_id)


data_source_name = "olist.dim_customers"
data_source = context.data_sources.add_pandas(name=data_source_name)

In [None]:
# create asset
data_asset_name = "olist.dim_customers_asset"
data_asset = data_source.add_dataframe_asset(name=data_asset_name)


In [None]:
# create batch
batch_definition_name = "batch_customers_dataframe"
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)

batch_parameters = {"dataframe": df_customers}

new_batch = batch_definition.get_batch(batch_parameters=batch_parameters)

In [None]:
print(new_batch.head(4))

In [None]:
# Create a new suite for all dimension tables schema validation
suite_name = "schema_dim_customers_expectation"
suite = gx.ExpectationSuite(name=suite_name)

schema_dim_customers_expectation = gx.expectations.ExpectColumnToExist(
    column="customer_sid", column_index=0
)

context.suites.add_or_update(suite)
suite.add_expectation(schema_dim_customers_expectation)

definition_name = "schema_dim_customers_definition"
validation_definition = gx.ValidationDefinition(
    data=batch_definition, suite=suite, name=definition_name
)

In [None]:
context = gx.get_context()
# List of GBQ tables and their expected columns with types
gbq_tables_with_columns_and_types = {
    "olist.dim_payments": {"payment_sid": "string"},
    "olist.dim_reviews": {"review_sid": "string"},
    "olist.dim_geolocation": {"geolocation_zip_code_prefix": "string"},
    "olist.dim_items": {"item_sid": "string"},
    "olist.dim_date": {"date_sid": "integer"},
}

# Iterate over the list of tables and process each one
for table_name, expected_columns in gbq_tables_with_columns_and_types.items():
    # Query the table from GBQ
    query = f"SELECT * FROM {table_name}"
    df_table = read_gbq(query, project_id="projectm2-aiess")

    # Generate unique names for data source and asset
    data_source_name = f"{table_name}_data_source"
    asset_name = f"{table_name}_asset"

    # Add data source
    data_source = context.data_sources.add_pandas(name=data_source_name)

    # Add DataFrame asset
    data_asset = data_source.add_dataframe_asset(name=asset_name)

    # Add batch definition
    batch_definition_name = table_name
    batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)

    # Get the batch and print the first few rows
    batch_parameters = {"dataframe": df_table}
    batch = batch_definition.get_batch(batch_parameters=batch_parameters)
    print(f"Batch for {table_name}:")
    print(batch.head(4))

    # Create Expectation Suite
    suite_name = f"{table_name}_expectation_suite"
    suite = gx.ExpectationSuite(name=suite_name)
    suite = context.suites.add(suite)

    # Add ExpectColumnValuesToBeOfType expectations for each expected column
    for column, column_type in expected_columns.items():
        expectation = gx.expectations.ExpectColumnValuesToBeOfType(
            column=column, type_=column_type
        )
        suite.add_expectation(expectation)

    # Create validation definition
    definition_name = f"{table_name}_validation_definition"
    validation_definition = gx.ValidationDefinition(
        data=batch_definition, suite=suite, name=definition_name
    )

    # Run validation
    validation_results = validation_definition.run()
    print(f"Validation results for {table_name}:")
    print(validation_results)

In [None]:
context = gx.get_context()

# Query the fact table from GBQ
project_id = "projectm2-aiess"
fact_table_name = "olist.fct_orders"
query = f"SELECT * FROM {fact_table_name}"
df_fact_table = read_gbq(query, project_id=project_id)

# Generate unique names for data source and asset
data_source_name = f"{fact_table_name}_data_source"
asset_name = f"{fact_table_name}_asset"

# Add data source
data_source = context.data_sources.add_pandas(name=data_source_name)

# Add DataFrame asset
data_asset = data_source.add_dataframe_asset(name=asset_name)

# Add batch definition
batch_definition_name = fact_table_name
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_definition_name)

# Get the batch and print the first few rows
batch_parameters = {"dataframe": df_fact_table}
batch = batch_definition.get_batch(batch_parameters=batch_parameters)
print(f"Batch for {fact_table_name}:")
print(batch.head(4))

# Add column expectations
schema_fct_orders_expectation_1 = gx.expectations.ExpectColumnToExist(
    column="payment_sid", column_index=0
)
schema_fct_orders_expectation_2 = gx.expectations.ExpectColumnToExist(
    column="review_sid", column_index=1
)
schema_fct_orders_expectation_3 = gx.expectations.ExpectColumnToExist(
    column="item_sid", column_index=2
)
schema_fct_orders_expectation_4 = gx.expectations.ExpectColumnToExist(
    column="customer_sid", column_index=3
)

# Create a new suite for the fact table schema validation
suite_name = "schema_fct_orders_expectation"
suite = gx.ExpectationSuite(name=suite_name)
suite = context.suites.add(suite)

suite.add_expectation(schema_fct_orders_expectation_1)
suite.add_expectation(schema_fct_orders_expectation_2)
suite.add_expectation(schema_fct_orders_expectation_3)
suite.add_expectation(schema_fct_orders_expectation_4)

# Create validation definition
definition_name = "schema_fct_orders_definition"
validation_definition = gx.ValidationDefinition(
    data=batch_definition, suite=suite, name=definition_name
)

# Run validation
validation_results = validation_definition.run()
print(f"Validation results for {fact_table_name}:")
print(validation_results)

In [None]:
context = gx.get_context()
suite_name = "df_delivery_expectation_suite"
suite = gx.ExpectationSuite(name=suite_name)
suite = context.suites.add(suite)

# Add expectations for column existence
suite.add_expectation(
    gx.expectations.ExpectColumnToExist(column="actual_delivery_time")
)
suite.add_expectation(
    gx.expectations.ExpectColumnToExist(column="estimated_delivery_time")
)
suite.add_expectation(
    gx.expectations.ExpectColumnToExist(column="actual_delivery_time_minutes")
)
suite.add_expectation(
    gx.expectations.ExpectColumnToExist(column="estimated_delivery_time_minutes")
)

# Add expectations for column values to be non-null
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="actual_delivery_time")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="estimated_delivery_time")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="actual_delivery_time_minutes")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="estimated_delivery_time_minutes")
)

# Add expectations for column values to be positive
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeGreaterThan(
        column="actual_delivery_time_minutes", value=0
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeGreaterThan(
        column="estimated_delivery_time_minutes", value=0
    )
)

# Add expectations for column data types
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="actual_delivery_time", type_="timedelta64[ns]"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="estimated_delivery_time", type_="timedelta64[ns]"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="actual_delivery_time_minutes", type_="float"
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeOfType(
        column="estimated_delivery_time_minutes", type_="float"
    )
)

# Add expectations for column value ranges
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="actual_delivery_time_minutes", min_value=0, max_value=10000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="estimated_delivery_time_minutes", min_value=0, max_value=10000
    )
)

# Add expectations for consistency between columns
suite.add_expectation(
    gx.expectations.ExpectMulticolumnValuesToBeInSet(
        columns=["actual_delivery_time_minutes", "estimated_delivery_time_minutes"],
        condition="actual_delivery_time_minutes <= estimated_delivery_time_minutes"
    )
)

# Validate the DataFrame
batch_definition = data_asset.add_batch_definition_whole_dataframe("df_delivery_batch")
batch_parameters = {"dataframe": df_fact_table}
batch = batch_definition.get_batch(batch_parameters=batch_parameters)

validation_definition = gx.ValidationDefinition(
    data=batch, suite=suite, name="df_delivery_validation"
)
validation_results = validation_definition.run()
print(validation_results)