# Setting PySpark

In [1]:
import pyspark
pyspark_version = pyspark.__version__
from pyspark.sql import SparkSession


class StartSpark:

    ####################################################################################################################
    def __init__(self, app_name: str = 'PySparkDf'):
        super().__init__()
        self._appName = app_name

    ####################################################################################################################
    def initialize(self):

        # --------------------------------------------------------------------------------------------------------------
        spark = (SparkSession
                 .builder
                 .master("local[*]")
                 .config('spark.driver.host', 'localhost')
                 .appName(self._appName)
                 .getOrCreate()
                 )
        pyspark_version_info_1 = str(pyspark_version)
        pyspark_version_info = f'Version = {pyspark_version_info_1} '.ljust(19, ' ')
        spark.sparkContext.setLogLevel("ERROR")
        # --------------------------------------------------------------------------------------------------------------
        print(f"""{' ' * 9} _____      _____                  _      _   _____               _       """)
        print(f"""{' ' * 9}| ___ \    /  ___|                | |    | | | ___ \             | |      """)
        print(f"""{' ' * 9}| |_/ /   _\ `--. _ __   __ _ _ __| | __ | | | |_/ /___  __ _  __| |_   _ """)
        print(f"""{' ' * 9}|  __/ | | |`--. \ '_ \ / _` | '__| |/ / | | |    // _ \/ _` |/ _` | | | |""")
        print(f"""{' ' * 9}| |  | |_| /\__/ / |_) | (_| | |  |   <  | | | |\ \  __/ (_| | (_| | |_| |""")
        print(f"""{' ' * 9}\_|   \__, \____/| .__/ \__,_|_|  |_|\_\ | | \_| \_\___|\__,_|\__,_|\__, |""")
        print(f"""{' ' * 9}       __/ |     | |                     | |                         __/ |""")
        print(f"""{' ' * 9}      |___/      |_| {pyspark_version_info} |_|                        |___/ """)
        # --------------------------------------------------------------------------------------------------------------
        
        return spark

---

# Starting PySpark

In [4]:
spark = StartSpark().initialize()

          _____      _____                  _      _   _____               _       
         | ___ \    /  ___|                | |    | | | ___ \             | |      
         | |_/ /   _\ `--. _ __   __ _ _ __| | __ | | | |_/ /___  __ _  __| |_   _ 
         |  __/ | | |`--. \ '_ \ / _` | '__| |/ / | | |    // _ \/ _` |/ _` | | | |
         | |  | |_| /\__/ / |_) | (_| | |  |   <  | | | |\ \  __/ (_| | (_| | |_| |
         \_|   \__, \____/| .__/ \__,_|_|  |_|\_\ | | \_| \_\___|\__,_|\__,_|\__, |
                __/ |     | |                     | |                         __/ |
               |___/      |_| Version = 3.5.0     |_|                        |___/ 


---
# Importing functions needed

In [12]:
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, TimestampType, DateType, IntegerType, MapType, LongType
from pyspark.sql.functions import when, col, lit, asc, desc, monotonically_increasing_id
from pyspark.sql.dataframe import DataFrame as SparkDF
from datetime import datetime

---
# Defining DFs

In [None]:
schema = StructType([
    StructField('col1', StringType(), True),
    StructField('col2', StringType(), True),
    StructField('col3', StringType(), True),
    StructField('col4', StringType(), True),
])

data = [tuple([f'a_{x}_{str(y).zfill(4)}' for x in range(1,5)]) for y in range(1,1000000)]

df_1 = spark.createDataFrame(data=data, schema=schema).cache()
df_2 = df_1

df_2 = df_2.withColumn('col2', when(col('col2').like('%_02%'), 'DIFF').otherwise(col('col2')))
df_2 = df_2.withColumn('col3', when(col('col3').like('%_03%'), 'DIFF').otherwise(col('col3')))

---
# Manual Testing (testing validation options)

In [None]:
def lower_df_columns(df: SparkDF, lower_var: bool = True):
    if lower_var:
        df = df.toDF(*[_col.lower() for _col in df.columns])
        return df
    else:
        df = df.toDF(*[_col.upper() for _col in df.columns])
        return df

In [None]:
# Normalizing columns names for easier analysis
df_1 = lower_df_columns(df_1)
df_2 = lower_df_columns(df_2, lower_var=True)

In [None]:
# checking for col number difference
assert len(df_1.columns) == len(df_2.columns), 'Diff cols'
assert list(set(df_1.columns) - set(df_2.columns)) == [], 'Columns with different names'

In [None]:
# checking for row number difference
assert df_1.count() == df_2.count(), 'Diff rows'

In [None]:
# Checking for differences in data
diff_dfs = df_1.subtract(df_2).count()
# assert diff_dfs == 0, f'There\'s difference in the data {diff_dfs}'

In [None]:
dict_final = {}
keep_temp = 0
for _col in df_1.columns:
    if not keep_temp == diff_dfs:
        temp = df_1.select(_col).subtract(df_2.select(_col)).count()
    else:
        temp = 0
    keep_temp += temp
    if temp > 0:
        dict_final[_col] = temp

In [None]:
for _col in dict_final.keys():
    print(f"{_col} = {dict_final.get(_col)}")

---

# Class (expanding the validation)

In [7]:
class GetDfDifferences:
    """
    Class for getting differences between two dataframes.
    
    df1: Spark Dataframe
    the dataframe to be compared
    
    df2: Spark Dataframe
    the new dataframe to be compared
    
    return: Spark Dataframe containing differences between two dataframes
    """
    
    # Defining the schema that will be used to report the analysis _____________________________________________________
    SCHEMA = StructType([
        StructField('DateRefProcess',    TimestampType(),                     True),
        StructField('StartProcess',      TimestampType(),                     True),
        StructField('EndProcess',        TimestampType(),                     True),
        StructField('Status',            StringType(),                        True),
        StructField('ColsDiff',          BooleanType(),                       True),
        StructField('CountDiffBool',     BooleanType(),                       True),
        StructField('CountDiffComplete', StringType(),                        True),
        StructField('DataDiffBool',      BooleanType(),                       True),
        StructField('DataDiffTotalRows', IntegerType(),                       True),
        StructField('DataDiffCols',      MapType(StringType(), StringType()), True),
    ])
    
    # ##################################################################################################################
    def __init__(self, df1: SparkDF, df2: SparkDF):
        self.df1 = self.lower_columns(df1)
        self.df2 = self.lower_columns(df2)
        
        # Caching the Dataframes in order to improve performance _______________________________________________________
        self.df1.cache()
        self.df2.cache()
        
        self.data_diff = None
        self.start = datetime.now()
        
    # ##################################################################################################################
    def check_count(self) -> bool:
        return self.df1.count() == self.df2.count()
        
    # ##################################################################################################################
    def check_cols(self) -> bool:
        return sorted(self.df1.columns) == sorted(self.df2.columns)
    
    # ##################################################################################################################
    @staticmethod
    def lower_columns(df: SparkDF) -> SparkDF:
        df = df.toDF(*[column for column in df.columns])
        return df.select(sorted(df.columns))
    
    # ##################################################################################################################
    def get_data_diff(self) -> None:
        columns = self.df1.columns
        self.data_diff = self.df1.select(columns).subtract(self.df2.select(columns)).count()
    
    # ##################################################################################################################
    def get_diff_cols(self) -> dict:
        self.get_data_diff()
        
        if self.data_diff > 0:
            return_dict = {}            
            for column in sorted(self.df1.columns):
                temp = self.df1.select(column).subtract(self.df2.select(column)).count()
                return_dict[column] = temp
            
            return return_dict
    
    # ##################################################################################################################
    def run(self) -> SparkDF:
        
        # Basic validation _____________________________________________________________________________________________
        check_cols = self.check_cols()
        check_count = self.check_count()
        
        # Checking for data validation _________________________________________________________________________________
        col_dict = self.get_diff_cols() if (check_cols and check_count) else None
        
        # Preparing report DF that might be used in case of a log table for validation multiples tables after migration 
        data_return = [
            (
            self.start,
            self.start,
            datetime.now(),
            'OK' if (check_cols and check_count and self.data_diff == 0) else 'NOT OK',
            not check_cols,
            not check_count,
            f"df1: {self.df1.count()} | df2: {self.df2.count()}" if not check_count else "No differences in count.",
            (self.data_diff > 0) if (check_cols and check_count) else False,
            self.data_diff,
            {_key: col_dict.get(_key) for _key in col_dict.keys() if col_dict.get(_key) > 0} if (check_cols and check_count and self.data_diff >0) else None,
            )
        ]
        
        # Un-persisting the Dataframes _________________________________________________________________________________
        self.df1.unpersist()
        self.df2.unpersist()
        
        # Validating if the Dataframes were indeed un-persisted ________________________________________________________
        assert self.df1.is_cached is False, 'DF1 is cached'
        assert self.df2.is_cached is False, 'DF2 is cached'
        
        # Deleting the variables created for the Dataframes since this is the last part of the class ___________________
        del self.df1, self.df2
        
        # Well the end ...
        return spark.createDataFrame(data=data_return, schema=self.SCHEMA).withColumn('DateRefProcess', col('DateRefProcess').cast(DateType()))

---
# Expanding testing

In [44]:
import numpy as np
from tqdm.notebook import tqdm

# Defining a dict with the patterns for data creation and testing over the class
generate_control = {
    'num_dfs': 50,
    'num_rows_in_df': 500000,
    'dfs_col_removed': [0, 2, 32, 47],
    'col_removed': [('col1',), ('col2',), ('col2',), ('col2',)],
    'dfs_count': [0, 2, 3, 8, 40, 45],
    'dfs_with_diffs': [0, 4, 5, 25, 30],
    'dfs_diff_cols': [('col1', 'col2'), ('col3', 'col4', 'col1'), ('col1',), ('col3', 'col4'), ('col1', 'col2')],
}

# Schema for the Dataframes
schema = StructType([
    StructField('col1', StringType(), True),
    StructField('col2', StringType(), True),
    StructField('col3', StringType(), True),
    StructField('col4', StringType(), True),
])

for df_generated in tqdm(range(generate_control.get('num_dfs'))):
    start = datetime.now()
    
    new_data = [tuple([f'a_{x*(df_generated + 1)}_{str(y*(df_generated + 1)).zfill(4)}' for x in range(1,5)]) for y in range(1, int(generate_control.get('num_rows_in_df') * round(1 + np.random.choice([x for x in range(1, 20)], 1)[0]/100, 3)))]
    
    df1_gen_new = spark.createDataFrame(data=new_data, schema=schema)
    df1_gen_new.cache()
    df2_gen_new = df1_gen_new
    df2_gen_new.cache()
    
    # Creating count diff
    if df_generated in generate_control.get('dfs_count'):
        df2_gen_new = df2_gen_new.unionAll(df2_gen_new.limit(np.random.choice([x for x in range(1, 200)], 1)[0]))
    
    # Creating data diff
    if df_generated in generate_control.get('dfs_with_diffs'):
        for column in generate_control.get('dfs_diff_cols')[generate_control.get('dfs_with_diffs').index(df_generated)]:
            df2_gen_new = df2_gen_new.withColumn(column, when(col(column).like(f'%_00{int("".join([x for x in column if x.isdigit()])) * np.random.choice([1,11], 1)[0]}0%'), 'DIFF').otherwise(col(column)))
            
    # Creating columns diff
    if df_generated in generate_control.get('dfs_col_removed'):
        for column in generate_control.get('col_removed')[generate_control.get('dfs_col_removed').index(df_generated)]:
            df2_gen_new = df2_gen_new.drop(column)
    
    end = datetime.now()
    
    # Creating the report table
    if df_generated == 0:
        df_return = GetDfDifferences(df1_gen_new, df2_gen_new).run().withColumn('iteration', lit(df_generated)).withColumn('start_creation', lit(start)).withColumn('end_creation', lit(end))
        
    else:
        df_return = df_return.unionAll(GetDfDifferences(df1_gen_new, df2_gen_new).run().withColumn('iteration', lit(df_generated)).withColumn('start_creation', lit(start)).withColumn('end_creation', lit(end)))
    
    df1_gen_new.unpersist()
    df2_gen_new.unpersist()
    
    del df1_gen_new, df2_gen_new
    

  0%|          | 0/50 [00:00<?, ?it/s]





                                                                                

In [45]:
df_return = df_return.withColumn('process_time_seconds', (col('EndProcess').cast(LongType()) - col('StartProcess').cast(LongType())))
df_return = df_return.withColumn('creation_time_seconds', (col('end_creation').cast(LongType()) - col('start_creation').cast(LongType())))
df_return.cache()

DataFrame[DateRefProcess: date, StartProcess: timestamp, EndProcess: timestamp, Status: string, ColsDiff: boolean, CountDiffBool: boolean, CountDiffComplete: string, DataDiffBool: boolean, DataDiffTotalRows: int, DataDiffCols: map<string,string>, iteration: int, start_creation: timestamp, end_creation: timestamp, process_time_seconds: bigint, creation_time_seconds: bigint]

In [47]:
df_return.is_cached

True

In [49]:
df_return.show(truncate=False)

+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+--------------------------------------+---------+--------------------------+--------------------------+--------------------+---------------------+
|DateRefProcess|StartProcess              |EndProcess                |Status|ColsDiff|CountDiffBool|CountDiffComplete        |DataDiffBool|DataDiffTotalRows|DataDiffCols                          |iteration|start_creation            |end_creation              |process_time_seconds|creation_time_seconds|
+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+--------------------------------------+---------+--------------------------+--------------------------+--------------------+---------------------+
|2024-06-23    |2024-06-23 17:07:33.699222|2024-06-23 17:07:40.041624|NOT OK|true    |tr

---
## Report showing tables that are OK

In [50]:
df_return.where(col('Status') == 'OK').orderBy(*[desc('process_time_seconds'), asc('iteration')]).withColumn('count', monotonically_increasing_id() + 1).select(['count', 'iteration'] + [column for column in df_return.columns if column not in ['iteration', 'count']]).show(100, truncate=False)

                                                                                

+-----+---------+--------------+--------------------------+--------------------------+------+--------+-------------+------------------------+------------+-----------------+------------+--------------------------+--------------------------+--------------------+---------------------+
|count|iteration|DateRefProcess|StartProcess              |EndProcess                |Status|ColsDiff|CountDiffBool|CountDiffComplete       |DataDiffBool|DataDiffTotalRows|DataDiffCols|start_creation            |end_creation              |process_time_seconds|creation_time_seconds|
+-----+---------+--------------+--------------------------+--------------------------+------+--------+-------------+------------------------+------------+-----------------+------------+--------------------------+--------------------------+--------------------+---------------------+
|1    |1        |2024-06-23    |2024-06-23 17:07:41.986051|2024-06-23 17:07:45.177189|OK    |false   |false        |No differences in count.|false     

---
## Report showing tables that aren't OK

In [51]:
(df_return.where(col('Status') != 'OK').orderBy(*[asc('ColsDiff'), asc('CountDiffBool')])
          .withColumn('count', monotonically_increasing_id() + 1)
          .select(['count', 'iteration'] + [column for column in df_return.columns if column not in ['iteration', 'count']])
          .show(truncate=False))

+-----+---------+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+--------------------------------------+--------------------------+--------------------------+--------------------+---------------------+
|count|iteration|DateRefProcess|StartProcess              |EndProcess                |Status|ColsDiff|CountDiffBool|CountDiffComplete        |DataDiffBool|DataDiffTotalRows|DataDiffCols                          |start_creation            |end_creation              |process_time_seconds|creation_time_seconds|
+-----+---------+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+--------------------------------------+--------------------------+--------------------------+--------------------+---------------------+
|1    |4        |2024-06-23    |2024-06-23 17:07:52.411497|2024-06-23 

---
## Report showing tables with only differences on data

In [52]:
(df_return.where((col('Status') != 'OK') & (col('ColsDiff') == False) & (col('CountDiffBool') == False))
          .orderBy(*[desc('process_time_seconds'), asc('iteration')])
          .withColumn('count', monotonically_increasing_id()).select(['count', 'iteration'] + [column for column in df_return.columns if column not in ['iteration', 'count']])
          .show(truncate=False))

+-----+---------+--------------+--------------------------+--------------------------+------+--------+-------------+------------------------+------------+-----------------+--------------------------------------+--------------------------+--------------------------+--------------------+---------------------+
|count|iteration|DateRefProcess|StartProcess              |EndProcess                |Status|ColsDiff|CountDiffBool|CountDiffComplete       |DataDiffBool|DataDiffTotalRows|DataDiffCols                          |start_creation            |end_creation              |process_time_seconds|creation_time_seconds|
+-----+---------+--------------+--------------------------+--------------------------+------+--------+-------------+------------------------+------------+-----------------+--------------------------------------+--------------------------+--------------------------+--------------------+---------------------+
|0    |30       |2024-06-23    |2024-06-23 17:09:30.877831|2024-06-23 17:

---
## Report showing tables with differences on count and columns

In [53]:
df_return.where((col('Status') != 'OK') & (col('ColsDiff') == True) & (col('CountDiffBool') == True)).show(truncate=False)

+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+------------+---------+--------------------------+--------------------------+--------------------+---------------------+
|DateRefProcess|StartProcess              |EndProcess                |Status|ColsDiff|CountDiffBool|CountDiffComplete        |DataDiffBool|DataDiffTotalRows|DataDiffCols|iteration|start_creation            |end_creation              |process_time_seconds|creation_time_seconds|
+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+------------+---------+--------------------------+--------------------------+--------------------+---------------------+
|2024-06-23    |2024-06-23 17:07:33.699222|2024-06-23 17:07:40.041624|NOT OK|true    |true         |df1: 524999 | df2: 525151|false       |NULL             |NULL     

                                                                                

In [54]:
df_return.where((col('Status') != 'OK') & (col('ColsDiff') == True)).show(truncate=False)

+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+------------+---------+--------------------------+--------------------------+--------------------+---------------------+
|DateRefProcess|StartProcess              |EndProcess                |Status|ColsDiff|CountDiffBool|CountDiffComplete        |DataDiffBool|DataDiffTotalRows|DataDiffCols|iteration|start_creation            |end_creation              |process_time_seconds|creation_time_seconds|
+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+------------+---------+--------------------------+--------------------------+--------------------+---------------------+
|2024-06-23    |2024-06-23 17:07:33.699222|2024-06-23 17:07:40.041624|NOT OK|true    |true         |df1: 524999 | df2: 525151|false       |NULL             |NULL     

                                                                                

In [55]:
df_return.where((col('Status') != 'OK') & (col('ColsDiff') == False) & (col('CountDiffBool') == True)).show(truncate=False)

+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+------------+---------+--------------------------+--------------------------+--------------------+---------------------+
|DateRefProcess|StartProcess              |EndProcess                |Status|ColsDiff|CountDiffBool|CountDiffComplete        |DataDiffBool|DataDiffTotalRows|DataDiffCols|iteration|start_creation            |end_creation              |process_time_seconds|creation_time_seconds|
+--------------+--------------------------+--------------------------+------+--------+-------------+-------------------------+------------+-----------------+------------+---------+--------------------------+--------------------------+--------------------+---------------------+
|2024-06-23    |2024-06-23 17:07:49.875247|2024-06-23 17:07:50.945812|NOT OK|false   |true         |df1: 524999 | df2: 525103|false       |NULL             |NULL     