In [25]:
from snowflake.snowpark.session import Session
from configs.config import snowflake_conn_prop_local as snowflake_conn_prop
session = Session.builder.configs(snowflake_conn_prop).create()

In [32]:
from snowflake.snowpark.functions import col
import pandas as pd
import json

def validate_table_columns(table_name):
    # Assuming 'session' is a predefined Spark session
    df_sql = session.table("CITIBIKE_2.VALIDATION.EXPECTATIONS").filter(col("TABLE_NAME") == table_name.upper())
    data = df_sql.collect()
    df = pd.DataFrame(data)  
  
   # Check if the necessary columns exist and the DataFrame is not empty
    if all(column in df.columns for column in ['COLUMN_NAME', 'EXPECTATION', 'PARAMETERS']) and not df.empty:
        # Iterate over each row in the DataFrame
        for _, row in df.iterrows():
            column_value = row['COLUMN_NAME']
            expectation = row['EXPECTATION']
            parameter = row['PARAMETERS']
            parameter_dict = json.loads(parameter)

            # Check for table-level expectations
            if column_value == "NONE":
                # Process table-level expectations
                if expectation == 'expect_table_row_count_to_be_between':
                    min_value = parameter_dict.get("min")
                    max_value = parameter_dict.get("max")
                    print(f"Validating the table '{table_name}' with expectation '{expectation}' and parameters (Min: {min_value}, Max: {max_value}).")
                else:
                    # Handle other table-level expectations if necessary
                    print(f"Encountered an unrecognized table-level expectation type '{expectation}' for table '{table_name}'.")
            else:
                # Process column-level expectations            
                if expectation in ['expect_column_min_to_be_between', 'expect_column_mean_to_be_between']:
                    # For expectations with min and max values
                    min_value = parameter_dict.get("min")
                    max_value = parameter_dict.get("max")
                    print(f"Validating the column '{column_value}' with expectation '{expectation}' and parameters (Min: {min_value}, Max: {max_value}).")
                elif expectation == 'expect_column_values_to_be_in_set':
                    # For expectations with a set of expected values
                    expected_values = parameter_dict.get("expectedValues", [])
                    print(f"Validating the column '{column_value}' with expectation '{expectation}' and expected values {expected_values}.")              
                else:
                    # Handle other types of expectations if necessary
                    print(f"Encountered an unrecognized expectation type '{expectation}' for column '{column_value}'.")
    else:
        print("The DataFrame is either empty or does not contain the necessary columns.")

In [33]:
validate_table_columns("CONTACTS")

Validating the column 'AGE' with expectation 'expect_column_mean_to_be_between' and parameters (Min: 30, Max: 32141253312).
Validating the table 'CONTACTS' with expectation 'expect_table_row_count_to_be_between' and parameters (Min: 0, Max: 10).
