In [None]:
# Import libraries
import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
import pandas as pd
import os

print(f"Great Expectations version: {gx.__version__}")

## 1. Initialize Great Expectations Context

In [None]:
# Get context
context = gx.get_context()

# List available datasources
print("\nAvailable datasources:")
for name in context.list_datasources():
    print(f"  - {name['name']}")

## 2. Create a Simple Expectation Suite

In [None]:
# Create expectation suite
suite_name = "example_suite"

try:
    suite = context.get_expectation_suite(suite_name)
    print(f"Loaded existing suite: {suite_name}")
except:
    suite = context.create_expectation_suite(
        expectation_suite_name=suite_name,
        overwrite_existing=True
    )
    print(f"Created new suite: {suite_name}")

print(f"\nExpectations in suite: {len(suite.expectations)}")

## 3. Add Expectations

Define data quality rules for your tables

In [None]:
# Example expectations
expectations = [
    {
        "expectation_type": "expect_table_row_count_to_be_between",
        "kwargs": {
            "min_value": 1,
        }
    },
    {
        "expectation_type": "expect_column_values_to_not_be_null",
        "kwargs": {
            "column": "id"  # Replace with your column
        }
    },
    {
        "expectation_type": "expect_column_values_to_be_unique",
        "kwargs": {
            "column": "id"  # Replace with your column
        }
    }
]

for expectation in expectations:
    suite.add_expectation(**expectation)

context.save_expectation_suite(suite)
print(f"Added {len(expectations)} expectations to suite")

## 4. Validate Data with Trino Datasource

Query Iceberg tables through Trino and validate

In [None]:
# Example: Create a batch request for Trino
# Replace 'your_schema.your_table' with actual table

batch_request = RuntimeBatchRequest(
    datasource_name="trino_datasource",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="iceberg.default.example_table",  # Change to your table
    runtime_parameters={"query": "SELECT * FROM iceberg.default.example_table LIMIT 1000"},
    batch_identifiers={"default_identifier_name": "example_batch"}
)

# Note: This will fail if the table doesn't exist
# Uncomment to run:
# validator = context.get_validator(
#     batch_request=batch_request,
#     expectation_suite_name=suite_name
# )
# results = validator.validate()
# print(results)

## 5. Create a Checkpoint for Automation

In [None]:
# Create checkpoint
checkpoint_name = "example_checkpoint"

checkpoint_config = {
    "name": checkpoint_name,
    "config_version": 1.0,
    "class_name": "SimpleCheckpoint",
    "run_name_template": "%Y%m%d-%H%M%S-example",
}

try:
    context.add_checkpoint(**checkpoint_config)
    print(f"Created checkpoint: {checkpoint_name}")
except:
    print(f"Checkpoint {checkpoint_name} already exists")

# List checkpoints
print("\nAvailable checkpoints:")
for cp in context.list_checkpoints():
    print(f"  - {cp}")

## 6. Generate Data Documentation

In [None]:
# Build data docs
context.build_data_docs()

print("Data documentation generated!")
print("\nAccess data docs at:")
print("  Local: /opt/great_expectations/uncommitted/data_docs/local_site/index.html")
print("  S3: Check MinIO console at http://localhost:9001")

## 7. Test PostgreSQL Connection

In [None]:
# Test PostgreSQL datasource
from sqlalchemy import create_engine
import os

# Get credentials from environment
pg_host = os.getenv('POSTGRES_HOST', 'postgresql')
pg_port = os.getenv('POSTGRES_PORT', '5432')
pg_db = os.getenv('POSTGRES_DB', 'datalyptica')
pg_user = os.getenv('POSTGRES_USER', 'datalyptica')
pg_pass = os.getenv('POSTGRES_PASSWORD', 'datalyptica123')

conn_string = f"postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}"
engine = create_engine(conn_string)

# Test connection
with engine.connect() as conn:
    result = conn.execute("SELECT version()")
    version = result.fetchone()[0]
    print(f"PostgreSQL version: {version}")
    
print("\n✅ PostgreSQL connection successful!")

## 8. Test Trino Connection

In [None]:
# Test Trino datasource
from trino.dbapi import connect
import os

trino_host = os.getenv('TRINO_HOST', 'trino')
trino_port = int(os.getenv('TRINO_PORT', '8080'))

conn = connect(
    host=trino_host,
    port=trino_port,
    user='admin',
    catalog='iceberg',
    schema='default'
)

cursor = conn.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()[0]
print(f"Trino version: {version}")

# List schemas
cursor.execute("SHOW SCHEMAS FROM iceberg")
schemas = cursor.fetchall()
print(f"\nAvailable Iceberg schemas: {len(schemas)}")
for schema in schemas:
    print(f"  - {schema[0]}")

print("\n✅ Trino connection successful!")

## 9. Example: Validate a Sample DataFrame

In [None]:
# Create sample data
sample_df = pd.DataFrame({
    'id': range(1, 101),
    'name': [f'User_{i}' for i in range(1, 101)],
    'age': [20 + (i % 50) for i in range(1, 101)],
    'email': [f'user{i}@example.com' for i in range(1, 101)]
})

print(f"Sample DataFrame shape: {sample_df.shape}")
print("\nFirst few rows:")
print(sample_df.head())

# Create runtime batch request for pandas dataframe
from great_expectations.core.batch import RuntimeBatchRequest

runtime_batch_request = RuntimeBatchRequest(
    datasource_name="s3_pandas_datasource",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="sample_data",
    runtime_parameters={"batch_data": sample_df},
    batch_identifiers={"default_identifier_name": "sample_batch"}
)

print("\n✅ Sample data created for validation")

## Next Steps

1. **Create your own expectation suites** for specific tables
2. **Set up checkpoints** for automated validation
3. **Integrate with CI/CD** pipelines
4. **Schedule validations** using Airflow or cron
5. **Explore data docs** for insights and documentation

## Resources

- [Great Expectations Documentation](https://docs.greatexpectations.io/)
- [Expectation Gallery](https://greatexpectations.io/expectations/)
- Datalyptica Documentation: `/opt/great_expectations/config/`