In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
%run /validation/SetUpBook

Out[6]: [FileInfo(path='dbfs:/FileStore/', name='FileStore/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Orders/', name='Orders/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/People/', name='People/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Returns/', name='Returns/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/', name='databricks-datasets/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-results/', name='databricks-results/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/', name='mnt/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/project/', name='project/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/stage.stage_store/', name='stage.stage_store/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/', name='user/', size=0, modificationTime=0)]

In [0]:
reference_df = spark.read.format('csv').option("header", True).option("inferSchema", True).load(reference_ds)

In [0]:
reference_df.show(10)

+---+----------------+-------------+-----------+-------------+-------------+----------+----------+
| Id|SrcContainerName|SrcFolderName|SrcFileName|   SrcColumns|SrcColumnType| CreatedAt|updated_at|
+---+----------------+-------------+-----------+-------------+-------------+----------+----------+
|  1|      xyenta-stg|       Orders|Orders_data|       Row ID|      integer|08-12-2023|11-12-2023|
|  2|      xyenta-stg|       Orders|Orders_data|     Order ID|       string|08-12-2023|11-12-2023|
|  3|      xyenta-stg|       Orders|Orders_data|   Order Date|         date|08-12-2023|11-12-2023|
|  4|      xyenta-stg|       Orders|Orders_data|    Ship Date|         date|08-12-2023|11-12-2023|
|  5|      xyenta-stg|       Orders|Orders_data|    Ship Mode|       string|08-12-2023|11-12-2023|
|  6|      xyenta-stg|       Orders|Orders_data|  Customer ID|       string|08-12-2023|11-12-2023|
|  7|      xyenta-stg|       Orders|Orders_data|Customer Name|       string|08-12-2023|11-12-2023|
|  8|     

In [0]:
orders_df = spark.read.format('csv').option('header', True).option('inferSchema', True).load(orders_path)

returns_df = spark.read.format('csv').option('header', True).option('inferSchema', True).load(returns_path)

In [0]:
orders_df.printSchema()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Ship Date: date (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Market: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping Cost: double (nullable = true)
 |-- Order Priority: string (nullable = true)



In [0]:
#columns comparision 

def column_comparision(df, filename, ref_df):
    """
        df : pass the dataframe to which we need to validate
        filename : pass this parameter to get the required/related information about particular file from ref_df
        ref_df : pass the reference_df 
    """
    #get the columns from both df and ref_df
    df_columns = set(df.columns)

    ref_filtered = ref_df.filter(col('SrcFileName') == filename)

    ref_columns = set(ref_filtered.select("SrcColumns").rdd.flatMap(lambda x:x).collect())

    #check for missing columns
    missing_columns = ref_columns - df_columns

    if missing_columns :
        print(f"Column names do not match. Missing columns: {missing_columns}")
        return False
    else:
        print(f"columns match for {filename}.")
        return True


In [0]:
#for orders_data
column_comparision(df=orders_df, filename='Orders_data', ref_df=reference_df)

#for returns_data
column_comparision(df= returns_df, filename='Returns_data', ref_df= reference_df)

columns match for Orders_data.
columns match for Returns_data.
Out[19]: True

In [0]:
#Schema comparision
def schema_comparision(df, filename, ref_df):
    #filtered accoridng to filename and iterate through list and get the SrcColumns and SrcColumnTypes
    ref_schema_filtered = ref_df.filter(col("SrcFileName") == filename)

    for x in ref_schema_filtered.collect():

        columnNames = x['SrcColumns']
        #converting str to list to avoid getting the only characters from list at time of returning or printing missied columns
        columnNamesList = [x.strip() for x in columnNames.split(",")]

        refDataTypes = x['SrcColumnType'] 

        refTypeList = [x.strip() for x in refDataTypes.split(",")]
        #get the schema of the dataframe to which we need to target
        # dataFrameTypes = df.schema[columnNames].dataType.simpleString()
        dataFrameTypes = [field.dataType.simpleString() for field in df.schema.fields]
        # print(dataFrameTypes2)

        missmatchedcolumns = [[col_names, df_types, ref_types] for col_names, df_types, ref_types in zip(columnNamesList, dataFrameTypes,refTypeList) if df_types != ref_types]

        # print(missmatchedcolumns)

        if missmatchedcolumns:

            print("Schema mismatch for the following columns:")
            for col_name, df_type, ref_type in missmatchedcolumns:

                print(f"Column: {col_name}, DataFrame Type: {df_type}, Reference Type: {ref_type}")
            return False
       
    print("All columns schema matched")
    return True


In [0]:
#for Returns_data
schema_comparision(df= returns_df, filename='Returns_data', ref_df= reference_df)

All columns schema matched
Out[31]: True