# **Validating Data with Great Expectations**

## **Install & Set up Great Expectations**

In [None]:
# Set up the environment with the required libraries

import great_expectations as gx

import sys
import os
import pandas as pd
import numpy as np

from sqlalchemy import create_engine
from sqlalchemy import text

from datetime import datetime
from dotenv import load_dotenv

In [None]:
# Load libraries that require loading

load_dotenv()

In [None]:
# Retrieve Data Context

path_to_context_root_dir = 'C:/Users/xxx/Documents/Projects/xxx/great_expectations'

fact_trades_context = gx.get_context(context_root_dir = path_to_context_root_dir)

fact_trades_context

In [None]:
# Connect to DataSource
pg_datasource_add = os.getenv("POSTGRES_ADDRESS")
pg_datasource_db = os.getenv("DATABASE")
pg_datasource_name = os.getenv("POSTGRES_USERNAME")
pg_datasource_pw = os.getenv("POSTGRES_PASSWORD")

postgresql_connection_string = f"postgresql+psycopg2://{pg_datasource_name}:{pg_datasource_pw}@{pg_datasource_add}/{pg_datasource_db}"

#### **DataAsset From SQL Source**

In [None]:
#create a connection_string to enable secure connection to the PostgreSQL instance
postgresql_connection_string

#instantiate a Datasource which is a representation of the data available in the PostgreSQL database
pg_datasource = fact_trades_context.sources.add_or_update_postgres(name = "datateam_datasource",
                                                       connection_string = postgresql_connection_string)
pg_datasource

In [None]:
# Add Data Assets (a discrete set of data) to the Data Context. That is, pick out the table (or query) of interest, for the analysis

data_asset_fact_trades = pg_datasource.add_table_asset(name = "fact_trades", table_name = "fact_trades", schema_name = "public")

#### **DataAsset From Pandas Source**

In [None]:
# However, for this project, we will pull the postgres table into a pandas dataframe first, then create the data asset from that

#create the connection engine
engine = create_engine(postgresql_connection_string)
conn = engine.connect().execution_options(stream_results=True)

In [None]:
#read the postgres table into a pandas dataframe
df_fact_trades = pd.read_sql_table( "fact_trades", conn, "public")

In [None]:
df_fact_trades.head()

In [None]:
#add the pandas dataframe to the data context as a data asset
pandas_datasource = fact_trades_context.sources.add_pandas(name= "pandas_fact_trades_datasource")
df_name = 'pandas_fact_trades'

data_asset_pandas_fact_trades = pandas_datasource.add_dataframe_asset(name = df_name)

In [None]:
#pull already defined pandas datasource data asset
data_asset_pandas_fact_trades = fact_trades_context.get_datasource("pandas_fact_trades_datasource").get_asset("pandas_fact_trades")
data_asset_pandas_fact_trades

In [None]:
#view Data Asset(s) in the context
# all_dataassets = fact_trades_context.get_available_data_asset_names("pandas_fact_trades_datasource")
all_dataassets = fact_trades_context.get_available_data_asset_names()
all_dataassets

## **Create Expectation Suite**

### **Build Batch (Batch Requests)**

#### **1. SQL Source**

In [None]:
batch_request_fact_trades = data_asset_fact_trades.build_batch_request()
batch_request_fact_trades

In [None]:
# The options dictionary can be used to limit the Batches returned by a Batch Request. Omitting the options dictionary will result in all available Batches being returned. (Like subseting the data)

# To inspect the allowed keys for the options dictionary, you can do the following:

# options = data_asset_fact_trades.batch_request_options
# print(options)

In [None]:
# Vverify that the correct Batches were returned
batches_fact_trades = data_asset_fact_trades.get_batch_list_from_batch_request(batch_request_fact_trades)

# Because Batch definitions are quite verbose, it is easiest to determine what data the Batch Request
# Will return by printing just the batch_spec of each Batch.
for batch in batches_fact_trades:
    print(batch.batch_spec)

#### **2. Pandas Source**

In [None]:
# Build batch
batch_request_pandas_fact_trades = data_asset_pandas_fact_trades.build_batch_request(dataframe = df_fact_trades)
batch_request_pandas_fact_trades

In [None]:
# Verify that the correct Batches were returned
batches_pandas_fact_trades = data_asset_pandas_fact_trades.get_batch_list_from_batch_request(batch_request_pandas_fact_trades)

# Because Batch definitions are quite verbose, it is easiest to determine what data the Batch Request
# Will return by printing just the batch_spec of each Batch.
for batch in batches_pandas_fact_trades:
    print(batch.batch_spec)

### **Auto-Generate Expectations**

#### **1. Data Assistant**

In [None]:
# Define unnecessary columns to be excluded by the data assistant
exclude_column_fact_trades = [column1, column5, column7, column9]

In [None]:
data_assistant_result_fact_trades = fact_trades_context.assistants.onboarding.run(
    batch_request = batch_request_pandas_fact_trades,
    exclude_column_names = exclude_column_fact_trades
)

##### *View Auto-Generated Expectation Suite*

In [None]:
data_assistant_result_fact_trades

In [None]:
# To see all Metrics computed by the Onboarding Data Assistant
metrics_fact_trades = data_assistant_result_fact_trades.metrics_by_domain
metrics_fact_trades

In [None]:
# Plot metrics
metrics_plot_fact_trades = data_assistant_result_fact_trades.plot_metrics()

In [None]:
# View auto created expectations grouped by Expectation Type

exp_by_exptype_fact_trades = data_assistant_result_fact_trades.show_expectations_by_expectation_type(
   expectation_suite_name = data_assistant_result_fact_trades)

In [None]:
# View auto created expectations grouped by Domain

exp_by_domain_fact_trades = data_assistant_result_fact_trades.show_expectations_by_domain_type(
    expectation_suite_name = data_assistant_result_fact_trades)

In [None]:
# Where applicable - plot the expectations produced and the associated metrics calculated by the onboarding data assistant
expectation_metrics_plot_fact_trades = data_assistant_result_fact_trades.plot_expectations_and_metrics()

# !!! Note !!!

# If no Expectation was produced by the Data Assistant for a given Metric,
# neither the Expectation nor the Metric will be visualized by the plot_expectations_and_metrics() method.

#### **2. Pandas Profiler**

In [None]:
# Connect sqlalchemy to postgresql
engine = create_engine(postgresql_connection_string)
conn = engine.connect().execution_options(stream_results=True)

In [None]:
pandas_fact_trades = pd.read_sql_table('fact_trades', engine.connect(), schema = 'transactions')

In [None]:
pandas_fact_trades.describe(include='all')

In [None]:
profiler_fact_trades = ProfileReport(pandas_fact_trades)
profiler_fact_trades

### **Define Expectation Suite**

In [None]:
# Prepare a new expectation suite, including the name of the expectation suite only

expectation_suite_name_fact_trades = "fact_trades_expectations"

expectation_suite_fact_trades = fact_trades_context.add_or_update_expectation_suite(expectation_suite_name = expectation_suite_name_fact_trades)

In [None]:
# Instantiate Validator
validator_fact_trades = fact_trades_context.get_validator(batch_request = batch_request_fact_trades,
                                                                      expectation_suite_name = expectation_suite_name_fact_trades)

In [None]:
validator_fact_trades.head()

In [None]:
validator_fact_trades.columns()

#### **Table Expectations**

In [None]:
table_expectations = [
    validator_fact_trades.expect_table_columns_to_match_set(column_set = ['transaction_price','discount','client_type','user_type',
                                                                          'client_id','total_weight_kg','currency','currency_code',
                                                                          'client_country_code','client_country','system_name','request_id',
                                                                          'requested_volume','client_email','fee_per_trade','actual_trade_vol_mt'
                                                                          'is_deleted','transaction_units','vat_value','transaction_type',
                                                                          'actual_trade_value','trade_date','client_phone','discount_type',
                                                                          'deduction','client_fullname','asset_code','trade_status',
                                                                          'units_volume','requested_units','transaction_id','asset_name','asset_location']),
]

#### **Columns Expectations**

##### **1. In-Built Expectations**

In [None]:
# expect_column_values_to_not_be_null

non_null_column = ['transaction_price',
                   'client_id',
                   'currency',
                   'currency_code',
                   'client_country_code',
                   'client_country',
                   'trade_is_on_behalf',
                   'requested_price',
                   'asset_type',
                   'system_name',
                   'request_id',
                   'requested_volume',
                   'client_email',
                   'fee_per_trade',
                   'is_deleted',
                   'transaction_type',
                   'trade_date',
                   'client_phone',
                   'deduction',
                   'client_fullname',
                   'asset_code',
                   'trade_status',
                   'units_volume',
                   'requested_units',
                   'client_type',
                   'user_type',
                   'transaction_id',
                   'asset_name',
                   'asset_location',
                   'deal_timestamp',
                   'trade_timestamp'
                   ]

for column in non_null_column:
    validator_fact_trades.expect_column_values_to_not_be_null(column = column)

In [None]:
# expect_column_values_to_not_be_null (conditional)

non_null_column_cond_matched = ['total_weight_kg','actual_trade_vol_mt','transaction_units',
                                'vat_value','actual_trade_value',]

non_null_column_cond_delivered = ['processed_inventory','is_contract_note_sent',]

non_null_column_cond_sec_type = ['asset_location',]


for column in non_null_column_cond_matched:
    validator_fact_trades.expect_column_values_to_not_be_null(column = column,
                                                                    condition_parser = 'great_expectations__experimental__',
                                                                    row_condition = 'col("trade_status") == "Merged"')
for column in non_null_column_cond_delivered:
    validator_fact_trades.expect_column_values_to_not_be_null(column = column,
                                                                    condition_parser = 'great_expectations__experimental__',
                                                                    row_condition = 'col("asset_type")=="Options" & col("")=="Merged"')
    
for column in non_null_column_cond_sec_type:
    validator_fact_trades.expect_column_values_to_not_be_null(column = column,
                                                                    condition_parser = 'great_expectations__experimental__',
                                                                    row_condition = 'col("asset_type")=="Futures" | col("asset_type")=="Options"')

In [None]:
# expect_column_values_to_be_unique

unique_column = ['transaction_id']

for column in unique_column:
    validator_fact_trades.expect_column_values_to_be_unique(column = column)

In [None]:
# expect_column_values_to_be_of_type

text_column = ['client_type','user_type','currency','asset_type','system_name','request_id',
               'transaction_type','asset_code','trade_status','transaction_id','asset_name',]

int_column = ['requested_volume','units_volume','requested_units',]

float_column = ['transaction_price','requested_price','fee_per_trade',]

bool_column = ['trade_is_on_behalf','is_deleted',]

timestamp_column = ['trade_date','deal_timestamp','trade_timestamp']


for column in text_column:
    validator_fact_trades.expect_column_values_to_be_of_type(column = column,
                                                                   type_ = 'TEXT', #'STR'
                                                                   mostly = 1.0)

for column in int_column:
    validator_fact_trades.expect_column_values_to_be_of_type(column = column,
                                                                   type_ = 'INTEGER', #'INT'
                                                                   mostly = 1.0)

for column in float_column:
    validator_fact_trades.expect_column_values_to_be_of_type(column = column,
                                                                   type_ = 'FLOAT',
                                                                   mostly = 1.0)

for column in bool_column:
    validator_fact_trades.expect_column_values_to_be_of_type(column = column,
                                                                   type_ = 'BOOLEAN', #'BOOL'
                                                                   mostly = 1.0)

for column in timestamp_column:
    validator_fact_trades.expect_column_values_to_be_of_type(column = column,
                                                                   type_ = 'TIMESTAMP',
                                                                   mostly = 1.0)

# Hashed out are case where pandas datasource is used

In [None]:
# expect_column_values_to_match_regex

tids = ['transaction_id']

emails = ['client_email']

# change for transaction_id
for column in tids:
    validator_fact_trades.expect_column_values_to_match_regex(column = column,
                                                                    regex = '^\d{12}$',
                                                                    mostly = 1.0)

for column in emails:
    validator_fact_trades.expect_column_values_to_match_regex(column = column,
                                                                    regex = '^[a-zA-Z0-9.!#$%&*+/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)*$',
                                                                    mostly = 1.0)

In [None]:
# expect_column_values_to_be_between

now = datetime.now()

value_range_column = {"trade_date": [now, pd.to_datetime('2020-01-09'),],
                      "trade_timestamp,": [now, pd.to_datetime('2020-01-06T00:00:00.000000+00:00'),],
                      "deal_timestamp,": [now, pd.to_datetime('2020-02-20T00:00:00.000000+00:00'),],}

for column, date in value_range_column.items():
    validator_fact_trades.expect_column_values_to_be_between(column = column,
                                                                   max_value = pd.to_datetime(date[0], utc=True), min_value = pd.to_datetime(date[1], utc=True),
                                                                   strict_max = True, strict_min = True,
                                                                   mostly = 1.0)

In [None]:
# expect_table_columns_to_match_set

categorical_columns = {"trade_status": ['Open','Partial-Merge','Merged','Cancelled'],
                       "currency": ['US Dollars','Pounds', 'Euro'],
                       "currency_code": ['USD', 'GBP', 'EUR'],
                       "system_name": ['BASE', 'EXTERNAL', 'NEXT'],
                       "units_volume": [10, 1000],
                       "transaction_type": ['Disperse','Purchase'],
                       "client_type": ['Intermediary','Advocate','Trader'],
                       "user_type": ['Personal','Business'],
                       "client_country": ['United States of America','Canada','Japan',
                                          'United Kingdom of Great Britain and Northern Ireland'],
                       "client_country_code": ['US','CA','JP','UK'],
                       "asset_type": ['Futures','Forwards','Options'],
                       "asset_name": ['asset1','asset2','asset3','asset4','asset5','asset6','asset7','asset8','asset9','asset10',],
                       "asset_code": ['ASS78', 'HSS87', 'JSS98', 'KSS09', 'LSS10', 'MSS11', 'NSS12', 'OSS13', 'PSS14', 'QSS15',],
                      }

for column, distinct in categorical_columns.items():
    validator_fact_trades.expect_column_values_to_be_in_set(column = column,
                                                                value_set = distinct,
                                                                 mostly = 1.0)

##### **2. Custom Expectations**

In [None]:
# Import custom expectations

from expectations.expect_column_values_to_be_between_quartile_limits_by_category import ExpectColumnValuesToBeBetweenQuartileLimitsByCategory
from expectations.expect_column_values_to_be_between_quartile_limits import ExpectColumnValuesToBeBetweenQuartileLimits

In [None]:
# expect_column_values_to_be_between_quartile_limits_by_category

quartile_limit = ['transaction_price','requested_price',]

for column in quartile_limit:
    validator_fact_trades.expect_column_values_to_be_between_quartile_limits_by_category(column_A = column,
                                                                                               column_B = 'asset_code',
                                                                                               mostly = 1.0)

In [None]:
# expect_column_values_to_be_between_quartile_limits

quartile_limit = ['transaction_price','requested_price',]

for column in quartile_limit:
    validator_fact_trades.expect_column_values_to_be_between_quartile_limits(column = column,
                                                                                   mostly = 1.0)

#### **Save Expectation Suite**

In [None]:
# Save Expectation Suite

validator_fact_trades.save_expectation_suite(discard_failed_expectations = False)

In [None]:
expectation_suite_fact_trades = fact_trades_context.get_expectation_suite(expectation_suite_name = expectation_suite_name_fact_trades)

expectation_suite_fact_trades

In [None]:
# Save the expectation suite
# In the interactive workflow, an Expectation Suite will be configured to include Expectations as they are defined, but will not be saved to an Expectation Store until you issue the command for it to be.

saved_expectation_suite_fact_trades = fact_trades_context.save_expectation_suite(expectation_suite_fact_trades, discard_failed_expectations=False)

## **Edit Expectation Suite (If required)**

In [None]:
# Import required Library

from great_expectations.core.expectation_suite import ExpectationConfiguration

In [None]:
config = [ExpectationConfiguration(expectation_type="expect_column_values_to_be_between",
                                   kwargs={"auto": True,
                                           "column": "xxx",
                                           "domain": "column",
                                           "min_value": 1,
                                           "max_value": 4,
                                           #'max_value': 6, this is what was changed, as an example
                                           "mostly": 1.0,
                                           "strict_max": False,
                                           "strict_min": False,},
                                  ),
         
         ]

In [None]:
for exp in config:
    expectation_suite.add_expectation(ExpectationConfiguration(exp))

In [None]:
or_config = [expectation_type = "expect_column_values_to_be_between",
                                   kwargs = {"auto": True,
                                           "column": "xxx",
                                           "domain": "column",
                                           "min_value": 1,
                                           "max_value": 4,
                                           #'max_value': 6, this is what was changed, as an example
                                           "mostly": 1.0,
                                           "strict_max": False,
                                           "strict_min": False},
          ]

In [None]:
for or_exp in or_config:
    expectation_suite.add_expectation(ExpectationConfiguration(or_exp))

In [None]:
# Confirm edit

config_to_search = ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_between",
    kwargs={"column": "xxx"},
)
found_expectation = expectation_suite.find_expectations(config_to_search, match_type="domain")

# This assertion will succeed because the ExpectationConfiguration has been updated.
assert found_expectation == [config]

## **Validate Data**

In [None]:
from great_expectations.checkpoint import Checkpoint

In [None]:
# Create and Store Checkpoint

checkpoint_name = "checkpoint_transactions"

checkpoint_transactions = Checkpoint(
    name = checkpoint_name,
    run_name_template = "%Y%m%d-%H%M%S-transactions",
    data_context = fact_trades_context,
    action_list = [
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {
            "name": "store_evaluation_params",
            "action": {"class_name": "StoreEvaluationParametersAction"},
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction" }
        },
        # {
            # "name": "send_email_on_validation_result",
            # "action": {"class_name": "EmailAction",
                      # "notify_on" : "failure",
                        ## put the actual following information in the uncommitted/config_variables.yml file
                       ## or pass in as environment variable
        # renderer: {module_name: great_expectations.render.renderer.email_renderer, class_name: EmailRenderer},
                      # "smtp_address": ${smtp_address},
                       # "smtp_port": ${smtp_port},
                       # "sender_login": ${email_address},
                       # "sender_password": ${sender_password},
                       # "sender_alias": ${sender_alias}, # useful to send an email as an alias
                       # "receiver_emails": ${receiver_emails},
                       # "use_tls": False,
                       # "use_ssl": True}
        # },
    ],
    validations = [
                   {"batch_request": batch_request_fact_trades,
                    "expectation_suite_name": expectation_suite_name_fact_trades}
                  ],
    # runtime_configuration = {"result_format": {"result_format": "COMPLETE",
    #                                            "unexpected_index_column_names": ["pk_column"],
    #                                            "return_unexpected_index_query": True}
    #                         }
)

In [None]:
# Create and Store Checkpoint

checkpoint_name = "checkpoint_transactions"

checkpoint_transactions = Checkpoint(name = checkpoint_name,
                                     config_version = 3.0,
                                     run_name_template = "%Y%m%d-%H%M%S-transactions",)

In [None]:
checkpoint_transactions

In [None]:
# Run checkpoint to test auto-created expectation suite and validate data

checkpoint_transactions_result = checkpoint_transactions.run()
# checkpoint_transactions_result

assert checkpoint_transactions_result["success"] is False

In [None]:
# To run validation directly from the data context(1)
# Save the checkpoint

# fact_trades_context.add_or_update_checkpoint(checkpoint = checkpoint_transactions)
checkpoint_name = "checkpoint_transactions"
fact_trades_context.add_or_update_checkpoint(name = checkpoint_name,
                                                   validator = validator_fact_trades
                                                  )

In [None]:
# To update the checkpoint

fact_trades_context.add_or_update_checkpoint(
    name = checkpoint_name_comx,
    validations = [
        {
            "batch_request": xxx,
            "expectation_suite_name": "xxx",
        },
    ],
)

In [None]:
# To run validation directly from the data context(2)

validation_transactions = fact_trades_context.run_checkpoint(
    checkpoint_name = checkpoint_name)

#view validation result


if validation_transactions["success"]:
    print("Validation suceeded!")
    sys.exit(1) # !!! GOOGLE THIS !!!
print("Validation failed!")

In [None]:
airflow_checkpoint_run_result: CheckpointResult = fact_trades_context.run_checkpoint(
    checkpoint_name = checkpoint_transactions,
    # batch_request = {
        # "runtime_parameters": {
            # "batch_data": my_data_frame,
        # },
        "data_connector_query": {
            "batch_filter_parameters": {
                "airflow_run_id": airflow_run_id,
            }
        },
    # },
    run_name=airflow_run_id,
)

In [None]:
#to view an HTML representation of the checkpoint Validation results
validation_result_transactions = fact_trades_context.view_validation_result(validation_transactions)

## **Generate Report**

In [None]:
# To view Data Docs that were autoatically created after the validator was run
fact_trades_context.build_data_docs()

In [None]:
fact_trades_context.open_data_docs()

In [None]:
# Save the expectation suite
# In the interactive workflow, an Expectation Suite will be configured to include Expectations as they are defined, but will not be saved to an Expectation Store until you issue the command for it to be.

saved_expectation_suite_fact_trades = fact_trades_context.save_expectation_suite(expectation_suite_fact_trades, discard_failed_expectations=False)