# Lakeflow Declarative Pipelines Integration Tests Using Expectations
This is a simple example pipeline that includes a few integration checks using expectations on a simple project to demonstrate the basics.

Please check out the following resources for more information.

- [Manage data quality with pipeline expectations](https://docs.databricks.com/en/delta-live-tables/expectations.html#manage-data-quality-with-pipeline-expectations)

- [Expectation recommendations and advanced patterns](https://docs.databricks.com/en/delta-live-tables/expectation-patterns.html#expectation-recommendations-and-advanced-patterns)

- [Applying software development & DevOps best practices to Delta Live Table pipelines](https://www.databricks.com/blog/applying-software-development-devops-best-practices-delta-live-table-pipelines)

## Obtain Configuration Variable for the Target Environment
This path will use the configuration variable set in the pipeline for **dev, stage and prod**.

- If target is **dev** or **stage** run all integration tests. 
- If target is **prod**, only check the column value range integration test.

In [0]:
import dlt

## Store the target configuration environment in the variable target
target = spark.conf.get("target")

### Create a Dictionary for Integration Test Values

Create a dictionary containing the necessary values for integration tests in both **dev** and **stage** environments. There are several approaches to achieve this, but this is a straightforward method.

For more information, refer to the [Portable and Reusable Expectations](https://docs.databricks.com/en/delta-live-tables/expectation-patterns.html#portable-and-reusable-expectations) documentation.



In [0]:
## Based on the deployed target, obtain the specific validation metrics for the tables.
target_integration_tests_validation = {
    'dev': {
        'filtered_taxis': {
            'total_rows': 20581
        }
    },
    'stage': {
        'filtered_taxis': {
            'total_rows': 20581
        }
    }
}


## Store the expected values for the total rows in the tables tables in the variables based on the target if in development or stage
if target in ('dev', 'stage'):
    total_expected = target_integration_tests_validation[target]['filtered_taxis']['total_rows']


### Create a Function to Count the Total Number of Rows in a Table
The `test_count_table_total_rows` function creates a materialized view that counts the total number of rows in the specified table.

In [0]:
def test_count_table_total_rows(table_name, total_count, target):
    '''
    Count the number of rows in the specified table and compare with the expected values for development and stage data. 
    Fail the update if the count does not match the specified values.
    '''
    @dlt.table(
        name=f"TEST_{target}_{table_name}_total_rows_verification",
        comment=f"Confirms all rows were ingested from the {target} raw data to {table_name}"
    )

    @dlt.expect_all_or_fail({"valid count": f"total_rows = {total_count}"}) 

    def count_table_total_rows():
        return spark.sql(f"""
            SELECT COUNT(*) AS total_rows FROM LIVE.{table_name}
        """)

### Create a Function to Confirm the Column Value ranges in the Materialized View
The `test_filtered_taxi_table_columns` function creates a materialized view that checks the values in the columns **pickup_zip**. **dropoff_zip**, **trip_distance**, and **fare_amount** in **filtered_taxi**.

In [0]:
def test_filtered_taxi_table_columns(target):
    '''
    This function will check the ranges of values in the columns pickup_zip. dropoff_zip, trip_distance, and fare_amount in the table filtered_taxis.

    This confirms that the distinct values for these columns in the table are valid.
    ''' 
    ## Set expectations for the columns
    check_columns = {
        "valid pickup zip": "pickup_zip > 7000 AND pickup_zip < 12000",
        "valid dropoff zip": "dropoff_zip > 7000 AND dropoff_zip < 12000",
        "valid trip distance": "trip_distance >= 0",
        "valid fare amount": "fare_amount < 30" ## This is the query filter used to create the filtered_taxis table in the pipeline.
    }

    @dlt.table(name=f"TEST_{target}_filtered_taxis_columns",
               comment="Check column ranges in the filtered_taxis table")

    ## Fail if expectations are not met
    @dlt.expect_all_or_fail(check_columns)

    def filtered_taxi_table_columns():
        return (dlt
                .read("filtered_taxis")
                .select("pickup_zip", "dropoff_zip", "trip_distance", "fare_amount")
            )

### Execute the Specified Integration Tests
Execute the specified integration tests based on the target environment.

In [0]:
## Run the specified tests based on the target environment (dev, stage or production)

if target in ('dev','stage'):  ## Dynamic integration test for dev or stage tables
    test_count_table_total_rows('filtered_taxis',  total_expected, target)
    test_filtered_taxi_table_columns(target)
    
elif target == 'prod':  ## Only test the column value ranges in production.
    test_filtered_taxi_table_columns(target)