In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import types

In [0]:
%run "../ETL Testing/Setup_file"

In [0]:
# shows the target file name, the location of the source file, the source file name and from which reference file it has benn  referenced from.
print('Source_details_df')
src_details_df.show(2)  

print('Target_details_df')
tgt_details_df.show(truncate = False)

Source_details_df
+---+-------------+------------+----------+-------------+----------+----------+
| Id|SrcSchemaName|SrcTableName|SrcColumns|SrcColumnType| CreatedAt|updated_at|
+---+-------------+------------+----------+-------------+----------+----------+
|  1|       Orders| Orders_data|    Row ID|          int|08-12-2023|11-12-2023|
|  2|       Orders| Orders_data|  Order ID|       string|08-12-2023|11-12-2023|
+---+-------------+------------+----------+-------------+----------+----------+
only showing top 2 rows

Target_details_df
+------------+---------------------------------------+------------+------------+
|TargetdfName|FileLocation                           |SrcTableName|refdfName   |
+------------+---------------------------------------+------------+------------+
|orders_df   |dbfs:/FileStore/tables/Orders_data.csv |Orders_data |reference_df|
|returns_df  |dbfs:/FileStore/tables/Returns_data.csv|Returns_data|reference_df|
+------------+---------------------------------------+

In [0]:
#column comparison

def column_comparison(src_details_df, tgt_details_df, spark):
    try:
        tgt_details = tgt_details_df.select(
            'SrcTableName',
            'FileLocation',
            'TargetdfName'
            ).collect()
        
        for row in tgt_details:
            src_table = row['SrcTableName']
            tgt_path = row['FileLocation']
            tgt_name = row['TargetdfName']

            tgt_df = spark.read.format('csv') \
                        .option('header', True) \
                        .option('inferSchema', True) \
                        .load(tgt_path)

            tgt_cols = {col.lower() for col in tgt_df.columns}
            # tgt_cols_set = set(map(str.lower, tgt_df.columns))

            src_cols = src_details_df.filter(
                F.col('SrcTableName') == src_table
                ).select(
                    'SrcColumns'
                    ).rdd.flatMap(
                        lambda x: x
                        ).collect()
                            
            src_cols = {col.lower() for col in src_cols}

            missing_cols_src = tgt_cols - src_cols
            missing_cols_tgt = src_cols - tgt_cols

            if missing_cols_src:
                print(f"Columns are missing in the source - {src_table}. Missing columns are : {missing_cols_src}")

            if missing_cols_tgt:
                print(f"Columns are missing in the target - {tgt_name}. Missing columns are : {missing_cols_tgt}")

            if not missing_cols_src and not missing_cols_tgt:
                print(f'No missing columns between Src {src_table} and tgt {tgt_name}')

    except Exception as e:
        print(f'An error occured: {str(e)}')
        return False

In [0]:
# column_comparison(src_details_df, tgt_details_df, spark)

In [0]:
# schema comparison - data types
def schema_comparison(src_details_df, tgt_details_df, spark):
    try:
        # collecting the data of tgt_details into a single list. Each row would be a record.
        tgt_details = tgt_details_df.select(
                    'SrcTableName',
                    'FileLocation',
                    'TargetdfName'
                    ).collect()
                
        for row in tgt_details:
            src_table = row['SrcTableName']
            tgt_path = row['FileLocation']
            tgt_name = row['TargetdfName']

            tgt_df = spark.read.format('csv') \
                        .option('header', True) \
                        .option('inferSchema', True) \
                        .load(tgt_path)

            # Filtering based on soource table name and selecting only source column names and types
            src_schema_df = src_details_df.filter(
                F.col('SrcTableName') == src_table
                ).select(
                    F.trim(F.lower('SrcColumns')),
                    F.trim(F.lower('SrcColumnType'))
                    )

            # extracting the name and datatype of each column in the tgt data frame
            tgt_df_schema = [(str.lower(field.name), str.lower(field.dataType.simpleString())) for field in tgt_df.schema]

            #  creating a schema with these details inroder to make it easier for comparison later
            tgt_schema_df = spark.createDataFrame(tgt_df_schema, ['TgtColumns', 'TgtDataTypes'])

            tgt_schema_df = tgt_schema_df.select(
                F.trim('TgtColumns'), 
                F.trim('TgtDataTypes')
                )

            #  creating a set of the schema values which is in the form of a tuple of col name and dtype
            final_src_schema = set(src_schema_df.rdd.map(tuple).collect())
            final_tgt_schema = set(tgt_schema_df.rdd.map(tuple).collect())

            if final_src_schema == final_tgt_schema:
                print(f'Schemas are a match between Src - {src_table} and Tgt - {tgt_name}')
            else:
                print(f'Mismatch in schemas between Src - {src_table} and Tgt - {tgt_name}')
                #Columns in source but missing or different in target
                src_only_mismatch = final_src_schema - final_tgt_schema

                #Columns in target but missing or different in source
                tgt_only_mismatch = final_tgt_schema - final_src_schema

                if src_only_mismatch:
                    print(f'Mismatch or missing schemas in Source - {src_table}:  ')
                    for col, dtype in src_only_mismatch:
                        print(f'{col}:{dtype}')

                if tgt_only_mismatch:
                    print(f'Mismatch or missing schemas in Target - {tgt_name}:')
                    for col, dtype in tgt_only_mismatch:
                        print(f'{col}:{dtype}')

    except Exception as e:
        print(f'An error occured: {str(e)}')
        return False


In [0]:
schema_comparison(src_details_df, tgt_details_df, spark)

Mismatch in schemas between Src - Orders_data and Tgt - orders_df
Mismatch or missing schemas in Source - Orders_data:  
ship mode:int
order date:string
Mismatch or missing schemas in Target - orders_df:
ship mode:string
order date:date
Mismatch in schemas between Src - Returns_data and Tgt - returns_df
Mismatch or missing schemas in Source - Returns_data:  
market:int
Mismatch or missing schemas in Target - returns_df:
market:string


##Testing Blocks

In [0]:
src_details_df.filter(
    F.col('SrcTableName') == 'Returns_data'
    ).show()

+---+-------------+------------+----------+-------------+----------+----------+
| Id|SrcSchemaName|SrcTableName|SrcColumns|SrcColumnType| CreatedAt|updated_at|
+---+-------------+------------+----------+-------------+----------+----------+
| 25|      Returns|Returns_data|  Returned|       string|08-12-2023|11-12-2023|
| 26|      Returns|Returns_data|  Order ID|       string|08-12-2023|11-12-2023|
| 27|      Returns|Returns_data|    Market|       string|08-12-2023|11-12-2023|
+---+-------------+------------+----------+-------------+----------+----------+



In [0]:
src_details_df = src_details_df.withColumn(
    'SrcColumnType',
    F.when(
        (F.col('SrcTableName') == 'Returns_data') & (F.col('SrcColumns') == 'Market'),
        'int'
        ).otherwise(F.col('SrcColumnType'))
    )

In [0]:
src_details_df.filter(
    #  (F.col('SrcTableName') == 'Orders_data') & (F.col('SrcColumns') == 'Ship Mode')
    (F.col('SrcTableName') == 'Returns_data') & (F.col('SrcColumns') == 'Market')
    ).show()

+---+-------------+------------+----------+-------------+----------+----------+
| Id|SrcSchemaName|SrcTableName|SrcColumns|SrcColumnType| CreatedAt|updated_at|
+---+-------------+------------+----------+-------------+----------+----------+
| 27|      Returns|Returns_data|    Market|          int|08-12-2023|11-12-2023|
+---+-------------+------------+----------+-------------+----------+----------+

