### Integrating with Apache Airflow
**Description**: Integrate Great Expectations with Apache Airflow to run data quality checks automatically in your DAG.

**Steps**:
1. Install Airflow (if you haven't already):
2. Airflow DAG Integration:
    - Create a DAG file:
3. Deploy and Test:
    - Place this file in your Airflow DAGs directory and start your Airflow scheduler.
    - Open the Airflow UI and trigger the DAG to see it run your expectations.

In [1]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import great_expectations as gx
import pandas as pd
import os
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}
dag = DAG(
    'great_expectations_data_quality_check',
    default_args=default_args,
    schedule=None, 
    catchup=False,
)
def run_great_expectations_validation():
    context = gx.get_context()
    data = {'Name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', None],
            'Email': ['alice@example.com', 'bob@example.com', 'charlie@example.com', 'david.example', 'eve@example.com', ''],
            'Age': [25, 30, 22, None, 28, 31]}
    df = pd.DataFrame(data)
    csv_file_path = 'airflow_data_quality.csv'
    df.to_csv(csv_file_path, index=False)
    datasource_name = 'airflow_pandas_datasource'
    data_connector_name = 'default_inferred_data_connector_name'
    data_asset_name = 'airflow_data_quality.csv'
    expectation_suite_name = 'airflow_data_quality_expectations'
    try:
        context.get_datasource(datasource_name)
        print(f"Datasource '{datasource_name}' already exists.")
    except gx.exceptions.DatasourceNotFoundError:
        pandas_datasource = context.add_pandas_csv_datasource(
            name=datasource_name,
            base_directory=os.path.dirname(os.path.abspath(csv_file_path)),
            batching_regex=r"(.+)\.csv",
        )
        print(f"Datasource '{datasource_name}' added.")
    try:
        expectation_suite = context.suites.get(expectation_suite_name)
        print(f"Loaded existing Expectation Suite: {expectation_suite_name}")
    except gx.exceptions.ExpectationSuiteNotFoundError:
        expectation_suite = context.create_expectation_suite(
            expectation_suite_name=expectation_suite_name, overwrite_existing=True
        )
        validator = context.get_validator(
            batch_request=gx.core.batch_request.BatchRequest(
                datasource_name=datasource_name,
                data_connector_name=data_connector_name,
                data_asset_name=data_asset_name,
                batch_spec_passthrough={"reader_method": "csv", "path_or_buf": csv_file_path},
            ),
            expectation_suite=expectation_suite,
        )
        validator.expect_column_to_exist('Name')
        validator.expect_column_to_exist('Email')
        validator.expect_column_to_exist('Age')
        validator.expect_column_values_to_not_be_null('Email')
        validator.save_expectation_suite()
        print(f"Created and saved Expectation Suite: {expectation_suite_name}")
    else:
        validator = context.get_validator(
            batch_request=gx.core.batch_request.BatchRequest(
                datasource_name=datasource_name,
                data_connector_name=data_connector_name,
                data_asset_name=data_asset_name,
                batch_spec_passthrough={"reader_method": "csv", "path_or_buf": csv_file_path},
            ),
            expectation_suite=expectation_suite,
        )
    print(f"Using validator for data asset: {validator.active_batch_request.data_asset_name}")
    validation_result = validator.validate()
    print("\nValidation Results:")
    print(validation_result)
    context.build_data_docs(validation_result_list=[validation_result])
    print("Data Docs generated. Check your Airflow logs for the Data Docs path.")
    os.remove(csv_file_path)
run_ge_task = PythonOperator(
    task_id='run_great_expectations_validation',
    python_callable=run_great_expectations_validation,
    dag=dag,
)

[[34m2025-05-13T07:38:36.268+0000[0m] {[34m_docs_decorators.py:[0m115} INFO[0m - Skipping registering function get_context because it does not have a class[0m
  from .autonotebook import tqdm as notebook_tqdm
[0m
[[34m2025-05-13T07:38:37.899+0000[0m] {[34m_docs_decorators.py:[0m109} INFO[0m - Skipping registering function DataSourceManager._register_add_datasource.<locals>.crud_method_info because it is a closure[0m
[[34m2025-05-13T07:38:37.900+0000[0m] {[34m_docs_decorators.py:[0m109} INFO[0m - Skipping registering function DataSourceManager._register_update_datasource.<locals>.crud_method_info because it is a closure[0m
[[34m2025-05-13T07:38:37.901+0000[0m] {[34m_docs_decorators.py:[0m109} INFO[0m - Skipping registering function DataSourceManager._register_add_or_update_datasource.<locals>.crud_method_info because it is a closure[0m
[[34m2025-05-13T07:38:37.902+0000[0m] {[34m_docs_decorators.py:[0m109} INFO[0m - Skipping registering function DataSourceMa