In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, array, when, array_remove, lit, size, array_contains



specify the files needs to be compared and primary key

In [2]:
    expected_file_path = '/Users/archanagajula/IdeaProjects/pyspark_data_assertion/data/expected_data.csv'
    actual_file_path = '/Users/archanagajula/IdeaProjects/pyspark_data_assertion/data/actual_data.csv'
    primary_key_column = 'vendor_id'

start the spark session

In [3]:
    spark = SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

read files

In [4]:
    actual_df = spark.read.csv(actual_file_path, sep=',', header=True)
    expected_df = spark.read.csv(expected_file_path, sep=',', header=True)

In [5]:
    print("Count of all records from expected:", expected_df.count())
    print("Distinct Count of records from expected:", expected_df.distinct().count())
    print("Count of all records from actual:", actual_df.count())
    print("Distinct Count of records from actual:", actual_df.distinct().count())

Count of all records from expected: 7
Distinct Count of records from expected: 7
Count of all records from actual: 7
Distinct Count of records from actual: 7


In [6]:
    common_records_df = expected_df.intersect(actual_df)
    print("Count of records that are in both:", common_records_df.count())

Count of records that are in both: 4


In [7]:
    only_expected_df = expected_df.subtract(actual_df)

    only_actual_df = actual_df.subtract(expected_df)

In [8]:
    print('Count of records that are only in expected:',only_expected_df.count())

Count of records that are only in expected: 3


In [9]:
    print('Count of records that are only in actual:',only_actual_df.count())

Count of records that are only in actual: 3


In [10]:
    print("Some non common records from expected:")
    expected_non_matching_records = only_expected_df.join(only_actual_df, on=primary_key_column, how='left_anti').select(
        only_expected_df["*"])
    expected_non_matching_records.orderBy(F.col(primary_key_column).asc()).show(5, False)

Some non common records from expected:
+---------+-------------------+-------------------+---------------+-------------+------------+------------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|vendor_id|pickup_datetime    |dropoff_datetime   |passenger_count|trip_distance|rate_code_id|store_and_fwd_flag|pickup_location_id|dropoff_location_id|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+---------+-------------------+-------------------+---------------+-------------+------------+------------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|7        |2019-01-27 18:49:51|2019-01-27 19:49:51|1              |1.3          |1           |N                 |144               |45           

In [11]:
    print("Some non common records from actual:")
    actual_non_matching_records = only_actual_df.join(only_expected_df, on=primary_key_column, how='left_anti').select(
        only_actual_df["*"])
    actual_non_matching_records.orderBy(F.col(primary_key_column).asc()).show(5, False)

Some non common records from actual:
+---------+-------------------+-------------------+---------------+-------------+------------+------------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|vendor_id|pickup_datetime    |dropoff_datetime   |passenger_count|trip_distance|rate_code_id|store_and_fwd_flag|pickup_location_id|dropoff_location_id|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+---------+-------------------+-------------------+---------------+-------------+------------+------------------+------------------+-------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|8        |2019-01-26 18:20:17|2019-01-26 18:39:27|1              |1.3          |1           |N                 |144               |45             

compare records column wise and adding non matching column names into array

In [12]:
    mismatch_columns = [when(expected_df[c] != actual_df[c], lit(c)).otherwise("") for c in expected_df.columns]
    select_exp = [col(primary_key_column), array_remove(array(*mismatch_columns), "").alias("mismatch_columns")]
    mismatch_columns_df = expected_df.join(actual_df,
                                           on=primary_key_column, how='inner').select(select_exp)

In [13]:
    print('Count of records having mismatch:', mismatch_columns_df.filter(size("mismatch_columns") > 0).count())
    print('Count of records matching:', mismatch_columns_df.filter(size("mismatch_columns") == 0).count())

Count of records having mismatch: 2
Count of records matching: 4


In [14]:
    mismatch_columns_df.filter(size("mismatch_columns") > 0).show(truncate=False)

+---------+--------------------------------+
|vendor_id|mismatch_columns                |
+---------+--------------------------------+
|3        |[passenger_count, trip_distance]|
|6        |[pickup_datetime]               |
+---------+--------------------------------+

