In [106]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame as DF
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from datetime import datetime
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
import sys,os

In [107]:
sp = SparkSession.builder.appName("Test").getOrCreate()

In [108]:
schema = StructType([
    StructField('name',StringType(),True),
    StructField('phone',StringType(),True),
    StructField('email',StringType(),True),
    StructField('numberrange',StringType(),True)
])


In [109]:
df_src = sp.read.csv("./data_scr.csv",header=True,schema=schema)
df_src.printSchema()

root
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- numberrange: string (nullable = true)



In [110]:
wind = Window().orderBy(df_src.name,df_src.phone,df_src.email)
df_src = df_src.withColumn('id',f.row_number().over(wind))

In [111]:
df_src.printSchema()
df_src.show(5)

root
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- numberrange: string (nullable = true)
 |-- id: integer (nullable = false)

+--------------+--------------+--------------------+-----------+---+
|          name|         phone|               email|numberrange| id|
+--------------+--------------+--------------------+-----------+---+
| Aladdin Simon|1-355-186-8775|risus.a.ultricies...|          0|  1|
|   Alec Hinton|(741) 177-1030|hendrerit.consect...|          4|  2|
|  Alexa Barker|1-245-418-8957|parturient.montes...|          3|  3|
|Alfreda Guerra|(761) 258-5778|aliquam.ultrices....|          0|  4|
|Alvin Cummings|(354) 588-7652|eros.proin.ultric...|          2|  5|
+--------------+--------------+--------------------+-----------+---+
only showing top 5 rows



In [112]:
df_tgt = sp.read.csv("./tgt_data.csv",header=True,schema=schema)
df_tgt.printSchema()

root
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- numberrange: string (nullable = true)



In [113]:
wind = Window().orderBy(df_tgt.name,df_tgt.phone,df_tgt.email)
df_tgt = df_tgt.withColumn('id',f.row_number().over(wind))
df_tgt.show(5)

+--------------+--------------+--------------------+-----------+---+
|          name|         phone|               email|numberrange| id|
+--------------+--------------+--------------------+-----------+---+
| Aladdin Simon|1-355-186-8775|rissus.a.ultricie...|          0|  1|
|   Alec Hinton|(741) 177-1030|hendrerit.consect...|          4|  2|
|  Alexa Barker|1-245-418-8957|parturient.montes...|          3|  3|
|Alfreda Guerra|(761) 258-5778|aliquam.ultrices....|          0|  4|
|Alvin Cummings|(354) 588-7652|eros.proin.ultric...|          2|  5|
+--------------+--------------+--------------------+-----------+---+
only showing top 5 rows



In [114]:
def col_check(df1,df2):
    return  [i for i in df1.columns if i not in df2.columns]

In [115]:
col_check(df_src,df_tgt)
col_check(df_tgt,df_src)

[]

In [116]:
def count_chk(df1,df2):
    return df1.count() == df2.count()

In [117]:
count_chk(df_src,df_tgt)

True

In [118]:
def verify_count(col_s,col_t,df_s,df_t,PK_s,PK_t):
    l=[]
    for c in zip(PK_s,PK_t) :
        l.append(df_s[c[0]] == df_t[c[1]])
    if len(l) == 2:
        con = l[0] & l[1]
    elif len(l) == 3 :
        con = l[0] & l[1] & l[2]
    else : con = l[0]
    #print(con)    
    df = df_s.join(df_t,con,'inner')\
        .agg(
            f.sum(f.when(col_s.isNull() & col_t.isNull(),1).otherwise(0)).alias('Both Null'),\
            f.sum(f.when(col_s.isNull() & col_t.isNotNull(),1).otherwise(0)).alias('s Null t not null'),\
            f.sum(f.when(col_s.isNotNull() & col_t.isNull(),1).otherwise(0)).alias('s not Null t null'),\
            f.sum(f.when(col_s == col_t ,1).otherwise(0)).alias('match'),\
            f.sum(f.when(col_s != col_t ,1).otherwise(0)).alias('mismatch')
            )
    return df

In [119]:
def validate_count(df_s,df_tgt):
    for cols in df_s.columns:
        print(cols)
        verify_count(df_s[cols],df_tgt[cols],df_s,df_tgt,PK_s=['id'],PK_t=['id']).show()
        #.write.option("header",True).mode("append").csv("./op1.csv")
        
        

In [120]:
validate_count(df_src,df_tgt)

name
+---------+-----------------+-----------------+-----+--------+
|Both Null|s Null t not null|s not Null t null|match|mismatch|
+---------+-----------------+-----------------+-----+--------+
|        0|                0|                0|   99|       1|
+---------+-----------------+-----------------+-----+--------+

phone
+---------+-----------------+-----------------+-----+--------+
|Both Null|s Null t not null|s not Null t null|match|mismatch|
+---------+-----------------+-----------------+-----+--------+
|        0|                0|                0|  100|       0|
+---------+-----------------+-----------------+-----+--------+

email
+---------+-----------------+-----------------+-----+--------+
|Both Null|s Null t not null|s not Null t null|match|mismatch|
+---------+-----------------+-----------------+-----+--------+
|        0|                0|                0|   98|       2|
+---------+-----------------+-----------------+-----+--------+

numberrange
+---------+------------

Function for row level data verification between source and target

In [121]:
def verify_data(col_s,col_t,df_s,df_t,PK_s,PK_t):
    l=[]
    for c in zip(PK_s,PK_t) :
        l.append(df_s[c[0]] == df_t[c[1]])
    if len(l) == 2:
        con = l[0] & l[1]
    elif len(l) == 3 :
        con = l[0] & l[1] & l[2]
    else : con = l[0]
    #print(con)    
    df_Mismatch = df_s.join(df_t,con,'inner')\
            .where(col_s != col_t)\
            .select(df_t[PK_s[0]],col_s.alias("source "+ str(col_s)),col_t.alias("Tgt "+ str(col_s)))
    
    df_s_null_t_not_null = df_s.join(df_t,con,'inner')\
            .where(col_s.isNull() & col_t.isNotNull())\
            .select(df_t[PK_s[0]],col_s.alias("source "+ str(col_s)),col_t.alias("Tgt "+ str(col_s)))
    
    df_s_not_null_t_null = df_s.join(df_t,con,'inner')\
            .where(col_s.isNotNull() & col_t.isNull())\
            .select(df_t[PK_s[0]],col_s.alias("source "+ str(col_s)),col_t.alias("Tgt "+ str(col_s)))
            
    return df_Mismatch,df_s_null_t_not_null,df_s_not_null_t_null

In [122]:
def validate_data(df_s,df_tgt):
    dict_df={}
    for index,cols in enumerate(df_s.columns):
        #print(cols)
        dict_df[cols]=verify_data(df_s[cols],df_tgt[cols],df_s,df_tgt,PK_s=['id'],PK_t=['id'])
    return dict_df

In [123]:
dc = validate_data(df_src,df_tgt)
for k,v in dc.items():
    print("{}".format(k))
    for i in range(len(dc[k])):
        if dc[k][i].count()>0:
            dc[k][i].show()

name
+---+---------------------+------------------+
| id|source Column<'name'>|Tgt Column<'name'>|
+---+---------------------+------------------+
| 94|        Vivien Martin|    Viviend Martin|
+---+---------------------+------------------+

phone
email
+---+----------------------+--------------------+
| id|source Column<'email'>| Tgt Column<'email'>|
+---+----------------------+--------------------+
|  1|  risus.a.ultricies...|rissus.a.ultricie...|
|  9|  lorem.vitae@hotma...|NULL.vitae@hotmai...|
+---+----------------------+--------------------+

numberrange
+---+----------------------------+-------------------------+
| id|source Column<'numberrange'>|Tgt Column<'numberrange'>|
+---+----------------------------+-------------------------+
| 81|                           8|                     NULL|
+---+----------------------------+-------------------------+

id
