# Spark dataframe validation with Great Expectations
Created by Will Needham, [Learn Microsoft Fabric](https://youtube.com/@learnmicrosoftfabric) 

**Goal: show you a simple way to setup Great Expectations to perform validation of a Spark DataFrame.** 

_Note: the approach shown in this notebook is what I would call 'GX Lite'. GX is a vast library with lots of different features, but it can be overwhelming for beginners. I have developed the approach below to strip out a lot of the complexity and make it as easy as possible to get started with GX. Once you get the basics, it would be worthwhile learning about getting up Data Sources, Expectation Suites, Checkpoints etc, as these unlock more of the GX advanced features, like Actions, Data Docs etc._

In the example, we are validating Spark Dataframe objects. The idea is to validate Spark dataframe after we perform some transformation/ cleaning on a dataset, before we write it to a new layer of our pipeline.

## Prerequisites
This notebook assumes you have run the first notebook, and have a Lakehouse table named 'raw_customers'.  

## Define Lakehouse table name to be validated 
We make use of a parameter cell in Fabric to be able to parameterize the notebook (i.e. so we can embed this validation notebook into a Fabric Data Pipeline)

In [None]:
table_name_to_be_validated = 'raw_customers'
output_table_name = 'validated_customers'

## Install GX (if not installed at the workspace level)

In [None]:
%pip install --q great_expectations

## Get, transform/clean a Lakehouse table
I define a parsing function just to show where in the workflow you would use this. Actual parsing/ cleaning has not been implemented, this is dependent on your data/ requirements. 

In [None]:
def parse_and_clean_df(df): 
    ''' Input: Spark dataframe in Raw state, 
    Function: clean and transform your raw DF depending on business logic/ dataset (not implemented in this example) 
    Return: clean_df (of type Spark Dataframe)
    '''
    clean_df = df 
    return clean_df


# read in the raw table 
df = spark.sql(f"SELECT * FROM Marketing_Preparation.{table_name_to_be_validated}")

# (optionally) clean and parse the dataframe, depending on the dataset/ requirements
cleaned_df = parse_and_clean_df(df) 

## Validate the table/ Spark dataframe

In [None]:
from datetime import datetime 
import great_expectations as gx

# initialize a GX spark dataframe dataset for validation, passing in your cleaned_df
gx_df = gx.dataset.sparkdf_dataset.SparkDFDataset(cleaned_df)

# add expectations to the dataset
gx_df.expect_column_to_exist('SubscriptionDate')
gx_df.expect_column_to_exist('record_creation_date')
gx_df.expect_column_values_to_be_between(column='SubscriptionDate', min_value=datetime.strptime('2020-01-01', '%Y-%m-%d'))
gx_df.expect_column_values_to_be_between(column='SubscriptionDate', max_value=datetime.now())
gx_df.expect_column_values_to_match_regex('Email', regex='^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$')



## Run the validator

In [None]:
#run validation
validation_results = gx_df.validate()

## Handle results

This section has deliberately been left blank, because hwo you wish to handle the outcome of your validation results is up to you. 



In [None]:
from pyspark.sql.functions import lit
def write_sparkdf_to_lakehouse_table(spark_df, output_table_name) -> None:
    ''' Input: file_path_to_be_validated (str), table_name (str)
    Function: Reads data from CSV file path, adds a record_creation_date field and writes to Lakehouse table. 
    Output: None 
    '''

    time_now = datetime.now() 
    spark_df.withColumn('validation_passed_date', lit(time_now))
    output_table_name
    spark_df.write.format("delta").mode("overwrite").save(f'Tables/{output_table_name}')


In [None]:
def handle_success() -> None:
    ''' Function to handle a successful validation run
    This could include: 
    1) moving file to a validation_passed folder path
    2) write validation results to a central validation results lakehouse 
    '''
    write_sparkdf_to_lakehouse_table(cleaned_df, output_table_name) 
     

def handle_failure() -> None: 
    ''' Function to handle a failed validation run
    This could include: 
    1) custom logging
    2) write validation results to a central validation results lakehouse 
    '''
    pass
    
if validation_results.success == True: 
    handle_success() 
else: 
    handle_failure() 