# Great Expectations (Pandas + Slack Notification)

Use Great Expectations Core v1.0+ in a code-first workflow to validate a dataset and send a Slack alert when validation fails.

In [1]:
!pip install great_expectations pandas



In [2]:
import pandas as pd
import great_expectations as gx
from great_expectations.expectations import (
    ExpectColumnValuesToNotBeNull,
    ExpectColumnValuesToBeUnique,
    ExpectColumnValuesToBeBetween,
    ExpectColumnValuesToBeInSet
)

# Read the data
df = pd.read_csv("/content/drive/MyDrive/Colab Data/Amazon Sale Report.csv", low_memory=False)
print(df.info())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 128975 entries, 0 to 128974
Data columns (total 24 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   index               128975 non-null  int64  
 1   Order ID            128975 non-null  object 
 2   Date                128975 non-null  object 
 3   Status              128975 non-null  object 
 4   Fulfilment          128975 non-null  object 
 5   Sales Channel       128975 non-null  object 
 6   ship-service-level  128975 non-null  object 
 7   Style               128975 non-null  object 
 8   SKU                 128975 non-null  object 
 9   Category            128975 non-null  object 
 10  Size                128975 non-null  object 
 11  ASIN                128975 non-null  object 
 12  Courier Status      122103 non-null  object 
 13  Qty                 128975 non-null  int64  
 14  currency            121180 non-null  object 
 15  Amount              121180 non-nul

  return datetime.utcnow().replace(tzinfo=utc)


In [3]:
# Let's clear the column names
# 'Order ID' -> 'order_id'
df.columns = [c.replace(' ', '_').lower() for c in df.columns]

# Examine the diagram
print(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 128975 entries, 0 to 128974
Data columns (total 24 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   index               128975 non-null  int64  
 1   order_id            128975 non-null  object 
 2   date                128975 non-null  object 
 3   status              128975 non-null  object 
 4   fulfilment          128975 non-null  object 
 5   sales_channel_      128975 non-null  object 
 6   ship-service-level  128975 non-null  object 
 7   style               128975 non-null  object 
 8   sku                 128975 non-null  object 
 9   category            128975 non-null  object 
 10  size                128975 non-null  object 
 11  asin                128975 non-null  object 
 12  courier_status      122103 non-null  object 
 13  qty                 128975 non-null  int64  
 14  currency            121180 non-null  object 
 15  amount              121180 non-nul

  return datetime.utcnow().replace(tzinfo=utc)


In [4]:
print(df['status'].value_counts())

status
Shipped                          77804
Shipped - Delivered to Buyer     28769
Cancelled                        18332
Shipped - Returned to Seller      1953
Shipped - Picked Up                973
Pending                            658
Pending - Waiting for Pick Up      281
Shipped - Returning to Seller      145
Shipped - Out for Delivery          35
Shipped - Rejected by Buyer         11
Shipping                             8
Shipped - Lost in Transit            5
Shipped - Damaged                    1
Name: count, dtype: int64


  return datetime.utcnow().replace(tzinfo=utc)


In [5]:
# 1. Context start
context = gx.get_context()

# 2. Create an Expectation Suite
suite = context.suites.add(gx.ExpectationSuite(name="amazon_orders_suite"))

# 3. Add Expectations
suite.add_expectation(ExpectColumnValuesToNotBeNull(column="order_id"))
suite.add_expectation(ExpectColumnValuesToBeUnique(column="order_id"))
suite.add_expectation(ExpectColumnValuesToBeBetween(column="qty", min_value=0))
suite.add_expectation(ExpectColumnValuesToBeBetween(column="amount", min_value=0))

# Permitted set for status
allowed_status = [
    "Shipped",
    "Shipped - Delivered to Buyer",
    "Cancelled",
    "Shipped - Returned to Seller",
    "Shipped - Picked Up",
    "Pending",
    "Pending - Waiting for Pick Up",
    "Shipped - Returning to Seller",
    "Shipped - Out for Delivery",
    "Shipped - Rejected by Buyer",
    "Shipping",
    "Shipped - Lost in Transit",
    "Shipped - Damaged"
]

# Add expectation
suite.add_expectation(
    ExpectColumnValuesToBeInSet(
        column="status",
        value_set=allowed_status
    )
)

INFO:great_expectations.data_context.types.base:Created temporary directory '/tmp/tmpp93iuja1' for ephemeral docs site


ExpectColumnValuesToBeInSet(id='57785d10-bf5f-4d82-9fad-fce2ae6b4b56', meta=None, notes=None, result_format=<ResultFormat.BASIC: 'BASIC'>, description=None, catch_exceptions=True, rendered_content=None, severity=<FailureSeverity.CRITICAL: 'critical'>, windows=None, batch_id=None, column='status', mostly=1, row_condition=None, condition_parser=None, value_set=['Shipped', 'Shipped - Delivered to Buyer', 'Cancelled', 'Shipped - Returned to Seller', 'Shipped - Picked Up', 'Pending', 'Pending - Waiting for Pick Up', 'Shipped - Returning to Seller', 'Shipped - Out for Delivery', 'Shipped - Rejected by Buyer', 'Shipping', 'Shipped - Lost in Transit', 'Shipped - Damaged'])

In [7]:
import requests

# 1. Data Source
try:
    data_source = context.data_sources.get(name="my_source")
except:
    data_source = context.data_sources.add_pandas(name="my_source")

# 2. Data Asset
try:
    data_asset = data_source.get_asset(name="my_asset")
except:
    data_asset = data_source.add_dataframe_asset(name="my_asset")

# 3. Batch Definition
# GX v1.0+
try:
    batch_definition = data_asset.get_batch_definition_daily(name="my_batch_def")
except:

    batch_definition = data_asset.add_batch_definition_whole_dataframe(name="my_batch_def")

# 4. Validation Definition
try:
    validation_def = context.validation_definitions.get(name="amazon_val")
except:
    validation_def = context.validation_definitions.add(
        gx.ValidationDefinition(
            name="amazon_val",
            data=batch_definition,
            suite=suite
        )
    )


results = validation_def.run(batch_parameters={"dataframe": df})

# Slack Notification Function
def send_slack_alert(results, webhook_url):
    stats = results.statistics
    failures = [res.expectation_config.type for res in results.results if not res.success]

    summary = (
        f"üö® *GX Validation Failed!*\n"
        f"‚úÖ Successful: {stats['successful_expectations']}\n"
        f"‚ùå Unsuccessful: {stats['unsuccessful_expectations']}\n"
        f"üõ† Errors: {', '.join(failures)}"
    )

    requests.post(webhook_url, json={"text": summary})

# If unsuccessful, send to Slack
if not results.success:
    MY_WEBHOOK_URL = "YOUR_WEBHOOK_URL_HERE"
    send_slack_alert(results, MY_WEBHOOK_URL)
    print("An error has been detected and a Slack notification has been sent!")
else:
    print("All checks were passed successfully.")

  return datetime.utcnow().replace(tzinfo=utc)


Calculating Metrics:   0%|          | 0/37 [00:00<?, ?it/s]

  return datetime.utcnow().replace(tzinfo=utc)


An error has been detected and a Slack notification has been sent!
