In [93]:
# !pip install --upgrade pip

In [94]:
# !pip install pyspark great_expectations==1.1.3

In [95]:
import great_expectations as gx
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, TimestampType
from datetime import datetime
import re

In [96]:
print(f'Great Expectations version {gx.__version__} is installed')

class ExpectationGenerator:
    def __init__(self, rule: dict) -> None:
        self.rule = rule
        self.fieldName = rule['field_name']
        self.meta = dict(
            expectation=rule['expectation'],
            action=rule['expectation_action'],
            operator=rule['operator']
        )
        
    def required(self):
        return gx.expectations.ExpectColumnValuesToNotBeNull(
            column=self.fieldName,
            meta=self.meta
        )
    
    def choice(self, *value_set):
        value_set = list(value_set)
        return gx.expectations.ExpectColumnValuesToBeInSet(
            column=self.fieldName,
            value_set=value_set,
            meta=self.meta
        )
    
    def string_format(self, regex: str):
        return gx.expectations.ExpectColumnValuesToMatchRegex(
            column=self.fieldName,
            regex=regex,
            meta=self.meta
        )

Great Expectations version 1.0.3 is installed


In [97]:
spark = SparkSession.builder.appName("GE_Test").getOrCreate()

In [98]:
def create_sample_data():
    schema = StructType([
        StructField("user_id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("signup_date", TimestampType(), True)
    ])

    data = [
        (1, "John Doe", "john@email.com", 25, datetime(2023, 1, 1)),
        (2, "Jensen Nguyen", "jane@email.com", 30, datetime(2023, 2, 1)),
        (3, "Bob Smith", "invalid-email", -5, datetime(2024, 1, 1)),
        (4, "Alice Brown", "alice@email.com", 40, datetime(2023, 3, 1)),
        (5, "", None, None, None),
        (6, "Jensen Nguyen 2", None, 30, datetime(2023, 2, 1))
    ]
    return spark.createDataFrame(data, schema=schema)

# Create test data
df = create_sample_data()

df.show(truncate=False)

+-------+---------------+---------------+----+-------------------+
|user_id|name           |email          |age |signup_date        |
+-------+---------------+---------------+----+-------------------+
|1      |John Doe       |john@email.com |25  |2023-01-01 00:00:00|
|2      |Jensen Nguyen  |jane@email.com |30  |2023-02-01 00:00:00|
|3      |Bob Smith      |invalid-email  |-5  |2024-01-01 00:00:00|
|4      |Alice Brown    |alice@email.com|40  |2023-03-01 00:00:00|
|5      |               |null           |null|null               |
|6      |Jensen Nguyen 2|null           |30  |2023-02-01 00:00:00|
+-------+---------------+---------------+----+-------------------+



In [99]:
# Initialize Great Expectations context
context = gx.get_context()

# Create rules for testing
rules = [
    {
        'field_name': 'name',
        'expectation': 'required',
        'expectation_action': 'warn',
        'operator': 'not_null'
    },
    {
        'field_name': 'email',
        'expectation': 'required and string_format(^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$)',
        'expectation_action': 'warn',
        'operator': 'not_null'
    }
]

# Create and add expectations
suite = context.suites.add(
    gx.core.expectation_suite.ExpectationSuite(
        name="new_test_suite"
    )
)

# Before adding expectations
print('suite', suite)

suite {
  "name": "new_test_suite",
  "id": "53778fc4-ae6a-45da-920e-f50532b0bb8e",
  "expectations": [],
  "meta": {
    "great_expectations_version": "1.0.3"
  },
  "notes": null
}


In [100]:
# for rule in rules:
#     generator = ExpectationGenerator(rule)
#     if 'required' in rule['expectation']:
#         expectation = generator.required()
#     elif 'string_format' in rule['expectation']:
#         regex = rule['expectation'].split('(')[1].rstrip(')')
#         expectation = generator.string_format(regex)
#     suite.add_expectation(expectation)
    
# # After adding expectations
# print('suite', suite)

for rule in rules:
    generator = ExpectationGenerator(rule)
    raw_expectation = rule['expectation']
    
    # Add required validation if present
    if 'required' in raw_expectation:
        suite.add_expectation(generator.required())
    
    # Add string format validation if present
    if 'string_format' in raw_expectation:
        pattern = r'string_format\((.*?)\)'
        match = re.search(pattern, raw_expectation)
        if match:
            regex = match.group(1)
            suite.add_expectation(generator.string_format(regex))

In [101]:
# Create batch definition
batch_definition = context.data_sources.add_spark(
    name="my_spark_datasource"
).add_dataframe_asset(
    name="my_data_asset"
).add_batch_definition_whole_dataframe(
    name="my_batch"
)

# print('batch_definition', batch_definition)

In [102]:
# Create validation definition
validation_definition = context.validation_definitions.add(
    gx.core.validation_definition.ValidationDefinition(
        name="my_validation",
        data=batch_definition,
        suite=suite
    )
)

# After creating validation definition
# print('validation_definition', validation_definition)

In [103]:
# Create and run checkpoint
checkpoint = context.checkpoints.add(
    gx.checkpoint.checkpoint.Checkpoint(
        name="my_checkpoint",
        validation_definitions=[validation_definition]
    )
)

results = checkpoint.run(
    batch_parameters={"dataframe": df}
)

# Print results
print("\nValidation Results:")
print("==================")

# # Get the first (and only) validation result from run_results
# validation_result = list(results.run_results.values())[0]
# print("\nDetailed Results:")
# for result in validation_result['results']:
#     print(f"\nExpectation: {result['expectation_config']['type']}")
#     print(f"Column: {result['expectation_config']['kwargs']['column']}")
#     print(f"Success: {result['success']}")
#     print(f"Unexpected count: {result['result'].get('unexpected_count', 0)}")
#     if not result['success']:
#         print(f"Unexpected values: {result['result'].get('partial_unexpected_list', [])}")

Calculating Metrics:   0%|          | 0/22 [00:00<?, ?it/s]


Validation Results:


In [106]:
# Get the validation results and create a DataFrame
validation_result = list(results.run_results.values())[0]
results_data = []

for result in validation_result['results']:
    result_dict = {
        'expectation_type': result['expectation_config']['type'],
        'column': result['expectation_config']['kwargs']['column'],
        'status': result['success'],
        'unexpected_count': result['result'].get('unexpected_count', 0)
    }
    
    # Add unexpected values and user_ids if present and the validation failed
    if not result['success']:
        unexpected_values = result['result'].get('partial_unexpected_list', [])
        
        # Get the user_ids for failed records by matching the values
        column_name = result['expectation_config']['kwargs']['column']
        if result['expectation_config']['type'] == 'expect_column_values_to_not_be_null':
            # For null values
            user_ids = df.filter(F.col(column_name).isNull()).select('user_id').rdd.map(lambda x: x[0]).collect()
        else:
            # For invalid values (like invalid-email)
            user_ids = df.filter(F.col(column_name).isin(unexpected_values)).select('user_id').rdd.map(lambda x: x[0]).collect()
        
        # Create a row for each unexpected value and user_id pair
        for unexpected_value, user_id in zip(unexpected_values, user_ids):
            row_dict = result_dict.copy()
            row_dict['unexpected_value'] = unexpected_value
            row_dict['user_id'] = user_id
            results_data.append(row_dict)
    else:
        # For successful validations, add a single row with empty values
        result_dict['unexpected_value'] = None
        result_dict['user_id'] = None
        results_data.append(result_dict)

# Create DataFrame from results
results_df = pd.DataFrame(results_data)

# Display the results DataFrame
print("\nResults DataFrame:")
results_df


Results DataFrame:


Unnamed: 0,expectation_type,column,status,unexpected_count,unexpected_value,user_id
0,expect_column_values_to_not_be_null,name,True,0,,
1,expect_column_values_to_not_be_null,email,False,2,,5.0
2,expect_column_values_to_not_be_null,email,False,2,,6.0
3,expect_column_values_to_match_regex,email,False,1,invalid-email,3.0
