### Demo

Includes:
- Generating sample data
- Examples using various backends:
  - DuckDB (Polars, Pandas)
  - Pyspark
  - Athena
  - Postgres
- Examples showcasing post-processing functionality:
  - Reports for Alerting (Text, Markdown, HTML)
  - Persist Error Cases (CSV, Parquet)

### Sample Data

The sample data includes the following data quality issues, which are subsequently tested for:
- Range check: age 150 is an implausible outlier
- Duplicates: Duplicate user_id 4
- Regex: email is invalid for user_id 3
- Business Rule violation (multi-column): True: A, B; False: C, not met for user_id 3 (False, B)
- Business Rule violation (multi-dataset): "orders"(order_id=3) includes user_id=5 which is missing in "users"

In [1]:
import polars as pl

df_users = pl.DataFrame({
        "user_id": [1, 2, 3, 4, 4],
        "age": [25, 150, 22, 45, 30],
        "email": ["user1@example.com", "user2@example.com", "invalid-email", "user4@example.com", "user5@example.com"],
        "score": [85.5, 92.0, 78.3, 88.7, 95.2],
        "is_active": [True, True, False, True, False],
        "category": ["A", "B", "B", "A", "C"],
    })

df_orders = pl.DataFrame({
        "order_id": [1, 2, 3],
        "user_id": [1, 2, 5],
        "order_date": ["2021-01-01", "2021-01-02", "2021-01-03"],
        "order_amount": [100, 200, 300],
    })

### DuckDB (Polars), also works for Pandas

In [2]:
from sqldq import SQLDQ
import duckdb

# DuckDB setup
con = duckdb.connect()
con.register("users", df_users)
con.register("orders", df_orders)
dq = SQLDQ.from_duckdb(con, default_max_rows=20)  

# Checks
dq = (dq.add_check(name="Age between 18 and 80",
                 failure_rows_query="SELECT * FROM users WHERE age NOT BETWEEN 18 AND 80")
      .add_check(name="Duplicate user IDs",
                  failure_rows_query="""
                  WITH duplicates AS (
                      SELECT user_id, COUNT(*) AS count
                      FROM users
                      GROUP BY user_id
                      HAVING COUNT(*) > 1
                  )
                  SELECT * FROM users WHERE user_id IN (SELECT user_id FROM duplicates)""",
                  columns=["user_id", "email"])
      .add_check(name="Email format valid",
                  failure_rows_query=r"SELECT * FROM users WHERE email !~ '^[^@]+@[^@]+\.[^@]+'",
                  columns=["email"])
      .add_check(name="Missing user IDs in orders",
                  failure_rows_query="""SELECT * FROM orders o 
                  LEFT JOIN users u ON o.user_id = u.user_id 
                  WHERE u.user_id IS NULL""",
                  columns=["order_id", "user_id"])
        .add_check(name="Active users in category A or B only",
                    failure_rows_query="""
                    SELECT * FROM users
                    WHERE is_active = TRUE AND category NOT IN ('A', 'B')""",
                    columns=["user_id", "is_active", "category"])
        .add_check(name="Inactive users in category C only",
                    failure_rows_query="""SELECT * FROM users 
                    WHERE is_active = FALSE AND category != 'C'""",
                    columns=["user_id", "is_active", "category"])
)

# Execution
results = dq.execute()

results.get_summary()

{'total_checks': 6, 'passed_checks': 1, 'failed_checks': 5}

##### Programmatic Post-processing (Workflow)

In [3]:
if results.has_failures():
    print("Checks failed. We should send a report.")
else:
    print("All checks passed")

Checks failed. We should send a report.


In [4]:
results.results

{'Age between 18 and 80': CheckResult(name='Age between 18 and 80', failed_rows=shape: (1, 6)
 ┌─────────┬─────┬───────────────────┬───────┬───────────┬──────────┐
 │ user_id ┆ age ┆ email             ┆ score ┆ is_active ┆ category │
 │ ---     ┆ --- ┆ ---               ┆ ---   ┆ ---       ┆ ---      │
 │ i64     ┆ i64 ┆ str               ┆ f64   ┆ bool      ┆ str      │
 ╞═════════╪═════╪═══════════════════╪═══════╪═══════════╪══════════╡
 │ 2       ┆ 150 ┆ user2@example.com ┆ 92.0  ┆ true      ┆ B        │
 └─────────┴─────┴───────────────────┴───────┴───────────┴──────────┘, limit=20, error_message=None),
 'Duplicate user IDs': CheckResult(name='Duplicate user IDs', failed_rows=shape: (2, 2)
 ┌─────────┬───────────────────┐
 │ user_id ┆ email             │
 │ ---     ┆ ---               │
 │ i64     ┆ str               │
 ╞═════════╪═══════════════════╡
 │ 4       ┆ user4@example.com │
 │ 4       ┆ user5@example.com │
 └─────────┴───────────────────┘, limit=20, error_message=None),


In [5]:
for name, check_result in results.results.items():
    print(f"Check: {name}")
    print(f"Failed rows count:\n{check_result.failure_count}\n")

Check: Age between 18 and 80
Failed rows count:
1

Check: Duplicate user IDs
Failed rows count:
2

Check: Email format valid
Failed rows count:
1

Check: Missing user IDs in orders
Failed rows count:
1

Check: Active users in category A or B only
Failed rows count:
0

Check: Inactive users in category C only
Failed rows count:
1



##### Reporting

In [6]:
print(results.get_summary())

{'total_checks': 6, 'passed_checks': 1, 'failed_checks': 5}


In [7]:
# Display results
report = results.report()
print(report)

Age between 18 and 80: ❌

Duplicate user IDs: ❌

Email format valid: ❌

Missing user IDs in orders: ❌

Active users in category A or B only: ✅

Inactive users in category C only: ❌

Summary: 6 checks, 5 failed


In [8]:
report = results.report(output_format="text",
                        include_rows=True,
                        include_summary_header=True,
                        fail_only=True)
print(report)

Data Quality Check Summary
Total Checks: 6
Passed: 1
Failed: 5


Age between 18 and 80: ❌ (up to 20 failures recorded)
↳ Failed rows (max 10 shown):
shape: (1, 6)
┌─────────┬─────┬───────────────────┬───────┬───────────┬──────────┐
│ user_id ┆ age ┆ email             ┆ score ┆ is_active ┆ category │
│ ---     ┆ --- ┆ ---               ┆ ---   ┆ ---       ┆ ---      │
│ i64     ┆ i64 ┆ str               ┆ f64   ┆ bool      ┆ str      │
╞═════════╪═════╪═══════════════════╪═══════╪═══════════╪══════════╡
│ 2       ┆ 150 ┆ user2@example.com ┆ 92.0  ┆ true      ┆ B        │
└─────────┴─────┴───────────────────┴───────┴───────────┴──────────┘

Duplicate user IDs: ❌ (up to 20 failures recorded)
↳ Failed rows (max 10 shown):
shape: (2, 2)
┌─────────┬───────────────────┐
│ user_id ┆ email             │
│ ---     ┆ ---               │
│ i64     ┆ str               │
╞═════════╪═══════════════════╡
│ 4       ┆ user4@example.com │
│ 4       ┆ user5@example.com │
└─────────┴───────────────────┘

E




In [9]:
report = results.report(output_format="markdown",
                        include_rows=True,
                        include_summary_header=True,
                        fail_only=True)

In [10]:
report = results.report(output_format="html",
                        include_rows=True,
                        include_summary_header=True,
                        fail_only=True)
print(report)

<h2>Data Quality Check Summary</h2>
<p><strong>Total Checks:</strong> 6</p>
<p><strong>Passed:</strong> 1</p>
<p><strong>Failed:</strong> 5</p>
<hr>

<h3>Age between 18 and 80: ❌ <b>up to 20 failures recorded</b></h3><p><b>Failed rows (max 10 shown):</b></p><table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th>user_id</th>
      <th>age</th>
      <th>email</th>
      <th>score</th>
      <th>is_active</th>
      <th>category</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>2</td>
      <td>150</td>
      <td>user2@example.com</td>
      <td>92.0</td>
      <td>True</td>
      <td>B</td>
    </tr>
  </tbody>
</table>

<h3>Duplicate user IDs: ❌ <b>up to 20 failures recorded</b></h3><p><b>Failed rows (max 10 shown):</b></p><table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th>user_id</th>
      <th>email</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>4</td>
      <td>user4@example.com</td>
    </t

In [11]:
timestamp = "2025-01-01T12-34-56"

In [12]:
# Export reports
results.export_report(f"./demo/reports/{timestamp}_dq_report.txt", output_format="text", include_rows=True)
results.export_report(f"./demo/reports/{timestamp}_dq_report.md", output_format="markdown", include_rows=True)
results.export_report(f"./demo/reports/{timestamp}_dq_report.html", output_format="html", include_rows=True)

##### Persiting results

In [13]:
# local exports
results.export_failed_rows(f"./demo/exports/{timestamp}_dq_fails_1", output_format="csv")
results.export_failed_rows(f"./demo/exports/{timestamp}_dq_fails_2", output_format="parquet")


In [None]:
# S3
import boto3
from datetime import datetime

session = boto3.Session(profile_name="default")

bucket = "your-bucket"  # Replace with your S3 bucket name
prefix = f"dq-export-{datetime.now().isoformat()}"

results.export_failed_rows(f"s3://{bucket}/{prefix}/{timestamp}_dq_fails_1",
                           output_format="csv",
                           boto3_session=session)
results.export_failed_rows(f"s3://{bucket}/{prefix}/{timestamp}_dq_fails_2",
                           output_format="parquet",
                           boto3_session=session)

### Pyspark

In [15]:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("sqldq_Example") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()

df_users_spark = spark.createDataFrame(df_users.to_pandas())
df_users_spark.createOrReplaceTempView("users")

df_orders_spark = spark.createDataFrame(df_orders.to_pandas())
df_orders_spark.createOrReplaceTempView("orders")

dq = SQLDQ.from_pyspark(spark, default_max_rows=15)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/13 02:02:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/13 02:02:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [16]:
dq = (dq
        .add_check(
            name="Age between 18 and 80",
            failure_rows_query="SELECT * FROM users WHERE age < 18 OR age > 80"
        )
        .add_check(name="Duplicate user IDs",
            failure_rows_query="""
            WITH duplicates AS (
                SELECT user_id, COUNT(*) AS count
                FROM users
                GROUP BY user_id
                HAVING COUNT(*) > 1
            )
            SELECT * FROM users WHERE user_id IN (SELECT user_id FROM duplicates)""",
            columns=["user_id", "email"])
        .add_check(
            name="Email format valid",
            failure_rows_query=r"SELECT * FROM users WHERE NOT (email RLIKE '^[^@]+@[^@]+\.[^@]+')",
            columns=["email"]
        )
        .add_check(
            name="Score within range",
            failure_rows_query="SELECT * FROM users WHERE score < 0 OR score > 100"
        )
        .add_check(name="Active users in category A or B only",
                    failure_rows_query="""
                    SELECT * FROM users
                    WHERE is_active = TRUE AND category NOT IN ('A', 'B')""",
                    columns=["user_id", "is_active", "category"])
        .add_check(name="Inactive users in category C only",
                    failure_rows_query="""SELECT * FROM users 
                    WHERE is_active = FALSE AND category != 'C'""",
                    columns=["user_id", "is_active", "category"])
    )

# Execute checks
result = dq.execute()  

# Display results
report = result.report()
print(report)

                                                                                

Age between 18 and 80: ❌

Duplicate user IDs: ❌

Email format valid: ❌

Score within range: ✅

Active users in category A or B only: ✅

Inactive users in category C only: ❌

Summary: 6 checks, 4 failed


### Postgres

In [17]:
import psycopg2

# Connect to PostgreSQL
conn = psycopg2.connect(
    host="postgres",
    database="sqldq_test",
    user="admin",
    password="admin"
)

dq = SQLDQ.from_postgresql(conn, default_max_rows=25)

In [18]:
dq = (dq
        .add_check(
            name="Age between 18 and 80",
            failure_rows_query="SELECT * FROM users WHERE age < 18 OR age > 80"
        )
        .add_check(name="Duplicate user IDs",
                    failure_rows_query="""
                    WITH counts AS(
                        SELECT user_id, COUNT(*) AS count
                        FROM users
                        GROUP BY user_id
                    )
                    SELECT * FROM counts
                    WHERE count > 1""",
                    columns=["user_id", "count"])
        .add_check(
            name="Email format valid",
            failure_rows_query=r"SELECT * FROM users WHERE email !~ '^[^@]+@[^@]+\.[^@]+'",
            columns=["email"]
        )
        .add_check(name="Missing user IDs in orders",
                    failure_rows_query="""SELECT o.order_id, o.user_id FROM orders o 
                    LEFT JOIN users u ON o.user_id = u.user_id 
                    WHERE u.user_id IS NULL""",
                    columns=["order_id", "user_id"])
        .add_check(name="Active users in category A or B only",
                    failure_rows_query="""
                    SELECT * FROM users
                    WHERE is_active = TRUE AND category NOT IN ('A', 'B')""",
                    columns=["user_id", "is_active", "category"])
        .add_check(name="Inactive users in category C only",
                    failure_rows_query="""SELECT * FROM users 
                    WHERE is_active = FALSE AND category != 'C'""",
                    columns=["user_id", "is_active", "category"])
    )

# Execute checks
result = dq.execute()  

# Display results
report = result.report()
print(report)

Age between 18 and 80: ❌

Duplicate user IDs: ❌

Email format valid: ❌

Missing user IDs in orders: ❌

Active users in category A or B only: ✅

Inactive users in category C only: ❌

Summary: 6 checks, 5 failed


In [19]:
conn.close()

In [20]:
result.results

{'Age between 18 and 80': CheckResult(name='Age between 18 and 80', failed_rows=shape: (1, 6)
 ┌─────────┬─────┬───────────────────┬──────────────┬───────────┬──────────┐
 │ user_id ┆ age ┆ email             ┆ score        ┆ is_active ┆ category │
 │ ---     ┆ --- ┆ ---               ┆ ---          ┆ ---       ┆ ---      │
 │ i64     ┆ i64 ┆ str               ┆ decimal[4,2] ┆ bool      ┆ str      │
 ╞═════════╪═════╪═══════════════════╪══════════════╪═══════════╪══════════╡
 │ 2       ┆ 150 ┆ user2@example.com ┆ 92.00        ┆ true      ┆ B        │
 └─────────┴─────┴───────────────────┴──────────────┴───────────┴──────────┘, limit=25, error_message=None),
 'Duplicate user IDs': CheckResult(name='Duplicate user IDs', failed_rows=shape: (1, 2)
 ┌─────────┬───────┐
 │ user_id ┆ count │
 │ ---     ┆ ---   │
 │ i64     ┆ i64   │
 ╞═════════╪═══════╡
 │ 4       ┆ 2     │
 └─────────┴───────┘, limit=25, error_message=None),
 'Email format valid': CheckResult(name='Email format valid', failed

### Athena

You can use this to create test data in Athena:

```sql
-- CREATE DATABASE
CREATE DATABASE IF NOT EXISTS sqldq_demo

-- Change to the database you just created

-- USERS TABLE

DROP TABLE IF EXISTS users;

CREATE TABLE users (
    user_id INT,
    age INT,
    email STRING,
    score DECIMAL(5,2),
    is_active BOOLEAN,
    category STRING
)
LOCATION 's3://your-bucket/iceberg/users/'  -- TODO: replace with your bucket
TBLPROPERTIES (
    'table_type'='ICEBERG',
    'format'='PARQUET'
);

INSERT INTO users VALUES 
    (1, 25, 'user1@example.com', 85.5, true, 'A'),
    (2, 150, 'user2@example.com', 92.0, true, 'B'),
    (3, 22, 'invalid-email', 78.3, false, 'B'),
    (4, 45, 'user4@example.com', 88.7, true, 'A'),
    (4, 30, 'user5@example.com', 95.2, false, 'C');

-- ORDERS TABLE

DROP TABLE IF EXISTS orders;

CREATE TABLE orders (
    order_id INT,
    user_id INT,
    order_date DATE,
    order_amount DECIMAL(10,2)
)
LOCATION 's3://your-bucket/iceberg/orders/'  -- TODO: replace with your bucket
TBLPROPERTIES (
    'table_type'='ICEBERG',
    'format'='PARQUET'
);

INSERT INTO orders VALUES 
    (1, 1, DATE '2021-01-01', 100.00),
    (2, 2, DATE '2021-01-02', 200.00),
    (3, 5, DATE '2021-01-03', 300.00);

```

In [None]:
import boto3

session = boto3.Session(profile_name="default")
dq = SQLDQ.from_athena(
    database="sqldq_demo", # Replace with your Athena database name in case you changed it
    workgroup="your_athena_workgroup", # Replace with your Athena workgroup (likely it's "primary")
    boto3_session=session,
    default_max_rows=30
)

dq = (
    dq.add_check(
        name="Age between 18 and 80",
        failure_rows_query="SELECT * FROM users WHERE age < 18 OR age > 80"
    ).add_check(
        name="Email format valid", 
        failure_rows_query=r"SELECT * FROM users WHERE NOT regexp_like(email, '^[^@]+@[^@]+\\.[^@]+')",
        columns=["email"])
    .add_check(name="Active users in category A or B only",
                failure_rows_query="""
                SELECT * FROM users
                WHERE is_active = TRUE AND category NOT IN ('A', 'B')""",
                columns=["user_id", "is_active", "category"])
    .add_check(name="Inactive users in category C only",
                failure_rows_query="""SELECT * FROM users 
                WHERE is_active = FALSE AND category != 'C'""",
                columns=["user_id", "is_active", "category"]))

In [None]:
result = dq.execute()
report = result.report()
print(report)