Skip to content

Aip 99 llmdataqualityoperator#62963

Draft
cetingokhan wants to merge 6 commits intoapache:mainfrom
cetingokhan:aip-99-llmdataqualityoperator
Draft

Aip 99 llmdataqualityoperator#62963
cetingokhan wants to merge 6 commits intoapache:mainfrom
cetingokhan:aip-99-llmdataqualityoperator

Conversation

@cetingokhan
Copy link
Contributor

AIP - 99 LLMDataQualityOperator

This pull request introduces a new LLMDataQualityOperator for generating and executing data-quality checks using natural language prompts and LLMs, along with supporting utilities for database/schema introspection and example usage. The changes add a robust operator for data-quality validation, enable schema context resolution for both relational and object-storage sources.

How It Works

Plan Generation (LLM-backed): The operator accepts a prompts dict mapping check names to natural-language expectations (e.g. "email_nulls": "Less than 5% of emails should be null"). It introspects the target database schema and sends prompts + schema context to the configured LLM, which produces a DQPlan — a set of optimised SQL query groups.

Plan Caching: Generated plans are serialised and stored in Airflow Variable (key: dq_plan__<sha256[:16]>). Cache key is computed from a sorted serialisation of prompts + prompt_version, making it order-independent and version-bumped when prompts change semantically. This avoids redundant LLM calls on rerun.

Execution: Each SQL group is executed against the target DB via a DbApiHook. Results are collected per check name into a results_map.

Validation: Each metric value is passed to the corresponding callable in validators. A check passes if no validator is provided (metrics are collected but not gated) or if the validator returns True. Failures record the reason.

Dry Run Mode: When dry_run=True, the plan is generated/cached but not executed.


Was generative AI tooling used to co-author this PR?
  • Yes
    Cloude Sonnet 4.6 & Gemini 3.1 Pro
    Filled some of methods scope and tests created via copilot

@cetingokhan
Copy link
Contributor Author

cetingokhan commented Mar 5, 2026

Sample Task Log;

.......
[2026-03-05T17:41:42.539957Z] INFO - Registered object store for schema: s3://
[2026-03-05T17:41:42.918575Z] INFO - Registered data source format parquet for table: sales_data
[2026-03-05T17:41:42.920314Z] INFO - Using schema context:
Table: sales_data
Columns: price: int64
area: int64
bedrooms: int64
bathrooms: int64
stories: int64
mainroad: string_view
guestroom: string_view
basement: string_view
hotwaterheating: string_view
airconditioning: string_view
parking: int64
prefarea: string_view
furnishingstatus: string_view
[2026-03-05T17:41:42.947772Z] INFO - DQ plan cache miss — generating via LLM (key: 'dq_plan_v8_535ba38ccff9be9a').
[2026-03-05T17:41:42.948151Z] INFO - Using system prompt:
You are a data-quality SQL expert.

Given a set of named data-quality checks and a database schema, produce a DQPlan that minimises the number of SQL queries executed.

PRIMARY RULE — combine everything on the same table into ONE SELECT:
  All checks that query the same table MUST be merged into a single SELECT
  statement. Each check becomes one output column in that statement.

  CORRECT (one query for two checks on the same table):
    SELECT
      (COUNT(*) FILTER (WHERE email IS NULL) * 100.0 / COUNT(*)) AS null_email_pct,
      COUNT(*) FILTER (WHERE bathrooms < 0)                      AS invalid_bathrooms
    FROM customers

  WRONG (two separate queries for the same table):
    SELECT COUNT(*) FILTER (WHERE email IS NULL) AS null_email_count FROM customers
    SELECT COUNT(*) FILTER (WHERE bathrooms < 0) AS invalid_bathrooms FROM customers

GROUPING STRATEGY:
  Assign a group_id that describes the table being queried (e.g. "sales_data_checks").
  Only split into multiple groups when the checks genuinely require different tables
  or subqueries that cannot be expressed as columns of a single SELECT.

OUTPUT RULES:
  1. Each output column must be aliased to exactly the metric_key of its check.
     Example: ... AS null_email_pct
  2. Each check_name must exactly match the key in the prompts dict.
  3. metric_key values must be valid SQL column aliases (snake_case, no spaces).
  4. Generates only SELECT queries — no INSERT, UPDATE, DELETE, DROP, or DDL.
  5. Use SQL syntax.
  6. Each check must appear in exactly ONE group.
  7. Return a valid DQPlan object. No extra commentary.

Available schema:
Table: sales_data
Columns: price: int64
area: int64
bedrooms: int64
bathrooms: int64
stories: int64
mainroad: string_view
guestroom: string_view
basement: string_view
hotwaterheating: string_view
airconditioning: string_view
parking: int64
prefarea: string_view
furnishingstatus: string_view

[2026-03-05T17:41:42.948518Z] INFO - Using user message:
Generate a DQPlan for the following data-quality checks.

IMPORTANT: All checks that query the same table MUST be combined into a single SELECT with one output column per check.

Checks:
  - check_name="null_mainroad": Check the percentage of rows where mainroad is NULL
  - check_name="invalid_bathrooms": Count rows where bathrooms is negative or NULL
  - check_name="bathroom_data": Count rows where bathrooms is greater than 2
  - check_name="furnishingstatus_values": furnishingstatus column contains only 'furnished', 'semi-furnished', or 'unfurnished'
......
[2026-03-05T17:41:49.436353Z] INFO - Registered data source format parquet for table: sales_data
[2026-03-05T17:41:49.438619Z] INFO - Executing query: SELECT
  (COUNT(*) FILTER (WHERE mainroad IS NULL) * 100.0 / COUNT(*)) AS null_mainroad,
  COUNT(*) FILTER (WHERE bathrooms IS NULL OR bathrooms < 0) AS invalid_bathrooms,
  COUNT(*) FILTER (WHERE bathrooms > 2) AS bathroom_data,
  COUNT(*) FILTER (WHERE furnishingstatus NOT IN ('furnished', 'semi-furnished', 'unfurnished')) AS furnishingstatus_values
FROM sales_data
[2026-03-05T17:41:49.617786Z] INFO - All 4 data-quality check(s) passed.
[2026-03-05T17:41:49.619021Z] INFO - Pushing xcom ti=RuntimeTaskInstance(id=UUID('019cbf17-4648-7359-ab77-9e24023d0c69'), task_id='validate_sales_events', dag_id='example_llm_dq_s3_parquet', run_id='manual__2026-03-05T17:41:39.517609+00:00', try_number=1, dag_version_id=UUID('019cbf09-ae3e-7f97-afb3-cc5ec0a4160f'), map_index=-1, hostname='b483f76e2940', context_carrier={}, task=<Task(LLMDataQualityOperator): validate_sales_events>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2026, 3, 5, 17, 41, 40, 355724, tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING: 'running'>, is_mapped=False, rendered_map_index=None, sentry_integration='') 
[2026-03-05T17:41:49.669975Z] INFO - Task instance in success state
[2026-03-05T17:41:49.670192Z] INFO -  Previous state of the Task instance: TaskInstanceState.RUNNING
[2026-03-05T17:41:49.670349Z] INFO - Task operator:<Task(LLMDataQualityOperator): validate_sales_events>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant