In [None]:
# Complete Professional Workflow for Great Expectations
# This demonstrates the logical flow: Data Assistants -> Validation -> Checkpoints -> Documentation

import great_expectations as gx
import pandas as pd
import json
from pathlib import Path
from datetime import datetime


In [None]:
# Initialize context and output directories
context = gx.get_context()

# Step 1: Connect to Data Sources
# PostgreSQL connection
connection_string = "postgresql+psycopg2://try_gx:try_gx@postgres.workshops.greatexpectations.io/gx_example_db"
try:
    postgres_source = context.get_datasource('postgres_professional')
except:
    postgres_source = context.sources.add_postgres('postgres_professional', connection_string=connection_string)

try:
    postgres_asset = postgres_source.get_asset('nyc_taxi_professional')
except:
    postgres_asset = postgres_source.add_table_asset(name='nyc_taxi_professional', table_name='nyc_taxi_data')

postgres_batch_request = postgres_asset.build_batch_request()
postgres_batch_list = postgres_asset.get_batch_list_from_batch_request(postgres_batch_request)
postgres_batch = postgres_batch_list[0]

# Pandas data source
# df = pd.read_csv("https://raw.githubusercontent.com/great-expectations/gx_tutorials/main/data/yellow_tripdata_sample_2019-01.csv")
# try:
#     pandas_source = context.get_datasource('pandas_professional')
# except:
#     pandas_source = context.sources.add_pandas('pandas_professional')

# try:
#     pandas_asset = pandas_source.get_asset('taxi_dataframe_professional')
# except:
#     pandas_asset = pandas_source.add_dataframe_asset(name='taxi_dataframe_professional')

# pandas_batch_request = pandas_asset.build_batch_request(dataframe=df)
# pandas_batch_list = pandas_asset.get_batch_list_from_batch_request(pandas_batch_request)
# pandas_batch = pandas_batch_list[0]

print("Data sources connected successfully")
print(f"PostgreSQL batch: {postgres_batch.id}")
# print(f"Pandas batch: {pandas_batch.id}")

"""
## Data Assistants in Great Expectations

Data Assistants in Great Expectations are tools designed to automate the process of profiling and understanding your data. They analyze datasets to generate useful insights and create initial sets of expectations, which are rules or assertions about your data's properties.

**Main types of Data Assistants:**
- **Missingness Data Assistant:** Profiles your data to detect missing values and suggests expectations related to nullity and completeness for each column.
- **Onboarding Data Assistant:** Provides a broad overview of your dataset, including data types, cardinality, and basic statistics, and generates a comprehensive set of expectations to help you get started with data quality monitoring.

Data Assistants are typically run before manual validation, as they help you quickly establish a baseline of data quality checks and understand the structure and issues in your data.

For more details, see the [Great Expectations documentation](https://docs.greatexpectations.io/docs/oss/guides/expectations/data_assistants/overview/) (Great Expectations, n.d.).

**Reference:**  
Great Expectations. (n.d.). Data Assistants Overview. https://docs.greatexpectations.io/docs/oss/guides/expectations/data_assistants/overview/
"""


In [None]:
# Step 2: Run Data Assistants FIRST (before any validation)
# Create expectation suites for automatic profiling
postgres_profiling_suite = context.add_expectation_suite('postgres_auto_profiling')
# pandas_profiling_suite = context.add_expectation_suite('pandas_auto_profiling')

# Create validators for Data Assistants
postgres_validator = context.get_validator(
    batch_request=postgres_batch_request,
    expectation_suite_name='postgres_auto_profiling'
)

# pandas_validator = context.get_validator(
#     batch_request=pandas_batch_request,
#     expectation_suite_name='pandas_auto_profiling'
# )

# Run Data Assistants
print("\nRunning Data Assistants for automatic profiling...")

# Missingness Data Assistant for PostgreSQL
postgres_missingness_result = context.assistants.missingness.run(
    validator=postgres_validator,
    exclude_column_names=['pickup_datetime', 'dropoff_datetime']
)
postgres_missingness_suite = postgres_missingness_result.get_expectation_suite(
    expectation_suite_name='postgres_missingness_final'
)
context.save_expectation_suite(postgres_missingness_suite)

# Onboarding Data Assistant for PostgreSQL
postgres_onboarding_result = context.assistants.onboarding.run(
    validator=postgres_validator,
    exclude_column_names=['pickup_datetime', 'dropoff_datetime']
)
postgres_onboarding_suite = postgres_onboarding_result.get_expectation_suite(
    expectation_suite_name='postgres_onboarding_final'
)
context.save_expectation_suite(postgres_onboarding_suite)

# # Missingness Data Assistant for Pandas
# pandas_missingness_result = context.assistants.missingness.run(
#     validator=pandas_validator,
#     exclude_column_names=['pickup_datetime', 'dropoff_datetime']
# )
# pandas_missingness_suite = pandas_missingness_result.get_expectation_suite(
#     expectation_suite_name='pandas_missingness_final'
# )
# context.save_expectation_suite(pandas_missingness_suite)

# # Onboarding Data Assistant for Pandas
# pandas_onboarding_result = context.assistants.onboarding.run(
#     validator=pandas_validator,
#     exclude_column_names=['pickup_datetime', 'dropoff_datetime']
# )
# pandas_onboarding_suite = pandas_onboarding_result.get_expectation_suite(
#     expectation_suite_name='pandas_onboarding_final'
# )
# context.save_expectation_suite(pandas_onboarding_suite)


print(f"- PostgreSQL Missingness: {len(postgres_missingness_suite.expectations)} expectations")
print(f"- PostgreSQL Onboarding: {len(postgres_onboarding_suite.expectations)} expectations")
# print(f"- Pandas Missingness: {len(pandas_missingness_suite.expectations)} expectations")
# print(f"- Pandas Onboarding: {len(pandas_onboarding_suite.expectations)} expectations")


In [None]:
# Step 3: Add Manual Business Rules
manual_suite = context.add_expectation_suite('manual_business_rules_professional')
manual_validator = context.get_validator(
    batch_request=postgres_batch_request,
    expectation_suite_name='manual_business_rules_professional'
)

# Add specific business expectations
manual_validator.expect_column_values_to_be_between(
    column='passenger_count', min_value=1, max_value=6
)
manual_validator.expect_column_values_to_be_between(
    column='fare_amount', min_value=0
)
manual_validator.expect_column_values_to_not_be_null(
    column='trip_distance'
)
manual_validator.expect_column_values_to_be_between(
    column='trip_distance', min_value=0, max_value=100
)

manual_validator.save_expectation_suite()
manual_suite = context.get_expectation_suite('manual_business_rules_professional')
print(f"\nManual business rules added: {len(manual_suite.expectations)} expectations")

In [None]:
# Step 4: Create and Run Checkpoints
print("\nCreating and running checkpoints...")

checkpoint_results = {}

# Define all checkpoints to run
checkpoints_to_run = [
    ('postgres_missingness_cp', 'postgres_missingness_final', postgres_batch_request, postgres_source.name, postgres_asset.name),
    ('postgres_onboarding_cp', 'postgres_onboarding_final', postgres_batch_request, postgres_source.name, postgres_asset.name),
    # ('pandas_missingness_cp', 'pandas_missingness_final', pandas_batch_request, pandas_source.name, pandas_asset.name),
    # ('pandas_onboarding_cp', 'pandas_onboarding_final', pandas_batch_request, pandas_source.name, pandas_asset.name),
    ('manual_business_cp', 'manual_business_rules_professional', postgres_batch_request, postgres_source.name, postgres_asset.name)
]

# Create and run each checkpoint
for cp_name, suite_name, batch_req, source_name, asset_name in checkpoints_to_run:
    checkpoint = context.add_or_update_checkpoint(
        name=cp_name,
        config_version=1.0,
        class_name='Checkpoint',
        run_name_template=f'%Y%m%d-%H%M%S-{cp_name}',
        expectation_suite_name=suite_name,
        batch_request={
            'datasource_name': source_name,
            'data_asset_name': asset_name
        },
        action_list=[
            {
                'name': 'store_validation_result',
                'action': {'class_name': 'StoreValidationResultAction'}
            },
            {
                'name': 'update_data_docs',
                'action': {'class_name': 'UpdateDataDocsAction'}
            }
        ]
    )
    
    result = context.run_checkpoint(checkpoint_name=cp_name)
    checkpoint_results[cp_name] = result

print("\nCheckpoint Results:")
for name, result in checkpoint_results.items():
    status = "PASSED" if result.success else "FAILED"
    print(f"  {name}: {status}")

In [None]:
# Step 5: Save Results and Generate Report
results_summary = {
    "generation_time": datetime.now().isoformat(),
    "data_sources": {
        "postgres": {"source": postgres_source.name, "asset": postgres_asset.name},
        # "pandas": {"source": pandas_source.name, "asset": pandas_asset.name}
    },
    "expectation_suites": {
        "postgres_missingness": len(postgres_missingness_suite.expectations),
        "postgres_onboarding": len(postgres_onboarding_suite.expectations),
        # "pandas_missingness": len(pandas_missingness_suite.expectations),
        # "pandas_onboarding": len(pandas_onboarding_suite.expectations),
        "manual_business_rules": len(manual_suite.expectations)
    },
    "checkpoint_results": {name: result.success for name, result in checkpoint_results.items()}
}

In [None]:
# Step 6: Build and Open Data Docs
print("\nBuilding Data Docs...")
context.build_data_docs()

# Open Data Docs in browser
print("\nOpening Data Docs in browser...")
context.open_data_docs()

print("\nWorkflow complete! Data Docs are now open in your browser.")