## Run the validation on new data/ from configuration files 
In the last notebook, we set up the Great Expectations configuration for Data Validation in Microsoft Fabric. 

We ran it through once to generate the complete Data Context and then outputted it into the Lakehouse Files area. 

This notebook is built to be embedded within a data pipeline. 

It 
- is built to validate bronze Lakehouse tables, and if it passes all validation checks, it will be written to a Silver table. 
- uses a predefined Great Expectations Data Context, 
- performs a checkpoint validation, 
- writes the Checkpoint Results to a Lakehouse Table
- if it fails, it will raise and Exception, which can be handled within the Data Pipeline. 

#### Step 1: Re-initialise the Data Context from Lakehouse Files

In [10]:
from great_expectations.data_context import FileDataContext

path_to_local_context = '/lakehouse/default/Files'

# initialise the data context from the Lakehouse Files
context = FileDataContext.create(project_root_dir=path_to_local_context)

StatementMeta(, a4566f6f-0b43-412a-9ca4-e2be2501993a, 15, Finished, Available)

    - No action was taken.

    - No action was taken.



#### Step 2: Get fresh dataframe and build a next batch request

In [46]:
# get a fresh load of data
bronze_dataframe = spark.sql("SELECT * FROM TutorialLakehouse.historic_weather_data LIMIT 1000")

# get my data asset from the data context
my_asset = context.get_datasource("my_spark_datasource").get_asset("my_df_asset")

# create a new batch request from the new data
my_asset.build_batch_request(dataframe=bronze_dataframe)

StatementMeta(, a4566f6f-0b43-412a-9ca4-e2be2501993a, 53, Cancelled, Waiting)

#### Step 3: Re-run the checkpoint again, with the new context. 


In [37]:
results = context.run_checkpoint(checkpoint_name="my_checkpoint" )

StatementMeta(, a4566f6f-0b43-412a-9ca4-e2be2501993a, 43, Finished, Available)

  warn(



Calculating Metrics:   0%|          | 0/28 [00:00<?, ?it/s]

StatementMeta(, a4566f6f-0b43-412a-9ca4-e2be2501993a, 44, Finished, Available)

#### Step 4: Handle results
Now we have some results, we want to perform some actions: 
- log the results in a validation log Lakehouse table
- if success, write to Silver Table
- if failure, throw an exception to be handled by data pipeline 

In [53]:
import pandas as pd
from datetime import datetime 

def load_to_silver(validated_bronze): 
    validated_bronze.write.format("delta").mode("overwrite").save("Tables/silver_historic_weather_data")



def parse_and_load_checkpoint_result(results): 

    validation_results = results['run_results'][next(iter(results['run_results']))]['validation_result']
    success = validation_results['success']
    
    restructured = {
        "run_name": [results['run_id']['run_name']], 
        "run_time": [datetime.strptime(results['run_id']['run_time'][:19], "%Y-%m-%dT%H:%M:%S")],
        "validation_result": [success], 
        "evaluated_expectations": [validation_results['statistics']['evaluated_expectations']],
        "successful_expectations": [validation_results['statistics']['successful_expectations']],
        "unsuccessful_expectations": [validation_results['statistics']['unsuccessful_expectations']],
        "success_percent": [validation_results['statistics']['success_percent']],
        "expectation_suite_name": [validation_results['meta']['expectation_suite_name']]
    }

    pandas_df = pd.DataFrame(restructured)
    spark_df = spark.createDataFrame(pandas_df)
    spark_df.write.format("delta").mode("append").save("Tables/validation_results")

    return success
    
success = parse_and_load_checkpoint_result(results.to_json_dict())

if success: 
    load_to_silver(bronze_dataframe)
else: 
    raise Exception("Handle the exception in the data pipeline.")


StatementMeta(, a4566f6f-0b43-412a-9ca4-e2be2501993a, 60, Finished, Available)




